基于AWS Serverless的Glue服務進行ETL(提取、轉換和加載)數據分析(二)——數據清洗、轉換

2 數據清洗、轉換

此實驗使用S3作為數據源

ETL:

E ?? extract ?? ???? 輸入
T ?? transform ??? 轉換
L ?? load ??????????? 輸出

大綱

  • 2 數據清洗、轉換
    • 2.1 架構圖
    • 2.2 數據清洗
    • 2.3 編輯腳本
      • 2.3.1 連接數據源(s3)
      • 2.3.2. 數據結構轉換
      • 2.3.2 數據結構拆分、定義
      • 2.3.3 清洗后的數據寫入新s3
      • 2.3.4 運行作業
    • 2.4 數據分區
      • 2.4.1 編輯腳本
      • 2.4.2 運行腳本
    • 2.5 總結

2.1 架構圖

在這里插入圖片描述

2.2 數據清洗

此步會將S3中的原始數據清洗成我們想要的自定義結構的數據。之后,我們可通過APIGateway+Lambda+Athena來實現一個無服務器的數據分析服務。

步驟圖例
1、入口在這里插入圖片描述
2、創建Job(s3作為數據源,則Type選擇Spark,若為Kinesis等,選擇Stream Spark)在這里插入圖片描述
3、IAM角色需要有s3與Glue的權限在這里插入圖片描述
4、選擇s3腳本位置,若已經完成腳本的編寫工作,則可以選擇第二項或第三項,若無則Glue會提供默認腳本在這里插入圖片描述
5、安全配置參數在這里插入圖片描述建議:添加參數–enable-auto-scaling為true。每次在我們執行Job任務時,會根據運行 ETL 任務的數據處理單元(DPU)的個數來分配動態IP,在我們子網的動態IP數低于DPU數時,Job將會執行失敗。此參數將會動態分配IP。
6、數據源()在這里插入圖片描述
7、數據目標(我們會將清洗后的數據存儲到新的s3桶)在這里插入圖片描述
8、設計架構(在本案例中,我們會自定義腳本。所以不再在此處設計架構)(此處設計后,腳本會自動生成相關代碼)在這里插入圖片描述
9、保存在這里插入圖片描述

2.3 編輯腳本

腳本中的args參數的鍵值需要從Job的安全配置參數中定義

2.3.1 連接數據源(s3)

#數據源
datasource = glueContext.create_dynamic_frame.from_catalog(database = args['db_name'], table_name = tableName, transformation_ctx = "datasource")

2.3.2. 數據結構轉換

mapped_readings = ApplyMapping.apply(frame = datasource, mappings = [("lclid", "string", "meter_id", "string"), \("datetime", "string", "reading_time", "string"), \("KWH/hh (per half hour)", "double", "reading_value", "double")], \transformation_ctx = "mapped_readings")

2.3.2 數據結構拆分、定義

mapped_readings_df = DynamicFrame.toDF(mapped_readings)mapped_readings_df = mapped_readings_df.withColumn("obis_code", lit(""))
mapped_readings_df = mapped_readings_df.withColumn("reading_type", lit("INT"))reading_time = to_timestamp(col("reading_time"), "yyyy-MM-dd HH:mm:ss")
mapped_readings_df = mapped_readings_df \.withColumn("week_of_year", weekofyear(reading_time)) \.withColumn("date_str", regexp_replace(col("reading_time").substr(1,10), "-", "")) \.withColumn("day_of_month", dayofmonth(reading_time)) \.withColumn("month", month(reading_time)) \.withColumn("year", year(reading_time)) \.withColumn("hour", hour(reading_time)) \.withColumn("minute", minute(reading_time)) \.withColumn("reading_date_time", reading_time) \.drop("reading_time")

2.3.3 清洗后的數據寫入新s3

# write data to S3
filteredMeterReads = DynamicFrame.fromDF(mapped_readings_df, glueContext, "filteredMeterReads")s3_clean_path = "s3://" + args['clean_data_bucket']glueContext.write_dynamic_frame.from_options(frame = filteredMeterReads,connection_type = "s3",connection_options = {"path": s3_clean_path},format = "parquet",transformation_ctx = "s3CleanDatasink")

2.3.4 運行作業

????執行成功后,狀態將變為"SUCCESS",失敗將會給出失敗信息,可在CloudWatch 中查看詳情

在這里插入圖片描述

在這里插入圖片描述


清洗后的數據保存到了s3


在這里插入圖片描述
數據清洗完畢后,可通過上一篇中的爬網程序步驟,將清洗后的數據的結構創建表到數據目錄中,
此時我們可以使用Athena對清洗后的數據進行分析。

