目錄
python tcp 框架 asyncio
websockets
python tcp 框架 asyncio
import asyncio
import json
import timeclass TCPClient:def __init__(self, host, port, heartbeat_interval=10):self.host = hostself.port = portself.heartbeat_interval = heartbeat_intervalself.reader = Noneself.writer = Noneself.connected = Falseself.last_recv_time = time.time()async def connect(self):while True:try:print(f"正在連接 {self.host}:{self.port} ...")self.reader, self.writer = await asyncio.open_connection(self.host, self.port)self.connected = Trueprint("? 已連接服務器")asyncio.create_task(self.send_heartbeat())asyncio.create_task(self.receive_loop())breakexcept Exception as e:print(f"連接失敗: {e},3秒后重試")await asyncio.sleep(3)async def send_heartbeat(self):while self.connected:try:await self.send({"type": "heartbeat"})await asyncio.sleep(self.heartbeat_interval)except Exception as e:print(f"心跳發送失敗: {e}")self.connected = Falsebreakasync def receive_loop(self):try:while self.connected:data = await self.reader.readline()if not data:print("? 服務器斷開連接")self.connected = Falsebreakself.last_recv_time = time.time()try:msg = json.loads(data.decode())self.on_message(msg)except json.JSONDecodeError:print(f"收到非JSON數據: {data}")except Exception as e:print(f"接收出錯: {e}")finally:self.connected = Falseawait self.connect() # 自動重連def on_message(self, msg):"""收到消息時觸發(你可以改成事件回調)"""print(f"📩 收到消息: {msg}")async def send(self, obj):if self.writer and not self.writer.is_closing():line = json.dumps(obj) + "\n"self.writer.write(line.encode())await self.writer.drain()else:print("? 未連接,無法發送")# === 創建全局客戶端實例 ===
client = TCPClient("127.0.0.1", 8888)async def main():await client.connect()# === 在任意地方調用發送 ===
async def send_message():await client.send({"type": "chat", "msg": "Hello Server"})if __name__ == "__main__":loop = asyncio.get_event_loop()loop.create_task(main())# 模擬3秒后在別的地方發消息loop.call_later(3, lambda: asyncio.create_task(send_message()))loop.run_forever()
websockets
安裝依賴
pip install websockets
示例代碼?python
編輯
import asyncio
import websockets
import json
import threadingSERVER_URI = "ws://127.0.0.1:8765"# 全局 websocket 引用,用于在其他地方發消息
ws_conn = Noneasync def heartbeat(ws):"""定時發送心跳包"""while True:try:await ws.send(json.dumps({"type": "ping"}))except Exception as e:print("心跳發送失敗:", e)breakawait asyncio.sleep(5) # 心跳間隔async def listen_messages(ws):"""監聽服務器消息"""try:async for message in ws:data = json.loads(message)print("收到消息:", data)except websockets.ConnectionClosed:print("連接已關閉")async def send_message(data):"""在其他地方調用的發消息方法"""global ws_connif ws_conn:await ws_conn.send(json.dumps(data))else:print("未連接服務器,無法發送")async def main():global ws_connasync with websockets.connect(SERVER_URI) as websocket:ws_conn = websocket# 并發執行 心跳 和 收消息await asyncio.gather(heartbeat(websocket),listen_messages(websocket))def start_client():"""啟動 WebSocket 客戶端"""asyncio.run(main())def send_from_other_thread(msg):"""從其他線程發送消息"""asyncio.run(send_message({"type": "chat", "text": msg}))if __name__ == "__main__":# 啟動 WebSocket 客戶端(獨立線程)t = threading.Thread(target=start_client, daemon=True)t.start()# 等待連接建立import timetime.sleep(2)# 從主線程模擬發送消息send_from_other_thread("Hello from main thread!")# 防止主線程退出t.join()