当前位置:   article > 正文

利用CICD管道和MLOps自动化微调、部署亚马逊云科技上的AI大语言模型

利用CICD管道和MLOps自动化微调、部署亚马逊云科技上的AI大语言模型

项目简介:

小李哥将继续每天介绍一个基于亚马逊云科技AWS云计算平台的全球前沿AI技术解决方案,帮助大家快速了解国际上最热门的云计算平台亚马逊云科技AWS AI最佳实践,并应用到自己的日常工作里。

本次介绍的是如何在亚马逊云科技利用CodePipeline实现机器学习模型算法自动化微调和部署,首先在自动化工作流中创建Step Function状态机,利用状态机在机器学习托管服务SageMaker上微调大语言模型,最终为用户提供了一个对外暴露的URL端点用于推理。本架构设计全部采用了云原生Serverless架构,提供可扩展和安全的AI解决方案。本方案的解决方案架构图如下:

方案所需基础知识  

什么是 Amazon SageMaker?

Amazon SageMaker 是亚马逊云科技提供的一站式机器学习服务,旨在帮助开发者和数据科学家轻松构建、训练和部署机器学习模型。SageMaker 提供了从数据准备、模型训练到模型部署的全流程工具,使用户能够高效地在云端实现机器学习项目。

什么是 Amazon Step Functions?

Amazon Step Functions 是亚马逊云科技提供的一项完全托管的工作流编排服务,允许用户通过可视化的方式将多个 AWS 服务串联在一起,形成自动化的流程。Step Functions 使开发者能够轻松定义和管理复杂的工作流,包括分支决策、并行处理、错误处理和重试逻辑。

使用 Step Function 状态机自动化 SageMaker 上大模型创建、微调、部署的好处

通过使用 Amazon Step Functions 状态机,开发者可以自动化 Amazon SageMaker 上的大模型创建、微调和部署过程。Step Functions 允许将这些步骤串联成一个可视化的工作流,简化了复杂的机器学习管道管理。自动化的好处包括:

提高效率

将重复性任务自动化,减少人工干预,加速模型开发和部署流程。

降低错误风险

通过预定义的工作流,确保每个步骤按序执行,降低人为错误的可能性。

增强可扩展性

轻松处理不同规模的机器学习任务,从小规模实验到大规模生产部署,保持一致的工作流管理。

简化运维

自动化流程可简化模型的监控和管理,便于随时调整和优化机器学习管道。

利用 Step Functions 自动化 SageMaker 的操作,不仅提高了机器学习项目的开发效率,还确保了整个流程的稳定性和可重复性。

本方案包括的内容

1. 通过SDK代码形式定义亚马逊云科技State Function状态机配置

2. 配置亚马逊云科技Pipeline构建CICD管道,自动化创建State Function工作流

3. 启动State Function工作流自动化大语言AI模型的创建、微调和部署

项目搭建具体步骤:

1. 首先我们进入到亚马逊云科技控制台,进入CodeCommit代码库服务,点击"Clone URL"分别复制两个代码库的URL,用于将代码库代码clone到本地。

2. 下面进入到亚马逊云科技云端IDE Cloud9中,创建一个新的Cloud9后点击“Open”打开。

3. 在IDE控制台中运行以下命令,将“genai-repo”中的模型文件下载到本地

  1. git clone <genai-repo URL>
  2. cd genai-repo

4. 我们在文件夹中新建如下两个文件“buildspec.yml”和“state_machine_manager.py”,分别是CICD和Step Function状态配置文件。文件内容如下:

“buildspec.yml”:该文件主要是在CICD代码构建中的配置文件,主要是运行命令“python state_machine_manager.py”

  1. version: 0.2
  2. phases:
  3. install:
  4. commands:
  5. - python --version
  6. - pip install --upgrade pip
  7. - pip install boto3
  8. - pip install --upgrade sagemaker
  9. - pip install --upgrade stepfunctions
  10. pre_build:
  11. commands:
  12. - cd $CODEBUILD_SRC_DIR
  13. build:
  14. commands:
  15. - echo Build started on `date`
  16. - cd $CODEBUILD_SRC_DIR
  17. - echo Current directory `ls -la`
  18. - echo Building the AWS Step-Function...
  19. - echo Path `pwd`
  20. - python state_machine_manager.py
  21. post_build:
  22. commands:
  23. - echo Build completed on `date`

