文章目錄
- aws(學習筆記第三十三課) 深入使用cdk
- 學習內容:
- 1. 使用`aws athena`
- 1.1 什么是`aws athena`
- 1.2 什么是`aws glue`
- 1.2 為什么`aws athena`和`aws glue`一起使用
- 2. 開始練習`aws athena`
- 2.1 代碼鏈接
- 2.2 整體架構
- 2.3 代碼解析
- 2.3.1 創建測試數據的`S3 bucket`
- 2.3.2 創建保存查詢結果的`S3 bucket`
- 2.3.3 將示例的程序`json`數據文件同期到`S3 bucket`
- 2.3.4 創建`aws glue`的`cfnDatabase`
- 2.3.5 創建`aws glue crawler`需要的權限`Role`
- 2.3.6 創建`aws glue crawler`
- 2.3.7 創建`aws athena work group`
- 2.3.8 創建`aws athena query`
- 2.3.9 調整執行順序
- 2.4 開始執行`aws cdk for athena`
- 2.4.1 執行部署
- 2.4.2 執行`crawler`爬蟲
- 2.4.3 查看`aws athena`的`queries`
- 2.4.4 執行`aws athena`的`queries`
- 2.4.5 查看`aws athena`的`queries`執行結果
aws(學習筆記第三十三課) 深入使用cdk
- 使用
cdk
生成athena
以及aws glue crawler
學習內容:
- 使用
aws athena
+aws glue crawler
1. 使用aws athena
1.1 什么是aws athena
aws athena
是aws
提供的數據分析service
,可以使用SQL
語言對S3
上保存的數據進行分析。
managed service
,所以不需要維護。- 基于
OpenSource
的框架構筑 - 基于處理的數據量進行收費
- 對數據提供加密功能
注意 和RDB
不能進行JOIN操作,所以只能提供對csv
和json
進行數據查詢
1.2 什么是aws glue
aws glue
是aws
提供的managed ETL service
。能夠簡單的進行分析數據的準備和load
。table
和schema
關聯的metadata
能夠作為aws glue catalog data
進行保存。
1.2 為什么aws athena
和aws glue
一起使用
aws athena
結合aws glue
能夠將aws glue
作成的database
或者schema
,使用aws athena
進行查詢。
2. 開始練習aws athena
2.1 代碼鏈接
代碼鏈接aws-cdk-examples
2.2 整體架構
2.3 代碼解析
2.3.1 創建測試數據的S3 bucket
# creating the buckets where the logs will be placedlogs_bucket = s3.Bucket(self, 'logs-bucket',bucket_name=f"auditing-logs-{self.account}",removal_policy=RemovalPolicy.DESTROY,auto_delete_objects=True)
2.3.2 創建保存查詢結果的S3 bucket
# creating the bucket where the queries output will be placedquery_output_bucket = s3.Bucket(self, 'query-output-bucket',bucket_name=f"auditing-analysis-output-{self.account}",removal_policy=RemovalPolicy.DESTROY,auto_delete_objects=True)
2.3.3 將示例的程序json
數據文件同期到S3 bucket
# uploading the log files to the bucket as exampless3_deployment.BucketDeployment(self, 'sample-files',destination_bucket=logs_bucket,sources=[s3_deployment.Source.asset('./log-samples')],content_type='application/json',retain_on_delete=False
)
2.3.4 創建aws glue
的cfnDatabase
# creating the Glue Database to serve as our Data Catalogglue_database = glue.CfnDatabase(self, 'log-database',catalog_id=self.account,database_input=glue.CfnDatabase.DatabaseInputProperty(name="log-database"))
2.3.5 創建aws glue crawler
需要的權限Role
# creating the permissions for the crawler to enrich our Data Catalogglue_crawler_role = iam.Role(self, 'glue-crawler-role',role_name='glue-crawler-role',assumed_by=iam.ServicePrincipal(service='glue.amazonaws.com'),managed_policies=[# Remember to apply the Least Privilege Principle and provide only the permissions needed to the crawleriam.ManagedPolicy.from_managed_policy_arn(self, 'AmazonS3FullAccess','arn:aws:iam::aws:policy/AmazonS3FullAccess'),iam.ManagedPolicy.from_managed_policy_arn(self, 'AWSGlueServiceRole','arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole')])
這里需要兩個policy
,AmazonS3FullAccess
和AWSGlueServiceRole
。
2.3.6 創建aws glue crawler
# creating the Glue Crawler that will automatically populate our Data Catalog. Don't forget to run the crawler# as soon as the deployment finishes, otherwise our Data Catalog will be empty. Check out the README for more instructionsglue.CfnCrawler(self, 'logs-crawler',name='logs-crawler',database_name=glue_database.database_input.name,role=glue_crawler_role.role_name,targets={"s3Targets": [{"path": f's3://{logs_bucket.bucket_name}/products'},{"path": f's3://{logs_bucket.bucket_name}/users'}]})
這里,aws glue crawler
執行ETL Extract Transform Load
,將S3 bucket
里面的products
和users
的數據文件,經過轉換將json
數據文件load
到glue database
。
2.3.7 創建aws athena work group
# creating the Athena Workgroup to store our querieswork_group = athena.CfnWorkGroup(self, 'log-auditing-work-group',name='log-auditing',work_group_configuration=athena.CfnWorkGroup.WorkGroupConfigurationProperty(result_configuration=athena.CfnWorkGroup.ResultConfigurationProperty(output_location=f"s3://{query_output_bucket.bucket_name}",encryption_configuration=athena.CfnWorkGroup.EncryptionConfigurationProperty(encryption_option="SSE_S3"))))
aws athena
通過work group
進行管理,創建了workgroup
之后,在里面繼續創建query
。
2.3.8 創建aws athena query
# creating an example query to fetch all product events by dateproduct_events_by_date_query = athena.CfnNamedQuery(self, 'product-events-by-date-query',database=glue_database.database_input.name,work_group=work_group.name,name="product-events-by-date",query_string="SELECT * FROM \"log-database\".\"products\" WHERE \"date\" = '2024-01-19'")# creating an example query to fetch all user events by dateuser_events_by_date_query = athena.CfnNamedQuery(self, 'user-events-by-date-query',database=glue_database.database_input.name,work_group=work_group.name,name="user-events-by-date",query_string="SELECT * FROM \"log-database\".\"users\" WHERE \"date\" = '2024-01-22'")# creating an example query to fetch all events by the user IDall_events_by_userid_query = athena.CfnNamedQuery(self, 'all-events-by-userId-query',database=glue_database.database_input.name,work_group=work_group.name,name="all-events-by-userId",query_string="SELECT * FROM (\n"" SELECT transactionid, userid, username, domain, datetime, action FROM \"log-database\".\"products\" \n""UNION \n"" SELECT transactionid, userid, username, domain, datetime, action FROM \"log-database\".\"users\" \n"") WHERE \"userid\" = '123'")
2.3.9 調整執行順序
# adjusting the resource creation order
product_events_by_date_query.add_dependency(work_group)
user_events_by_date_query.add_dependency(work_group)
all_events_by_userid_query.add_dependency(work_group)
2.4 開始執行aws cdk for athena
2.4.1 執行部署
python -m venv .venv
source .venv/Scripts/activate # windows platform
pip install -r requirements.txt
cdk synth
cdk --require-approval never deploy
2.4.2 執行crawler
爬蟲
默認crawler
是不啟動的,需要run
起來。
正常執行完畢。數據都由S3 bucket
的json
文件,經過ETL
,進入到aws glue database
里面了。
2.4.3 查看aws athena
的queries
AWS Athena
> 查詢編輯器 > 已保存的查詢 > 工作組 > log auditing