文章目錄
- aws(學習筆記第四十三課) s3_sns_sqs_lambda_chain
- 學習內容:
- 1. 整體架構
- 1.1 代碼鏈接
- 1.2 整體架構
- 1.3 測試代碼需要的修改
- 1.3.1 `unit test`代碼中引入`stack`的修改
- 1.3.2 `test_outputs_created`代碼中把錯誤的去掉
- 2. 代碼解析
- 2.1 生成`dead_letter_queue`死信隊列和`upload_queue`文件上傳隊列
- 2.3 創建`sqs topic`并對其進行訂閱`subscribe`
- 2.4 創建`S3 bucket`
- 2.5 為`S3 bucket`綁定`SNS Topic`
- 2.6 創建`lambda`函數
- 2.7 對`upload_queue`綁定`lambda`函數
- 2.8 輸出`cdk`的`output`
- 2.8.1 `UploadFileToS3Example`
- 2.8.2 `UploadSqsQueueUrl`
- 2.8.3 `LambdaFunctionName`
- 2.8.4 `LambdaFunctionLogGroupName`
- 3 執行`cdk`
- 3.1 執行`cdk`
- 3.2 執行`cdk`的結果
- 3.3 `output`檢查
- 3.4 向`S3 bucket`上傳`csv`文件
- 4 使用`unit test`進行`aws cdk`的測試
- 4.1 `unit test`代碼
- 4.1.1 創建`mock stack`
- 4.1.2 創建`test`
- 4.2 進行`unit test`
aws(學習筆記第四十三課) s3_sns_sqs_lambda_chain
- 使用
lambda
監視S3 upload event
,并使用unit test
進行aws cdk
的測試
學習內容:
- 使用
dead_letter_queue
死信隊列 - 使用
sqs
的隊列 - 使用
unit test
進行aws cdk
的測試
1. 整體架構
1.1 代碼鏈接
代碼連接(s3_sns_sqs_lambda_chain)
1.2 整體架構
1.3 測試代碼需要的修改
1.3.1 unit test
代碼中引入stack
的修改
import aws_cdk as cdk
import pytestfrom s3_sns_sqs_lambda_chain.s3_sns_sqs_lambda_chain_stack import S3SnsSqsLambdaChainStack
s3_sns_sqs_lambda_chain
這里如果不加上,那么會報錯。
1.3.2 test_outputs_created
代碼中把錯誤的去掉
def test_outputs_created(template):"""Test for CloudFormation Outputs- Upload File To S3 Instructions- Queue Url- Lambda Function Name- Lambda Function Log Group Name"""# template.hastemplate.has_output("UploadFileToS3Example", {})template.has_output("UploadSqsQueueUrl", {})template.has_output("LambdaFunctionName", {})template.has_output("LambdaFunctionLogGroupName", {})
template.has
這里要去掉,如果不去掉就會執行錯誤。
2. 代碼解析
2.1 生成dead_letter_queue
死信隊列和upload_queue
文件上傳隊列
# Note: A dead-letter queue is optional but it helps capture any failed messagesdlq = sqs.Queue(self,id="dead_letter_queue_id",retention_period=Duration.days(7))dead_letter_queue = sqs.DeadLetterQueue(max_receive_count=1,queue=dlq)upload_queue = sqs.Queue(self,id="sample_queue_id",dead_letter_queue=dead_letter_queue,visibility_timeout = Duration.seconds(LAMBDA_TIMEOUT * 6))
2.3 創建sqs topic
并對其進行訂閱subscribe
sqs_subscription = sns_subs.SqsSubscription(upload_queue,raw_message_delivery=True)upload_event_topic = sns.Topic(self,id="sample_sns_topic_id")# This binds the SNS Topic to the SQS Queueupload_event_topic.add_subscription(sqs_subscription)
2.4 創建S3 bucket
# Note: Lifecycle Rules are optional but are included here to keep costs# low by cleaning up old files or moving them to lower cost storage optionss3_bucket = s3.Bucket(self,id="sample_bucket_id",block_public_access=s3.BlockPublicAccess.BLOCK_ALL,versioned=True,lifecycle_rules=[s3.LifecycleRule(enabled=True,expiration=Duration.days(365),transitions=[s3.Transition(storage_class=s3.StorageClass.INFREQUENT_ACCESS,transition_after=Duration.days(30)),s3.Transition(storage_class=s3.StorageClass.GLACIER,transition_after=Duration.days(90)),])])
這里考慮成本,所以30days
之后s3.StorageClass.INFREQUENT_ACCESS
,所以90days
之后s3.StorageClass.GLACIER
。
2.5 為S3 bucket
綁定SNS Topic
# Note: If you don't specify a filter all uploads will trigger an event.# Also, modifying the event type will handle other object operations# This binds the S3 bucket to the SNS Topics3_bucket.add_event_notification(s3.EventType.OBJECT_CREATED_PUT,s3n.SnsDestination(upload_event_topic),s3.NotificationKeyFilter(prefix="uploads", suffix=".csv"))
2.6 創建lambda
函數
function = _lambda.Function(self, "lambda_function",runtime=_lambda.Runtime.PYTHON_3_9,handler="lambda_function.handler",code=_lambda.Code.from_asset(path=lambda_dir),timeout = Duration.seconds(LAMBDA_TIMEOUT))
lambda
函數的實現:
def handler(event, context):# output event to logsprint(event)return {'statusCode': 200,'body': event}
2.7 對upload_queue
綁定lambda
函數
# This binds the lambda to the SQS Queueinvoke_event_source = lambda_events.SqsEventSource(upload_queue)function.add_event_source(invoke_event_source)
2.8 輸出cdk
的output
2.8.1 UploadFileToS3Example
這里創建了一個CfnOutput
,用于輸出上傳本地文件到S3 bucket
上的示例command
。
CfnOutput(self,"UploadFileToS3Example",value="aws s3 cp <local-path-to-file> s3://{}/".format(s3_bucket.bucket_name),description="Upload a file to S3 (using AWS CLI) to trigger the SQS chain",)
2.8.2 UploadSqsQueueUrl
這里輸出一個Output
,用于表示upload_queue
的queue_url
。
CfnOutput(self,"UploadSqsQueueUrl",value=upload_queue.queue_url,description="Link to the SQS Queue triggered on S3 uploads",)
2.8.3 LambdaFunctionName
這里輸出一個Output
,用于表示lambda
的函數名。
CfnOutput(self,"LambdaFunctionName",value=function.function_name,)
2.8.4 LambdaFunctionLogGroupName
這里輸出一個Output
,用于表示lambda
函數的log
輸出的log group
。
CfnOutput(self,"LambdaFunctionLogGroupName",value=function.log_group.log_group_name,)
3 執行cdk
3.1 執行cdk
python -m venv .venv
source ./.venv/Scripts/activate
pip install -r requirement.txt
cdk --require-approval never deploy
3.2 執行cdk
的結果
3.3 output
檢查
3.4 向S3 bucket
上傳csv
文件
aws s3 cp ./uploads_bbb.csv s3://s3-sns-sqs-lambda-stack-samplebucketideae240bf-l2je2slhprg2/
這里,每次上傳csv
文件都會觸發lambda
函數調用。
4 使用unit test
進行aws cdk
的測試
4.1 unit test
代碼
4.1.1 創建mock stack
@pytest.fixture
def template():"""Generate a mock stack that embeds the orchestrator construct for testing"""script_dir = pathlib.Path(__file__).parentlambda_dir = str(script_dir.joinpath("..", "..", "lambda"))app = cdk.App()stack = S3SnsSqsLambdaChainStack(app,"s3-sns-sqs-lambda-stack",lambda_dir=lambda_dir)return cdk.assertions.Template.from_stack(stack)
4.1.2 創建test
測試topic
關聯
def test_sns_topic_created(template):"""Test for SNS Topic and Subscription: S3 Uploadvent Notification"""template.resource_count_is("AWS::SNS::Subscription", 1)template.resource_count_is("AWS::SNS::Topic", 1)template.resource_count_is("AWS::SNS::TopicPolicy", 1)
測試queue
關聯
def test_sqs_queue_created(template):"""Test for SQS Queue:- Queue to process uploads- Dead-letter Queue"""template.resource_count_is("AWS::SQS::Queue", 2)template.resource_count_is("AWS::SQS::QueuePolicy", 1)
測試lambda
關聯
def test_lambdas_created(template):"""Test for Lambdas created:- Sample Lambda- Bucket Notification Handler (automatically provisioned)- Log Retention (automatically provisioned)"""template.resource_count_is("AWS::Lambda::Function", 3)template.resource_count_is("AWS::Lambda::EventSourceMapping", 1)
測試output
關聯
def test_outputs_created(template):"""Test for CloudFormation Outputs- Upload File To S3 Instructions- Queue Url- Lambda Function Name- Lambda Function Log Group Name"""template.has_output("UploadFileToS3Example", {})template.has_output("UploadSqsQueueUrl", {})template.has_output("LambdaFunctionName", {})template.has_output("LambdaFunctionLogGroupName", {})
4.2 進行unit test
- 進行依賴包的安裝
pip install -r requirements-dev.txt
- 進行
unit test
pytest