“state_machine_manager.py”:该文件主要是用于创建一个Step Function,定义工作流在SageMaker上对模型进行自动化创建、微调和部署,整个Step Function工作流包含多个状态,具体的定义在workflow_definition变量中。

  1. import boto3
  2. import datetime
  3. import random
  4. import uuid
  5. import logging
  6. import stepfunctions
  7. import sagemaker
  8. import io
  9. import random
  10. import json
  11. import sys
  12. from sagemaker import djl_inference
  13. from sagemaker import image_uris
  14. from sagemaker import Model
  15. from stepfunctions import steps
  16. from stepfunctions.steps import *
  17. from stepfunctions.workflow import Workflow
  18. iam = boto3.client('iam')
  19. s3 = boto3.client('s3')
  20. stepfunctions.set_stream_logger(level=logging.INFO)
  21. ### SET UP STEP FUNCTIONS ###
  22. unique_timestamp = f"{datetime.datetime.now():%H-%m-%S}"
  23. state_machine_name = f'FineTuningLLM-{unique_timestamp}'
  24. notebook_name = f'fine-tuning-llm-{unique_timestamp}'
  25. succeed_state = Succeed("HelloWorldSuccessful")
  26. fail_state = Fail("HelloWorldFailed")
  27. new_model_name = f"trained-dolly-{unique_timestamp}"
  28. try:
  29. # Get a list of all bucket names
  30. bucket_list = s3.list_buckets()
  31. # Filter bucket names starting with 'automate'
  32. bucket_names = [bucket['Name'] for bucket in bucket_list['Buckets'] if bucket['Name'].startswith('automate')]
  33. mybucket = bucket_names[0].strip("'[]")
  34. except Exception as e:
  35. print(f"Error: {e}")
  36. # Get the stepfunction_workflow_role
  37. try:
  38. role = iam.get_role(RoleName='stepfunction_workflow_role')
  39. workflow_role = role['Role']['Arn']
  40. except iam.exceptions.NoSuchEntityException:
  41. print("The role 'stepfunction_workflow_role' does not exist.")
  42. # Get the sagemaker_exec_role
  43. try:
  44. role2 = iam.get_role(RoleName='sagemaker_exec_role')
  45. sagemaker_exec_role = role2['Role']['Arn']
  46. except iam.exceptions.NoSuchEntityException:
  47. print("The role 'sagemaker_exec_role' does not exist.")
  48. # Create a SageMaker model object
  49. model_data="s3://{}/output/lora_model.tar.gz".format(mybucket)
  50. image_uri = image_uris.retrieve(framework="djl-deepspeed",
  51. version="0.22.1",
  52. region="us-east-1")
  53. trained_dolly_model = Model(image_uri=image_uri,
  54. model_data=model_data,
  55. predictor_cls=djl_inference.DJLPredictor,
  56. role=sagemaker_exec_role)
  57. # Create a retry configuration for SageMaker throttling exceptions. This is attached to
  58. # the SageMaker steps to ensure they are retried until they run.
  59. SageMaker_throttling_retry = stepfunctions.steps.states.Retry(
  60. error_equals=['ThrottlingException', 'SageMaker.AmazonSageMakerException'],
  61. interval_seconds=5,
  62. max_attempts=60,
  63. backoff_rate=1.25
  64. )
  65. # Create a state machinestep to create the model
  66. model_step = steps.ModelStep(
  67. 'Create model',
  68. model=trained_dolly_model,
  69. model_name=new_model_name
  70. )
  71. # Add a retry configuration to the model_step
  72. model_step.add_retry(SageMaker_throttling_retry)
  73. # Create notebook for running SageMaker training job.
  74. create_sagemaker_notebook = LambdaStep(
  75. state_id="Create training job",
  76. parameters={
  77. "FunctionName": "create_notebook_function",
  78. "Payload": {"notebook_name": notebook_name},
  79. },
  80. )
  81. # Get notebook status
  82. get_notebook_status = LambdaStep(
  83. state_id="Get training job status",
  84. parameters={
  85. "FunctionName": "get_notebook_status_function",
  86. "Payload": {"notebook_name": notebook_name},
  87. },
  88. )
  89. #choice state
  90. response_notebook_status = Choice(state_id="Response to training job status")
  91. wait_for_training_job = Wait(
  92. state_id="Wait for training job",
  93. seconds=150)
  94. wait_for_training_job.next(get_notebook_status)
  95. #retry checking notebook status
  96. response_notebook_status.add_choice(
  97. rule=ChoiceRule.StringEquals(
  98. variable="$.Payload.trainningstatus", value="Failed"
  99. ),
  100. next_step=fail_state,
  101. )
  102. response_notebook_status.add_choice(
  103. rule=ChoiceRule.StringEquals(
  104. variable="$.Payload.trainningstatus", value="Stopped"
  105. ),
  106. next_step=fail_state,
  107. )
  108. response_notebook_status.add_choice(
  109. ChoiceRule.StringEquals(
  110. variable="$.Payload.trainningstatus", value="NotAvailable"
  111. ),
  112. next_step=fail_state,
  113. )
  114. inservice_rule=ChoiceRule.StringEquals(
  115. variable="$.Payload.trainningstatus", value="InService"
  116. )
  117. response_notebook_status.add_choice(
  118. ChoiceRule.Not(inservice_rule),
  119. next_step=wait_for_training_job,
  120. )
  121. # Create a step to generate an Amazon SageMaker endpoint configuration
  122. endpoint_config_step = steps.EndpointConfigStep(
  123. "Create endpoint configuration",
  124. endpoint_config_name=new_model_name,
  125. model_name=new_model_name,
  126. initial_instance_count=1,
  127. instance_type='ml.g4dn.2xlarge'
  128. )
  129. # Add a retry configuration to the endpoint_config_step
  130. endpoint_config_step.add_retry(SageMaker_throttling_retry)
  131. # Create a step to generate an Amazon SageMaker endpoint
  132. endpoint_step = steps.EndpointStep(
  133. "Create endpoint",
  134. endpoint_name=f"endpoint-{new_model_name}",
  135. endpoint_config_name=new_model_name
  136. )
  137. # Add a retry configuration to the endpoint_step
  138. endpoint_step.add_retry(SageMaker_throttling_retry)
  139. # Chain the steps together to generate a full AWS Step Function
  140. workflow_definition = steps.Chain([
  141. create_sagemaker_notebook,
  142. wait_for_training_job,
  143. get_notebook_status,
  144. response_notebook_status,
  145. model_step,
  146. endpoint_config_step,
  147. endpoint_step
  148. ])
  149. # Create an AWS Step Functions workflow based on inputs
  150. basic_workflow = Workflow(
  151. name=state_machine_name,
  152. definition=workflow_definition,
  153. role=workflow_role,
  154. )
  155. jsonDef = basic_workflow.definition.to_json(pretty=True)
  156. print('---------')
  157. print(jsonDef)
  158. print('---------')
  159. basic_workflow.create()

