使用 Maxwell 和 RabbitMQ 監控 Mysql Flowable 表變更

為什么需要監控數據庫變化?

當 Flowable 表中的數據發生變化(例如插入新任務、更新狀態或刪除記錄),我們可能需要觸發其他操作,比如通知用戶、更新儀表盤或啟動新流程。Maxwell 可以讀取 MySQL 的二進制日志(binlog),將變化事件以 JSON 格式發送到 RabbitMQ 消息隊列,供其他系統消費。

準備工作

你需要以下組件:

  • MySQL:運行 Flowable 的數據庫,需啟用二進制日志(binlog)。

  • RabbitMQ:消息隊列,用于接收 Maxwell 發送的事件。

  • Maxwell:讀取 MySQL binlog 并發送事件到 RabbitMQ。

  • Flowable:已部署,數據庫中有需要監控的表(例如 flowable.ACT_RU_TASK)。

步驟 1:配置 MySQL

確保 MySQL 啟用了二進制日志。以下是一個簡單的 Docker Compose 配置:

mysql:image: mysql:8.0container_name: mysqlports:- "3306:3306"volumes:- mysql_data:/var/lib/mysqlcommand:- "--log_bin=mysql-bin"- "--server_id=1"- "--binlog_format=ROW"environment:MYSQL_ROOT_PASSWORD: rootMYSQL_DATABASE: flowableMYSQL_USER: maxwellMYSQL_PASSWORD: maxwell_password
  • log_bin=mysql-bin:啟用二進制日志。

  • server_id=1:設置唯一服務器 ID。

  • binlog_format=ROW:使用 ROW 格式,適合 Maxwell 解析。

為 Maxwell 創建用戶并授予權限:

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'maxwell'@'%' IDENTIFIED BY 'maxwell_password';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';

步驟 2:啟動 RabbitMQ

使用 Docker 運行 RabbitMQ:

rabbitmq:image: rabbitmq:3-managementcontainer_name: rabbitmqports:- "5672:5672"- "15672:15672"environment:RABBITMQ_DEFAULT_USER: guestRABBITMQ_DEFAULT_PASS: guest

訪問 http://localhost:15672(默認用戶/密碼:guest/guest)檢查 RabbitMQ 是否正常運行。

步驟 3:配置 Maxwell

Maxwell 讀取 MySQL 的 binlog 并發送事件到 RabbitMQ。創建一個配置文件 maxwell.properties:

log_level=INFO
host=mysql
port=3306
user=maxwell
password=maxwell_password
producer=rabbitmq
rabbitmq_host=rabbitmq
rabbitmq_port=5672
rabbitmq_user=guest
rabbitmq_pass=guest
rabbitmq_exchange=maxwell
rabbitmq_exchange_type=topic
rabbitmq_routing_key_template=%database%.%table%
filter=exclude: *.*, include: flowable.*
replica_server_id=64
  • filter:只監控 flowable 數據庫的表。

  • rabbitmq_exchange:事件發送到 maxwell 交換機。

運行 Maxwell:

docker run -d --name maxwell \-v $(pwd)/maxwell.properties:/config.properties \--network=host \zendesk/maxwell:latest \bin/maxwell --config=/config.properties

注意:確保 Maxwell、MySQL 和 RabbitMQ 在同一網絡(例如使用 --network=host 或自定義 Docker 網絡)。

步驟 4:測試監控

  1. 在 Flowable 數據庫中插入一條任務記錄:

    INSERT INTO flowable.ACT_RU_TASK (ID_, NAME_, PRIORITY_, CREATE_TIME_)
    VALUES ('task123', 'Test Task', 50, NOW());
  2. Maxwell 將捕獲變更并發送 JSON 事件到 RabbitMQ,例如:

    {"database": "flowable","table": "ACT_RU_TASK","type": "insert","data": {"id_": "task123","name_": "Test Task","priority_": 50,"create_time_": "2025-08-06 17:03:00"}
    }
  3. 在 RabbitMQ 管理界面(http://localhost:15672):

    • 創建一個隊列(例如 flowable_queue)。

    • 綁定到 maxwell 交換機,路由鍵為 flowable.*。

    • 檢查隊列中的消息。

步驟 5:消費事件

開發一個簡單的消費者來處理 RabbitMQ 的事件。以下是一個 Python 示例:

import pika
import jsonconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='flowable_queue')
channel.queue_bind(exchange='maxwell', queue='flowable_queue', routing_key='flowable.*')def callback(ch, method, properties, body):event = json.loads(body)print(f"Received event: {event}")if event['table'] == 'ACT_RU_TASK' and event['type'] == 'insert':print(f"New task created: ID={event['data']['id_']}, Name={event['data']['name_']}")channel.basic_consume(queue='flowable_queue', on_message_callback=callback, auto_ack=True)
channel.start_consuming()