2.4 數據分區

接下來我們對數據進行分區處理(此處只提供了按天分區
重新進行數據清洗中的創建Job操作后,重寫腳本

2.4.1 編輯腳本

連接數據源。表為上一步最后重新爬取生成的新表。

cleanedMeterDataSource = glueContext.create_dynamic_frame.from_catalog(database = args['db_name'], table_name = tableName, transformation_ctx = "cleanedMeterDataSource")

根據type與data_str分區

business_zone_bucket_path_daily = "s3://{}/daily".format(args['business_zone_bucket'])businessZone = glueContext.write_dynamic_frame.from_options(frame = cleanedMeterDataSource, \connection_type = "s3", \connection_options = {"path": business_zone_bucket_path_daily, "partitionKeys": ["reading_type", "date_str"]},\format = "parquet", \transformation_ctx = "businessZone")

2.4.2 運行腳本

分區后的數據結果:
在這里插入圖片描述
再次創建、運行爬網程序,將會在數據目錄中生成新的分區表。

2.5 總結

到這一步,我們已經使用Glue ETL對s3桶中的數據進行了清洗、分區操作。在進行上篇中的Athena操作后,我們已經可以通過Athena直接查詢到清洗、分區后的數據集了。
接下來,我們會通過使用APIGateway+Lambda+Athena來構建一個無服務器的數據查詢分析服務。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/208093.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/208093.shtml
英文地址,請注明出處:http://en.pswp.cn/news/208093.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

FFmpeg開發筆記(六)如何訪問Github下載FFmpeg源碼

學習FFmpeg的時候,經常要到GitHub下載各種開源代碼,比如FFmpeg的源碼頁面位于https://github.com/FFmpeg/FFmpeg。然而國內訪問GitHub很不穩定,經常打不開該網站,比如在命令行執行下面的ping命令。 ping github.com 上面的ping結…

初識Linux:權限(1)

目錄 提示:以下指令均在Xshell 7 中進行 Linux 的權限 內核: 查看操作系統版本 查看cpu信息 查看內存信息 外部程序: 用戶: 普通用戶變為超級用戶: su 和 su-的區別: root用戶變成普通用戶&#…

KALI LINUX信息收集

預計更新 第一章 入門 1.1 什么是Kali Linux? 1.2 安裝Kali Linux 1.3 Kali Linux桌面環境介紹 1.4 基本命令和工具 第二章 信息收集 1.1 網絡掃描 1.2 端口掃描 1.3 漏洞掃描 1.4 社交工程學 第三章 攻擊和滲透測試 1.1 密碼破解 1.2 暴力破解 1.3 漏洞利用 1.4 …

什么是SSL證書?

當我們網上購物或銀行業務時,為了安全起見,我們希望看到網站的地址欄上有“HTTPS”和安全鎖圖標。但是這個“HTTPS”和鎖定圖標實際上意味著什么?要回答這些問題,我們需要了解 HTTPS、SSL 協議和 SSL 證書。 關于HTTPS、SSL和SSL…

風控反欺詐安全學習路標

1. 金融和支付領域知識 - 了解金融和支付領域的基本概念、業務流程和風險特點。 - 學習金融機構的監管要求和合規措施,如KYC(了解你的客戶)和AML(反洗錢)。 2. 數據分析和挖掘技術 - 學習數據分析和數據挖掘的基本原理…

fastadmin獲取關聯表數據select渲染

php public function piliangadd(){if (false === $this->request->isPost()) {$fenlei_list = Db::name(fenlei)->order(weigh desc)->select();$this</

每天五分鐘計算機視覺:稠密連接網絡(DenseNet)

本文重點 在前面的課程中我們學習了殘差網絡ResNet,而DenseNet可以看成是ResNet的后續,我們看一下圖就可以看出二者的主要區別了。 特點 DenseNet是一種卷積神經網絡,它的特點是每一層都直接連接到所有后續層。這意味著,每一層都接收來自前一層的輸出,并將其作為輸入傳遞…

Flyway——Oracle創建前綴索引

文章目錄 前言創建一般索引的語法前綴索引 前言 索引有助于提升數據庫表的查詢速率&#xff0c;極大的縮減查詢的時間。但索引的創建需要考慮的因素很多&#xff0c;并非索引越多越好&#xff01; 創建一般索引的語法 oracle創建一般的常見索引&#xff0c;語法如下所示&…

n個人排成一圈,數數123離隊

#include<stdio.h> int main() { int i, n100,k0,j0,a[1000]{0};//k&#xff1a;數數123的變量&#xff0c;j記錄離開隊列人數的變量scanf("%d",&n);for(int ii0; ii<n; ii){ for( i0; i<n; i){// printf("wei%d ",i);if((a[i]0)&&…

掌握Line多開技術,打造私人專屬空間

掌握Line多開技術&#xff0c;打造私人專屬空間 在現代社交網絡的時代&#xff0c;人們經常需要同時處理多個社交賬號&#xff0c;例如工作、家庭、朋友等不同領域的社交關系。而對于Line這樣的主流社交應用來說&#xff0c;多開技術可以讓用戶更便捷地管理多個賬號&#xff0…

數據結構線性表-棧和隊列的實現

1. 棧(Stack) 1.1 概念 棧&#xff1a;一種特殊的線性表&#xff0c;其只允許在固定的一端進行插入和刪除元素操作。進行數據插入和刪除操作的一端稱為棧 頂&#xff0c;另一端稱為棧底。棧中的數據元素遵守后進先出LIFO&#xff08;Last In First Out&#xff09;的原則。 …

Vue學習計劃-Vue2--Vue核心(三)methods和computed

Vue 1. 事件 v-on 基礎 使用 v-on:xxx或者xxx綁定事件&#xff0c;其中xxx是事件名 事件的回調需要配置在methods對象中&#xff0c;最終會在vm上 methods中配置函數&#xff0c;不要用箭頭函數&#xff0c;否則this就不是vm了 methods中配置函數&#xff0c;都是被Vue管…

Seata使用

本文以seata-server-1.5.2&#xff0c;以配置中心、注冊中心使用Nacos&#xff0c;store.modedb&#xff08;mysql&#xff09;為例進行操作。 一、Seata Server端 1、下載seata server 鏈接: http://seata.io/zh-cn/blog/download.html下載壓縮包&#xff0c;解壓至非中文目錄…

Java技術棧 —— 微服務框架Spring Cloud —— Ruoyi-Cloud 學習(一)

Ruoyi-cloud 項目學習 一、項目環境搭建與啟動1.1 nacos安裝部署1.1.1 nacos安裝、啟動1.1.2 nacos部署 1.2 seata安裝部署1.3 后端部署與運行1.3.1 ruoyi-modules-file模塊運行報錯 1.4 nginx安裝、部署、配置與啟動1.5 redis安裝與部署1.6 前段框架知識1.7 項目啟動1.8 參考 …

實用方法 | 搭建真正滿足用戶需求的在線幫助中心

隨著互聯網的普及和信息技術的快速發展&#xff0c;客戶服務和支持變得越來越重要。為了提高客戶滿意度和維持良好的品牌形象&#xff0c;越來越多企業都開始搭建自己的在線幫助中心。 不知從何下手&#xff1f;細想一下&#xff0c;搭建在線幫助中心主要就是為了解決用戶的問…

根據java類名找出當前是哪個Excel中的sheet

pom.xml <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4.0.0 …

shell_81.Linux在命令行中創建使用函數

在命令行中使用函數 在命令行中創建函數 兩種方法 單行方式來定義函數&#xff1a; $ function divem { echo $[ $1 / $2 ]; } $ divem 100 5 20 $ 當你在命令行中定義函數時&#xff0c;必須在每個命令后面加個分號&#xff0c;這樣 shell 就能知道哪里是命令的起止了&am…

反射實現tomcat

獲取類信息的方法 1.通過類對象 x.getClass() 2.通過class.forname方法 Class.forname(className);這里className是存儲類名的字符串 3.通過類名.class 類名.class 通過類名創建對象 類名.newInstance&#xff08;&#xff09;&#xff1b; 反射可以看到類的一切信息&#xff1…

C語言聯合和枚舉講解

目錄 聯合體的大小 聯合體如何省空間 巧用聯合體 聯合判斷大小端&#xff08;驚為天人&#xff0c;大佬寫的&#xff0c;我借鑒&#xff09; 枚舉 枚舉類型的使用 首先我們先看一下菜鳥教程中的對C語言聯合體的說明 聯合體的大小 #include <stdio.h> union u {char…

Proteus仿真--基于ADC0808設計的調溫報警器

本文介紹基于ADC0808實現的調溫報警器設計&#xff08;完整仿真源文件及代碼見文末鏈接&#xff09; 溫度調節使用滑動變阻器模擬實現&#xff0c;ADC0808采集信號并輸出在LCD上面顯示&#xff0c;報警系統是LED燈和蜂鳴器實現聲光電報警 仿真圖如下 仿真運行視頻 Proteus仿真…