5.接下来我们将文件夹中新的全部文件上传回我们的代码库中

  1. git add *
  2. git commit -m "initial commit"
  3. git pus

6. 接下来我们进入到代码构建服务CodeBuild中,创建一个新的项目。

7.为项目起名“genai-build”,并为构建添加代码库,代码库设置为genai-repo,分支选为master。

8.为代码构建添加授权权限,以及构建配置文件Buildspec,最后点击创建。

9. 接下来我们进入到CodePipeline中创建一个新的CICD部署任务

10.为pipeline起名“genai-pipeline”,并分配授权权限。

11. 首先选择CICD部署流中的数据源,选择类型为CodeCommit代码库,项目repo为“genai-repo”,分支为master。

12. 在Build代码构建阶段选择我们刚刚创建的CodeBuild项目“genai-build”。省略部署阶段,直接点击创建。

13. 等待代码构建阶段成功完成,接下来我们进入到step function服务主页。

14. 在step function主页中我们可以看到codebuild服务中我们新创建了一个Step Function: “FineTuningLLM-19-08-44”

15. 我们点击Step Function后可以获取我们之前定义的工作流配置信息、

  1. {
  2. "StartAt": "Create training job",
  3. "States": {
  4. "Create training job": {
  5. "Parameters": {
  6. "FunctionName": "create_notebook_function",
  7. "Payload": {
  8. "notebook_name": "fine-tuning-llm-19-08-44"
  9. }
  10. },
  11. "Resource": "arn:aws:states:::lambda:invoke",
  12. "Type": "Task",
  13. "Next": "Wait for training job"
  14. },
  15. "Wait for training job": {
  16. "Seconds": 150,
  17. "Type": "Wait",
  18. "Next": "Get training job status"
  19. },
  20. "Get training job status": {
  21. "Parameters": {
  22. "FunctionName": "get_notebook_status_function",
  23. "Payload": {
  24. "notebook_name": "fine-tuning-llm-19-08-44"
  25. }
  26. },
  27. "Resource": "arn:aws:states:::lambda:invoke",
  28. "Type": "Task",
  29. "Next": "Response to training job status"
  30. },
  31. "Response to training job status": {
  32. "Type": "Choice",
  33. "Choices": [
  34. {
  35. "Variable": "$.Payload.trainningstatus",
  36. "StringEquals": "Failed",
  37. "Next": "HelloWorldFailed"
  38. },
  39. {
  40. "Variable": "$.Payload.trainningstatus",
  41. "StringEquals": "Stopped",
  42. "Next": "HelloWorldFailed"
  43. },
  44. {
  45. "Variable": "$.Payload.trainningstatus",
  46. "StringEquals": "NotAvailable",
  47. "Next": "HelloWorldFailed"
  48. },
  49. {
  50. "Not": {
  51. "Variable": "$.Payload.trainningstatus",
  52. "StringEquals": "InService"
  53. },
  54. "Next": "Wait for training job"
  55. }
  56. ],
  57. "Default": "Create model"
  58. },
  59. "Create model": {
  60. "Parameters": {
  61. "ExecutionRoleArn": "arn:aws:iam::903982278766:role/sagemaker_exec_role",
  62. "ModelName": "trained-dolly-19-08-44",
  63. "PrimaryContainer": {
  64. "Environment": {},
  65. "Image": "763104351884.dkr.ecr.us-east-1.amazonaws.com/djl-inference:0.22.1-deepspeed0.9.2-cu118",
  66. "ModelDataUrl": "s3://automate-fine-tuning-e91ee010/output/lora_model.tar.gz"
  67. }
  68. },
  69. "Resource": "arn:aws:states:::sagemaker:createModel",
  70. "Type": "Task",
  71. "Next": "Create endpoint configuration",
  72. "Retry": [
  73. {
  74. "ErrorEquals": [
  75. "ThrottlingException",
  76. "SageMaker.AmazonSageMakerException"
  77. ],
  78. "IntervalSeconds": 5,
  79. "MaxAttempts": 60,
  80. "BackoffRate": 1.25
  81. }
  82. ]
  83. },
  84. "Create endpoint configuration": {
  85. "Resource": "arn:aws:states:::sagemaker:createEndpointConfig",
  86. "Parameters": {
  87. "EndpointConfigName": "trained-dolly-19-08-44",
  88. "ProductionVariants": [
  89. {
  90. "InitialInstanceCount": 1,
  91. "InstanceType": "ml.g4dn.2xlarge",
  92. "ModelName": "trained-dolly-19-08-44",
  93. "VariantName": "AllTraffic"
  94. }
  95. ]
  96. },
  97. "Type": "Task",
  98. "Next": "Create endpoint",
  99. "Retry": [
  100. {
  101. "ErrorEquals": [
  102. "ThrottlingException",
  103. "SageMaker.AmazonSageMakerException"
  104. ],
  105. "IntervalSeconds": 5,
  106. "MaxAttempts": 60,
  107. "BackoffRate": 1.25
  108. }
  109. ]
  110. },
  111. "Create endpoint": {
  112. "Resource": "arn:aws:states:::sagemaker:createEndpoint",
  113. "Parameters": {
  114. "EndpointConfigName": "trained-dolly-19-08-44",
  115. "EndpointName": "endpoint-trained-dolly-19-08-44"
  116. },
  117. "Type": "Task",
  118. "End": true,
  119. "Retry": [
  120. {
  121. "ErrorEquals": [
  122. "ThrottlingException",
  123. "SageMaker.AmazonSageMakerException"
  124. ],
  125. "IntervalSeconds": 5,
  126. "MaxAttempts": 60,
  127. "BackoffRate": 1.25
  128. }
  129. ]
  130. },
  131. "HelloWorldFailed": {
  132. "Type": "Fail"
  133. }
  134. }
  135. }