保存為 consumer.py 并運行:

pip install pika
python consumer.py

當插入新任務時,消費者會打印類似以下輸出:

Received event: {'database': 'flowable', 'table': 'ACT_RU_TASK', 'type': 'insert', ...}
New task created: ID=task123, Name=Test Task

總結

通過 Maxwell 和 RabbitMQ,我們可以輕松監控 Flowable 表的變更,并將事件發送到消息隊列供其他系統使用。這個方案簡單高效,適合實時數據處理場景。下一步,你可以:

  • 優化 Maxwell 過濾規則,只監控特定表(如 ACT_RU_TASK)。

  • 將事件集成到 Flowable 流程,觸發自動化任務。

  • 使用 Prometheus 監控 RabbitMQ 隊列性能。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/92652.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/92652.shtml
英文地址,請注明出處:http://en.pswp.cn/web/92652.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

MySQL面試題及詳細答案 155道(041-060)

《前后端面試題》專欄集合了前后端各個知識模塊的面試題,包括html,javascript,css,vue,react,java,Openlayers,leaflet,cesium,mapboxGL,threejs&…

mysql_mcp_server_pro源碼部署及啟動報錯新手指南:讓智能體長出手來直接獲取到最底層的數據

文章目錄 源碼部署 1.克隆項目地址 2.創建虛擬環境 3.激活環境 4.進入項目目錄下 5.安裝依賴 6.進入到src目錄 7.在當前目錄下,新建一個.env文件 8.配置數據庫信息 9.啟動項目 10.啟動權限管理啟動項目 啟動報錯了: 問題現象與直接原因 解決方案與操作步驟 方案1:允許忽略未定…

jupyter服務器創建賬戶加映射對外賬戶地址

文章目錄一、創建test1-test10用戶(跳過已存在的test3)二、檢查必要組件是否安裝解決方法:用緊湊格式避免換行解析錯誤核心修復說明:使用方法:以下是根據需求生成的命令、檢查腳本及啟動腳本,按步驟執行即可…

DDR中的POD與ODT

一、POD(Pseudo Open Drain)技術1. 定義與工作原理POD(偽開漏) 是DDR4/LPDDR4引入的電壓標準與驅動架構,替代傳統的SSTL(Stub Series Terminated Logic)。其核心特征是將上拉電源從VDDQ改為VTT&…

企業架構之導論(1)

一、企業架構是什么 企業架構是對企業業務、數據、應用、技術四大核心領域及其相互關系的系統化描述與設計框架。它像一張“城市藍圖”,確保業務戰略能精準映射到IT落地: 本質:是連接業務戰略(做什么)與技術執行(怎么做)的結構化方法論。 核心組件: 業務架構:定義業…

實戰:在已有K8S集群如何新增和刪除Node節點

本篇文章將分享一下如何在已有集群添加新節點和刪除現有節點1 新增節點到K8S集群新增節點可以分為準備節點、配置節點和將其加入集群三步。1.1 準備新節點準備一個相同操作系統的主機作為新節點。參考以前部署的文章:實戰部署k8s 1.28版本集群,跟著操作到…

C++ 黑馬 內存分配模型

一, 內存分配模型內存總共有四個分區1 代碼區 主要用來存儲二進制代碼,由操作系統進行管理2 棧區 由編譯器自己進行釋放和分配,例如函數的傳遞的參數,局部變量,const修飾的局部常量等等....3 堆區 由程序員自己分配和釋放&am…

【華為倉頡編程語言】運行第一個倉頡程序

歡迎來到倉頡編程語言的第一個實戰課程。 上節課我們成功安裝了倉頡工具鏈,今天讓我們一起編寫并運行第一個倉頡程序。相信很多同學都還記得學習第一門編程語言時寫的"Hello World"程序,那種看到程序成功運行的激動心情。今天,我們…

利用DeepSeek改寫并增強測試Duckdb和sqlite的不同插入方法性能

