為什么需要監控數據庫變化?
當 Flowable 表中的數據發生變化(例如插入新任務、更新狀態或刪除記錄),我們可能需要觸發其他操作,比如通知用戶、更新儀表盤或啟動新流程。Maxwell 可以讀取 MySQL 的二進制日志(binlog),將變化事件以 JSON 格式發送到 RabbitMQ 消息隊列,供其他系統消費。
準備工作
你需要以下組件:
MySQL:運行 Flowable 的數據庫,需啟用二進制日志(binlog)。
RabbitMQ:消息隊列,用于接收 Maxwell 發送的事件。
Maxwell:讀取 MySQL binlog 并發送事件到 RabbitMQ。
Flowable:已部署,數據庫中有需要監控的表(例如 flowable.ACT_RU_TASK)。
步驟 1:配置 MySQL
確保 MySQL 啟用了二進制日志。以下是一個簡單的 Docker Compose 配置:
mysql:image: mysql:8.0container_name: mysqlports:- "3306:3306"volumes:- mysql_data:/var/lib/mysqlcommand:- "--log_bin=mysql-bin"- "--server_id=1"- "--binlog_format=ROW"environment:MYSQL_ROOT_PASSWORD: rootMYSQL_DATABASE: flowableMYSQL_USER: maxwellMYSQL_PASSWORD: maxwell_password
log_bin=mysql-bin:啟用二進制日志。
server_id=1:設置唯一服務器 ID。
binlog_format=ROW:使用 ROW 格式,適合 Maxwell 解析。
為 Maxwell 創建用戶并授予權限:
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'maxwell'@'%' IDENTIFIED BY 'maxwell_password';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
步驟 2:啟動 RabbitMQ
使用 Docker 運行 RabbitMQ:
rabbitmq:image: rabbitmq:3-managementcontainer_name: rabbitmqports:- "5672:5672"- "15672:15672"environment:RABBITMQ_DEFAULT_USER: guestRABBITMQ_DEFAULT_PASS: guest
訪問 http://localhost:15672(默認用戶/密碼:guest/guest)檢查 RabbitMQ 是否正常運行。
步驟 3:配置 Maxwell
Maxwell 讀取 MySQL 的 binlog 并發送事件到 RabbitMQ。創建一個配置文件 maxwell.properties:
log_level=INFO
host=mysql
port=3306
user=maxwell
password=maxwell_password
producer=rabbitmq
rabbitmq_host=rabbitmq
rabbitmq_port=5672
rabbitmq_user=guest
rabbitmq_pass=guest
rabbitmq_exchange=maxwell
rabbitmq_exchange_type=topic
rabbitmq_routing_key_template=%database%.%table%
filter=exclude: *.*, include: flowable.*
replica_server_id=64
filter:只監控 flowable 數據庫的表。
rabbitmq_exchange:事件發送到 maxwell 交換機。
運行 Maxwell:
docker run -d --name maxwell \-v $(pwd)/maxwell.properties:/config.properties \--network=host \zendesk/maxwell:latest \bin/maxwell --config=/config.properties
注意:確保 Maxwell、MySQL 和 RabbitMQ 在同一網絡(例如使用 --network=host 或自定義 Docker 網絡)。
步驟 4:測試監控
在 Flowable 數據庫中插入一條任務記錄:
INSERT INTO flowable.ACT_RU_TASK (ID_, NAME_, PRIORITY_, CREATE_TIME_) VALUES ('task123', 'Test Task', 50, NOW());
Maxwell 將捕獲變更并發送 JSON 事件到 RabbitMQ,例如:
{"database": "flowable","table": "ACT_RU_TASK","type": "insert","data": {"id_": "task123","name_": "Test Task","priority_": 50,"create_time_": "2025-08-06 17:03:00"} }
在 RabbitMQ 管理界面(http://localhost:15672):
創建一個隊列(例如 flowable_queue)。
綁定到 maxwell 交換機,路由鍵為 flowable.*。
檢查隊列中的消息。
步驟 5:消費事件
開發一個簡單的消費者來處理 RabbitMQ 的事件。以下是一個 Python 示例:
import pika
import jsonconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='flowable_queue')
channel.queue_bind(exchange='maxwell', queue='flowable_queue', routing_key='flowable.*')def callback(ch, method, properties, body):event = json.loads(body)print(f"Received event: {event}")if event['table'] == 'ACT_RU_TASK' and event['type'] == 'insert':print(f"New task created: ID={event['data']['id_']}, Name={event['data']['name_']}")channel.basic_consume(queue='flowable_queue', on_message_callback=callback, auto_ack=True)
channel.start_consuming()
保存為 consumer.py 并運行:
pip install pika
python consumer.py
當插入新任務時,消費者會打印類似以下輸出:
Received event: {'database': 'flowable', 'table': 'ACT_RU_TASK', 'type': 'insert', ...}
New task created: ID=task123, Name=Test Task
總結
通過 Maxwell 和 RabbitMQ,我們可以輕松監控 Flowable 表的變更,并將事件發送到消息隊列供其他系統使用。這個方案簡單高效,適合實時數據處理場景。下一步,你可以:
優化 Maxwell 過濾規則,只監控特定表(如 ACT_RU_TASK)。
將事件集成到 Flowable 流程,觸發自動化任務。
使用 Prometheus 監控 RabbitMQ 隊列性能。