16. 在Step Function运行状态视图中我们可以看到全部步骤都已经完成了。其中两个状态“create training job"和"get training job status"分别调用了两个不同的lambda python函数。

“create training job"的Python代码如下:

  1. import boto3
  2. import base64
  3. import os
  4. def lambda_handler(event, context):
  5. aws_region = 'us-east-1'
  6. notebook_name = event["notebook_name"]
  7. # s3_bucket='automate-fine-tunning-gblpoc'
  8. notebook_file = 'lab-notebook.ipynb'
  9. iam = boto3.client('iam')
  10. # Create SageMaker and S3 clients
  11. sagemaker = boto3.client('sagemaker', region_name=aws_region)
  12. s3 = boto3.resource('s3', region_name=aws_region)
  13. s3_client = boto3.client('s3')
  14. s3_bucket = os.environ['s3_bucket']
  15. s3_prefix="notebook_lifecycle"
  16. lifecycle_config_script = f"""#!/bin/bash
  17. set -e
  18. cd /home/ec2-user/SageMaker/
  19. aws s3 cp s3://{s3_bucket}/{s3_prefix}/training_scripts.zip .
  20. unzip training_scripts.zip
  21. echo "Running training job..."
  22. source /home/ec2-user/anaconda3/bin/activate pytorch_p310
  23. chmod +x /home/ec2-user/SageMaker/converter.sh
  24. chown ec2-user:ec2-user /home/ec2-user/SageMaker/converter.sh
  25. nohup /home/ec2-user/SageMaker/converter.sh >> /home/ec2-user/SageMaker/nohup.out 2>&1 &
  26. """
  27. lifecycle_config_name = f'LCF-{notebook_name}'
  28. print(lifecycle_config_script)
  29. # Function to manage lifecycle configuration
  30. def manage_lifecycle_config(lifecycle_config_script):
  31. content = base64.b64encode(lifecycle_config_script.encode('utf-8')).decode('utf-8')
  32. try:
  33. # Create lifecycle configuration if not found
  34. sagemaker.create_notebook_instance_lifecycle_config(
  35. NotebookInstanceLifecycleConfigName=lifecycle_config_name,
  36. OnCreate=[{'Content': content}]
  37. )
  38. except sagemaker.exceptions.ClientError as e:
  39. print(e)
  40. # Try to describe the notebook instance to determine its status
  41. # Get the role with the specified name
  42. try:
  43. role = iam.get_role(RoleName='sagemaker_exec_role')
  44. sagemaker_exec_role = role['Role']['Arn']
  45. except iam.exceptions.NoSuchEntityException:
  46. print("The role 'sagemaker_exec_role' does not exist.")
  47. try:
  48. response = sagemaker.describe_notebook_instance(NotebookInstanceName=notebook_name)
  49. except sagemaker.exceptions.ClientError as e:
  50. print(e)
  51. if 'RecordNotFound' in str(e):
  52. manage_lifecycle_config(lifecycle_config_script)
  53. # Create a new SageMaker notebook instance if not found
  54. # Updated to 4xl by DWhite due to 12xl not being available. 7/18/2024
  55. sagemaker.create_notebook_instance(
  56. NotebookInstanceName=notebook_name,
  57. InstanceType='ml.g5.4xlarge',
  58. RoleArn=sagemaker_exec_role,
  59. LifecycleConfigName=lifecycle_config_name,
  60. VolumeSizeInGB=30
  61. )
  62. else:
  63. raise
  64. return {
  65. 'statusCode': 200,
  66. 'body': 'Notebook instance setup and lifecycle configuration applied.'
  67. }

