赞
踩
小李哥将继续每天介绍一个基于亚马逊云科技AWS云计算平台的全球前沿AI技术解决方案,帮助大家快速了解国际上最热门的云计算平台亚马逊云科技AWS AI最佳实践,并应用到自己的日常工作里。
本次介绍的是如何在亚马逊云科技利用CodePipeline实现机器学习模型算法自动化微调和部署,首先在自动化工作流中创建Step Function状态机,利用状态机在机器学习托管服务SageMaker上微调大语言模型,最终为用户提供了一个对外暴露的URL端点用于推理。本架构设计全部采用了云原生Serverless架构,提供可扩展和安全的AI解决方案。本方案的解决方案架构图如下:
Amazon SageMaker 是亚马逊云科技提供的一站式机器学习服务,旨在帮助开发者和数据科学家轻松构建、训练和部署机器学习模型。SageMaker 提供了从数据准备、模型训练到模型部署的全流程工具,使用户能够高效地在云端实现机器学习项目。
Amazon Step Functions 是亚马逊云科技提供的一项完全托管的工作流编排服务,允许用户通过可视化的方式将多个 AWS 服务串联在一起,形成自动化的流程。Step Functions 使开发者能够轻松定义和管理复杂的工作流,包括分支决策、并行处理、错误处理和重试逻辑。
通过使用 Amazon Step Functions 状态机,开发者可以自动化 Amazon SageMaker 上的大模型创建、微调和部署过程。Step Functions 允许将这些步骤串联成一个可视化的工作流,简化了复杂的机器学习管道管理。自动化的好处包括:
将重复性任务自动化,减少人工干预,加速模型开发和部署流程。
通过预定义的工作流,确保每个步骤按序执行,降低人为错误的可能性。
轻松处理不同规模的机器学习任务,从小规模实验到大规模生产部署,保持一致的工作流管理。
自动化流程可简化模型的监控和管理,便于随时调整和优化机器学习管道。
利用 Step Functions 自动化 SageMaker 的操作,不仅提高了机器学习项目的开发效率,还确保了整个流程的稳定性和可重复性。
1. 首先我们进入到亚马逊云科技控制台,进入CodeCommit代码库服务,点击"Clone URL"分别复制两个代码库的URL,用于将代码库代码clone到本地。
2. 下面进入到亚马逊云科技云端IDE Cloud9中,创建一个新的Cloud9后点击“Open”打开。
3. 在IDE控制台中运行以下命令,将“genai-repo”中的模型文件下载到本地
- git clone <genai-repo URL>
- cd genai-repo
4. 我们在文件夹中新建如下两个文件“buildspec.yml”和“state_machine_manager.py”,分别是CICD和Step Function状态配置文件。文件内容如下:
“buildspec.yml”:该文件主要是在CICD代码构建中的配置文件,主要是运行命令“python state_machine_manager.py”
- version: 0.2
-
- phases:
- install:
- commands:
- - python --version
- - pip install --upgrade pip
- - pip install boto3
- - pip install --upgrade sagemaker
- - pip install --upgrade stepfunctions
- pre_build:
- commands:
- - cd $CODEBUILD_SRC_DIR
- build:
- commands:
- - echo Build started on `date`
- - cd $CODEBUILD_SRC_DIR
- - echo Current directory `ls -la`
- - echo Building the AWS Step-Function...
- - echo Path `pwd`
- - python state_machine_manager.py
- post_build:
- commands:
- - echo Build completed on `date`