在前文基礎上,好奇作為事務型數據庫的SQLite表現怎么樣,讓DeepSeek來幫忙。 提示詞 請仿照附件編寫用python插入sqlite數據的測試函數,如果sqlite3沒有對應方法就省略 import sqlite3 import pandas as pd import timemethods [字符串拼接, …

進程管理塊(PCB):操作系統進程管理的核心數據結構

進程管理塊(PCB):操作系統進程管理的核心數據結構在現代操作系統中,進程管理塊(Process Control Block, PCB) 是內核用來描述、管理和控制進程生命周期的最核心、最關鍵的數據結構。它就像是一個進程的“身…

線程的sleep、wait、join、yield如何使用?

sleep:讓線程睡眠,期間會出讓cpu,在同步代碼塊中,不會釋放鎖 wait(必須先獲得對應的鎖才能調用):讓線程進 入等待狀態,釋放當前線程持有的鎖資源線程只有在notify 或者notifyAll方法調用后才會被喚醒,然后去爭奪鎖. join: 線程之間協同方式,使…

2025年服裝智能跟單系統TOP3推薦榜單

TOP1領軍者首選推薦:金蝶服裝系統【★★★★★】 在服裝智能跟單系統的領域,金蝶服裝系統憑借其強大的功能和卓越的性能脫穎而出,成為眾多企業的首選。盡管本文標題提及的是另一份榜單,但值得一提的是,金蝶系統若參與評…

基于FFmpeg的B站視頻下載處理

起因是這樣的一天,本人在B站客戶端緩存了一個視頻,用于學習參考等學術交流,但是視頻和音頻卻是分開且通過Win Hex查看發現文件頭含有9個“30”,想到一個個手動刪字節不如讓程序取代,便有了本文章這一篇文章發布之前&am…

【Vue Router】路由模式、懶加載、守衛、權限、緩存

前言 Vue Router 是 Vue 生態中處理頁面跳轉的核心工具,它解決了單頁應用中 URL 管理、組件切換、狀態維護等關鍵問題,同時提供了豐富的功能(如動態路由、嵌套路由、路由守衛)。除了經常用到的路由配置以外,我們還需了…

Linux epoll 實現詳解 (fs/eventpoll.c)

核心數據結構分析 1. struct eventpoll (epoll 實例核心結構) c struct eventpoll {struct mutex mtx; // 保護 epoll 結構的互斥鎖wait_queue_head_t wq; // epoll_wait() 使用的等待隊列wait_queue_head_t poll_wait; // 文件 poll() 使用的等待隊列struc…

【牛客刷題】小紅的項鏈(字節跳動面試題)

文章目錄 一、題目介紹 1.1 輸入描述 1.2 輸出描述 1.3 示例 二、算法設計思路 三、流程圖 四、題解實現 五、復雜度分析 六、關鍵算法知識點 一、題目介紹 原題鏈接:https://www.nowcoder.com/practice/3da065cab096478eb603bbfca5af8b02 小紅將 n n n個珠子排成一排,然后…

【Html網頁模板】HTML炫酷星空(一閃一閃亮晶晶)

文章目錄專欄導讀功能預覽快速開始核心實現拆解1. 背景與基礎布局2. 背景層靜態星空(輕微閃爍)3. 前景層“亮晶晶”的閃爍小星星4. 交互與動效5. 行星裝飾可配置項與個性化建議初始化順序(入口)源碼結語專欄導讀 🔥&am…

第一天-CAN Signal信號的Multiplexor多路復用在DBC中實現

🚀 CAN總線的“變形金剛術”:Multiplexor多路復用信號深度揭秘在汽車電子江湖中,當數百個ECU爭相發送數據時,如何讓一條CAN報文像"變形金剛"一樣自由切換形態?Multiplexor(多路復用)技…

Code Exercising Day 10 of “Code Ideas Record“:StackQueue part02

文章目錄【150. Evaluate Reverse Polish Notation】【239. Sliding Window Maximum】【347. Top K Frequent Elements】【150. Evaluate Reverse Polish Notation】 Problem Link Approach: Use a stack. Push numbers onto the stack; when encountering an operator, pop t…

系統架構設計師備考之架構設計高級知識

1.系統架構設計基礎知識1.1.軟件架構概念軟件架構定義軟件架構(Software Architecture)或稱軟件體系結構,是指系統的一個或者多個結構,這些結構包括軟件的構件(可能是程序模塊、類或者是中間件)、構件的外部…