"get training job status"的代码如下:

  1. import boto3
  2. import json
  3. import os
  4. s3 = boto3.client('s3')
  5. sagemaker = boto3.client('sagemaker')
  6. s3_bucket = os.environ['s3_bucket']
  7. def lambda_handler(event, context):
  8. print(event)
  9. notebook_name = event["notebook_name"]
  10. notebook_status = "NotAvailable"
  11. training_job_status = 'NotAvailable'
  12. check_status = 'NotAvailable'
  13. # Try to describe the notebook instance to determine its status
  14. try:
  15. response = sagemaker.describe_notebook_instance(NotebookInstanceName=notebook_name)
  16. notebook_status = response['NotebookInstanceStatus']
  17. if notebook_status == 'InService':
  18. find_artifact = s3.list_objects_v2(
  19. Bucket=s3_bucket,
  20. Prefix='output/lora_model.tar.gz'
  21. )
  22. artifact_location = find_artifact.get('Contents',[])
  23. if not artifact_location:
  24. training_job_status = 'Creating'
  25. check_status = 'Creating'
  26. else:
  27. if 'output/lora_model.tar.gz' in str(artifact_location):
  28. training_job_status = 'Completed'
  29. check_status = 'InService'
  30. elif notebook_status == 'Failed':
  31. check_status = 'Failed'
  32. elif notebook_status == 'NotAvailable':
  33. check_status = 'NotAvailable'
  34. else:
  35. check_status = 'Pending'
  36. print(f"Notebook Status: {notebook_status}")
  37. print(f"Model on s3: {training_job_status}")
  38. print(f"Check status: {check_status}")
  39. except sagemaker.exceptions.ClientError as e:
  40. print(e)
  41. return {
  42. 'statusCode': 200,
  43. 'input': notebook_name,
  44. 'trainningstatus': check_status
  45. }