“state_machine_manager.py”:该文件主要是用于创建一个Step Function,定义工作流在SageMaker上对模型进行自动化创建、微调和部署,整个Step Function工作流包含多个状态,具体的定义在workflow_definition变量中。
- import boto3
- import datetime
- import random
- import uuid
- import logging
- import stepfunctions
- import sagemaker
- import io
- import random
- import json
- import sys
- from sagemaker import djl_inference
-
- from sagemaker import image_uris
- from sagemaker import Model
- from stepfunctions import steps
- from stepfunctions.steps import *
- from stepfunctions.workflow import Workflow
-
- iam = boto3.client('iam')
- s3 = boto3.client('s3')
-
- stepfunctions.set_stream_logger(level=logging.INFO)
-
- ### SET UP STEP FUNCTIONS ###
- unique_timestamp = f"{datetime.datetime.now():%H-%m-%S}"
- state_machine_name = f'FineTuningLLM-{unique_timestamp}'
- notebook_name = f'fine-tuning-llm-{unique_timestamp}'
- succeed_state = Succeed("HelloWorldSuccessful")
- fail_state = Fail("HelloWorldFailed")
- new_model_name = f"trained-dolly-{unique_timestamp}"
-
- try:
- # Get a list of all bucket names
- bucket_list = s3.list_buckets()
-
- # Filter bucket names starting with 'automate'
- bucket_names = [bucket['Name'] for bucket in bucket_list['Buckets'] if bucket['Name'].startswith('automate')]
- mybucket = bucket_names[0].strip("'[]")
- except Exception as e:
- print(f"Error: {e}")
-
-
-
- # Get the stepfunction_workflow_role
- try:
- role = iam.get_role(RoleName='stepfunction_workflow_role')
- workflow_role = role['Role']['Arn']
- except iam.exceptions.NoSuchEntityException:
- print("The role 'stepfunction_workflow_role' does not exist.")
-
- # Get the sagemaker_exec_role
- try:
- role2 = iam.get_role(RoleName='sagemaker_exec_role')
- sagemaker_exec_role = role2['Role']['Arn']
- except iam.exceptions.NoSuchEntityException:
- print("The role 'sagemaker_exec_role' does not exist.")
-
- # Create a SageMaker model object
- model_data="s3://{}/output/lora_model.tar.gz".format(mybucket)
-
- image_uri = image_uris.retrieve(framework="djl-deepspeed",
- version="0.22.1",
- region="us-east-1")
- trained_dolly_model = Model(image_uri=image_uri,
- model_data=model_data,
- predictor_cls=djl_inference.DJLPredictor,
- role=sagemaker_exec_role)
-
- # Create a retry configuration for SageMaker throttling exceptions. This is attached to
- # the SageMaker steps to ensure they are retried until they run.
- SageMaker_throttling_retry = stepfunctions.steps.states.Retry(
- error_equals=['ThrottlingException', 'SageMaker.AmazonSageMakerException'],
- interval_seconds=5,
- max_attempts=60,
- backoff_rate=1.25
- )
- # Create a state machinestep to create the model
- model_step = steps.ModelStep(
- 'Create model',
- model=trained_dolly_model,
- model_name=new_model_name
- )
- # Add a retry configuration to the model_step
- model_step.add_retry(SageMaker_throttling_retry)
-
- # Create notebook for running SageMaker training job.
- create_sagemaker_notebook = LambdaStep(
- state_id="Create training job",
- parameters={
- "FunctionName": "create_notebook_function",
- "Payload": {"notebook_name": notebook_name},
- },
- )
- # Get notebook status
- get_notebook_status = LambdaStep(
- state_id="Get training job status",
- parameters={
- "FunctionName": "get_notebook_status_function",
- "Payload": {"notebook_name": notebook_name},
- },
- )
-
- #choice state
- response_notebook_status = Choice(state_id="Response to training job status")
- wait_for_training_job = Wait(
- state_id="Wait for training job",
- seconds=150)
- wait_for_training_job.next(get_notebook_status)
- #retry checking notebook status
- response_notebook_status.add_choice(
- rule=ChoiceRule.StringEquals(
- variable="$.Payload.trainningstatus", value="Failed"
- ),
- next_step=fail_state,
- )
- response_notebook_status.add_choice(
- rule=ChoiceRule.StringEquals(
- variable="$.Payload.trainningstatus", value="Stopped"
- ),
- next_step=fail_state,
- )
- response_notebook_status.add_choice(
- ChoiceRule.StringEquals(
- variable="$.Payload.trainningstatus", value="NotAvailable"
- ),
- next_step=fail_state,
- )
- inservice_rule=ChoiceRule.StringEquals(
- variable="$.Payload.trainningstatus", value="InService"
- )
- response_notebook_status.add_choice(
- ChoiceRule.Not(inservice_rule),
- next_step=wait_for_training_job,
- )
-
- # Create a step to generate an Amazon SageMaker endpoint configuration
- endpoint_config_step = steps.EndpointConfigStep(
- "Create endpoint configuration",
- endpoint_config_name=new_model_name,
- model_name=new_model_name,
- initial_instance_count=1,
- instance_type='ml.g4dn.2xlarge'
- )
- # Add a retry configuration to the endpoint_config_step
- endpoint_config_step.add_retry(SageMaker_throttling_retry)
-
- # Create a step to generate an Amazon SageMaker endpoint
- endpoint_step = steps.EndpointStep(
- "Create endpoint",
- endpoint_name=f"endpoint-{new_model_name}",
- endpoint_config_name=new_model_name
- )
- # Add a retry configuration to the endpoint_step
- endpoint_step.add_retry(SageMaker_throttling_retry)
-
- # Chain the steps together to generate a full AWS Step Function
- workflow_definition = steps.Chain([
- create_sagemaker_notebook,
- wait_for_training_job,
- get_notebook_status,
- response_notebook_status,
- model_step,
- endpoint_config_step,
- endpoint_step
- ])
-
- # Create an AWS Step Functions workflow based on inputs
- basic_workflow = Workflow(
- name=state_machine_name,
- definition=workflow_definition,
- role=workflow_role,
- )
-
- jsonDef = basic_workflow.definition.to_json(pretty=True)
-
- print('---------')
- print(jsonDef)
- print('---------')
-
- basic_workflow.create()

