一、最近搭建了個rabbitmq集群 三個磁盤節點,上生產環境之前想做個壓測,測試下穩定性,參考Deepseek做了如下測試方案
二、核心代碼實現:
- 配置文件 (config.py)
import os
RABBITMQ_NODES = ['amqp://admin:123456@192.168.0.175:8101',
]QUEUE_CONFIG = {'queue_name': 'stress_test_queue','durable': True,'arguments': {'x-ha-policy': 'all', # 鏡像隊列'x-queue-mode': 'lazy' # 惰性隊列}
}TEST_PARAMS = {'producer_count': 20, # 生產者進程數'consumer_count': 30, # 消費者進程數'message_size': 1024, # 消息大小(字節) 1KB'total_messages': 1000000, # 總消息量'test_duration': 600 # 測試時長(秒) 10分鐘
}
LOG_DIR = "logs"
CONSUMER_LOG_FILE = os.path.join(LOG_DIR, "consumer_stress_test.log")
PRODUCER_LOG_FILE = os.path.join(LOG_DIR, "producer_stress_test.log")
- 生產者實現 (producer.py)
import pika
import time
import random
import os
from config import RABBITMQ_NODES, QUEUE_CONFIG, TEST_PARAMS, PRODUCER_LOG_FILE# 確保日志目錄存在
os.makedirs(os.path.dirname(PRODUCER_LOG_FILE), exist_ok=True)def log_producer(message):"""記錄生產者日志"""print(message)with open(PRODUCER_LOG_FILE, 'a', encoding='utf-8') as f:f.write(message + '\n')def produce_messages():"""生產者函數"""# 隨機選擇集群節點node = random.choice(RABBITMQ_NODES)params = pika.URLParameters(node)try:connection = pika.BlockingConnection(params)channel = connection.channel()# 聲明隊列channel.queue_declare(queue=QUEUE_CONFIG['queue_name'],durable=QUEUE_CONFIG['durable'],arguments=QUEUE_CONFIG['arguments'])start_time = time.time()message_count = 0pid = os.getpid()# 生成測試消息message_body = os.urandom(TEST_PARAMS['message_size'])while (time.time() - start_time) < TEST_PARAMS['test_duration']:try:# 發送消息channel.basic_publish(exchange='',routing_key=QUEUE_CONFIG['queue_name'],body=message_body,properties=pika.BasicProperties(delivery_mode=2, # 持久化消息timestamp=int(time.time() * 1000) # 毫秒時間戳))message_count += 1# 每1000條打印一次if message_count % 1000 == 0:current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())log_producer(f"{current_time}, [Producer {pid}] 發送: {message_count}")except Exception as e:log_producer(f"生產者錯誤: {str(e)}")# 重連機制try:if connection and connection.is_open:connection.close()connection = pika.BlockingConnection(params)channel = connection.channel()except Exception as e:log_producer(f"重連失敗: {str(e)}")time.sleep(1)connection.close()log_producer(f"[Producer {pid}] 發送完成,總計發送: {message_count} 條消息")return message_countexcept Exception as e:log_producer(f"生產者初始化錯誤: {str(e)}")return 0
if __name__ == "__main__":produce_messages()
- 消費者實現 (consumer.py)
import pika
import time
import random
import os
import threading
from config import RABBITMQ_NODES, QUEUE_CONFIG, TEST_PARAMS, CONSUMER_LOG_FILEos.makedirs(os.path.dirname(CONSUMER_LOG_FILE), exist_ok=True)consumed_count = 0
total_latency = 0
min_latency = float('inf')
max_latency = 0connections = {}
lock = threading.Lock()def log_consumer(message):"""記錄消費者日志"""print(message)with open(CONSUMER_LOG_FILE, 'a', encoding='utf-8') as f:f.write(message + '\n')def get_channel():"""獲取或創建連接和通道"""with lock:pid = os.getpid()if pid not in connections or not connections[pid].is_open:node = random.choice(RABBITMQ_NODES)params = pika.URLParameters(node)connections[pid] = pika.BlockingConnection(params)return connections[pid].channel()def on_message(channel, method, properties, body):global consumed_count, total_latency, min_latency, max_latencycurrent_ts = time.time() * 1000latency = current_ts - properties.timestampconsumed_count += 1total_latency += latencymin_latency = min(min_latency, latency)max_latency = max(max_latency, latency)if consumed_count % 1000 == 0:pid = os.getpid()avg_latency = total_latency / consumed_countcurrent_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())log_consumer(f"{current_time}, [Consumer {pid}] 消費: {consumed_count}, 平均延遲: {avg_latency:.2f}ms")# 確認消息channel.basic_ack(delivery_tag=method.delivery_tag)def start_consumer():"""啟動消費者"""try:channel = get_channel()# QoS設置channel.basic_qos(prefetch_count=100)channel.queue_declare(queue=QUEUE_CONFIG['queue_name'],durable=QUEUE_CONFIG['durable'],arguments=QUEUE_CONFIG['arguments'])channel.basic_consume(queue=QUEUE_CONFIG['queue_name'],on_message_callback=on_message)log_consumer(f"消費者啟動 PID: {os.getpid()}")channel.start_consuming()except Exception as e:log_consumer(f"消費者錯誤: {str(e)}")# 嘗試重新連接time.sleep(5)start_consumer()
if __name__ == "__main__":start_consumer()```
- 壓測控制器 (controller.py)
import multiprocessing
import time
import os
from producer import produce_messages
from consumer import start_consumer
from config import TEST_PARAMS, CONSUMER_LOG_FILE
from report_generator import ReportGenerator # 確保導入修復后的報告生成器def run_producers():"""啟動生產者進程"""producers = []for _ in range(TEST_PARAMS['producer_count']):p = multiprocessing.Process(target=produce_messages)p.start()producers.append(p)return producersdef run_consumers():"""啟動消費者進程"""consumers = []for _ in range(TEST_PARAMS['consumer_count']):p = multiprocessing.Process(target=start_consumer)p.start()consumers.append(p)return consumersdef monitor():"""監控壓測過程"""start_time = time.time()while time.time() - start_time < TEST_PARAMS['test_duration'] + 10:elapsed = time.time() - start_timeprint(f"[監控] 壓測已運行: {int(elapsed)}秒 / {TEST_PARAMS['test_duration']}秒")time.sleep(5)def main():"""主控制函數"""print("=" * 60)print("RabbitMQ集群壓測控制器")print("=" * 60)# 確保日志目錄存在os.makedirs(os.path.dirname(CONSUMER_LOG_FILE), exist_ok=True)# 清空日志文件open(CONSUMER_LOG_FILE, 'w').close()print("啟動消費者...")consumers = run_consumers()time.sleep(5) # 等待消費者就緒print("啟動生產者...")producers = run_producers()print("啟動監控...")monitor()print("壓測完成,終止進程...")for p in producers:if p.is_alive():p.terminate()for c in consumers:if c.is_alive():c.terminate()print("所有進程已終止")# 生成壓測報告print("\n" + "=" * 60)print("生成壓測報告...")generator = ReportGenerator(CONSUMER_LOG_FILE)report_path = generator.generate_report()if report_path:print(f"報告已生成: {report_path}")else:print("報告生成失敗")
if __name__ == "__main__":main()
5、日志解析器 (report_generator.py)
import pandas as pd
import matplotlib# 設置為無頭模式,避免GUI問題
matplotlib.use('Agg') # 必須在導入pyplot之前設置
import matplotlib.pyplot as plt
from datetime import datetime
import re
import os
import numpy as np
import sys
import platform# 檢查并導入兼容的 docx 模塊
try:from docx import Documentfrom docx.shared import Inches, Ptfrom docx.enum.text import WD_ALIGN_PARAGRAPHfrom docx.enum.table import WD_TABLE_ALIGNMENTDOCX_SUPPORT = True
except ImportError as e:print(f"警告: 無法導入 python-docx 庫,Word 報告功能將不可用。錯誤: {str(e)}")print("請運行: pip install python-docx --upgrade")DOCX_SUPPORT = Falseclass ReportGenerator:def __init__(self, log_file='consumer_stress_test.log'):# 獲取桌面路徑if platform.system() == 'Windows':self.desktop_path = os.path.join(os.environ['USERPROFILE'], 'Desktop')else:self.desktop_path = os.path.join(os.path.expanduser('~'), 'Desktop')self.log_file = log_fileself.report_docx = os.path.join(self.desktop_path, 'RabbitMQ集群壓測分析報告.docx')self.report_image = os.path.join(self.desktop_path, 'RabbitMQ壓測性能圖表.png')# 設置中文字體支持plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'Arial Unicode MS'] # 多個回退選項plt.rcParams['axes.unicode_minus'] = Falsedef parse_logs(self):"""解析日志文件,提取時間戳、消費數量和延遲數據"""print(f"正在解析日志文件: {self.log_file}")data = []line_count = 0parsed_count = 0if not os.path.exists(self.log_file):print(f"錯誤: 日志文件不存在 - {self.log_file}")return pd.DataFrame()# 嘗試多種編碼encodings = ['utf-8', 'gbk', 'latin-1', 'cp1252']for encoding in encodings:try:with open(self.log_file, 'r', encoding=encoding) as f:for line in f:line_count += 1# 嘗試多種可能的日志格式if 'Consumed' in line or '消費' in line or '已處理' in line or '處理' in line:try:# 嘗試解析時間戳 - 支持多種格式timestamp_match = re.search(r'(\d{4}[-/]\d{1,2}[-/]\d{1,2} \d{1,2}:\d{1,2}:\d{1,2})',line)if timestamp_match:timestamp_str = timestamp_match.group(1)try:timestamp = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S")except:timestamp = datetime.strptime(timestamp_str, "%Y/%m/%d %H:%M:%S")# 解析消息數量count = 0count_match = re.search(r'(Consumed|消費|已處理|處理):?\s*(\d+)', line,re.IGNORECASE)if count_match:count = int(count_match.group(2))else:# 嘗試直接查找數字num_match = re.search(r'\b(\d{3,})\b', line)if num_match:count = int(num_match.group(1))# 解析延遲latency = 0latency_match = re.search(r'(avg|平均|延遲|latency)[=::]?\s*([\d.]+)(ms|毫秒)?',line, re.IGNORECASE)if latency_match:latency = float(latency_match.group(2))data.append({'timestamp': timestamp,'count': count,'latency': latency})parsed_count += 1except Exception as e:print(f"解析錯誤 (行 {line_count}): {line.strip()} - {str(e)}")if parsed_count > 0:print(f"使用 {encoding} 編碼成功解析 {parsed_count} 行日志")breakelse:print(f"使用 {encoding} 編碼未找到有效數據")except UnicodeDecodeError:print(f"嘗試 {encoding} 編碼失敗,嘗試下一種...")continueexcept Exception as e:print(f"解析日志時發生錯誤: {str(e)}")return pd.DataFrame(data)def generate_charts(self, df):"""生成分析圖表并保存到桌面"""if df.empty:print("沒有有效數據,無法生成圖表")return dfprint("正在生成分析圖表...")try:plt.figure(figsize=(16, 12))# 消息消費速率plt.subplot(2, 2, 1)df = df.sort_values('timestamp') # 確保按時間排序# 計算時間差和消息數差df['time_diff'] = df['timestamp'].diff().dt.total_seconds().fillna(0)df['count_diff'] = df['count'].diff().fillna(0)# 計算消息消費速率(避免除以零)df['msg_rate'] = np.where(df['time_diff'] > 0,df['count_diff'] / df['time_diff'],0)# 處理無限值和NaN值df['msg_rate'] = df['msg_rate'].replace([np.inf, -np.inf], np.nan)df['msg_rate'] = df['msg_rate'].ffill().bfill().fillna(0)plt.plot(df['timestamp'], df['msg_rate'], 'b-')plt.title('消息消費速率 (條/秒)', fontsize=14)plt.xlabel('時間', fontsize=12)plt.ylabel('消息數/秒', fontsize=12)plt.grid(True)# 累計消費消息數plt.subplot(2, 2, 2)plt.plot(df['timestamp'], df['count'], 'g-')plt.title('累計消費消息數', fontsize=14)plt.xlabel('時間', fontsize=12)plt.ylabel('總消息數', fontsize=12)plt.grid(True)# 延遲分布plt.subplot(2, 2, 3)plt.hist(df['latency'], bins=50, color='orange', alpha=0.7)plt.title('延遲分布', fontsize=14)plt.xlabel('延遲 (毫秒)', fontsize=12)plt.ylabel('頻率', fontsize=12)plt.grid(True)# 延遲隨時間變化plt.subplot(2, 2, 4)plt.plot(df['timestamp'], df['latency'], 'r-')plt.title('延遲隨時間變化', fontsize=14)plt.xlabel('時間', fontsize=12)plt.ylabel('延遲 (毫秒)', fontsize=12)plt.grid(True)plt.tight_layout()plt.savefig(self.report_image, dpi=300)print(f"分析圖表已保存到: {self.report_image}")except Exception as e:print(f"生成圖表時出錯: {str(e)}")import tracebacktraceback.print_exc()return dfdef create_text_report(self, df):"""創建文本格式的報告"""report_file = os.path.join(self.desktop_path, 'RabbitMQ壓測分析報告.txt')with open(report_file, 'w', encoding='utf-8') as f:f.write("=" * 60 + "\n")f.write("RabbitMQ集群壓測分析報告\n")f.write("=" * 60 + "\n\n")f.write(f"生成時間: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")f.write(f"日志文件: {os.path.abspath(self.log_file)}\n\n")if df.empty:f.write("沒有有效數據,無法生成報告\n")return report_file# 計算統計信息start_time = df['timestamp'].min()end_time = df['timestamp'].max()duration = (end_time - start_time).total_seconds()total_messages = df['count'].max() - df['count'].min()throughput = total_messages / duration if duration > 0 else 0# 延遲統計if not df['latency'].empty:latency_min = df['latency'].min()latency_max = df['latency'].max()latency_avg = df['latency'].mean()latency_90 = df['latency'].quantile(0.9)else:latency_min = latency_max = latency_avg = latency_90 = 0# 總體統計f.write("一、壓測總體統計\n")f.write("-" * 50 + "\n")f.write(f"測試開始時間: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n")f.write(f"測試結束時間: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\n")f.write(f"測試持續時間: {duration:.2f} 秒\n")f.write(f"總消費消息數: {total_messages:,} 條\n")f.write(f"平均吞吐量: {throughput:.2f} 條/秒\n")f.write(f"最小延遲: {latency_min:.2f} 毫秒\n")f.write(f"平均延遲: {latency_avg:.2f} 毫秒\n")f.write(f"最大延遲: {latency_max:.2f} 毫秒\n")f.write(f"90%延遲: {latency_90:.2f} 毫秒\n\n")# 分析結論f.write("二、性能分析結論\n")f.write("-" * 50 + "\n")# 吞吐量評估if throughput == 0:f.write("吞吐量評估: 未檢測到有效吞吐量數據\n")elif throughput > 5000:f.write("吞吐量評估: 優秀 (>5000條/秒)\n")elif throughput > 2000:f.write("吞吐量評估: 良好 (2000-5000條/秒)\n")else:f.write("吞吐量評估: 需優化 (<2000條/秒)\n")# 延遲評估if latency_avg == 0:f.write("延遲評估: 未檢測到延遲數據\n")elif latency_avg < 50:f.write("延遲評估: 優秀 (<50ms)\n")elif latency_avg < 200:f.write("延遲評估: 良好 (50-200ms)\n")else:f.write("延遲評估: 需優化 (>200ms)\n")# 穩定性評估if latency_90 > 0 and latency_avg > 0:stability_ratio = latency_90 / latency_avgf.write(f"穩定性評估: 90%延遲是平均延遲的 {stability_ratio:.1f} 倍\n")if stability_ratio < 2:f.write("穩定性評估: 系統穩定性優秀\n")elif stability_ratio < 5:f.write("穩定性評估: 系統穩定性良好\n")else:f.write("穩定性評估: 系統穩定性較差\n")# 優化建議f.write("\n三、優化建議\n")f.write("-" * 50 + "\n")suggestions = ["1. 增加消費者數量以提高吞吐量","2. 提高預取值(prefetch count)優化消費速度","3. 檢查網絡延遲,確保RabbitMQ集群節點間通信正常","4. 使用消息批量處理減少網絡開銷","5. 優化消費者代碼,減少處理時間"]for suggestion in suggestions:f.write(suggestion + "\n")# 圖表信息if os.path.exists(self.report_image):f.write("\n四、性能圖表\n")f.write("-" * 50 + "\n")f.write(f"性能圖表已保存到: {self.report_image}\n")print(f"文本報告已保存到: {report_file}")return report_filedef generate_report(self):"""生成完整的壓測報告"""print("=" * 60)print("RabbitMQ壓測報告生成工具")print("=" * 60)# 解析日志df = self.parse_logs()if df.empty:print("沒有有效數據,無法生成報告")report_file = os.path.join(self.desktop_path, 'RabbitMQ壓測分析報告.txt')with open(report_file, 'w', encoding='utf-8') as f:f.write("沒有從日志文件中提取到有效數據,請檢查日志格式\n")f.write(f"日志文件: {os.path.abspath(self.log_file)}\n")print(f"錯誤報告已保存到: {report_file}")return report_file# 生成圖表self.generate_charts(df)# 生成報告return self.create_text_report(df)if __name__ == "__main__":import syslog_file = sys.argv[1] if len(sys.argv) > 1 else 'consumer_stress_test.log'generator = ReportGenerator(log_file)report_path = generator.generate_report()if report_path:print(f"\n報告已生成: {report_path}")else:print("\n報告生成失敗")
6、 監控腳本 (monitor.py)
import pika
import time
import json
from config import RABBITMQ_NODES, QUEUE_CONFIGdef monitor_queue():while True:try:connection = pika.BlockingConnection(pika.URLParameters(random.choice(RABBITMQ_NODES)))channel = connection.channel()# 獲取隊列狀態queue = channel.queue_declare(queue=QUEUE_CONFIG['queue_name'],passive=True)messages_ready = queue.method.message_countprint(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 積壓消息: {messages_ready}")connection.close()time.sleep(5)except Exception as e:print(f"監控錯誤: {str(e)}")time.sleep(10)if __name__ == "__main__":monitor_queue()
7、運行壓測程序:
python controller.py
8、結果展示
RabbitMQ集群壓測分析報告
生成時間: 2025-08-07 13:19:23
日志文件: C:\Users\Administrator\PycharmProjects\pythonProject13\rabbitmq\logs\consumer_stress_test.log
一、壓測總體統計
--------------------------------------------------
測試開始時間: 2025-08-07 13:09:27
測試結束時間: 2025-08-07 13:19:21
測試持續時間: 594.00 秒
總消費消息數: 137,000 條
平均吞吐量: 230.64 條/秒
最小延遲: 137895.78 毫秒
平均延遲: 673436.15 毫秒
最大延遲: 4793312.15 毫秒
90%延遲: 1471724.25 毫秒二、性能分析結論
--------------------------------------------------
吞吐量評估: 需優化 (<2000條/秒)
延遲評估: 需優化 (>200ms)
穩定性評估: 90%延遲是平均延遲的 2.2 倍
穩定性評估: 系統穩定性良好三、優化建議
--------------------------------------------------
1. 增加消費者數量以提高吞吐量
2. 提高預取值(prefetch count)優化消費速度
3. 檢查網絡延遲,確保RabbitMQ集群節點間通信正常
4. 使用消息批量處理減少網絡開銷
5. 優化消費者代碼,減少處理時間四、性能圖表
--------------------------------------------------
性能圖表已保存到: C:\Users\Administrator\Desktop\RabbitMQ壓測性能圖表.png
圖表展示
這個方案有個缺陷就是測試完成后 還會有很多message堆積在那里,沒有被消費掉,所以我做了一個消費腳本,內容如下:
import pika
import threading
import random
import time
from config import RABBITMQ_NODES, QUEUE_CONFIG# 連接池管理
connection_pool = []
lock = threading.Lock()def get_connection():"""獲取或創建連接"""with lock:if connection_pool:return connection_pool.pop()else:node = random.choice(RABBITMQ_NODES)return pika.BlockingConnection(pika.URLParameters(node))def release_connection(connection):"""釋放連接到連接池"""with lock:if connection.is_open:connection_pool.append(connection)def consume_fast():"""高效消費消息的函數"""try:# 從連接池獲取連接connection = get_connection()channel = connection.channel()# 設置高預取值channel.basic_qos(prefetch_count=1000)# 定義消息處理回調def callback(ch, method, properties, body):# 這里可以添加實際的消息處理邏輯# 例如: process_message(body)# 立即確認消息(更安全)ch.basic_ack(delivery_tag=method.delivery_tag)# 開始消費channel.basic_consume(queue=QUEUE_CONFIG['queue_name'],on_message_callback=callback,auto_ack=False # 手動確認)print(f"[{threading.current_thread().name}] 快速消費者啟動...")channel.start_consuming()except Exception as e:print(f"消費者錯誤: {str(e)}")finally:# 釋放連接回連接池if 'connection' in locals() and connection.is_open:release_connection(connection)if __name__ == "__main__":# 初始化連接池for _ in range(10): # 創建10個初始連接node = random.choice(RABBITMQ_NODES)connection_pool.append(pika.BlockingConnection(pika.URLParameters(node)))print(f"已創建 {len(connection_pool)} 個初始連接")# 啟動多個快速消費者線程threads = []for i in range(20): # 啟動20個消費者線程t = threading.Thread(target=consume_fast,name=f"FastConsumer-{i + 1}",daemon=True)t.start()threads.append(t)time.sleep(0.1) # 避免同時啟動所有線程print("所有消費者線程已啟動,按 Ctrl+C 停止...")# 監控積壓消息try:while True:# 檢查隊列積壓try:connection = get_connection()channel = connection.channel()queue = channel.queue_declare(queue=QUEUE_CONFIG['queue_name'],passive=True)backlog = queue.method.message_countprint(f"當前積壓消息: {backlog}")release_connection(connection)# 如果積壓減少,可以考慮減少消費者if backlog < 1000:print("積壓已減少,可以安全停止腳本")breakexcept Exception as e:print(f"監控錯誤: {str(e)}")time.sleep(5) # 每5秒檢查一次except KeyboardInterrupt:print("\n正在停止消費者...")# 清理print("關閉所有連接...")for conn in connection_pool:if conn.is_open:conn.close()