17. 在Step Function工作流全部任务结束后,我们进入到SageMaker服务中,创建一个Jupyter Notebook并打开。

18. 我们创建一个新的Jupyter Notebook文件,并复制Fine-tuning微调代码。我们节选了部分微调代码段,主要是利用PEFT和Lora微调Dolly大语言模型。

  1. EPOCHS = 10
  2. LEARNING_RATE = 1e-4
  3. MODEL_SAVE_FOLDER_NAME = "dolly-3b-lora"
  4. training_args = TrainingArguments(
  5. output_dir=MODEL_SAVE_FOLDER_NAME,
  6. fp16=True,
  7. per_device_train_batch_size=1,
  8. per_device_eval_batch_size=1,
  9. learning_rate=LEARNING_RATE,
  10. num_train_epochs=EPOCHS,
  11. logging_strategy="steps",
  12. logging_steps=100,
  13. evaluation_strategy="steps",
  14. eval_steps=100,
  15. save_strategy="steps",
  16. save_steps=20000,
  17. save_total_limit=10,
  18. )
  19. trainer = Trainer(
  20. model=model,
  21. tokenizer=tokenizer,
  22. args=training_args,
  23. train_dataset=split_dataset['train'],
  24. eval_dataset=split_dataset["test"],
  25. data_collator=data_collator,
  26. )
  27. model.config.use_cache = False # silence the warnings. Please re-enable for inference!
  28. trainer.train()

19. 我们也需要创建一个SageMaker Lifecycle configurationsj脚本,用于在Step Function自动化模型微调中触发命令开启微调,启动脚本如下。

  1. #!/bin/bash
  2. set -e
  3. cd /home/ec2-user/SageMaker/
  4. aws s3 cp s3://automate-fine-tuning-e91ee010/notebook_lifecycle/training_scripts.zip .
  5. unzip training_scripts.zip
  6. echo "Running training job..."
  7. source /home/ec2-user/anaconda3/bin/activate pytorch_p310
  8. chmod +x /home/ec2-user/SageMaker/converter.sh
  9. chown ec2-user:ec2-user /home/ec2-user/SageMaker/converter.sh
  10. nohup /home/ec2-user/SageMaker/converter.sh >> /home/ec2-user/SageMaker/nohup.out 2>&1 &

20. 最后我们进入到SageMaker的Endpoint工程中,就可以看到部署成功的AI大模型API端点URL了。

以上就是在亚马逊云科技上利用亚马逊云科技CICD服务CodePipeline和Step Function工作流,自动化AI大语言模型的创建、微调、部署的全部步骤。欢迎大家未来与我一起,未来获取更多国际前沿的生成式AI开发方案。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/987011
推荐阅读
相关标签
  

闽ICP备14008679号