5.接下来我们将文件夹中新的全部文件上传回我们的代码库中
- git add *
- git commit -m "initial commit"
- git pus
6. 接下来我们进入到代码构建服务CodeBuild中,创建一个新的项目。
7.为项目起名“genai-build”,并为构建添加代码库,代码库设置为genai-repo,分支选为master。
8.为代码构建添加授权权限,以及构建配置文件Buildspec,最后点击创建。
9. 接下来我们进入到CodePipeline中创建一个新的CICD部署任务
10.为pipeline起名“genai-pipeline”,并分配授权权限。
11. 首先选择CICD部署流中的数据源,选择类型为CodeCommit代码库,项目repo为“genai-repo”,分支为master。
12. 在Build代码构建阶段选择我们刚刚创建的CodeBuild项目“genai-build”。省略部署阶段,直接点击创建。
13. 等待代码构建阶段成功完成,接下来我们进入到step function服务主页。
14. 在step function主页中我们可以看到codebuild服务中我们新创建了一个Step Function: “FineTuningLLM-19-08-44”
15. 我们点击Step Function后可以获取我们之前定义的工作流配置信息、
{ "StartAt": "Create training job", "States": { "Create training job": { "Parameters": { "FunctionName": "create_notebook_function", "Payload": { "notebook_name": "fine-tuning-llm-19-08-44" } }, "Resource": "arn:aws:states:::lambda:invoke", "Type": "Task", "Next": "Wait for training job" }, "Wait for training job": { "Seconds": 150, "Type": "Wait", "Next": "Get training job status" }, "Get training job status": { "Parameters": { "FunctionName": "get_notebook_status_function", "Payload": { "notebook_name": "fine-tuning-llm-19-08-44" } }, "Resource": "arn:aws:states:::lambda:invoke", "Type": "Task", "Next": "Response to training job status" }, "Response to training job status": { "Type": "Choice", "Choices": [ { "Variable": "$.Payload.trainningstatus", "StringEquals": "Failed", "Next": "HelloWorldFailed" }, { "Variable": "$.Payload.trainningstatus", "StringEquals": "Stopped", "Next": "HelloWorldFailed" }, { "Variable": "$.Payload.trainningstatus", "StringEquals": "NotAvailable", "Next": "HelloWorldFailed" }, { "Not": { "Variable": "$.Payload.trainningstatus", "StringEquals": "InService" }, "Next": "Wait for training job" } ], "Default": "Create model" }, "Create model": { "Parameters": { "ExecutionRoleArn": "arn:aws:iam::903982278766:role/sagemaker_exec_role", "ModelName": "trained-dolly-19-08-44", "PrimaryContainer": { "Environment": {}, "Image": "763104351884.dkr.ecr.us-east-1.amazonaws.com/djl-inference:0.22.1-deepspeed0.9.2-cu118", "ModelDataUrl": "s3://automate-fine-tuning-e91ee010/output/lora_model.tar.gz" } }, "Resource": "arn:aws:states:::sagemaker:createModel", "Type": "Task", "Next": "Create endpoint configuration", "Retry": [ { "ErrorEquals": [ "ThrottlingException", "SageMaker.AmazonSageMakerException" ], "IntervalSeconds": 5, "MaxAttempts": 60, "BackoffRate": 1.25 } ] }, "Create endpoint configuration": { "Resource": "arn:aws:states:::sagemaker:createEndpointConfig", "Parameters": { "EndpointConfigName": "trained-dolly-19-08-44", "ProductionVariants": [ { "InitialInstanceCount": 1, "InstanceType": "ml.g4dn.2xlarge", "ModelName": "trained-dolly-19-08-44", "VariantName": "AllTraffic" } ] }, "Type": "Task", "Next": "Create endpoint", "Retry": [ { "ErrorEquals": [ "ThrottlingException", "SageMaker.AmazonSageMakerException" ], "IntervalSeconds": 5, "MaxAttempts": 60, "BackoffRate": 1.25 } ] }, "Create endpoint": { "Resource": "arn:aws:states:::sagemaker:createEndpoint", "Parameters": { "EndpointConfigName": "trained-dolly-19-08-44", "EndpointName": "endpoint-trained-dolly-19-08-44" }, "Type": "Task", "End": true, "Retry": [ { "ErrorEquals": [ "ThrottlingException", "SageMaker.AmazonSageMakerException" ], "IntervalSeconds": 5, "MaxAttempts": 60, "BackoffRate": 1.25 } ] }, "HelloWorldFailed": { "Type": "Fail" } } }
16. 在Step Function运行状态视图中我们可以看到全部步骤都已经完成了。其中两个状态“create training job"和"get training job status"分别调用了两个不同的lambda python函数。
“create training job"的Python代码如下:
- import boto3
- import base64
- import os
-
-
- def lambda_handler(event, context):
- aws_region = 'us-east-1'
- notebook_name = event["notebook_name"]
- # s3_bucket='automate-fine-tunning-gblpoc'
-
- notebook_file = 'lab-notebook.ipynb'
- iam = boto3.client('iam')
-
- # Create SageMaker and S3 clients
- sagemaker = boto3.client('sagemaker', region_name=aws_region)
- s3 = boto3.resource('s3', region_name=aws_region)
- s3_client = boto3.client('s3')
- s3_bucket = os.environ['s3_bucket']
- s3_prefix="notebook_lifecycle"
-
- lifecycle_config_script = f"""#!/bin/bash
- set -e
- cd /home/ec2-user/SageMaker/
- aws s3 cp s3://{s3_bucket}/{s3_prefix}/training_scripts.zip .
- unzip training_scripts.zip
- echo "Running training job..."
- source /home/ec2-user/anaconda3/bin/activate pytorch_p310
- chmod +x /home/ec2-user/SageMaker/converter.sh
- chown ec2-user:ec2-user /home/ec2-user/SageMaker/converter.sh
- nohup /home/ec2-user/SageMaker/converter.sh >> /home/ec2-user/SageMaker/nohup.out 2>&1 &
- """
-
- lifecycle_config_name = f'LCF-{notebook_name}'
- print(lifecycle_config_script)
-
- # Function to manage lifecycle configuration
- def manage_lifecycle_config(lifecycle_config_script):
- content = base64.b64encode(lifecycle_config_script.encode('utf-8')).decode('utf-8')
- try:
- # Create lifecycle configuration if not found
- sagemaker.create_notebook_instance_lifecycle_config(
- NotebookInstanceLifecycleConfigName=lifecycle_config_name,
- OnCreate=[{'Content': content}]
- )
- except sagemaker.exceptions.ClientError as e:
- print(e)
-
- # Try to describe the notebook instance to determine its status
- # Get the role with the specified name
- try:
- role = iam.get_role(RoleName='sagemaker_exec_role')
- sagemaker_exec_role = role['Role']['Arn']
- except iam.exceptions.NoSuchEntityException:
- print("The role 'sagemaker_exec_role' does not exist.")
-
- try:
- response = sagemaker.describe_notebook_instance(NotebookInstanceName=notebook_name)
- except sagemaker.exceptions.ClientError as e:
- print(e)
- if 'RecordNotFound' in str(e):
- manage_lifecycle_config(lifecycle_config_script)
- # Create a new SageMaker notebook instance if not found
- # Updated to 4xl by DWhite due to 12xl not being available. 7/18/2024
- sagemaker.create_notebook_instance(
- NotebookInstanceName=notebook_name,
- InstanceType='ml.g5.4xlarge',
- RoleArn=sagemaker_exec_role,
- LifecycleConfigName=lifecycle_config_name,
- VolumeSizeInGB=30
- )
-
- else:
- raise
-
- return {
- 'statusCode': 200,
- 'body': 'Notebook instance setup and lifecycle configuration applied.'
- }

