Python + Pika RabbitMQ集群壓測完整方案

在這里插入圖片描述在這里插入圖片描述

一、最近搭建了個rabbitmq集群 三個磁盤節點,上生產環境之前想做個壓測,測試下穩定性,參考Deepseek做了如下測試方案
在這里插入圖片描述
二、核心代碼實現:

  1. 配置文件 (config.py)
import os
RABBITMQ_NODES = ['amqp://admin:123456@192.168.0.175:8101',
]QUEUE_CONFIG = {'queue_name': 'stress_test_queue','durable': True,'arguments': {'x-ha-policy': 'all',  # 鏡像隊列'x-queue-mode': 'lazy'  # 惰性隊列}
}TEST_PARAMS = {'producer_count': 20,      # 生產者進程數'consumer_count': 30,      # 消費者進程數'message_size': 1024,      # 消息大小(字節) 1KB'total_messages': 1000000, # 總消息量'test_duration': 600       # 測試時長(秒) 10分鐘
}
LOG_DIR = "logs"
CONSUMER_LOG_FILE = os.path.join(LOG_DIR, "consumer_stress_test.log")
PRODUCER_LOG_FILE = os.path.join(LOG_DIR, "producer_stress_test.log")
  1. 生產者實現 (producer.py)
import pika
import time
import random
import os
from config import RABBITMQ_NODES, QUEUE_CONFIG, TEST_PARAMS, PRODUCER_LOG_FILE# 確保日志目錄存在
os.makedirs(os.path.dirname(PRODUCER_LOG_FILE), exist_ok=True)def log_producer(message):"""記錄生產者日志"""print(message)with open(PRODUCER_LOG_FILE, 'a', encoding='utf-8') as f:f.write(message + '\n')def produce_messages():"""生產者函數"""# 隨機選擇集群節點node = random.choice(RABBITMQ_NODES)params = pika.URLParameters(node)try:connection = pika.BlockingConnection(params)channel = connection.channel()# 聲明隊列channel.queue_declare(queue=QUEUE_CONFIG['queue_name'],durable=QUEUE_CONFIG['durable'],arguments=QUEUE_CONFIG['arguments'])start_time = time.time()message_count = 0pid = os.getpid()# 生成測試消息message_body = os.urandom(TEST_PARAMS['message_size'])while (time.time() - start_time) < TEST_PARAMS['test_duration']:try:# 發送消息channel.basic_publish(exchange='',routing_key=QUEUE_CONFIG['queue_name'],body=message_body,properties=pika.BasicProperties(delivery_mode=2,  # 持久化消息timestamp=int(time.time() * 1000)  # 毫秒時間戳))message_count += 1# 每1000條打印一次if message_count % 1000 == 0:current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())log_producer(f"{current_time}, [Producer {pid}] 發送: {message_count}")except Exception as e:log_producer(f"生產者錯誤: {str(e)}")# 重連機制try:if connection and connection.is_open:connection.close()connection = pika.BlockingConnection(params)channel = connection.channel()except Exception as e:log_producer(f"重連失敗: {str(e)}")time.sleep(1)connection.close()log_producer(f"[Producer {pid}] 發送完成,總計發送: {message_count} 條消息")return message_countexcept Exception as e:log_producer(f"生產者初始化錯誤: {str(e)}")return 0
if __name__ == "__main__":produce_messages()
  1. 消費者實現 (consumer.py)
import pika
import time
import random
import os
import threading
from config import RABBITMQ_NODES, QUEUE_CONFIG, TEST_PARAMS, CONSUMER_LOG_FILEos.makedirs(os.path.dirname(CONSUMER_LOG_FILE), exist_ok=True)consumed_count = 0
total_latency = 0
min_latency = float('inf')
max_latency = 0connections = {}
lock = threading.Lock()def log_consumer(message):"""記錄消費者日志"""print(message)with open(CONSUMER_LOG_FILE, 'a', encoding='utf-8') as f:f.write(message + '\n')def get_channel():"""獲取或創建連接和通道"""with lock:pid = os.getpid()if pid not in connections or not connections[pid].is_open:node = random.choice(RABBITMQ_NODES)params = pika.URLParameters(node)connections[pid] = pika.BlockingConnection(params)return connections[pid].channel()def on_message(channel, method, properties, body):global consumed_count, total_latency, min_latency, max_latencycurrent_ts = time.time() * 1000latency = current_ts - properties.timestampconsumed_count += 1total_latency += latencymin_latency = min(min_latency, latency)max_latency = max(max_latency, latency)if consumed_count % 1000 == 0:pid = os.getpid()avg_latency = total_latency / consumed_countcurrent_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())log_consumer(f"{current_time}, [Consumer {pid}] 消費: {consumed_count}, 平均延遲: {avg_latency:.2f}ms")# 確認消息channel.basic_ack(delivery_tag=method.delivery_tag)def start_consumer():"""啟動消費者"""try:channel = get_channel()# QoS設置channel.basic_qos(prefetch_count=100)channel.queue_declare(queue=QUEUE_CONFIG['queue_name'],durable=QUEUE_CONFIG['durable'],arguments=QUEUE_CONFIG['arguments'])channel.basic_consume(queue=QUEUE_CONFIG['queue_name'],on_message_callback=on_message)log_consumer(f"消費者啟動 PID: {os.getpid()}")channel.start_consuming()except Exception as e:log_consumer(f"消費者錯誤: {str(e)}")# 嘗試重新連接time.sleep(5)start_consumer()
if __name__ == "__main__":start_consumer()```
  1. 壓測控制器 (controller.py)
import multiprocessing
import time
import os
from producer import produce_messages
from consumer import start_consumer
from config import TEST_PARAMS, CONSUMER_LOG_FILE
from report_generator import ReportGenerator  # 確保導入修復后的報告生成器def run_producers():"""啟動生產者進程"""producers = []for _ in range(TEST_PARAMS['producer_count']):p = multiprocessing.Process(target=produce_messages)p.start()producers.append(p)return producersdef run_consumers():"""啟動消費者進程"""consumers = []for _ in range(TEST_PARAMS['consumer_count']):p = multiprocessing.Process(target=start_consumer)p.start()consumers.append(p)return consumersdef monitor():"""監控壓測過程"""start_time = time.time()while time.time() - start_time < TEST_PARAMS['test_duration'] + 10:elapsed = time.time() - start_timeprint(f"[監控] 壓測已運行: {int(elapsed)}秒 / {TEST_PARAMS['test_duration']}秒")time.sleep(5)def main():"""主控制函數"""print("=" * 60)print("RabbitMQ集群壓測控制器")print("=" * 60)# 確保日志目錄存在os.makedirs(os.path.dirname(CONSUMER_LOG_FILE), exist_ok=True)# 清空日志文件open(CONSUMER_LOG_FILE, 'w').close()print("啟動消費者...")consumers = run_consumers()time.sleep(5)  # 等待消費者就緒print("啟動生產者...")producers = run_producers()print("啟動監控...")monitor()print("壓測完成,終止進程...")for p in producers:if p.is_alive():p.terminate()for c in consumers:if c.is_alive():c.terminate()print("所有進程已終止")# 生成壓測報告print("\n" + "=" * 60)print("生成壓測報告...")generator = ReportGenerator(CONSUMER_LOG_FILE)report_path = generator.generate_report()if report_path:print(f"報告已生成: {report_path}")else:print("報告生成失敗")
if __name__ == "__main__":main()

5、日志解析器 (report_generator.py)

import pandas as pd
import matplotlib# 設置為無頭模式,避免GUI問題
matplotlib.use('Agg')  # 必須在導入pyplot之前設置
import matplotlib.pyplot as plt
from datetime import datetime
import re
import os
import numpy as np
import sys
import platform# 檢查并導入兼容的 docx 模塊
try:from docx import Documentfrom docx.shared import Inches, Ptfrom docx.enum.text import WD_ALIGN_PARAGRAPHfrom docx.enum.table import WD_TABLE_ALIGNMENTDOCX_SUPPORT = True
except ImportError as e:print(f"警告: 無法導入 python-docx 庫,Word 報告功能將不可用。錯誤: {str(e)}")print("請運行: pip install python-docx --upgrade")DOCX_SUPPORT = Falseclass ReportGenerator:def __init__(self, log_file='consumer_stress_test.log'):# 獲取桌面路徑if platform.system() == 'Windows':self.desktop_path = os.path.join(os.environ['USERPROFILE'], 'Desktop')else:self.desktop_path = os.path.join(os.path.expanduser('~'), 'Desktop')self.log_file = log_fileself.report_docx = os.path.join(self.desktop_path, 'RabbitMQ集群壓測分析報告.docx')self.report_image = os.path.join(self.desktop_path, 'RabbitMQ壓測性能圖表.png')# 設置中文字體支持plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'Arial Unicode MS']  # 多個回退選項plt.rcParams['axes.unicode_minus'] = Falsedef parse_logs(self):"""解析日志文件,提取時間戳、消費數量和延遲數據"""print(f"正在解析日志文件: {self.log_file}")data = []line_count = 0parsed_count = 0if not os.path.exists(self.log_file):print(f"錯誤: 日志文件不存在 - {self.log_file}")return pd.DataFrame()# 嘗試多種編碼encodings = ['utf-8', 'gbk', 'latin-1', 'cp1252']for encoding in encodings:try:with open(self.log_file, 'r', encoding=encoding) as f:for line in f:line_count += 1# 嘗試多種可能的日志格式if 'Consumed' in line or '消費' in line or '已處理' in line or '處理' in line:try:# 嘗試解析時間戳 - 支持多種格式timestamp_match = re.search(r'(\d{4}[-/]\d{1,2}[-/]\d{1,2} \d{1,2}:\d{1,2}:\d{1,2})',line)if timestamp_match:timestamp_str = timestamp_match.group(1)try:timestamp = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S")except:timestamp = datetime.strptime(timestamp_str, "%Y/%m/%d %H:%M:%S")# 解析消息數量count = 0count_match = re.search(r'(Consumed|消費|已處理|處理):?\s*(\d+)', line,re.IGNORECASE)if count_match:count = int(count_match.group(2))else:# 嘗試直接查找數字num_match = re.search(r'\b(\d{3,})\b', line)if num_match:count = int(num_match.group(1))# 解析延遲latency = 0latency_match = re.search(r'(avg|平均|延遲|latency)[=::]?\s*([\d.]+)(ms|毫秒)?',line, re.IGNORECASE)if latency_match:latency = float(latency_match.group(2))data.append({'timestamp': timestamp,'count': count,'latency': latency})parsed_count += 1except Exception as e:print(f"解析錯誤 (行 {line_count}): {line.strip()} - {str(e)}")if parsed_count > 0:print(f"使用 {encoding} 編碼成功解析 {parsed_count} 行日志")breakelse:print(f"使用 {encoding} 編碼未找到有效數據")except UnicodeDecodeError:print(f"嘗試 {encoding} 編碼失敗,嘗試下一種...")continueexcept Exception as e:print(f"解析日志時發生錯誤: {str(e)}")return pd.DataFrame(data)def generate_charts(self, df):"""生成分析圖表并保存到桌面"""if df.empty:print("沒有有效數據,無法生成圖表")return dfprint("正在生成分析圖表...")try:plt.figure(figsize=(16, 12))# 消息消費速率plt.subplot(2, 2, 1)df = df.sort_values('timestamp')  # 確保按時間排序# 計算時間差和消息數差df['time_diff'] = df['timestamp'].diff().dt.total_seconds().fillna(0)df['count_diff'] = df['count'].diff().fillna(0)# 計算消息消費速率(避免除以零)df['msg_rate'] = np.where(df['time_diff'] > 0,df['count_diff'] / df['time_diff'],0)# 處理無限值和NaN值df['msg_rate'] = df['msg_rate'].replace([np.inf, -np.inf], np.nan)df['msg_rate'] = df['msg_rate'].ffill().bfill().fillna(0)plt.plot(df['timestamp'], df['msg_rate'], 'b-')plt.title('消息消費速率 (條/秒)', fontsize=14)plt.xlabel('時間', fontsize=12)plt.ylabel('消息數/秒', fontsize=12)plt.grid(True)# 累計消費消息數plt.subplot(2, 2, 2)plt.plot(df['timestamp'], df['count'], 'g-')plt.title('累計消費消息數', fontsize=14)plt.xlabel('時間', fontsize=12)plt.ylabel('總消息數', fontsize=12)plt.grid(True)# 延遲分布plt.subplot(2, 2, 3)plt.hist(df['latency'], bins=50, color='orange', alpha=0.7)plt.title('延遲分布', fontsize=14)plt.xlabel('延遲 (毫秒)', fontsize=12)plt.ylabel('頻率', fontsize=12)plt.grid(True)# 延遲隨時間變化plt.subplot(2, 2, 4)plt.plot(df['timestamp'], df['latency'], 'r-')plt.title('延遲隨時間變化', fontsize=14)plt.xlabel('時間', fontsize=12)plt.ylabel('延遲 (毫秒)', fontsize=12)plt.grid(True)plt.tight_layout()plt.savefig(self.report_image, dpi=300)print(f"分析圖表已保存到: {self.report_image}")except Exception as e:print(f"生成圖表時出錯: {str(e)}")import tracebacktraceback.print_exc()return dfdef create_text_report(self, df):"""創建文本格式的報告"""report_file = os.path.join(self.desktop_path, 'RabbitMQ壓測分析報告.txt')with open(report_file, 'w', encoding='utf-8') as f:f.write("=" * 60 + "\n")f.write("RabbitMQ集群壓測分析報告\n")f.write("=" * 60 + "\n\n")f.write(f"生成時間: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")f.write(f"日志文件: {os.path.abspath(self.log_file)}\n\n")if df.empty:f.write("沒有有效數據,無法生成報告\n")return report_file# 計算統計信息start_time = df['timestamp'].min()end_time = df['timestamp'].max()duration = (end_time - start_time).total_seconds()total_messages = df['count'].max() - df['count'].min()throughput = total_messages / duration if duration > 0 else 0# 延遲統計if not df['latency'].empty:latency_min = df['latency'].min()latency_max = df['latency'].max()latency_avg = df['latency'].mean()latency_90 = df['latency'].quantile(0.9)else:latency_min = latency_max = latency_avg = latency_90 = 0# 總體統計f.write("一、壓測總體統計\n")f.write("-" * 50 + "\n")f.write(f"測試開始時間: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n")f.write(f"測試結束時間: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\n")f.write(f"測試持續時間: {duration:.2f} 秒\n")f.write(f"總消費消息數: {total_messages:,} 條\n")f.write(f"平均吞吐量: {throughput:.2f} 條/秒\n")f.write(f"最小延遲: {latency_min:.2f} 毫秒\n")f.write(f"平均延遲: {latency_avg:.2f} 毫秒\n")f.write(f"最大延遲: {latency_max:.2f} 毫秒\n")f.write(f"90%延遲: {latency_90:.2f} 毫秒\n\n")# 分析結論f.write("二、性能分析結論\n")f.write("-" * 50 + "\n")# 吞吐量評估if throughput == 0:f.write("吞吐量評估: 未檢測到有效吞吐量數據\n")elif throughput > 5000:f.write("吞吐量評估: 優秀 (>5000條/秒)\n")elif throughput > 2000:f.write("吞吐量評估: 良好 (2000-5000條/秒)\n")else:f.write("吞吐量評估: 需優化 (<2000條/秒)\n")# 延遲評估if latency_avg == 0:f.write("延遲評估: 未檢測到延遲數據\n")elif latency_avg < 50:f.write("延遲評估: 優秀 (<50ms)\n")elif latency_avg < 200:f.write("延遲評估: 良好 (50-200ms)\n")else:f.write("延遲評估: 需優化 (>200ms)\n")# 穩定性評估if latency_90 > 0 and latency_avg > 0:stability_ratio = latency_90 / latency_avgf.write(f"穩定性評估: 90%延遲是平均延遲的 {stability_ratio:.1f} 倍\n")if stability_ratio < 2:f.write("穩定性評估: 系統穩定性優秀\n")elif stability_ratio < 5:f.write("穩定性評估: 系統穩定性良好\n")else:f.write("穩定性評估: 系統穩定性較差\n")# 優化建議f.write("\n三、優化建議\n")f.write("-" * 50 + "\n")suggestions = ["1. 增加消費者數量以提高吞吐量","2. 提高預取值(prefetch count)優化消費速度","3. 檢查網絡延遲,確保RabbitMQ集群節點間通信正常","4. 使用消息批量處理減少網絡開銷","5. 優化消費者代碼,減少處理時間"]for suggestion in suggestions:f.write(suggestion + "\n")# 圖表信息if os.path.exists(self.report_image):f.write("\n四、性能圖表\n")f.write("-" * 50 + "\n")f.write(f"性能圖表已保存到: {self.report_image}\n")print(f"文本報告已保存到: {report_file}")return report_filedef generate_report(self):"""生成完整的壓測報告"""print("=" * 60)print("RabbitMQ壓測報告生成工具")print("=" * 60)# 解析日志df = self.parse_logs()if df.empty:print("沒有有效數據,無法生成報告")report_file = os.path.join(self.desktop_path, 'RabbitMQ壓測分析報告.txt')with open(report_file, 'w', encoding='utf-8') as f:f.write("沒有從日志文件中提取到有效數據,請檢查日志格式\n")f.write(f"日志文件: {os.path.abspath(self.log_file)}\n")print(f"錯誤報告已保存到: {report_file}")return report_file# 生成圖表self.generate_charts(df)# 生成報告return self.create_text_report(df)if __name__ == "__main__":import syslog_file = sys.argv[1] if len(sys.argv) > 1 else 'consumer_stress_test.log'generator = ReportGenerator(log_file)report_path = generator.generate_report()if report_path:print(f"\n報告已生成: {report_path}")else:print("\n報告生成失敗")

6、 監控腳本 (monitor.py)

import pika
import time
import json
from config import RABBITMQ_NODES, QUEUE_CONFIGdef monitor_queue():while True:try:connection = pika.BlockingConnection(pika.URLParameters(random.choice(RABBITMQ_NODES)))channel = connection.channel()# 獲取隊列狀態queue = channel.queue_declare(queue=QUEUE_CONFIG['queue_name'],passive=True)messages_ready = queue.method.message_countprint(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 積壓消息: {messages_ready}")connection.close()time.sleep(5)except Exception as e:print(f"監控錯誤: {str(e)}")time.sleep(10)if __name__ == "__main__":monitor_queue()

7、運行壓測程序:

python controller.py

8、結果展示

RabbitMQ集群壓測分析報告
生成時間: 2025-08-07 13:19:23
日志文件: C:\Users\Administrator\PycharmProjects\pythonProject13\rabbitmq\logs\consumer_stress_test.log
一、壓測總體統計
--------------------------------------------------
測試開始時間: 2025-08-07 13:09:27
測試結束時間: 2025-08-07 13:19:21
測試持續時間: 594.00 秒
總消費消息數: 137,000 條
平均吞吐量: 230.64 條/秒
最小延遲: 137895.78 毫秒
平均延遲: 673436.15 毫秒
最大延遲: 4793312.15 毫秒
90%延遲: 1471724.25 毫秒二、性能分析結論
--------------------------------------------------
吞吐量評估: 需優化 (<2000條/秒)
延遲評估: 需優化 (>200ms)
穩定性評估: 90%延遲是平均延遲的 2.2 倍
穩定性評估: 系統穩定性良好三、優化建議
--------------------------------------------------
1. 增加消費者數量以提高吞吐量
2. 提高預取值(prefetch count)優化消費速度
3. 檢查網絡延遲,確保RabbitMQ集群節點間通信正常
4. 使用消息批量處理減少網絡開銷
5. 優化消費者代碼,減少處理時間四、性能圖表
--------------------------------------------------
性能圖表已保存到: C:\Users\Administrator\Desktop\RabbitMQ壓測性能圖表.png

圖表展示
在這里插入圖片描述
這個方案有個缺陷就是測試完成后 還會有很多message堆積在那里,沒有被消費掉,所以我做了一個消費腳本,內容如下:

import pika
import threading
import random
import time
from config import RABBITMQ_NODES, QUEUE_CONFIG# 連接池管理
connection_pool = []
lock = threading.Lock()def get_connection():"""獲取或創建連接"""with lock:if connection_pool:return connection_pool.pop()else:node = random.choice(RABBITMQ_NODES)return pika.BlockingConnection(pika.URLParameters(node))def release_connection(connection):"""釋放連接到連接池"""with lock:if connection.is_open:connection_pool.append(connection)def consume_fast():"""高效消費消息的函數"""try:# 從連接池獲取連接connection = get_connection()channel = connection.channel()# 設置高預取值channel.basic_qos(prefetch_count=1000)# 定義消息處理回調def callback(ch, method, properties, body):# 這里可以添加實際的消息處理邏輯# 例如: process_message(body)# 立即確認消息(更安全)ch.basic_ack(delivery_tag=method.delivery_tag)# 開始消費channel.basic_consume(queue=QUEUE_CONFIG['queue_name'],on_message_callback=callback,auto_ack=False  # 手動確認)print(f"[{threading.current_thread().name}] 快速消費者啟動...")channel.start_consuming()except Exception as e:print(f"消費者錯誤: {str(e)}")finally:# 釋放連接回連接池if 'connection' in locals() and connection.is_open:release_connection(connection)if __name__ == "__main__":# 初始化連接池for _ in range(10):  # 創建10個初始連接node = random.choice(RABBITMQ_NODES)connection_pool.append(pika.BlockingConnection(pika.URLParameters(node)))print(f"已創建 {len(connection_pool)} 個初始連接")# 啟動多個快速消費者線程threads = []for i in range(20):  # 啟動20個消費者線程t = threading.Thread(target=consume_fast,name=f"FastConsumer-{i + 1}",daemon=True)t.start()threads.append(t)time.sleep(0.1)  # 避免同時啟動所有線程print("所有消費者線程已啟動,按 Ctrl+C 停止...")# 監控積壓消息try:while True:# 檢查隊列積壓try:connection = get_connection()channel = connection.channel()queue = channel.queue_declare(queue=QUEUE_CONFIG['queue_name'],passive=True)backlog = queue.method.message_countprint(f"當前積壓消息: {backlog}")release_connection(connection)# 如果積壓減少,可以考慮減少消費者if backlog < 1000:print("積壓已減少,可以安全停止腳本")breakexcept Exception as e:print(f"監控錯誤: {str(e)}")time.sleep(5)  # 每5秒檢查一次except KeyboardInterrupt:print("\n正在停止消費者...")# 清理print("關閉所有連接...")for conn in connection_pool:if conn.is_open:conn.close()

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/92439.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/92439.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/92439.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【第7話:相機模型3】自動駕駛IPM圖像投影拼接技術詳解及代碼示例

IPM圖像投影拼接技術詳解 IPM&#xff08;逆透視映射&#xff09;圖像投影拼接技術是一種在計算機視覺中廣泛應用的圖像處理方法&#xff0c;主要用于將多個透視視圖的圖像轉換為鳥瞰視圖并拼接成一個無縫的大場景圖像。該技術特別適用于自動駕駛、機器人導航和監控系統等領域&…

【測試工程思考】測試自動化基礎能力建設

1 回顧 傳統軟件研發體系下定義的軟件測試是從用戶視角設計的。測試是試圖窮盡用戶行為的工程&#xff0c;從測試用例&#xff08;use case&#xff09;的英文定義就可見一般。測試的邏輯資產就是用自然語言去描述用戶的操作行為或路徑。 但隨著軟件工程向分布式架構和敏捷交付…

進階向:AI聊天機器人(NLP+DeepSeek API)

什么是AI聊天機器人? AI聊天機器人是一種通過自然語言處理(NLP)技術模擬人類對話的智能程序系統。其核心是建立在機器學習算法和大型語言模型基礎上的對話引擎,能夠理解用戶的自然語言輸入,分析語境和意圖,并生成符合上下文的相關回復。 這類機器人系統通常包含以下幾個…

一個C#的段子

猜猜按鈕的結果是啥。 public partial class Form1 : Form { public Form1() { InitializeComponent(); } private void Form1_Load(object sender, EventArgs e) { } public static bool flag true; privat…

使用 gptqmodel 量化 Qwen3-Coder-30B-A3B-Instruct

代碼部分 : quantize_qwen3_coder_30b_a3b_instruct_gptq.py import os########## 環境變量設置 ########## # 當前可用的 CUDA 編號 os.environ["CUDA_VISIBLE_DEVICES"] "1" # GPU 顯存資源片段優化 os.environ["PYTORCH_CUDA_ALLOC_CONF"] …

基于python、django的疫苗接種管理系統

基于python、django的疫苗接種管理系統

Go語言實戰案例:使用sync.Map構建線程安全map

在并發編程中&#xff0c;共享資源的訪問是一個繞不開的問題。Go 中的 map 在并發讀寫時是不安全的&#xff0c;直接使用可能導致程序 panic。因此&#xff0c;在多協程同時訪問 Map 的場景下&#xff0c;必須采取有效的同步措施。本篇將通過一個實戰案例&#xff0c;介紹 Go 的…

關于vue2中對接海康攝像頭以及直播流rtsp或rtmp,后臺ffmpeg轉碼后通過ws實現

最近項目中需要對接攝像頭監控&#xff0c;海康攝像頭為rtsp流格式有一個軟件VLC media player&#xff0c;可以在線進行rtsp或者rtmp流播放&#xff0c;可用來測試流地址是否可用功能實現思路為后臺通過fmpeg把rtsp流進行轉碼&#xff0c;然后通過ws方式進行一幀一幀推送。&am…

Docker容器強制刪除及文件系統修復完整指南

Docker容器強制刪除及文件系統修復完整指南 故障現象與原因分析 ?故障表現?&#xff1a; ERROR: for c9ca40be974d_OpIsosMD_OB unable to remove filesystem unlinkat /data/docker/storage/containers/c9ca40be974d...: structure needs cleaning?根本原因?&#xff1a;…

Matplotlib 知識點總結

1. 基礎繪圖&#xff08;plot函數&#xff09;基本語法&#xff1a;plot([x], y, [fmt], [x2], y2, [fmt2], ..., **kwargs)功能特點&#xff1a;可繪制點、線和組合圖形自動生成x軸&#xff08;0-N-1&#xff09;當x未指定時示例&#xff1a;繪制兩點連線、多點不規則線等代碼…

高可用微服務架構實戰:Nacos集群+Nginx負載均衡,Spring Cloud無縫對接

"當你的注冊中心掛了&#xff0c;整個微服務就變成了無頭蒼蠅。" 這是我在生產環境踩坑后最痛的領悟。今天&#xff0c;我將分享如何用Nacos集群Nginx搭建堅如磐石的注冊中心&#xff0c;讓你的微服務永不迷路&#xff01; 在 Windows 環境下配置 Nacos 集群&#x…

Spark大數據處理實戰指南

Spark 簡介 Apache Spark 是一個開源的分布式計算框架,專為大規模數據處理而設計。它通過內存計算和優化的執行引擎顯著提升了數據處理速度,適用于批處理、實時流處理、機器學習和圖計算等場景。 核心特性 高性能:利用內存計算(In-Memory Processing)減少磁盤 I/O,比傳…

瀏覽器緩存機制全解析:強緩存與協商緩存

瀏覽器緩存是瀏覽器為提升頁面加載速度、減少服務器壓力和節省網絡帶寬&#xff0c;在本地存儲資源&#xff08;如 HTML、CSS、JS、圖片等&#xff09;的機制。其核心分為強緩存和協商緩存&#xff0c;并涉及多種 HTTP 頭字段和存儲位置。以下是詳細解析&#xff1a;?? 一、緩…

知識隨記-----Qt 實用技巧:自定義倒計時按鈕防止用戶頻繁點擊

Qt 技巧&#xff1a;實現自定義倒計時按鈕防止用戶頻繁點擊注冊 項目場景 在一個基于 Qt 開發的聊天應用中&#xff0c;用戶注冊時需要獲取驗證碼。為防止用戶頻繁點擊獲取驗證碼按鈕&#xff0c;需要實現一個倒計時功能&#xff0c;用戶點擊后按鈕進入倒計時狀態&#xff0c;倒…

Linux與Windows應急響應

本人首先進行了linux的應急響應&#xff0c;windows之后再進行 Linux與Windows應急響應初體驗1 linux應急響應1.1 賬戶&#xff1a;1.1.1 使用cat /etc/passwd命令查看passwd文件2.1.2 使用cat /etc/shadow命令查找shadow文件&#xff0c;該文件為密碼文件的存儲項1.2 入侵排查…

計算機網絡1-4:計算機網絡的定義和分類

目錄 計算機網絡的定義 計算機網絡的分類 計算機網絡的定義 計算機網絡的分類 按交換技術分類&#xff1a;電路交換網絡、報文交換網絡、分組交換網絡 按使用者分類&#xff1a;公用網、專用網 按傳輸介質分類&#xff1a;有線網絡、無線網絡 按覆蓋范圍分類&#xff1a;…

在QT中動態添加/刪除控件,伸縮因子該怎么處理

開發中遇到的問題[TOC](開發中遇到的問題)處理方式在我們的界面開發過程中&#xff0c;通常需要開發一些可以動態添加or刪除控件的容器&#xff0c;類似Tab頁一樣&#xff0c;為了美觀的話&#xff0c;我們通常使用伸縮因子將容器中的控件往一個方向擠&#xff0c;類似下面的控…

【設計模式精解】什么是代理模式?徹底理解靜態代理和動態代理

目錄 靜態代理 動態代理 JDK動態代理 CGLIB代理 JDK動態代理和CGLIB代理的區別 總結 代理模式簡單來說就是 我們使用代理對象來代替對真實對象(real object)的訪問&#xff0c;這樣就可以在不修改原目標對象的前提下&#xff0c;擴展目標對象的功能。 代理模式有靜態代理…

MCU AI/ML - 彌合智能和嵌入式系統之間的差距

作者&#xff1a;芯科科技產品營銷高級經理Gopinath Krishniah 人工智能&#xff08;AI&#xff09;和機器學習&#xff08;ML&#xff09;是使系統能夠從數據中學習、進行推理并隨著時間的推移提高性能的關鍵技術。這些技術通常用于大型數據中心和功能強大的GPU&#xff0c;但…

Redis中的sdshdr的len和alloc那塊的知識點詳解

文章目錄核心比喻&#xff1a;一個可以伸縮的水瓶場景一&#xff1a;創建一個新字符串場景二&#xff1a;追加字符串&#xff08;觸發“空間預分配”&#xff09;場景三&#xff1a;再次追加字符串&#xff08;利用空閑空間&#xff09;場景四&#xff1a;縮短字符串&#xff0…