2 數據清洗、轉換
此實驗使用S3作為數據源
ETL:
E ?? extract ?? ???? 輸入
T ?? transform ??? 轉換
L ?? load ??????????? 輸出
大綱
- 2 數據清洗、轉換
- 2.1 架構圖
- 2.2 數據清洗
- 2.3 編輯腳本
- 2.3.1 連接數據源(s3)
- 2.3.2. 數據結構轉換
- 2.3.2 數據結構拆分、定義
- 2.3.3 清洗后的數據寫入新s3
- 2.3.4 運行作業
- 2.4 數據分區
- 2.4.1 編輯腳本
- 2.4.2 運行腳本
- 2.5 總結
2.1 架構圖
2.2 數據清洗
此步會將S3中的原始數據清洗成我們想要的自定義結構的數據。之后,我們可通過APIGateway+Lambda+Athena來實現一個無服務器的數據分析服務。
步驟 | 圖例 |
---|---|
1、入口 | ![]() |
2、創建Job(s3作為數據源,則Type選擇Spark,若為Kinesis等,選擇Stream Spark) | ![]() |
3、IAM角色需要有s3與Glue的權限 | ![]() |
4、選擇s3腳本位置,若已經完成腳本的編寫工作,則可以選擇第二項或第三項,若無則Glue會提供默認腳本 | ![]() |
5、安全配置參數 | ![]() |
6、數據源() | ![]() |
7、數據目標(我們會將清洗后的數據存儲到新的s3桶) | ![]() |
8、設計架構(在本案例中,我們會自定義腳本。所以不再在此處設計架構)(此處設計后,腳本會自動生成相關代碼) | ![]() |
9、保存 | ![]() |
2.3 編輯腳本
腳本中的args參數的鍵值需要從Job的安全配置參數中定義
2.3.1 連接數據源(s3)
#數據源
datasource = glueContext.create_dynamic_frame.from_catalog(database = args['db_name'], table_name = tableName, transformation_ctx = "datasource")
2.3.2. 數據結構轉換
mapped_readings = ApplyMapping.apply(frame = datasource, mappings = [("lclid", "string", "meter_id", "string"), \("datetime", "string", "reading_time", "string"), \("KWH/hh (per half hour)", "double", "reading_value", "double")], \transformation_ctx = "mapped_readings")
2.3.2 數據結構拆分、定義
mapped_readings_df = DynamicFrame.toDF(mapped_readings)mapped_readings_df = mapped_readings_df.withColumn("obis_code", lit(""))
mapped_readings_df = mapped_readings_df.withColumn("reading_type", lit("INT"))reading_time = to_timestamp(col("reading_time"), "yyyy-MM-dd HH:mm:ss")
mapped_readings_df = mapped_readings_df \.withColumn("week_of_year", weekofyear(reading_time)) \.withColumn("date_str", regexp_replace(col("reading_time").substr(1,10), "-", "")) \.withColumn("day_of_month", dayofmonth(reading_time)) \.withColumn("month", month(reading_time)) \.withColumn("year", year(reading_time)) \.withColumn("hour", hour(reading_time)) \.withColumn("minute", minute(reading_time)) \.withColumn("reading_date_time", reading_time) \.drop("reading_time")
2.3.3 清洗后的數據寫入新s3
# write data to S3
filteredMeterReads = DynamicFrame.fromDF(mapped_readings_df, glueContext, "filteredMeterReads")s3_clean_path = "s3://" + args['clean_data_bucket']glueContext.write_dynamic_frame.from_options(frame = filteredMeterReads,connection_type = "s3",connection_options = {"path": s3_clean_path},format = "parquet",transformation_ctx = "s3CleanDatasink")
2.3.4 運行作業
????執行成功后,狀態將變為"SUCCESS",失敗將會給出失敗信息,可在CloudWatch 中查看詳情
清洗后的數據保存到了s3
數據清洗完畢后,可通過上一篇中的爬網程序步驟,將清洗后的數據的結構創建表到數據目錄中,
此時我們可以使用Athena對清洗后的數據進行分析。
2.4 數據分區
接下來我們對數據進行分區處理(此處只提供了按天分區)
重新進行數據清洗中的創建Job操作后,重寫腳本
2.4.1 編輯腳本
連接數據源。表為上一步最后重新爬取生成的新表。
cleanedMeterDataSource = glueContext.create_dynamic_frame.from_catalog(database = args['db_name'], table_name = tableName, transformation_ctx = "cleanedMeterDataSource")
根據type與data_str分區
business_zone_bucket_path_daily = "s3://{}/daily".format(args['business_zone_bucket'])businessZone = glueContext.write_dynamic_frame.from_options(frame = cleanedMeterDataSource, \connection_type = "s3", \connection_options = {"path": business_zone_bucket_path_daily, "partitionKeys": ["reading_type", "date_str"]},\format = "parquet", \transformation_ctx = "businessZone")
2.4.2 運行腳本
分區后的數據結果:
再次創建、運行爬網程序,將會在數據目錄中生成新的分區表。
2.5 總結
到這一步,我們已經使用Glue ETL對s3桶中的數據進行了清洗、分區操作。在進行上篇中的Athena操作后,我們已經可以通過Athena直接查詢到清洗、分區后的數據集了。
接下來,我們會通過使用APIGateway+Lambda+Athena來構建一個無服務器的數據查詢分析服務。