物流實時數倉:數倉搭建(ODS)

系列文章目錄

物流實時數倉:采集通道搭建
物流實時數倉:數倉搭建


文章目錄

  • 系列文章目錄
  • 前言
  • 一、IDEA環境準備
    • 1.pom.xml
    • 2.目錄創建
  • 二、代碼編寫
    • 1.log4j.properties
    • 2.CreateEnvUtil.java
    • 3.KafkaUtil.java
    • 4.OdsApp.java
  • 三、代碼測試
  • 總結


前言

現在我們開始進行數倉的搭建,我們用Kafka來代替數倉的ods層。
基本流程為使用Flink從MySQL讀取數據然后寫入Kafka中


一、IDEA環境準備

1.pom.xml

寫入項目需要的配置

<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><java.version>1.8</java.version><flink.version>1.17.0</flink.version><hadoop.version>3.2.3</hadoop.version><flink-cdc.version>2.3.0</flink-cdc.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.68</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-reload4j</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink-cdc.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency></dependencies>

基本上項目需要的所有jar包都有了,不夠以后在加。

2.目錄創建

在這里插入圖片描述按照以上目錄結構進行目錄創建

二、代碼編寫

1.log4j.properties

log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

2.CreateEnvUtil.java

這個文件中有兩個方法
創建初始化Flink的env
Flink連接mysql的MySqlSource

