Flink流批一體計算(24):Flink SQL之mysql維表實時關聯

目錄

1.維表

2.數據準備

創建源數據

創建維度表

創建Sink表

3.配置任務

Flink SQL創建kafka源表

Flink SQL創建MySQL維表

Flink SQL創建MySQL結果表

編寫計算任務

核驗數據


1.維表

目前在實時計算的場景中,大多數都使用過MySQL、Hbase、redis作為維表引擎存儲一些維度數據,然后在DataStream API中調用MySQL、Hbase、redis客戶端去獲取到維度數據進行維度擴充。

本案例采用MySQL創建維表,與創建MySQL sink表語法相同。

2.數據準備

創建源數據

重啟kafka,創建Topic:? case_kafka_mysql

寫入json格式的數據

? {"ts": "20201011","id": 8,"price_amt":211}

創建維度表

在MySQL中創建名為product_dim的表

CREATE TABLE `product_dim` (`id` bigint(11) NOT NULL,`coupon_price_amt` bigint(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

向數據表插入如下數據:

INSERT INTO `product_dim` VALUES (1, 1);
INSERT INTO `product_dim` VALUES (3, 1);
INSERT INTO `product_dim` VALUES (8, 1);
創建Sink表

在MySQL中創建名為sync_test_3的表

CREATE TABLE `sync_test_3` (`id` bigint(11) NOT NULL AUTO_INCREMENT,`ts` varchar(64) DEFAULT NULL,`total_gmv` bigint(11) DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `uidx` (`ts`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;

3.配置任務

Flink SQL創建kafka源表
create table flink_test_3 (id BIGINT,ts VARCHAR,price_amt BIGINT,proctime AS PROCTIME ()
)with ('connector' = 'kafka','topic' = 'case_kafka_mysql','properties.bootstrap.servers' = '127.0.0.1:9092','properties.group.id' = 'flink_gp_test3','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true','properties.zookeeper.connect' = '127.0.0.1:2181/kafka');
Flink SQL創建MySQL維表
create table flink_test_3_dim (id BIGINT,coupon_price_amt BIGINT
)
WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8','table-name' = 'product_dim','username' = 'root','password' = 'Admin','lookup.max-retries' = '3','lookup.cache.max-rows' = 1000);

WITH參數

參數

說明

類型

備注

lookup.cache.max-rows

指定緩存的最大行數。如果超過該值,則最老的行記錄將會過期,會被新的記錄替換掉。

Integer

默認情況下,維表Cache是未開啟的。

lookup.cache.ttl

指定緩存中每行記錄的最大存活時間。如果某行記錄超過該時間,則該行記錄將會過期。

Duration

默認情況下,維表Cache是未開啟的。你可以設置lookup.cache.max-rows?lookup.cache.ttl參數來啟用維表Cache。啟用緩存時,采用的是LRU策略緩存。

lookup.cache.caching-missing-key

是否緩存空的查詢結果。

Boolean

參數取值如下:

true(默認值):緩存空的查詢結果。

false:不緩存空的查詢結果。

lookup.max-retries

查詢數據庫失敗的最大重試次數。

Integer

默認值為3

Flink SQL創建MySQL結果表
CREATE TABLE sync_test_3 (ts string,total_gmv bigint,PRIMARY KEY (ts) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8','table-name' = 'sync_test_3','username' = 'root','password' = 'Admin');
編寫計算任務
INSERT INTO sync_test_3
SELECTts,SUM(price_amt - coupon_price_amt) AS total_gmv
FROM(SELECTa.ts as ts,a.price_amt as price_amt,b.coupon_price_amt as coupon_price_amtFROMflink_test_3 as aLEFT JOIN flink_test_3_dim? FOR SYSTEM_TIME AS OF? a.proctime? as bON b.id = a.id)
GROUP BY ts;
核驗數據
SELECT id, ts, total_gmv FROM sync_test_3;

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/209737.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/209737.shtml
英文地址,請注明出處:http://en.pswp.cn/news/209737.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

PTA:計算總分

題干 請編寫一個函數sum,函數的功能是:計算一個由結構體表示的包含多門課程成績組成的學生的總成績。 函數接口定義: double sumScore(struct student stu); 其中 stu是用戶傳入的參數。函數須返回學生的總成績。 裁判測試程序樣例&#x…

【華為數據之道學習筆記】3-7 報告數據治理

報告數據是指對數據進行處理加工后,用作業務決策依據的數據。它用于支持報告和報表的生成。 用于報告和報表的數據可以分為如下幾種。 用于報表項數據生成的事實表、指標數據、維度。 用于報表項統計和計算的統計函數、趨勢函數及報告規則。 用于報表和報告展示的…

AVFormatContext編解碼層:理論與實戰

文章目錄 前言一、FFmpeg 解碼流程二、FFmpeg 轉碼流程三、編解碼 API 詳解1、解碼 API 使用詳解2、編碼 API 使用詳解 四、編碼案例實戰1、示例源碼2、運行結果 五、解碼案例實戰1、示例源碼2、運行結果 前言 AVFormatContext 是一個貫穿始終的數據結構,很多函數都…

前后端分離項目跨域請求

一、前端vue項目 在項目中創建request.js文件,添加以下內容 import axios from "axios"; const api axios.create({ //這里配置的是后端服務提供的接口baseURL: "http://localhost:8080/web-demo",timeout: 1000} ); export default api; …

基于HSV空間色彩的圖像分割方法(含python代碼實現)

文章目錄 1. 介紹2. HSV顏色空間3. python實現HSV圖像分割3.1. 代碼實現3.2. 運行結果 1. 介紹 HSV顏色系統簡介: HSV 即使用色相(Hue)、飽和度(Saturation)、明度(Value)來表示色彩的一種方式…

HttpComponents: 領域對象的設計

1. HTTP協議 1.1 HTTP請求 HTTP請求由請求頭、請求體兩部分組成,請求頭又分為請求行(request line)和普通的請求頭組成。通過瀏覽器的開發者工具,我們能查看請求和響應的詳情。 下面是一個HTTP請求發送的完整內容。 POST https://track.abc.com/v4/tr…

根據對數器找規律、根據數據量猜題目解法

題目一 小虎去買蘋果,商店只提供兩種類型的塑料袋,每種類型都有任意數量。1)能裝下6個蘋果的袋子2)能裝下8個蘋果的袋子小虎可以自由使用兩種袋子來裝蘋果,但是小虎有強迫癥,他要求自己使用的袋子數量必須…

python門戶網站文件爬取并顯示

廣西南寧政府門面網站 import requests import os import io import numpy as np from concurrent.futures import ThreadPoolExecutor from bs4 import BeautifulSoup import time import pdfplumber import pandas as pd from docx import Document import docx import win32…

WordCount 源碼解析 Mapper,Reducer,Driver

創建包 com.nefu.mapreduce.wordcount ,開始編寫 Mapper , Reducer , Driver 用戶編寫的程序分成三個部分: Mapper 、 Reducer 和 Driver 。 ( 1 ) Mapper 階段 ? 用戶自定義的 Mapper 要繼承自己的父…

文件服務器搭建

文件服務器搭建 文件服務器有四個選擇: httpd(apache) 穩定,使用廣泛,服務器一般自帶,對于開發人員來說強烈推薦。 nginx 穩定高效,使用廣泛,linux命令可直接下載,對…

STM32CubeIDE串口空閑中斷實現不定長數據接收

STM32F051空閑中斷實現串口不定長數據接收 目的編程軟件配置串口開中斷中斷程序運行結果目的 在串口輸入不定長數據時,通過串口空閑中斷來斷幀接收數據。 編程軟件 STM32CubeIDE STM32CubeMX配置MCU。通過對端口配置,自動生成程序,減少編程量。 配置串口開中斷 配置串口…

redis中序列化問題,value包含全路徑類名解析

1. 問題 redis中保存的key-value格式 value直接存入的是實體對象,值中包含全路徑類名,在使用Jackson2JsonRedisSerializer和GenericJackson2JsonRedisSerializer解析器時報錯 報錯內容: com.fasterxml.jackson.databind.exc.InvalidTypeI…

《師兄啊師兄》第二季確認定檔!海神揚名,穩健回歸!

近日,《師兄啊師兄》第二季的定檔海報和PV終于發布,確認將于12月14日上午10點強勢回歸!這部備受矚目的國漫作品自第一季播出以來,便以其獨特的劇情設定和唯美的畫風,贏得了廣大觀眾的喜愛。如今,動畫第二季…

第一課【習題】給應用添加通知和提醒

構造進度條模板通知,name字段當前需要固定配置為downloadTemplate。 給通知設置分發時間,需要設置showDeliveryTime為false。 OpenHarmony提供后臺代理提醒功能,在應用退居后臺或退出后,計時和提醒通知功能被系統后臺代理接管…

Qt 5.15.2 三維顯示功能

Qt 5.15.2 三維顯示功能 三維顯示效果: .pro項目文件 QT core gui opengl 3dcore 3drender 3dinput 3dextrasgreaterThan(QT_MAJOR_VERSION, 4): QT widgetsCONFIG c17# You can make your code fail to compile if it uses deprecated APIs. # In ord…

2023年法國經銷商Solu-Watt來訪安科瑞-安科瑞 蔣靜

2023年4月10日上午9點,法國Solu-Watt公司Matthieu先生一行到安科瑞考察參觀工廠的智能化出入庫工作站、柔性化儀表生產車間及實驗室。自1992年以來,Solu-Watt在電氣設備市場中不斷發展。能夠提供量身定制的安裝有線電氣解決方案(電氣柜、接線…

如何用Qt配置git項目并上傳Gitee

1.進入到Qt項目文件夾內,打開 “Git Bash Here” 2.初始化,在“Git Bash Here”中輸入 git init 3.加入所有文件,在“Git Bash Here”中輸入 git add . (需要注意,git add 后面還有一個點) 4.添加備注,git com…

STL源碼剖析筆記——哈希表、unordered_set、unordered_map、unordered_mutiset、unordered_mutimap

系列文章目錄 STL源碼剖析筆記——迭代器 STL源碼剖析筆記——vector STL源碼剖析筆記——list STL源碼剖析筆記——deque、stack,queue STL源碼剖析筆記——Binary Heap、priority_queue STL源碼剖析筆記——AVL-tree、RB-tree、set、map、mutiset、mutimap STL源…

一套rk3588 rtsp服務器推流的 github 方案及記錄 -01

我不生產代碼,我只是代碼的搬運工,相信我,看完這個文章你的圖片一定能變成流媒體推出去。 訴求:使用opencv拉流,轉成bgr數據,需要把處理后的數據(BGR)編碼成264,然后推流…

字符串函數strtok

1.調用格式: 2.調用形式:char*strtok(char*p1,const char*p2),其中第二個是由分隔符組成的字符串,第一個為需要分隔的字符串 3.調用目的:將分隔符之間的字符串取出 4.調用時一般將源字符串拷貝后調用,因為此函數會將…