WebSocket是一種網絡通信協議,它在單個TCP連接上提供全雙工的通信信道。在本篇文章中,我們將探討如何在Python中使用WebSocket實現實時通信。
websockets是Python中最常用的網絡庫之一,也是websocket協議的Python實現。它不僅作為基礎組件在眾多項目中發揮著重要作用,其源碼也值得廣大“Python玩家”研究。
官網:https://github.com/python-websockets/websockets
1. 什么是WebSocket?
WebSocket協議是在2008年由Web應用程序設計師和開發人員創建的,目的是為了在Web瀏覽器和服務器之間提供更高效、更低延遲的雙向通信。它允許客戶端和服務器在任何時候發送消息,無需重新建立TCP連接。WebSocket可以在Web瀏覽器和服務器之間傳輸文本和二進制數據,使得構建實時Web應用程序變得更加簡單。
2. 在Python中使用WebSocket
Python中有多個庫可以幫助我們使用WebSocket,如:websockets、aiohttp等。在本文中,我們將使用websockets庫來演示WebSocket編程。
要安裝websockets庫,你可以使用pip:
pip install websockets
3. 創建WebSocket服務器
使用websockets庫,我們可以輕松地創建一個WebSocket服務器。以下是一個簡單的示例:
import asyncio
import websocketsasync def echo(websocket, path):async for message in websocket:print(f"Received message: {message}")await websocket.send(f"Echo: {message}")start_server = websockets.serve(echo, "localhost", 8765)asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
在這個示例中,我們定義了一個名為echo的協程函數,它接收兩個參數:websocket和path。該函數使用async for循環讀取客戶端發送的消息,并將消息發送回客戶端。
然后,我們使用websockets.serve()函數創建一個WebSocket服務器,監聽本地主機的8765端口。最后,我們使用asyncio的事件循環啟動服務器。
4. 創建WebSocket客戶端
要創建一個WebSocket客戶端,我們同樣可以使用websockets庫。以下是一個簡單的客戶端示例:
import asyncio
import websocketsasync def main():async with websockets.connect("ws://localhost:8765") as websocket:message = "Hello, server!"await websocket.send(message)print(f"Sent: {message}")response = await websocket.recv()print(f"Received: {response}")asyncio.run(main())
在這個示例中,我們使用websockets.connect()函數建立與WebSocket服務器的連接。然后,我們使用send()方法向服務器發送消息,并使用recv()方法接收服務器的響應。
5. 總結
WebSocket協議為Web瀏覽器和服務器之間提供了實時雙向通信的能力,使得構建實時Web應用程序變得更加容易。在Python中,我們可以使用websockets庫輕松地實現WebSocket編程。
6. 通過websockets這個項目,從大型開源項目中學習asyncio庫。
一、asyncio.Transport
在官方文檔中,Transport被描述成對socket的抽象,它控制著如何傳輸數據。除了websockets,uvicorn、daphne等ASGI實現都會用到Transport。
Transport繼承于ReadTransport和WriteTransport,兩者都繼承于BaseTransport。顧名思義,Transport兼備讀和寫的功能,可以類比為讀寫socket對象。
Transport對象提供以下常用函數——
is_reading:判斷該Transport是否在讀。
set_write_buffer_limits:設置寫入Transport的高和低水位。考慮到網絡狀況,有時不希望寫入過多的數據。
write、write_eof、write_line:為當前Transport寫入數據,分別表示寫入二進制數據、eof和二進制行數據。其中eof寫入后不會關閉Transport,但會flush數據。
abort:立刻關閉Transport,不接受新的數據。留在緩沖的數據也會丟失,后續調用Protocol的connection_lost函數。
在websockets中,Transport使用場景不多,一般都是通過Protocol對象的回調參數使用的。在websocket的初始化過程中,會設置Transport的最高水位。同樣,在這種場景下,該對象也是作為回調參數使用的。
二、asyncio.Protocol
如果Transport是對socket的抽象,那么Protocol就是對協議的抽象。它提供了如何使用Transport的方式。
用戶使用的Protocol直接繼承自BaseProtocol,并提供了六個Unimplemented函數需要用戶去實現——
connection_made:當連接建立時會執行該函數,該函數包含一個Transport類型的參數。
connection_lost:當連接丟失或者關閉時會執行該函數,該函數包含一個Exception類型的參數。
pause_writing:當Transport對象寫入的數據高于之前設置的高水位時被調用,一般會暫停數據的寫入。
resume_writing:當Transport對象寫入的數據低于之前設置的低水位時被調用,一般用于恢復數據寫入。
data_received:當有數據被接受時回調,該函數包含一個二進制對象data,用來表示接受的數據。
eof_received:當被Transport對象被調用write_eof時被調用。
在websockets中,server端的connection_made實現截圖如圖所示。在該函數中,websockets將用戶實現的handler封裝成task對象,并和websocket的server綁定。
而在client端中實現如第一節截圖所示,只是在reader中注冊該Transport對象。
websockets的connection_lost函數實現方式如下。主要操作即更新狀態、關閉pings、更新對應的waiter狀態,以及維護reader對象。
在其他函數的實現中,websockets也主要用到了reader對象完成數據流的暫停和恢復,以及數據的寫入。
從上面代碼實現可以看出,websockets通過reader代理完成數據流的操作。這個reader是一個asyncio.StreamReader對象。這個對象具體如何使用將在下一篇介紹。
附錄:進階版本:
python使用websockets庫
serve:在server端使用,等待客戶端的連接。如果連接成功,返回一個websocket。
connect: 在client端使用,用于建立連接。
send:發送數據
recv:接收數據
close:關閉連接
服務端
#!/usr/bin/python3
# 主要功能:創建1個基本的websocket server, 符合asyncio 開發要求
import asyncio
import websockets
from datetime import datetimeasync def handler(websocket):data = await websocket.recv()reply = f"Data received as \"{data}\". time: {datetime.now()}"print(reply)await websocket.send(reply)print("Send reply")async def main():async with websockets.serve(handler, "localhost", 9999):await asyncio.Future() # run foreverif __name__ == "__main__":asyncio.run(main())
客戶端
import asyncio
import websockets
import timeasync def ws_client(url):for i in range(1, 40):async with websockets.connect(url) as websocket:await websocket.send("Hello, I am PyPy.")response = await websocket.recv()print(response)time.sleep(1)asyncio.run(ws_client('ws://localhost:9999'))
服務端
import asyncio
import websocketsIP_ADDR = "127.0.0.1"
IP_PORT = "9090"# 握手,通過接收Hi,發送"success"來進行雙方的握手。
async def serverHands(websocket):while True:recv_text = await websocket.recv()print("recv_text=" + recv_text)if recv_text == "Hi":print("connected success")await websocket.send("success")return Trueelse:await websocket.send("connected fail")# 接收從客戶端發來的消息并處理,再返給客戶端success
async def serverRecv(websocket):while True:recv_text = await websocket.recv()print("recv:", recv_text)await websocket.send("success,get mess:"+ recv_text)# 握手并且接收數據
async def serverRun(websocket, path):print(path)await serverHands(websocket)await serverRecv(websocket)# main function
if __name__ == '__main__':print("======server======")server = websockets.serve(serverRun, IP_ADDR, IP_PORT)asyncio.get_event_loop().run_until_complete(server)asyncio.get_event_loop().run_forever()
客戶端
import asyncio
import websocketsIP_ADDR = "127.0.0.1"
IP_PORT = "9090"async def clientHands(websocket):while True:# 通過發送hello握手await websocket.send("Hi")response_str = await websocket.recv()# 接收"success"來進行雙方的握手if "success" in response_str:print("握手成功")return True# 向服務器端發送消息
async def clientSend(websocket):while True:input_text = input("input text: ")if input_text == "exit":print(f'"exit", bye!')await websocket.close(reason="exit")return Falseawait websocket.send(input_text)recv_text = await websocket.recv()print(f"{recv_text}")# 進行websocket連接
async def clientRun():ipaddress = IP_ADDR + ":" + IP_PORTasync with websockets.connect("ws://" + ipaddress) as websocket:await clientHands(websocket)await clientSend(websocket)# main function
if __name__ == '__main__':print("======client======")asyncio.get_event_loop().run_until_complete(clientRun())
服務端
# -*- coding:utf8 -*-import json
import socket
import asyncio
import logging
import websockets
import multiprocessingIP = '127.0.0.1'
PORT_CHAT = 9090USERS ={}#提供聊天的后臺
async def ServerWs(websocket,path):logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',filename="chat.log",level=logging.INFO)# 握手await websocket.send(json.dumps({"type": "handshake"}))async for message in websocket:data = json.loads(message)message = ''# 用戶發信息if data["type"] == 'send':name = '404'for k, v in USERS.items():if v == websocket:name = kdata["from"] = nameif len(USERS) != 0: # asyncio.wait doesn't accept an empty listmessage = json.dumps({"type": "user", "content": data["content"], "from": name})# 用戶注冊elif data["type"] == 'register':try:USERS[data["uuid"]] = websocketif len(USERS) != 0: # asyncio.wait doesn't accept an empty listmessage = json.dumps({"type": "login", "content": data["content"], "user_list": list(USERS.keys())})except Exception as exp:print(exp)# 用戶注銷elif data["type"] == 'unregister':del USERS[data["uuid"]]if len(USERS) != 0: # asyncio.wait doesn't accept an empty listmessage = json.dumps({"type": "logout", "content": data["content"], "user_list": list(USERS.keys())})#打印日志logging.info(data)# 群發await asyncio.wait([user.send(message) for user in USERS.values()])def server_run():print("server")start_server = websockets.serve(ServerWs, '0.0.0.0', PORT_CHAT)asyncio.get_event_loop().run_until_complete(start_server)asyncio.get_event_loop().run_forever()if __name__ == "__main__":from multiprocessing import Processmultiprocessing.freeze_support()server = Process(target=server_run, daemon=False)server.start()
服務端
import asyncio
import websockets
import time
import json
import threading
# 功能模塊
class OutputHandler():async def run(self,message,send_ms,websocket):# 用戶發信息await send_ms(message, websocket)# 單發消息# await send_ms(message, websocket)# 群發消息#await s('hi起來')# 存儲所有的客戶端
Clients = {}# 服務端
class WS_Server():def __init__(self):self.ip = "127.0.0.1"self.port = 9090# 回調函數(發消息給客戶端)async def callback_send(self, msg, websocket=None):await self.sendMsg(msg, websocket)# 發送消息async def sendMsg(self, msg, websocket):print('sendMsg:', msg)# websocket不為空,單發,為空,群發消息if websocket != None:await websocket.send(msg)else:# 群發消息await self.broadcastMsg(msg)# 避免被卡線程await asyncio.sleep(0.2)# 群發消息async def broadcastMsg(self, msg):for user in Clients:await user.send(msg)# 針對不同的信息進行請求,可以考慮json文本async def runCaseX(self,jsonMsg,websocket):print('runCase')op = OutputHandler()# 參數:消息、方法、socketawait op.run(jsonMsg,self.callback_send,websocket)# 連接一個客戶端,起一個循環監聽async def echo(self,websocket, path):# 添加到客戶端列表# Clients.append(websocket)# 握手await websocket.send(json.dumps({"type": "handshake"}))# 循環監聽while True:# 接受信息try:# 接受文本recv_text = await websocket.recv()message = "Get message: {}".format(recv_text)# 返回客戶端信息await websocket.send(message)# 轉jsondata = json.loads(recv_text)# 用戶發信息if data["type"] == 'send':name = '404'for k, v in Clients.items():if v == websocket:name = kdata["from"] = nameif len(Clients) != 0: # asyncio.wait doesn't accept an empty listmessage = json.dumps({"type": "send", "content": data["content"], "from": name})await self.runCaseX(jsonMsg=message, websocket=websocket)# 用戶注冊elif data["type"] == 'register':try:Clients[data["uuid"]] = websocketif len(Clients) != 0: # asyncio.wait doesn't accept an empty listmessage = json.dumps({"type": "register", "content": data["content"], "user_list": list(Clients.keys())})await self.runCaseX(jsonMsg=message, websocket=websocket)except Exception as exp:print(exp)# 用戶注銷elif data["type"] == 'unregister':del Clients[data["uuid"]]# 對message進行解析,跳進不同功能區# await self.runCaseX(jsonMsg=data,websocket=websocket)# 鏈接斷開except websockets.ConnectionClosed:print("ConnectionClosed...", path)# del Clientsbreak# 無效狀態except websockets.InvalidState:print("InvalidState...")# del Clientsbreak# 報錯except Exception as e:print("ws連接報錯",e)# del Clientsbreak# 啟動服務器async def runServer(self):async with websockets.serve(self.echo, self.ip, self.port):await asyncio.Future() # run forever# 多協程模式,防止阻塞主線程無法做其他事情def WebSocketServer(self):asyncio.run(self.runServer())# 多線程啟動def startServer(self):# 多線程啟動,否則會堵塞thread = threading.Thread(target=self.WebSocketServer)thread.start()# thread.join()if __name__=='__main__':print("server")s = WS_Server()s.startServer()