package com.atguigu.tms.realtime.utils;import com.esotericsoftware.minlog.Log;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.connect.json.DecimalFormat;
import org.apache.kafka.connect.json.JsonConverterConfig;import java.util.HashMap;public class CreateEnvUtil {public static StreamExecutionEnvironment getStreamEnv(String[] args) {// 1.1 指定流處理環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.檢查點相關設置// 2.1 開啟檢查點env.enableCheckpointing(6000L, CheckpointingMode.EXACTLY_ONCE);// 2.2 設置檢查點的超時時間env.getCheckpointConfig().setCheckpointTimeout(120000L);// 2.3 設置job取消之后 檢查點是否保留env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 2.4 設置兩個檢查點之間的最小時間間隔env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000L);// 2.5 設置重啟策略env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1), Time.seconds(3)));// 2.6 設置狀態后端env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/tms/ck");// 2.7 設置操作hdfs用戶// 獲取命令行參數ParameterTool parameterTool = ParameterTool.fromArgs(args);String hdfsUserName = parameterTool.get("hadoop-user-name", "atguigu");System.setProperty("HADOOP_USER_NAME", hdfsUserName);return env;}public static MySqlSource<String> getMysqlSource(String option, String serverId, String[] args) {ParameterTool parameterTool = ParameterTool.fromArgs(args);String mysqlHostname = parameterTool.get("hadoop-user-name", "hadoop102");int mysqlPort = Integer.parseInt(parameterTool.get("mysql-port", "3306"));String mysqlUsername = parameterTool.get("mysql-username", "root");String mysqlPasswd = parameterTool.get("mysql-passwd", "000000");option = parameterTool.get("start-up-option", option);serverId = parameterTool.get("server-id", serverId);// 創建配置信息 Map 集合,將 Decimal 數據類型的解析格式配置 k-v 置于其中HashMap config = new HashMap<>();config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name());// 將前述 Map 集合中的配置信息傳遞給 JSON 解析 Schema,該 Schema 將用于 MysqlSource 的初始化JsonDebeziumDeserializationSchema jsonDebeziumDeserializationSchema =new JsonDebeziumDeserializationSchema(false, config);MySqlSourceBuilder<String> builder = MySqlSource.<String>builder().hostname(mysqlHostname).port(mysqlPort).username(mysqlUsername).password(mysqlPasswd).deserializer(jsonDebeziumDeserializationSchema);switch (option) {// 讀取實時數據case "dwd":String[] dwdTables = new String[]{"tms.order_info","tms.order_cargo","tms.transport_task","tms.order_org_bound"};return builder.databaseList("tms").tableList(dwdTables).startupOptions(StartupOptions.latest()).serverId(serverId).build();// 讀取維度數據case "realtime_dim":String[] realtimeDimTables = new String[]{"tms.user_info","tms.user_address","tms.base_complex","tms.base_dic","tms.base_region_info","tms.base_organ","tms.express_courier","tms.express_courier_complex","tms.employee_info","tms.line_base_shift","tms.line_base_info","tms.truck_driver","tms.truck_info","tms.truck_model","tms.truck_team"};return builder.databaseList("tms").tableList(realtimeDimTables).startupOptions(StartupOptions.initial()).serverId(serverId).build();}Log.error("不支持操作類型");return null;}
}

3.KafkaUtil.java

該文件中有一個方法,創建Flink連接Kafka需要的Sink

package com.atguigu.tms.realtime.utils;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.kafka.clients.producer.ProducerConfig;public class KafkaUtil {private static final String KAFKA_SERVER = "hadoop102:9092,hadoop103:9092,hadoop104:9092";public static KafkaSink<String> getKafkaSink(String topic, String transIdPrefix, String[] args) {// 將命令行參數對象封裝為 ParameterTool 類對象ParameterTool parameterTool = ParameterTool.fromArgs(args);// 提取命令行傳入的 key 為 topic 的配置信息,并將默認值指定為方法參數 topic// 當命令行沒有指定 topic 時,會采用默認值topic = parameterTool.get("topic", topic);// 如果命令行沒有指定主題名稱且默認值為 null 則拋出異常if (topic == null) {throw new IllegalArgumentException("主題名不可為空:命令行傳參為空且沒有默認值!");}// 獲取命令行傳入的 key 為 bootstrap-servers 的配置信息,并指定默認值String bootstrapServers = parameterTool.get("bootstrap-severs", KAFKA_SERVER);// 獲取命令行傳入的 key 為 transaction-timeout 的配置信息,并指定默認值String transactionTimeout = parameterTool.get("transaction-timeout", 15 * 60 * 1000 + "");return KafkaSink.<String>builder().setBootstrapServers(bootstrapServers).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(topic).setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).setTransactionalIdPrefix(transIdPrefix).setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeout).build();}public static KafkaSink<String> getKafkaSink(String topic, String[] args) {return getKafkaSink(topic, topic + "_trans", args);}
}

4.OdsApp.java

Ods層的app創建,負責讀取和寫入數據

package com.atguigu.tms.realtime.app.ods;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import com.esotericsoftware.minlog.Log;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;public class OdsApp {public static void main(String[] args) throws Exception {// 1.獲取流處理環境并指定檢查點StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);env.setParallelism(4);// 2 使用FlinkCDC從MySQL中讀取數據-事實數據String dwdOption = "dwd";String dwdServerId = "6030";String dwdsourceName = "ods_app_dwd_source";mysqlToKafka(dwdOption, dwdServerId, dwdsourceName, env, args);// 3 使用FlinkCDC從MySQL中讀取數據-維度數據String realtimeDimOption = "realtime_dim";String realtimeDimServerId = "6040";String realtimeDimsourceName = "ods_app_realtimeDim_source";mysqlToKafka(realtimeDimOption, realtimeDimServerId, realtimeDimsourceName, env, args);env.execute();}public static void mysqlToKafka(String option, String serverId, String sourceName, StreamExecutionEnvironment env, String[] args) {MySqlSource<String> MySqlSource = CreateEnvUtil.getMysqlSource(option, serverId, args);SingleOutputStreamOperator<String> dwdStrDS = env.fromSource(MySqlSource, WatermarkStrategy.noWatermarks(), sourceName).setParallelism(1).uid(option + sourceName);// 3 簡單ETLSingleOutputStreamOperator<String> processDS = dwdStrDS.process(new ProcessFunction<String, String>() {@Overridepublic void processElement(String jsonStr, ProcessFunction<String, String>.Context ctx, Collector<String> out) {try {JSONObject jsonObj = JSONObject.parseObject(jsonStr);if (jsonObj.getJSONObject("after") != null && !"d".equals(jsonObj.getString("op"))) {
//                                System.out.println(jsonObj);Long tsMs = jsonObj.getLong("ts_ms");jsonObj.put("ts", tsMs);jsonObj.remove("ts_ms");String jsonString = jsonObj.toJSONString();out.collect(jsonString);}} catch (Exception e) {Log.error("從Flink-CDC得到的數據不是一個標準的json格式",e);}}}).setParallelism(1);// 4 按照主鍵進行分組,避免出現亂序KeyedStream<String, String> keyedDS = processDS.keyBy((KeySelector<String, String>) jsonStr -> {JSONObject jsonObj = JSON.parseObject(jsonStr);return jsonObj.getJSONObject("after").getString("id");});//將數據寫入KafkakeyedDS.sinkTo(KafkaUtil.getKafkaSink("tms_ods", sourceName + "_transPre", args)).uid(option + "_ods_app_sink");}
}

三、代碼測試

在虛擬機啟動我們需要的組件,目前需要hadoop、zk、kafka和MySQL。
在這里插入圖片描述
先開一個消費者進行消費。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_ods

然后運行OdsApp.java
他會先讀取維度數據,因為維度數據需要全量更新之前的數據。
在這里插入圖片描述
當他消費結束后,我們運行jar包,獲取事實數據。

java -jar tms-mock-2023-01-06.jar 

如果能消費到新數據,代表通道沒問題,ODS層創建完成。

在這里插入圖片描述


總結

至此ODS搭建完成。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/166491.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/166491.shtml
英文地址,請注明出處:http://en.pswp.cn/news/166491.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

信息系統項目管理師-質量管理論文提綱

快速導航 1.信息系統項目管理師-項目整合管理 2.信息系統項目管理師-項目范圍管理 3.信息系統項目管理師-項目進度管理 4.信息系統項目管理師-項目成本管理 5.信息系統項目管理師-項目質量管理 6.信息系統項目管理師-項目資源管理 7.信息系統項目管理師-項目溝通管理 8.信息系…

當內容創作進入 AGI 時代,你也可以成為「神筆馬良」

我神筆馬良的童話故事我們或多或少都聽過&#xff0c;一支神筆在手&#xff0c;想畫什么就能畫出什么&#xff0c;栩栩如生。創造者的理解力、想象力和創作力都能通過這支神筆釋放。 近一年&#xff0c;隨著 AIGC 內容生產工具的快速出圈&#xff0c;有人把 Stable Diffusion、…

Sublime Text 4168最新代碼編輯

Sublime Text是一款功能強大的文本編輯器&#xff0c;具有以下主要功能&#xff1a; 支持多種編程語言的語法高亮和代碼自動完成功能&#xff0c;包括Python、JavaScript、HTML、CSS等。提供代碼片段&#xff08;Snippet&#xff09;功能&#xff0c;可以將常用的代碼片段保存…

JSP EL 算數運算符邏輯運算符

除了 empty 我們這邊還有一些基本的運算符 第一種 等等于 jsp代碼如下 <% page contentType"text/html; charsetUTF-8" pageEncoding"UTF-8" %> <%request.setCharacterEncoding("UTF-8");%> <!DOCTYPE html> <html> …

JVM-基礎

jdk7及以前&#xff1a; 通過-XX:PermSize 來設置永久代初始分配空間&#xff0c;默認值是20.75m -XX:MaxPermSize來設定永久代最大可分配空間&#xff0c;32位是64m&#xff0c;64位是82m jdk8及之后&#xff1a; 通過-XX:MetaspaceSize 來設置永久代初始分配空間&#xff…

概要設計文檔案例分享

1引言 1.1編寫目的 1.2項目背景 1.3參考資料 2系統總體設計 2.1整體架構 2.2整體功能架構 2.3整體技術架構 2.4運行環境設計 2.5設計目標 3系統功能模塊設計 3.1個人辦公 4性能設計 4.1響應時間 4.2并發用戶數 5接口設計 5.1接口設計原則 5.2接口實現方式 6運行設計 6.1運行模塊…

GZ031 應用軟件系統開發賽題第4套

2023年全國職業院校技能大賽 應用軟件系統開發賽項&#xff08;高職組&#xff09; 賽題第4套 工位號&#xff1a; 2023年4月 競賽說明 一、項目背景 黨的二十大報告指出&#xff0c;要加快建設制造強國、數字中國&#xff0c;推動制造業高端化、智能化、…

骨傳導耳機的優缺點都有哪些?骨傳導耳機值得入手嗎?

骨傳導耳機的優點還是很多的&#xff0c;相比于傳統耳機&#xff0c;骨傳導耳機要更值得入手&#xff01; 下面讓我們了解下骨傳導耳機的優缺點都有哪些&#xff1a; 一、優點 1、使用更安全 傳統的耳機&#xff0c;在使用時會聽不到外界的聲音&#xff0c;而骨傳導耳機通過…

“java.lang.IllegalStateException: No ConfigurableListableBeanFactory set“,缺少配置

一、錯誤分析 做品優購項目的運營商安全登錄時&#xff0c;運行項目后&#xff0c;瀏覽器訪問模板頁&#xff0c;模板頁的表格無法正常顯示&#xff0c;報錯信息如下&#xff1a; SEVERE: StandardWrapper.Throwable java.lang.IllegalStateException: No ConfigurableLista…

Java視頻流處理技術分享

引言 在現代互聯網時代&#xff0c;視頻流處理成為了許多應用的重要組成部分。無論是實時視頻聊天、在線直播還是視頻會議&#xff0c;都需要高效的視頻流處理技術來保證用戶體驗。Java作為一種強大的編程語言&#xff0c;也在視頻流處理領域發揮著重要的作用。本文將深入探討…

Linux 6.7全面改進x86 CPU微碼加載方式

導讀最近&#xff0c;社區在清理 Linux 上的 Intel/AMD x86 CPU 微代碼加載方面做了大量的工作&#xff0c;這些工作現已合并到 Linux 6.7 中。 由于在啟動時加載 CPU 微代碼對于減少不斷出現的新 CPU 安全漏洞以及有時解決功能問題非常重要&#xff0c;Thomas Gleixner 最近開…

AGV調整Matlab實現

% 用二維數組代替地圖和場地信息 % 可用場地&#xff1a;0 % 小車本身&#xff1a;1 % 貨物點及入庫點&#xff1a;2 % 地圖邊界: 100 % AGV出發區&#xff1a;11 % 監測區&#xff1a;12 % 充電區&#xff1a;13 % 生產區A1、A2&#xff1a;14 % 生產區B3、B4、B5&#xff1a…

C百題--7.輸出乘法表

1.問題描述 輸出9*9乘法表 2.解決思路 利用99乘法表行和列之間的關系&#xff0c;進行輸出 注意&#xff1a;%-2d 2代表占兩個字符&#xff1b;-代表左對齊 3.代碼實現 #include<stdio.h> int main(){for(int i1;i<9;i){for(int j1;j<i;j){printf("%d*%d…

微信小程序埋點

使用如下代碼封裝一下&#xff0c;例如封裝在log.js文件里面&#xff1a; var log wx.getRealtimeLogManager ? wx.getRealtimeLogManager() : nullmodule.exports {debug() {if (!log) returnlog.debug.apply(log, arguments)},info() {if (!log) returnlog.info.apply(l…

深入學習pytorch筆記

兩個重要的函數 dir()&#xff1a; 一個內置函數&#xff0c;用于列出對象的所有屬性和方法 help()&#xff1a;一個內置函數&#xff0c;用于獲取關于Python對象、模塊、函數、類等的詳細信息 Dateset類 Dataset&#xff1a;pytorch中的一個類&#xff0c;開發者在訓練和…

抖音電商品牌力不足咋辦?如何升級或強開旗艦店、官方旗艦店?我們有妙招!

隨著抖音電商的發展&#xff0c;越來越多的商家蜂擁而至&#xff0c;入駐經營抖音小店... 然而我們在開店的時候&#xff0c;選擇開通官方旗艦店、旗艦店、專營店或專賣店&#xff0c;卻被系統提示為你的商標品牌力不足&#xff0c;無法開通官方旗艦店、旗艦店、專營店、專賣店…

Android手電筒、閃光燈、torch、flash

1. 僅開啟手電筒 單純的開啟手電筒我們可以使用CameraManager的.setTorchMode()方法。 cameraCharacteristics.get(CameraCharacteristics.FLASH_INFO_AVAILABLE)獲取該相機特征是否可獲取閃光燈。 CameraManager cameraManager (CameraManager) getSystemService(CAMERA_SE…

在 vscode 中的json文件寫注釋,不報錯的解決辦法

打開 vscode 的「設置」&#xff0c;搜索&#xff1a;files: associations&#xff0c;然后添加 *.json jsonc最后

Nginx 配置錯誤導致的漏洞

目錄 1. CRLF注入漏洞 Bottle HTTP頭注入漏洞 2.目錄穿越漏洞 3. http add_header被覆蓋 本篇要復現的漏洞實驗有一個網站直接為我們提供了Docker的環境&#xff0c;我們只需要下載下來就可以使用&#xff1a; Docker環境的安裝可以參考&#xff1a;Docker安裝 漏洞環境的…

Docker rm 命令

docker rm&#xff1a;刪除一個或多個容器。 語法&#xff1a; docker rm [OPTIONS] CONTAINER [CONTAINER...]OPTIONS說明&#xff1a; -f&#xff1a;通過SIGKILL信號強制刪除一個運行中的容器。 -l &#xff1a;移除容器間的網絡連接&#xff0c;而非容器本身。 -v &…