"get training job status"的代码如下:
- import boto3
- import json
- import os
-
- s3 = boto3.client('s3')
- sagemaker = boto3.client('sagemaker')
- s3_bucket = os.environ['s3_bucket']
-
- def lambda_handler(event, context):
- print(event)
- notebook_name = event["notebook_name"]
- notebook_status = "NotAvailable"
- training_job_status = 'NotAvailable'
- check_status = 'NotAvailable'
- # Try to describe the notebook instance to determine its status
- try:
- response = sagemaker.describe_notebook_instance(NotebookInstanceName=notebook_name)
- notebook_status = response['NotebookInstanceStatus']
-
- if notebook_status == 'InService':
- find_artifact = s3.list_objects_v2(
- Bucket=s3_bucket,
- Prefix='output/lora_model.tar.gz'
- )
- artifact_location = find_artifact.get('Contents',[])
- if not artifact_location:
- training_job_status = 'Creating'
- check_status = 'Creating'
- else:
- if 'output/lora_model.tar.gz' in str(artifact_location):
- training_job_status = 'Completed'
- check_status = 'InService'
- elif notebook_status == 'Failed':
- check_status = 'Failed'
- elif notebook_status == 'NotAvailable':
- check_status = 'NotAvailable'
- else:
- check_status = 'Pending'
- print(f"Notebook Status: {notebook_status}")
- print(f"Model on s3: {training_job_status}")
- print(f"Check status: {check_status}")
-
- except sagemaker.exceptions.ClientError as e:
- print(e)
-
-
- return {
- 'statusCode': 200,
- 'input': notebook_name,
- 'trainningstatus': check_status
- }

