1. WebSocket
WebSocket
是一種在單個TCP
連接上進行全雙工通信的協議,它在現代 Web
開發和網絡應用中發揮著重要作用。在 WebSocket
出現之前,實現服務器與客戶端實時通信主要采用輪詢Polling
和長輪詢Long - Polling
等技術。輪詢是客戶端定時向服務器發送請求詢問是否有新數據;長輪詢則是客戶端發送請求后,服務器若沒有新數據就保持連接,直到有新數據才響應。這些方式存在效率低、資源消耗大等問題。為了解決這些問題,WebSocket
協議應運而生,它由 HTML5
標準定義,旨在提供一種高效、實時的雙向通信機制。
1.1 工作原理
Websocket
協議的主流版本為Websocket 13
,由RFC 6455
定義,基于TCP
協議實現。WebSocket
的實現原理基于其獨特的協議設計和通信機制,核心在于建立持久化的全雙工連接,突破傳統 HTTP
請求-響應模式的限制。以下是其實現原理的詳細解析:
-
握手階段【協議升級】:
WebSocket
通過HTTP
協議發起握手,升級到WebSocket
協議,過程如下:-
客戶端請求:客戶端發送
HTTP
請求,攜帶Upgrade: websocket
頭,表明希望升級到WebSocket
協議。GET /chat HTTP/1.1 Host: example.com Upgrade: websocket Connection: Upgrade Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ== # 隨機16字節 Base64 編碼,客戶端生成的隨機值,用于服務端驗證 Sec-WebSocket-Version: 13 // 指定協議版本 Origin: https://example.com
-
服務端響應:服務端驗證請求后返回
101 Switching Protocols
響應,完成協議升級。HTTP/1.1 101 Switching Protocols Upgrade: websocket Connection: Upgrade Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo= # 由客戶端 Key + GUID 哈希生成
-
握手完成:成功握手后,
TCP
連接保持打開狀態,后續通信直接使用Websocket
協議幀,不在依賴Http
。
-
-
數據傳輸階段:一旦連接升級成功,就建立了一個全雙工的
WebSocket
連接。客戶端和服務器可以在這個連接上隨時向對方發送數據,而不需要像HTTP
請求那樣每次都重新建立連接。 -
關閉連接階段:當一方想要關閉連接時,會發送一個關閉幀,另一方收到后也發送一個關閉幀進行確認,然后雙方關閉連接。
1.2 特點與應用場景
Websocket
一些有以下特點:
- 全雙工通信:客戶端和服務器可以同時、獨立地向對方發送數據,這使得實時通信變得更加高效。例如,在在線聊天應用中,用戶可以隨時發送和接收消息。
- 低延遲:由于避免了
HTTP
請求的開銷,WebSocket
連接建立后數據傳輸的延遲非常低,適合對實時性要求較高的應用,如金融交易系統、實時游戲等。 - 較少的控制開銷:
WebSocket
協議在連接建立后,只需要較少的控制信息來維持連接,相比于HTTP
請求,減少了額外的頭部信息,降低了帶寬消耗。 - 跨平臺兼容性:現代瀏覽器和大多數服務器端框架都支持
WebSocket
協議,使得開發者可以方便地在不同平臺上實現WebSocket
通信。
基于以上特點,能夠體現出Websocket
協議的優勢,以下是該協議的常用場景:
- 實時聊天應用:如在線客服系統、即時通訊軟件等,用戶可以實時發送和接收消息,實現高效的溝通。
- 實時數據更新:股票行情、體育賽事比分等需要實時更新數據的場景,服務器可以實時將最新數據推送給客戶端。
- 多人在線游戲:在多人在線游戲中,玩家的操作和狀態需要實時同步,
WebSocket
可以保證游戲的流暢性和實時性。 - 多人協作:多個用戶可以同時編輯同一個文檔,服務器實時將每個用戶的編輯操作同步給其他用戶。
2. Python
中使用Websocket
在 Python
中實現 WebSocket
協議通常借助第三方庫,主流選擇包括 websockets
【輕量異步】、Tornado
【異步框架】和 Django Channels
【集成 Django 生態】。
庫 | 特點 | 適用場景 |
---|---|---|
websockets | 輕量級異步庫,API 簡潔,支持 Python 3.6+ 。 | 快速搭建簡單 WebSocket 服務 |
Tornado | 異步網絡框架,內置 WebSocket 支持,適合復雜應用。 | 高性能實時服務,如聊天服務器 |
Django Channels | 集成 Django ,支持 WebSocket 、HTTP/2 等協議,需搭配 ASGI 服務器。 | Django 項目中的實時功能擴展 |
2.1 websockets API
websockets
是一個用于在 Python
中實現 WebSocket
協議的異步庫,適用于構建實時通信應用。安裝指令為pip install websockets
,該庫的常用API
如下:
-
服務端
API
-
websockets.serve()
:創建WebSocket
服務器。其參數如下:-
handler
:處理客戶端連接的異步函數,需接收websocket
和path
參數。 -
host
/port
:綁定地址和端口。 -
ping_interval
/ping_timeout
:心跳檢測間隔和超時時間,默認禁用。 -
origins
:允許的跨域來源,列表形式,如origins=["https://example.com"]
。 -
ssl
:SSL
上下文,用于wss://
安全連接。async def handler(websocket, path):passserver = websockets.serve(handler, "localhost", 8765, ping_interval=30)
-
-
WebSocketServer
對象:通過serve()
返回的服務器對象,通常用async with
管理生命周期。async with websockets.serve(...) as server:await server.wait_closed() # 阻塞直到服務器關閉
-
-
客戶端常用
API
-
websockets.connect()
:連接到WebSocket
服務器。其參數如下:-
uri
:服務器地址,如ws://localhost:8765
。 -
ping_interval
/ping_timeout
:客戶端心跳配置。 -
ssl
:用于HTTPS
的SSL
上下文。async with websockets.connect("ws://localhost:8765") as websocket:await websocket.send("Hello")
-
-
-
連接對象``WebSocketCommonProtocol
:無論是服務端還是客戶端的連接,均通過
websocket` 對象操作,核心方法如下。-
發送消息:
await websocket.send(message)
,發送文本或二進制數據。await websocket.send("Hello!") await websocket.send(json.dumps({"data": "test"}).encode())
-
接收消息:
await websocket.recv()
,接收一條文本或二進制消息。// 循環接收 async for message in websocket:print(f"Received: {message}")
-
關閉連接:
await websocket.close(code=1000, reason="")
,主動關閉連接,支持狀態碼和原因,如1000
【正常關閉】、1001
【服務器終止】、1002
【協議錯誤】。await websocket.close(code=1000, reason="Done")
-
連接狀態
websocket.open
:連接是否處于打開狀態。websocket.closed
:連接是否已關閉。
-
-
異常處理
-
websockets.exceptions.ConnectionClosed
:當連接意外時關閉拋出。try:await websocket.recv() except websockets.exceptions.ConnectionClosed as e:print(f"Connection closed: {e.code}, {e.reason}")
-
InvalidHandshake
:握手失敗,如服務器未實現WebSocket
。 -
InvalidMessage
:消息格式錯誤。
-
-
配置
SSL/TLS
【配置安全連接】-
服務端配置
import sslssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) ssl_context.load_cert_chain("server.crt", "server.key")server = websockets.serve(handler, "localhost", 8765, ssl=ssl_context)
-
客戶端配置
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ssl_context.load_verify_locations("server.crt")async with websockets.connect("wss://localhost:8765", ssl=ssl_context) as ws:pass
-
2.2 async
與await
在 Python
里,async
和 await
是異步編程的核心特性,它們基于 asyncio
庫實現,能讓代碼在處理 I/O
密集型任務時更高效。
-
async
關鍵字-
定義協程函數:
async
用于定義協程函數。協程函數是一種特殊的函數,它不會像普通函數那樣直接執行,而是返回一個協程對象。要執行協程函數,需要將其放到事件循環中。在下面例子中,greet
是一個協程函數,調用它時并不會立即執行函數體中的代碼,而是返回一個協程對象。import asyncio# 使用 async 定義協程函數 async def greet():print("開始執行協程函數")await asyncio.sleep(1) # 模擬 I/O 操作print("協程函數執行結束")return "Hello, World!"# 調用協程函數,返回協程對象 coro = greet() print(type(coro)) # <class 'coroutine'>
-
異步生成器:
async
還能用于定義異步生成器,異步生成器可以在異步環境中逐個生成值。在下面例子中,async_generator
是一個異步生成器,async for
用于迭代異步生成器中的值。import asyncio# 定義異步生成器 async def async_generator():for i in range(3):await asyncio.sleep(1)yield iasync def main():async for num in async_generator():print(num)asyncio.run(main())
-
await
關鍵字-
暫停協程執行:
await
只能在協程函數內部使用,它用于暫停當前協程的執行,等待一個可等待對象【另一個協程、Future
對象、Task
對象】完成,并返回其結果。當遇到await
時,控制權會暫時交回給事件循環,事件循環可以去執行其他任務。在下面例子中,main
協程函數里的await fetch_data()
會暫停main
協程的執行,直到fetch_data
協程執行完畢,然后將fetch_data
的返回值賦給result
。import asyncioasync def fetch_data():print("開始獲取數據")await asyncio.sleep(2) # 模擬耗時的數據獲取操作print("數據獲取完成")return "Data"async def main():result = await fetch_data() # 等待 fetch_data 協程完成print(f"獲取到的數據: {result}")# 創建事件循環并運行主協程 asyncio.run(main())
-
處理多個可等待對象:可以使用
asyncio.gather()
同時運行多個協程,await
會等待所有協程都完成。在下面例子中,asyncio.gather(task1(), task2())
會并發運行task1
和task2
協程,await
會等待這兩個協程都完成,然后將它們的返回值以列表形式存儲在results
中。import asyncioasync def task1():print("Task 1 開始")await asyncio.sleep(1)print("Task 1 完成")return 1async def task2():print("Task 2 開始")await asyncio.sleep(2)print("Task 2 完成")return 2async def main():results = await asyncio.gather(task1(), task2())print(f"所有任務的結果: {results}")asyncio.run(main())
-
-
在下面例子中,fetch
是一個協程函數,用于發送 HTTP
請求并返回響應內容。main
協程函數中,使用 asyncio.gather()
并發執行多個 fetch
協程,await
會等待所有請求都完成,然后打印每個響應內容的長度。
import asyncio
import aiohttp# 異步函數,用于發送 HTTP 請求
async def fetch(session, url):async with session.get(url) as response:return await response.text()# 主協程函數,用于并發執行多個請求
async def main():urls = ["http://example.com","http://example.org","http://example.net"]async with aiohttp.ClientSession() as session:tasks = [fetch(session, url) for url in urls]results = await asyncio.gather(*tasks)for result in results:print(len(result))# 運行主協程
asyncio.run(main())
2.2 實時日志監控
// 服務端代碼
import asyncio
import websockets
import timeconnected = set()async def log_monitor(websocket, path):connected.add(websocket)try:# 模擬持續發送日志while True:log = f"Log at {time.ctime()}"await websocket.send(log)await asyncio.sleep(1)except websockets.exceptions.ConnectionClosed:passfinally:connected.remove(websocket)async def main():async with websockets.serve(log_monitor, "localhost", 8765):await asyncio.Future()asyncio.run(main())
// 客戶端代碼
import asyncio
import websocketsasync def log_client():async with websockets.connect("ws://localhost:8765") as websocket:async for log in websocket:print(f"Received log: {log}")asyncio.run(log_client())