[Data Pipeline] Kafka消息 | Redis緩存 | Docker部署(Lambda架構)

在這里插入圖片描述

第七章:Kafka消息系統(實時流處理)

歡迎回到數據探索之旅!

  • 在前六章中,我們構建了強大的**批量處理流水線**。

  • 通過Airflow DAG(批量任務編排)協調Spark作業(數據處理),數據從MySQL數據庫(源系統)經數據層(青銅、白銀、黃金)存入MinIO存儲(數據湖),并通過數據質量檢查確保數據可靠性。這種批量處理非常適合生成日報和分析。

  • 但若需要即時響應數據變化呢?例如客戶完成訂單后立即推薦相關商品,此時無法等待每日批量作業。

這就需要**實時數據流處理——這正是Kafka消息系統**的核心價值

實時流處理解決的問題

批量處理存在固有延遲(數小時至數天),無法滿足以下場景:

  • 欺詐檢測(即時攔截可疑交易)
  • 個性化推薦(購物后實時推薦)
  • 實時儀表盤(銷售數據動態更新
  • 事件驅動告警

Kafka:高速數據傳送帶

Apache Kafka是構建實時數據管道的分布式流平臺

其核心設計猶如高速傳送帶,具備:

  • 高吞吐(每秒百萬級消息處理)
  • 低延遲(毫秒級響應)
  • 持久化存儲(消息可回溯)
  • 水平擴展能力

核心概念解析

  • 消息(事件):數據變更的最小單元,例如MySQL表的新增記錄
  • 主題(Topic):數據分類通道(如mysql.coffee_shop.order_details存儲訂單明細變更)
  • 生產者(Producer):數據寫入端(如監控MySQL的Debezium連接器)
  • 消費者(Consumer):數據讀取端(如推薦服務)
  • 代理(Broker):Kafka服務節點,組成高可用集群

實時推薦系統工作流

訂單數據實時處理流程:

在這里插入圖片描述

整個過程可在500ms內完成,實現秒級響應

數據攝取:Kafka Connect與Debezium

**變更數據捕獲(CDC)**是實現實時數據攝取的關鍵技術

# docker-compose.yaml配置片段
connect:image: confluentinc/cp-kafka-connectcommand:- bash- -c- |# 安裝Debezium MySQL連接器confluent-hub install debezium/debezium-connector-mysql/etc/confluent/docker/runenvironment:CONNECT_BOOTSTRAP_SERVERS: "kafka-1:9092,kafka-2:9092"  # Kafka集群地址

Debezium通過解析MySQL二進制日志(binlog),將數據變更轉化為標準事件格式。示例事件消息:

{"payload": {"after": {"order_id": "ORD_20230619_001","product_id": "COFFEE_BEAN_ESPRESSO","quantity": 2},"op": "c",  // 操作類型:c=新增,u=更新,d=刪除"ts_ms": 1687189200000  // 事件時間戳}
}

數據消費:Python消費者實現

實時推薦服務的消費者核心代碼:

# kafka_client.py消費者工作線程
def consumer_worker(worker_id: int):# 初始化Kafka連接handler = KafkaHandler(["kafka-1:9092", "kafka-2:9092"])consumer = handler.get_consumer(topic="mysql.coffee_shop.order_details",group_id="realtime-recs")producer = handler.get_producer()while True:# 批量拉取消息(每秒輪詢)messages = consumer.poll(timeout_ms=1000)for msg in messages.items():for record in msg.value:# 處理事件(調用推薦邏輯)process_recommendation(record, producer)def process_recommendation(record, producer):# 從Redis獲取用戶畫像(詳見第八章)user_profile = redis.get(f"user:{record['user_id']}")if user_profile["tier"] == "DIAMOND":# 生成推薦并發送至下游主題suggestion = {"order_id": record["order_id"],"suggested_product": "COFFEE_GRINDER"}producer.send("order_suggestions", suggestion)

該實現包含以下關鍵技術點

  1. 消費者組(group_id)實現負載均衡
  2. 自動提交偏移量(enable_auto_commit=True)
  3. 批量消息處理提升吞吐量

基礎設施部署

Docker Compose定義的核心服務:

services:kafka-1:image: bitnami/kafka:3.5.1ports:- 29092:29092  # 外部訪問端口environment:KAFKA_CFG_NODE_ID: 1  # 節點標識KAFKA_CFG_LISTENERS: PLAINTEXT://:9092kafka-ui:image: provectuslabs/kafka-uiports:- 8000:8080  # 監控界面端口environment:KAFKA_CLUSTERS_0_BOOTSTRAP_SERVERS: kafka-1:9092

關鍵組件說明:

  • 雙節點Kafka集群(kafka-1/kafka-2)保障高可用
  • Kafka UI提供可視化監控(http://localhost:8000)
  • 初始化服務(init-kafka)自動創建主題和分區

價值總結

Kafka實時流處理系統與批量處理管道形成互補:

批量處理實時流處理
延遲小時級毫秒級
吞吐極高
用例歷史分析即時響應
存儲數據湖持久化短期事件保留

這種混合架構同時滿足企業對歷史數據分析和實時決策的需求$CITE_14 $CITE_17。

下一章:Redis緩存/存儲


第八章:Redis緩存/存儲

詳細專欄:Redis文檔學習

在第七章:Kafka消息系統(實時流處理)中,我們了解到Kafka如何實現實時數據流動以支持商品推薦服務。

實時推薦需要極速訪問用戶等級、支付方式和商品信息,這正是Redis緩存/存儲的核心價值

Redis核心特性

Redis是開源的內存數據結構存儲系統,具備以下關鍵能力:

  • 亞毫秒級響應:數據存儲在內存而非磁盤,訪問速度比傳統數據庫快100倍
  • 豐富數據結構:支持字符串、哈希、集合、有序集合等數據結構
  • 數據持久化:支持RDB快照AOF日志兩種持久化方式
  • 高可用架構:支持主從復制集群部署

項目中的Redis應用

在我們的咖啡銷售數據管道中,Redis承擔兩大核心角色:

1. 查找數據緩存(靜態數據加速)

通過lookup_data_cache.py腳本定時從MySQL加載三類核心數據到Redis

# 來源: scripts/database/lookup_data_cache.py
# 鉆石客戶ID存儲為集合
r.sadd("diamond_customers", customer_id)# 支付方式ID存儲為集合
r.sadd("bank_acb_payment", payment_method_id)# 商品詳情存儲為哈希
r.hset(f"product:{product_id}", mapping={"name": "濃縮咖啡", "unit_price": 25})

實時服務通過redis_static連接訪問這些數據

# 檢查鉆石客戶(時間復雜度O(1))
is_diamond = redis_static.sismember("diamond_customers", "CUST_202306001")# 獲取商品詳情(哈希全量讀取)
product_info = redis_static.hgetall("product:COFFEE_BEAN_001")

2. 訂單狀態管理(動態數據暫存)

使用redis_dynamic連接處理實時訂單流

# 訂單計數器遞增(原子操作保證線程安全)
current_count = redis_dynamic.incr(f"message_count:ORDER_20230619_001")# 存儲已購商品集合(自動去重)
redis_dynamic.sadd(f"ordered_products:ORDER_20230619_001", "COFFEE_GRINDER")# 設置訂單狀態(帶90秒過期時間)
redis_dynamic.setex(f"order_status:ORDER_20230619_001", 90, "completed")

Redis架構優勢

維度傳統數據庫Redis
響應時間10-100ms0.1-1ms
QPS~1k~1M
數據結構固定表結構多種靈活結構
持久化強持久化可配置持久化

Docker部署配置

Redis服務在docker-compose.yaml中的定義

services:redis:image: redis:7.0.12ports:- "6379:6379"volumes:- ./redis_data:/data  # 數據持久化目錄command: ["redis-server", "--save 60 1000", "--appendonly yes"]

關鍵配置說明:

  • --save 60 100060秒內有1000次寫入則觸發RDB快照
  • --appendonly yes:啟用AOF日志記錄所有寫操作

數據流可視化

Redis在實時推薦中的交互流程:

在這里插入圖片描述

總結

Redis通過內存存儲和高效數據結構,在實時推薦中實現:

  1. 查詢加速:將鉆石客戶檢查從10ms級優化至0.1ms級
  2. 狀態同步:可靠跟蹤分布式環境下的訂單處理進度
  3. 資源解耦:降低MySQL負載峰值壓力達80%

這種緩存+暫存的雙重模式,使實時推薦服務能在500ms內完成從事件接收到推薦生成的完整流程

下一章:Docker Compose環境


第九章:Docker Compose環境

詳細專欄:Docker 云原生

歡迎回到咖啡銷售數據管道核心概念系列的最終章!我們已經深入探討了各個組件

  • 數據來源(第一章:MySQL數據庫(源系統))
  • 數據處理引擎(第二章:Spark作業(數據處理))
  • 分層存儲(第三章:MinIO存儲(數據湖)和第四章:數據層(青銅、白銀、黃金))
  • 工作流調度(第五章:Airflow DAG(批量編排))
  • 數據質量保障(第六章:數據質量檢查)
  • 實時事件處理(第七章:Kafka消息系統(實時流))
  • 以及高速緩存(第八章:Redis緩存/存儲)

想象組裝復雜機械或指揮大型樂團——每個零件或樂手都有特定角色。

如何確保它們協同運作

這正是Docker Compose要解決的核心挑戰:將Spark、Airflow、Kafka、數據庫等異構服務整合為有機整體。

Docker Compose核心價值

Docker Compose是通過YAML文件定義和管理多容器應用的工具,具備三大核心能力

在這里插入圖片描述

藍圖文件解析

項目包含兩個核心配置文件:

  • docker-compose.yaml:定義實時服務組件
  • docker-compose-batch.yaml:定義批處理服務組件

服務定義范式

# 摘自 docker-compose-batch.yaml
services:minio:image: minio/minio:latestcontainer_name: minioports:- "9000:9000"  # S3 API端口- "9001:9001"  # 控制臺端口environment:MINIO_ROOT_USER: minioadminMINIO_ROOT_PASSWORD: minioadminvolumes:- ./volumes/minio:/data  # 數據持久化路徑networks:- myNetwork

關鍵配置說明:

  • image:指定Docker鏡像版本,確保環境一致性
  • ports:端口映射遵循主機端口:容器端口格式
  • volumes:數據卷實現主機與容器的路徑映射
  • networks:自定義網絡實現服務發現

網絡拓撲架構

networks:myNetwork:  # 自定義覆蓋網絡driver: bridgeattachable: true

網絡特性:

  • 服務間通過服務名互訪(如spark-master:7077)
  • 隔離外部網絡干擾,提升安全性
  • 支持跨compose文件網絡共享

數據卷設計

volumes:- ./airflow/dags:/opt/airflow/dags  # DAG文件同步- ./volumes/postgres:/var/lib/postgresql/data  # 元數據持久化

數據管理策略:

  • 批處理數據:MinIO卷映射實現數據湖持久化
  • 元數據存儲:PostgreSQL卷保障任務狀態不丟失
  • 日志文件:主機目錄映射方便問題排查

💡完整服務矩陣

服務類型包含組件通信協議
批處理服務Spark Master/Worker, AirflowHTTP/8080, JDBC/7077
實時服務Kafka集群, Redis, Kafka ConnectTCP/9092, TCP/6379
存儲服務MinIO, PostgreSQLS3/9000, JDBC/5432
監控服務Prometheus, Grafana, Kafka UIHTTP/3000, Web/8000

環境管理指令集

全棧啟動命令

docker compose -f docker-compose.yaml -f docker-compose-batch.yaml up -d

參數解析:

  • -f:指定多個compose文件實現模塊化配置
  • -d后臺守護模式運行
  • 啟動順序通過depends_on字段控制

運維監控命令

# 查看容器運行狀態
docker compose -f docker-compose.yaml ps# 查看實時日志
docker compose logs -f spark-master# 彈性擴容Spark Worker
docker compose up -d --scale spark-worker=3

系統協同原理

在這里插入圖片描述

協同要點:

  1. 批量處理流:Airflow通過SparkSubmitOperator提交作業到Spark集群,實現ETL流水線
  2. 實時處理流Kafka Connect監控MySQL binlog生成CDC事件,觸發實時推薦計算
  3. 監控告警流:各組件暴露Metrics端點,Prometheus采集后通過Grafana展示

核心優勢

環境一致性保障

  • 開發、測試、生產環境使用相同鏡像版本(如bitnami/kafka:3.5.1)
  • 避免"在我機器上能跑"的問題,實現跨平臺兼容

資源隔離控制

# 限制容器資源配額
deploy:resources:limits:cpus: '0.50'memory: 1024Mreservations:cpus: '0.25'memory: 512M

資源管理策略:

  • Spark Worker按計算需求分配CPU/MEM
  • Kafka Broker根據吞吐量配置資源上限
  • 關鍵服務(如PostgreSQL)預留基礎資源

快速擴縮容能力

# 擴展Kafka Broker節點
docker compose up -d --scale kafka=3# 縮減Spark Worker節點
docker compose up -d --scale spark-worker=2

動態調整策略:

  • 批量任務高峰期擴展Spark計算節點
  • 大促期間增加Kafka分區和消費者組實例

總結

Docker Compose通過聲明式配置將復雜的多服務系統抽象為可版本控制的藍圖文件,實現:

  1. 一鍵部署:15+組件通過單個命令啟動
  2. 服務發現自定義網絡實現容器間域名解析
  3. 數據治理卷映射策略保障數據生命周期
  4. 資源管控:CPU/內存配額限制防止資源爭搶

該方案使我們的數據管道具備企業級可維護性,支持從開發環境到生產環境的平滑過渡。

通過組合批量處理與實時處理組件,構建出完整的Lambda架構(詳見 架構專欄)實現。

在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/85382.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/85382.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/85382.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

jquery 賦值時不觸發change事件解決——仙盟創夢IDE

一、傳統方法jquey change $(#village_id).trigger(change);$("#village_id").val(99);$("#village_id").change(); 不生效 二、傳統方法jquey $(#village_id).trigger(change); 四、傳統方法jquey <input type"text" /> <button…

Android | 簽名安全

檢驗和簽名 校驗開發者在數據傳送時采用的一種校正數據的一種方式&#xff0c; 常見的校驗有:簽名校驗(最常見)、dexcrc校驗、apk完整性校驗、路徑文件校驗等。 通過對 Apk 進行簽名&#xff0c;開發者可以證明對 Apk 的所有權和控制權&#xff0c;可用于安裝和更新其應用。…

Android14 耳機按鍵拍照

在相機拍照預覽界面 通過耳機按鍵實現拍照功能 耳機按鍵定義 frameworks/base/core/java/android/view/KeyEvent.java public static final int KEYCODE_HEADSETHOOK 79;相機界面 拍照邏輯 DreamCamera2\src\com\android\camera\PhotoModule.java Override public bool…

【AI作畫】第2章comfy ui的一般輸入節點,文本框的類型和輸入形式

目錄 CLIP文本編碼器 條件輸出和文本輸出 轉換某一變量為輸入 展示作品集 在默認的工作流之外&#xff0c;我們如何自己添加節點呢&#xff1f; 一般我們用到的sampler采樣器在“鼠標右鍵——添加節點——采樣——K采樣器” 我們用的clip文本編碼器在“鼠標右鍵——添加節…

vue3仿高德地圖官網路況預測時間選擇器

<template><div class"time-axis-container"><div class"time-axis" ref"axisRef"><!-- 刻度線 - 共25個刻度(0-24) --><divv-for"hour in 25":key"hour - 1"class"tick-mark":class&…

ZArchiver:高效解壓縮,輕松管理文件

在數字時代&#xff0c;文件的壓縮與解壓已成為我們日常操作中不可或缺的一部分。無論是接收朋友分享的大文件&#xff0c;還是下載網絡資源&#xff0c;壓縮包的處理都極為常見。ZArchiver正是一款為安卓用戶精心打造的解壓縮軟件&#xff0c;它以強大的功能、簡潔的界面和高效…

1432.改變一個整數能得到的最大差值

貪心思想&#xff0c;為了得到最大差&#xff0c;想辦法變成一個最大的數和一個最小的數。 這里有規則&#xff0c;從最高位開始&#xff0c; 變成最大&#xff0c;如果<9&#xff0c;則將該數位代表的數都變成9&#xff0c;如果該數位已經是9了&#xff0c;則將下一個數位…

前端跨域解決方案(4):postMessage

1 postMessage 核心 postMessage 是現代瀏覽器提供的跨域通信標準 API&#xff0c;允許不同源的窗口&#xff08;如主頁面與 iframe、彈出窗口、Web Worker&#xff09;安全交換數據。相比其他跨域方案&#xff0c;它的核心優勢在于&#xff1a; 雙向通信能力&#xff1a;支持…

大語言模型指令集全解析

在大語言模型的訓練與優化流程中&#xff0c;指令集扮演著關鍵角色&#xff0c;它直接影響模型對任務的理解與執行能力。以下對常見指令集展開詳細介紹&#xff0c;涵蓋構建方式、規模及適用場景&#xff0c;助力開發者精準選用 為降低指令數據構建成本&#xff0c;學術界和工…

OpenCV CUDA模塊設備層-----用于封裝CUDA紋理對象+ROI偏移量的一個輕量級指針類TextureOffPtr()

操作系統&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 編程語言&#xff1a;C11 算法描述 TextureOffPtr<T, R> 是 OpenCV 的 CUDA 模塊&#xff08;opencv_cudev&#xff09;中用于封裝 CUDA 紋理對象 ROI 偏移量 的一個輕量級指…

Python 數據分析10

2.3.3其他 除了前面所介紹的常用語數據挖掘建模的庫之外&#xff0c;還有許多庫也運用于數據挖掘建模&#xff0c;如jieba、SciPy、OpenCV、Pillow等。 1.jieba jieba是一個被廣泛使用的Python第三方中文分詞庫。jieba使用簡單&#xff0c;并且支持Python、R、C等多種編程語言的…

css 制作一個可以旋轉的水泵效果

如圖&#xff0c;項目里面有一個小圖片可以旋轉&#xff0c;達到看起來像是一個在工作的水泵。我使用css旋轉動畫實現。 一、HTML結構部分 <div className"ceshixuanzhuan"><img src{lunkuo} className"lunkuo"/><img src{yepian} classN…

數據結構期末程序題型

一、 隊列 1、簡單模擬隊列排列 #include<bits/stdc.h> using namespace std; int main(){int n;cin>>n;queue<int>q;string str;while(true){cin>>str;if(str"#")break;if(str"In"){int t;cin>>t;if(q.size()<n){q.pu…

SpringCloud+Vue汽車、單車充電樁源碼實現:從架構設計到核心模塊解析

智慧充電管理平臺技術實現&#xff1a;從架構設計到核心模塊解析 智慧充電管理平臺作為新能源汽車生態的核心基礎設施&#xff0c;需要實現充電設備管理、訂單處理、數據統計分析等復雜功能。本文將從技術架構、核心模塊設計、關鍵技術實現三個維度&#xff0c;深度解析平臺的…

Kafka入門及實戰應用指南

1、Kafka概述 Apache Kafka是由LinkedIn公司于2010年開發的一款分布式消息系統&#xff0c;旨在解決當時傳統消息隊列&#xff08;如ActiveMQ、RabbitMQ&#xff09;在高吞吐量和實時性場景下的性能瓶頸。隨著LinkedIn內部對實時日志處理、用戶行為追蹤等需求的激增&#xff0…

智能指針 c++

C 智能指針詳解 智能指針是 C11 引入的內存管理工具&#xff0c;位于 <memory> 頭文件中&#xff0c;用于自動管理動態分配的內存&#xff0c;防止內存泄漏。主要類型如下&#xff1a; 1. std::unique_ptr (獨占所有權) 特點&#xff1a;唯一擁有所指對象&#xff0c;不…

Python應用八股文

大家好!在 Python 學習的道路上&#xff0c;掌握一些基礎知識要點至關重要&#xff0c;這些要點常被稱為“Python 八股”。以下是對它們的簡易總結&#xff0c;幫助你快速回顧和鞏固 Python 的核心概念。 一、數據結構 列表&#xff08;List&#xff09;&#xff1a;有序可變序…

【技術深度】領碼SPARK破解微服務數據依賴困局:架構設計與實踐指南

——深度解析分布式數據冗余與異步消息機制&#xff0c;驅動企業數字化轉型加速 ? 核心摘要 本文從技術架構與工程實現的角度&#xff0c;系統講解領碼SPARK融合平臺如何精準解決微服務架構下數據依賴“卡脖子”問題。通過設計高效的數據冗余模型和完善的異步消息更新機制&am…

關于前端的防抖和節流

給我解釋下 前端開發中的防抖和節流 并舉個具體的例子 防抖&#xff08;Debounce&#xff09;與節流&#xff08;Throttle&#xff09;詳解 在前端開發中&#xff0c;防抖&#xff08;Debounce&#xff09; 和 節流&#xff08;Throttle&#xff09; 是兩種優化高頻觸發事件的…

React-router 多類型歷史記錄棧

react-router 為了滿足開發者更多路由歷史存儲場景&#xff0c;提供了以下幾種模式&#xff1a; 瀏覽器原生歷史記錄 瀏覽器 hash 內存型 服務端記錄 以上實現分別對應于一下 API 實現&#xff1a; createBrowserRouter&#xff1a;瀏覽器提供的歷史管理。 createHashRou…