17. 在Step Function工作流全部任务结束后,我们进入到SageMaker服务中,创建一个Jupyter Notebook并打开。
18. 我们创建一个新的Jupyter Notebook文件,并复制Fine-tuning微调代码。我们节选了部分微调代码段,主要是利用PEFT和Lora微调Dolly大语言模型。
- EPOCHS = 10
- LEARNING_RATE = 1e-4
- MODEL_SAVE_FOLDER_NAME = "dolly-3b-lora"
-
- training_args = TrainingArguments(
- output_dir=MODEL_SAVE_FOLDER_NAME,
- fp16=True,
- per_device_train_batch_size=1,
- per_device_eval_batch_size=1,
- learning_rate=LEARNING_RATE,
- num_train_epochs=EPOCHS,
- logging_strategy="steps",
- logging_steps=100,
- evaluation_strategy="steps",
- eval_steps=100,
- save_strategy="steps",
- save_steps=20000,
- save_total_limit=10,
- )
-
- trainer = Trainer(
- model=model,
- tokenizer=tokenizer,
- args=training_args,
- train_dataset=split_dataset['train'],
- eval_dataset=split_dataset["test"],
- data_collator=data_collator,
- )
- model.config.use_cache = False # silence the warnings. Please re-enable for inference!
- trainer.train()

19. 我们也需要创建一个SageMaker Lifecycle configurationsj脚本,用于在Step Function自动化模型微调中触发命令开启微调,启动脚本如下。
- #!/bin/bash
- set -e
- cd /home/ec2-user/SageMaker/
- aws s3 cp s3://automate-fine-tuning-e91ee010/notebook_lifecycle/training_scripts.zip .
- unzip training_scripts.zip
- echo "Running training job..."
- source /home/ec2-user/anaconda3/bin/activate pytorch_p310
- chmod +x /home/ec2-user/SageMaker/converter.sh
- chown ec2-user:ec2-user /home/ec2-user/SageMaker/converter.sh
-
- nohup /home/ec2-user/SageMaker/converter.sh >> /home/ec2-user/SageMaker/nohup.out 2>&1 &
20. 最后我们进入到SageMaker的Endpoint工程中,就可以看到部署成功的AI大模型API端点URL了。
以上就是在亚马逊云科技上利用亚马逊云科技CICD服务CodePipeline和Step Function工作流,自动化AI大语言模型的创建、微调、部署的全部步骤。欢迎大家未来与我一起,未来获取更多国际前沿的生成式AI开发方案。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。