1.服務端
"""
@File: rabbitmq_server.py
@Date: 2025/6/26 10:42
@Author: xxx
@Description:
1. RabbitMQ服務端,支持多節點命令執行
2. 作為被控節點運行,可接收定向命令并返回結果
""" import ssl
import pika
import time
import json
import socket
import logging
import subprocess
import configparser
logger = logging. getLogger( )
logger. setLevel( logging. DEBUG)
file_handler = logging. FileHandler( 'rabbitmq_server.log' , encoding= 'utf-8' )
file_handler. setLevel( logging. DEBUG)
file_handler. setFormatter( logging. Formatter( '%(asctime)s - %(levelname)s - %(lineno)04d - %(message)s' , datefmt= "%Y-%m-%d %H:%M:%S" ) )
logger. addHandler( file_handler)
console_handler = logging. StreamHandler( )
console_handler. setLevel( logging. INFO)
console_handler. setFormatter( logging. Formatter( '%(asctime)s - %(levelname)s - %(lineno)04d - %(message)s' , datefmt= "%Y-%m-%d %H:%M:%S" ) )
logger. addHandler( console_handler) RABBITMQ_HOST_CONF = "/etc/rabbitmq/rabbitmq.conf" class RabbitMQServer : """RabbitMQ RPC服務器類功能:接收并執行來自客戶端的定向命令""" def __init__ ( self, node_name= None , mq_user= "rabbitmq" , mq_password= "rabbitmq@123" , mq_virtual_host= "/" , mq_host= None , mq_port= 5671 , mq_ca= "/opt/ssl/ca_certificate.pem" ) : """初始化RabbitMQ服務端:param node_name: 節點名稱標識(唯一):param mq_user: RabbitMQ用戶名:param mq_password: RabbitMQ密碼:param mq_virtual_host: 虛擬主機:param mq_host: RabbitMQ服務器IP:param mq_port: RabbitMQ服務端口:param mq_ca: SSL證書路徑""" self. NODE_NAME = node_name if node_name else socket. gethostname( ) self. RABBITMQ_USER = mq_userself. RABBITMQ_UNLOCK_CODE = mq_passwordself. RABBITMQ_VIRTUAL_HOST = mq_virtual_hostself. RABBITMQ_HOST = mq_host if mq_host else self. get_option( RABBITMQ_HOST_CONF, "global" , "rabbitmq_host" ) self. RABBITMQ_PORT = mq_portself. SSL_CA_PATH = mq_caself. _setup_connection( ) def get_option ( self, file_path, section, option) : """獲取 file_path 配置項值,若配置文件沒有,返回空字符串:param section: section字符串,例如:'global':param option: key值,例如:'manage_nodes':return: 字符串類型數據""" parser = configparser. ConfigParser( ) parser. read( file_path) if not parser. has_option( section, option) : return "" else : return parser. get( section, option) def _get_ssl_options ( self) : """配置SSL安全連接選項""" context = ssl. SSLContext( ssl. PROTOCOL_TLSv1_2) context. load_verify_locations( self. SSL_CA_PATH) return pika. SSLOptions( context, "localhost" ) def _setup_connection ( self) : """建立RabbitMQ連接并設置隊列""" credentials = pika. PlainCredentials( self. RABBITMQ_USER, self. RABBITMQ_UNLOCK_CODE ) connection_params = pika. ConnectionParameters( host= self. RABBITMQ_HOST, port= self. RABBITMQ_PORT, virtual_host= self. RABBITMQ_VIRTUAL_HOST, credentials= credentials, ssl_options= self. _get_ssl_options( ) , heartbeat= 600 ) self. connection = pika. BlockingConnection( connection_params) self. channel = self. connection. channel( ) self. channel. queue_declare( queue= self. NODE_NAME, durable= True ) self. channel. basic_qos( prefetch_count= 1 ) self. channel. basic_consume( queue= self. NODE_NAME, on_message_callback= self. _execute_command, auto_ack= False ) def _execute_command ( self, ch, method, props, body) : """執行接收到的命令并返回結果""" try : message = json. loads( body. decode( 'utf-8' ) ) command = message. get( 'command' , '' ) target = message. get( 'target' , '' ) logger. info( f" [x] 收到( { target} )命令: { command} " ) if target != self. NODE_NAME: logger. warning( f" [x] 收到非本節點( { self. NODE_NAME} )命令,已忽略" ) ch. basic_ack( delivery_tag= method. delivery_tag) return logger. info( f" [*] 執行命令 【 { command} 】..." ) try : output = subprocess. check_output( command, shell= True , stderr= subprocess. STDOUT, timeout= 60 ) response = output. decode( 'utf-8' ) except subprocess. TimeoutExpired: response = "Error: Command timed out" except subprocess. CalledProcessError as e: response = f"Error: { e. output. decode( 'utf-8' ) } " except Exception as e: response = f"System Error: { str ( e) } " ch. basic_publish( exchange= '' , routing_key= props. reply_to, properties= pika. BasicProperties( correlation_id= props. correlation_id, delivery_mode= 2 ) , body= response. encode( 'utf-8' ) ) logger. info( f" [*] 命令執行完成" ) ch. basic_ack( delivery_tag= method. delivery_tag) except Exception as e: logger. exception( f" [x] 消息處理異常: { str ( e) } " ) ch. basic_nack( delivery_tag= method. delivery_tag) def start ( self, max_retries= 5 , retry_delay= 10 ) : """啟動RabbitMQ服務并持續監聽消息功能:管理服務生命周期,處理連接異常和重試邏輯:param max_retries: 最大重試次數,默認5次:param retry_delay: 重試間隔時間(秒),默認10秒:return:""" retry_count = 0 while True : try : logger. info( f" [*] { self. NODE_NAME} 節點服務啟動 (嘗試 { retry_count + 1 } / { max_retries} )" ) logger. info( f" [*] 等待隊列 { self. NODE_NAME} 中的請求..." ) if not hasattr ( self, 'connection' ) or self. connection. is_closed: self. _setup_connection( ) self. channel. start_consuming( ) except pika. exceptions. AMQPConnectionError as e: retry_count += 1 logger. exception( f"連接失敗: { str ( e) } " ) if retry_count >= max_retries: logger. error( " [x] 達到最大重試次數,終止服務" ) self. close( ) break logger. warning( f" [*] { retry_delay} 秒后嘗試重新連接..." ) time. sleep( retry_delay) except KeyboardInterrupt: logger. error( "\n [x] 接收到終止信號" ) self. close( ) logger. error( " [x] 服務已停止" ) break except Exception as e: logger. exception( f"服務異常: { str ( e) } " ) time. sleep( retry_delay) def close ( self) : """安全關閉RabbitMQ連接功能:清理資源,確保連接被正確關閉:return:""" if hasattr ( self, 'connection' ) and not self. connection. is_closed: self. connection. close( ) logger. info( " [x] 連接已安全關閉" ) if __name__ == '__main__' : server = RabbitMQServer( ) try : server. start( ) except KeyboardInterrupt: logger. error( "\n [x] 接收到終止信號" ) server. close( ) logger. error( " [x] 服務已停止" )
2.客戶端
"""
@File: rabbitmq_client.py
@Date: 2025/6/26 10:43
@Author: xxx
@Description:
1. RabbitMQ客戶端類,支持向指定節點發送SSH命令
2. 作為控制端運行,可定向發送命令并接收執行結果
""" import ssl
import pika
import time
import uuid
import json
import socket
import logging
import configparser
logger = logging. getLogger( )
logger. setLevel( logging. DEBUG)
file_handler = logging. FileHandler( 'rabbitmq_client.log' , encoding= 'utf-8' )
file_handler. setLevel( logging. DEBUG)
file_handler. setFormatter( logging. Formatter( '%(asctime)s - %(levelname)s - %(lineno)04d - %(message)s' , datefmt= "%Y-%m-%d %H:%M:%S" ) )
logger. addHandler( file_handler)
console_handler = logging. StreamHandler( )
console_handler. setLevel( logging. INFO)
console_handler. setFormatter( logging. Formatter( '%(asctime)s - %(levelname)s - %(lineno)04d - %(message)s' , datefmt= "%Y-%m-%d %H:%M:%S" ) )
logger. addHandler( console_handler) RABBITMQ_HOST_CONF = "/etc/rabbitmq/rabbitmq.conf" class RabbitMQClient : """RabbitMQ RPC客戶端類功能:向指定節點發送命令并獲取執行結果""" def __init__ ( self, mq_user= "rabbitmq" , mq_password= "rabbitmq@123" , mq_virtual_host= "/" , mq_host= None , mq_port= 5671 , mq_ca= "/opt/ssl/ca_certificate.pem" ) : """初始化RabbitMQ客戶端:param mq_user: RabbitMQ用戶名:param mq_password: RabbitMQ密碼:param mq_virtual_host: 虛擬主機:param mq_host: RabbitMQ服務器IP:param mq_port: RabbitMQ服務端口:param mq_ca: SSL證書路徑""" self. RABBITMQ_USER = mq_userself. RABBITMQ_UNLOCK_CODE = mq_passwordself. RABBITMQ_VIRTUAL_HOST = mq_virtual_hostself. RABBITMQ_HOST = mq_host if mq_host else self. get_option( RABBITMQ_HOST_CONF, "global" , "rabbitmq_host" ) self. RABBITMQ_PORT = mq_portself. SSL_CA_PATH = mq_caself. response = None self. corr_id = None logger. info( " [x] 正在建立連接 ..." ) self. _connect( ) logger. info( " [x] 連接建立成功" ) def get_option ( self, file_path, section, option) : """獲取 file_path 配置項值,若配置文件沒有,返回空字符串:param section: section字符串,例如:'global':param option: key值,例如:'manage_nodes':return: 字符串類型數據""" parser = configparser. ConfigParser( ) parser. read( file_path) if not parser. has_option( section, option) : return "" else : return parser. get( section, option) def _connect ( self) : """建立RabbitMQ連接并初始化回調隊列功能:配置安全連接參數、創建通信信道、設置消息回調處理:return:""" ssl_context = ssl. SSLContext( ssl. PROTOCOL_TLSv1_2) ssl_context. load_verify_locations( self. SSL_CA_PATH) ssl_options = pika. SSLOptions( ssl_context, "localhost" ) credentials = pika. PlainCredentials( self. RABBITMQ_USER, self. RABBITMQ_UNLOCK_CODE ) connection_params = pika. ConnectionParameters( host= self. RABBITMQ_HOST, port= self. RABBITMQ_PORT, virtual_host= self. RABBITMQ_VIRTUAL_HOST, credentials= credentials, ssl_options= ssl_options, heartbeat= 60 ) self. connection = pika. BlockingConnection( connection_params) self. channel = self. connection. channel( ) result = self. channel. queue_declare( queue= '' , exclusive= True ) self. callback_queue = result. method. queueself. channel. basic_consume( queue= self. callback_queue, on_message_callback= self. _on_response, auto_ack= False ) def _on_response ( self, ch, method, props, body) : """RPC模式下的響應消息回調處理函數功能:匹配并接收服務端返回的命令執行結果處理邏輯:1.通過correlation_id匹配對應的請求2.將二進制消息體解碼為字符串3.存儲結果供execute_command方法獲取:param ch: (pika.channel.Channel): 接收到消息的信道對象:param method: (pika.spec.Basic.Deliver): 包含投遞信息(如delivery_tag):param props: (pika.spec.BasicProperties): 消息屬性(含correlation_id等):param body: (bytes): 消息體內容(服務端返回的執行結果):return:""" try : if self. corr_id == props. correlation_id: self. response = body. decode( 'utf-8' ) except UnicodeDecodeError as e: self. response = f"解碼失敗: { str ( e) } " def execute_command ( self, command, target_node= None , timeout= 60 ) : """向指定RabbitMQ節點發送命令并獲取執行結果(RPC模式):param command (str): 要執行的shell命令字符串(如"ls -l"):param target_node (str): 目標節點標識,對應服務端的隊列名- 默認None表示發送到當前主機節點:param timeout (int): 等待響應的超時時間(秒),默認60秒:return str: 命令執行結果文本異常:TimeoutError: 超過指定時間未收到響應時拋出AMQP相關異常: 消息發送失敗時拋出向指定節點執行遠程命令""" self. response = None self. corr_id = str ( uuid. uuid4( ) ) if not target_node: target_node = socket. gethostname( ) message = { "command" : command, "target" : target_node, "timestamp" : time. time( ) } self. channel. basic_publish( exchange= '' , routing_key= target_node, properties= pika. BasicProperties( reply_to= self. callback_queue, correlation_id= self. corr_id, ) , body= json. dumps( message) . encode( 'utf-8' ) ) start_time = time. time( ) while self. response is None : self. connection. process_data_events( ) if time. time( ) - start_time > timeout: raise TimeoutError( f"等待節點 { target_node} 響應超時" ) time. sleep( 0.1 ) return self. response def close ( self) : """安全關閉RabbitMQ連接功能:1. 清理網絡連接資源2. 自動刪除臨時隊列(exclusive隊列)3. 防止資源泄漏:return:""" if self. connection and not self. connection. is_closed: self. connection. close( ) logger. warning( " [x] 連接已關閉" ) if __name__ == '__main__' : client = RabbitMQClient( ) try : nodes = [ "node247" , "node248" , "node249" ] for node in nodes: try : logger. info( f"\n向節點 { node} 執行命令: hostname" ) logger. info( client. execute_command( command= "hostname" , target_node= node) ) except Exception as e: logger. exception( f"節點 { node} 執行失敗: { str ( e) } " ) try : logger. info( f"\n向節點 { node} 執行命令: ls -l /opt/" ) logger. info( client. execute_command( command= "ls -l /opt/" , target_node= node) ) except Exception as e: logger. exception( f"節點 { node} 執行失敗: { str ( e) } " ) try : logger. info( f"\n向節點 { node} 執行命令: date" ) logger. info( client. execute_command( command= "date" , target_node= node) ) except Exception as e: logger. exception( f"節點 { node} 執行失敗: { str ( e) } " ) finally : client. close( )
3.調用結果
192.168 .120 .17 node17
192.168 .120 .18 node18
192.168 .120 .19 node19python3 rabbitmq_server. py
192.168 .120 .17 node17python3 rabbitmq_client. py