python的websocket方法教程

WebSocket是一種網絡通信協議,它在單個TCP連接上提供全雙工的通信信道。在本篇文章中,我們將探討如何在Python中使用WebSocket實現實時通信。

websockets是Python中最常用的網絡庫之一,也是websocket協議的Python實現。它不僅作為基礎組件在眾多項目中發揮著重要作用,其源碼也值得廣大“Python玩家”研究。
官網:https://github.com/python-websockets/websockets

1. 什么是WebSocket?

WebSocket協議是在2008年由Web應用程序設計師和開發人員創建的,目的是為了在Web瀏覽器和服務器之間提供更高效、更低延遲的雙向通信。它允許客戶端和服務器在任何時候發送消息,無需重新建立TCP連接。WebSocket可以在Web瀏覽器和服務器之間傳輸文本和二進制數據,使得構建實時Web應用程序變得更加簡單。

2. 在Python中使用WebSocket

Python中有多個庫可以幫助我們使用WebSocket,如:websockets、aiohttp等。在本文中,我們將使用websockets庫來演示WebSocket編程。

要安裝websockets庫,你可以使用pip:

pip install websockets

3. 創建WebSocket服務器

使用websockets庫,我們可以輕松地創建一個WebSocket服務器。以下是一個簡單的示例:

import asyncio
import websocketsasync def echo(websocket, path):async for message in websocket:print(f"Received message: {message}")await websocket.send(f"Echo: {message}")start_server = websockets.serve(echo, "localhost", 8765)asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

在這個示例中,我們定義了一個名為echo的協程函數,它接收兩個參數:websocket和path。該函數使用async for循環讀取客戶端發送的消息,并將消息發送回客戶端。

然后,我們使用websockets.serve()函數創建一個WebSocket服務器,監聽本地主機的8765端口。最后,我們使用asyncio的事件循環啟動服務器。

4. 創建WebSocket客戶端

要創建一個WebSocket客戶端,我們同樣可以使用websockets庫。以下是一個簡單的客戶端示例:

import asyncio
import websocketsasync def main():async with websockets.connect("ws://localhost:8765") as websocket:message = "Hello, server!"await websocket.send(message)print(f"Sent: {message}")response = await websocket.recv()print(f"Received: {response}")asyncio.run(main())

在這個示例中,我們使用websockets.connect()函數建立與WebSocket服務器的連接。然后,我們使用send()方法向服務器發送消息,并使用recv()方法接收服務器的響應。

5. 總結

WebSocket協議為Web瀏覽器和服務器之間提供了實時雙向通信的能力,使得構建實時Web應用程序變得更加容易。在Python中,我們可以使用websockets庫輕松地實現WebSocket編程。

6. 通過websockets這個項目,從大型開源項目中學習asyncio庫。

一、asyncio.Transport
在官方文檔中,Transport被描述成對socket的抽象,它控制著如何傳輸數據。除了websockets,uvicorn、daphne等ASGI實現都會用到Transport。

Transport繼承于ReadTransport和WriteTransport,兩者都繼承于BaseTransport。顧名思義,Transport兼備讀和寫的功能,可以類比為讀寫socket對象。
在這里插入圖片描述

Transport對象提供以下常用函數——

is_reading:判斷該Transport是否在讀。

set_write_buffer_limits:設置寫入Transport的高和低水位。考慮到網絡狀況,有時不希望寫入過多的數據。

write、write_eof、write_line:為當前Transport寫入數據,分別表示寫入二進制數據、eof和二進制行數據。其中eof寫入后不會關閉Transport,但會flush數據。

abort:立刻關閉Transport,不接受新的數據。留在緩沖的數據也會丟失,后續調用Protocol的connection_lost函數。

在websockets中,Transport使用場景不多,一般都是通過Protocol對象的回調參數使用的。在websocket的初始化過程中,會設置Transport的最高水位。同樣,在這種場景下,該對象也是作為回調參數使用的。
在這里插入圖片描述

二、asyncio.Protocol
如果Transport是對socket的抽象,那么Protocol就是對協議的抽象。它提供了如何使用Transport的方式。
在這里插入圖片描述

用戶使用的Protocol直接繼承自BaseProtocol,并提供了六個Unimplemented函數需要用戶去實現——

connection_made:當連接建立時會執行該函數,該函數包含一個Transport類型的參數。

connection_lost:當連接丟失或者關閉時會執行該函數,該函數包含一個Exception類型的參數。

pause_writing:當Transport對象寫入的數據高于之前設置的高水位時被調用,一般會暫停數據的寫入。

resume_writing:當Transport對象寫入的數據低于之前設置的低水位時被調用,一般用于恢復數據寫入。

data_received:當有數據被接受時回調,該函數包含一個二進制對象data,用來表示接受的數據。

eof_received:當被Transport對象被調用write_eof時被調用。

在websockets中,server端的connection_made實現截圖如圖所示。在該函數中,websockets將用戶實現的handler封裝成task對象,并和websocket的server綁定。
在這里插入圖片描述

而在client端中實現如第一節截圖所示,只是在reader中注冊該Transport對象。

websockets的connection_lost函數實現方式如下。主要操作即更新狀態、關閉pings、更新對應的waiter狀態,以及維護reader對象。
在這里插入圖片描述

在其他函數的實現中,websockets也主要用到了reader對象完成數據流的暫停和恢復,以及數據的寫入。

從上面代碼實現可以看出,websockets通過reader代理完成數據流的操作。這個reader是一個asyncio.StreamReader對象。這個對象具體如何使用將在下一篇介紹。

附錄:進階版本:

python使用websockets庫
serve:在server端使用,等待客戶端的連接。如果連接成功,返回一個websocket。

connect: 在client端使用,用于建立連接。

send:發送數據

recv:接收數據

close:關閉連接

服務端

#!/usr/bin/python3
# 主要功能:創建1個基本的websocket server, 符合asyncio 開發要求
import asyncio
import websockets
from datetime import datetimeasync def handler(websocket):data = await websocket.recv()reply = f"Data received as \"{data}\".  time: {datetime.now()}"print(reply)await websocket.send(reply)print("Send reply")async def main():async with websockets.serve(handler, "localhost", 9999):await asyncio.Future()  # run foreverif __name__ == "__main__":asyncio.run(main())

客戶端

import asyncio
import websockets
import timeasync def ws_client(url):for i in range(1, 40):async with websockets.connect(url) as websocket:await websocket.send("Hello, I am PyPy.")response = await websocket.recv()print(response)time.sleep(1)asyncio.run(ws_client('ws://localhost:9999'))

服務端

import asyncio
import websocketsIP_ADDR = "127.0.0.1"
IP_PORT = "9090"# 握手,通過接收Hi,發送"success"來進行雙方的握手。
async def serverHands(websocket):while True:recv_text = await websocket.recv()print("recv_text=" + recv_text)if recv_text == "Hi":print("connected success")await websocket.send("success")return Trueelse:await websocket.send("connected fail")# 接收從客戶端發來的消息并處理,再返給客戶端success
async def serverRecv(websocket):while True:recv_text = await websocket.recv()print("recv:", recv_text)await websocket.send("success,get mess:"+ recv_text)# 握手并且接收數據
async def serverRun(websocket, path):print(path)await serverHands(websocket)await serverRecv(websocket)# main function
if __name__ == '__main__':print("======server======")server = websockets.serve(serverRun, IP_ADDR, IP_PORT)asyncio.get_event_loop().run_until_complete(server)asyncio.get_event_loop().run_forever()

客戶端

import asyncio
import websocketsIP_ADDR = "127.0.0.1"
IP_PORT = "9090"async def clientHands(websocket):while True:# 通過發送hello握手await websocket.send("Hi")response_str = await websocket.recv()# 接收"success"來進行雙方的握手if "success" in response_str:print("握手成功")return True# 向服務器端發送消息
async def clientSend(websocket):while True:input_text = input("input text: ")if input_text == "exit":print(f'"exit", bye!')await websocket.close(reason="exit")return Falseawait websocket.send(input_text)recv_text = await websocket.recv()print(f"{recv_text}")# 進行websocket連接
async def clientRun():ipaddress = IP_ADDR + ":" + IP_PORTasync with websockets.connect("ws://" + ipaddress) as websocket:await clientHands(websocket)await clientSend(websocket)# main function
if __name__ == '__main__':print("======client======")asyncio.get_event_loop().run_until_complete(clientRun())

服務端

# -*- coding:utf8 -*-import json
import socket
import asyncio
import logging
import websockets
import multiprocessingIP = '127.0.0.1'
PORT_CHAT = 9090USERS ={}#提供聊天的后臺
async def ServerWs(websocket,path):logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',filename="chat.log",level=logging.INFO)# 握手await websocket.send(json.dumps({"type": "handshake"}))async for message in websocket:data = json.loads(message)message = ''# 用戶發信息if data["type"] == 'send':name = '404'for k, v in USERS.items():if v == websocket:name = kdata["from"] = nameif len(USERS) != 0:  # asyncio.wait doesn't accept an empty listmessage = json.dumps({"type": "user", "content": data["content"], "from": name})# 用戶注冊elif data["type"] == 'register':try:USERS[data["uuid"]] = websocketif len(USERS) != 0:  # asyncio.wait doesn't accept an empty listmessage = json.dumps({"type": "login", "content": data["content"], "user_list": list(USERS.keys())})except Exception as exp:print(exp)# 用戶注銷elif data["type"] == 'unregister':del USERS[data["uuid"]]if len(USERS) != 0:  # asyncio.wait doesn't accept an empty listmessage = json.dumps({"type": "logout", "content": data["content"], "user_list": list(USERS.keys())})#打印日志logging.info(data)# 群發await asyncio.wait([user.send(message) for user in USERS.values()])def server_run():print("server")start_server = websockets.serve(ServerWs, '0.0.0.0', PORT_CHAT)asyncio.get_event_loop().run_until_complete(start_server)asyncio.get_event_loop().run_forever()if __name__ == "__main__":from multiprocessing import Processmultiprocessing.freeze_support()server = Process(target=server_run, daemon=False)server.start()

服務端

import asyncio
import websockets
import time
import json
import threading
# 功能模塊
class OutputHandler():async def run(self,message,send_ms,websocket):# 用戶發信息await send_ms(message, websocket)# 單發消息# await send_ms(message, websocket)# 群發消息#await s('hi起來')# 存儲所有的客戶端
Clients = {}# 服務端
class WS_Server():def __init__(self):self.ip = "127.0.0.1"self.port = 9090# 回調函數(發消息給客戶端)async def callback_send(self, msg, websocket=None):await self.sendMsg(msg, websocket)# 發送消息async def sendMsg(self, msg, websocket):print('sendMsg:', msg)# websocket不為空,單發,為空,群發消息if websocket != None:await websocket.send(msg)else:# 群發消息await self.broadcastMsg(msg)# 避免被卡線程await asyncio.sleep(0.2)# 群發消息async def broadcastMsg(self, msg):for user in Clients:await user.send(msg)# 針對不同的信息進行請求,可以考慮json文本async def runCaseX(self,jsonMsg,websocket):print('runCase')op = OutputHandler()# 參數:消息、方法、socketawait op.run(jsonMsg,self.callback_send,websocket)# 連接一個客戶端,起一個循環監聽async def echo(self,websocket, path):# 添加到客戶端列表# Clients.append(websocket)# 握手await websocket.send(json.dumps({"type": "handshake"}))# 循環監聽while True:# 接受信息try:# 接受文本recv_text = await websocket.recv()message = "Get message: {}".format(recv_text)# 返回客戶端信息await websocket.send(message)# 轉jsondata = json.loads(recv_text)# 用戶發信息if data["type"] == 'send':name = '404'for k, v in Clients.items():if v == websocket:name = kdata["from"] = nameif len(Clients) != 0:  # asyncio.wait doesn't accept an empty listmessage = json.dumps({"type": "send", "content": data["content"], "from": name})await self.runCaseX(jsonMsg=message, websocket=websocket)# 用戶注冊elif data["type"] == 'register':try:Clients[data["uuid"]] = websocketif len(Clients) != 0:  # asyncio.wait doesn't accept an empty listmessage = json.dumps({"type": "register", "content": data["content"], "user_list": list(Clients.keys())})await self.runCaseX(jsonMsg=message, websocket=websocket)except Exception as exp:print(exp)# 用戶注銷elif data["type"] == 'unregister':del Clients[data["uuid"]]# 對message進行解析,跳進不同功能區# await self.runCaseX(jsonMsg=data,websocket=websocket)# 鏈接斷開except websockets.ConnectionClosed:print("ConnectionClosed...", path)# del Clientsbreak# 無效狀態except websockets.InvalidState:print("InvalidState...")# del Clientsbreak# 報錯except Exception as e:print("ws連接報錯",e)# del Clientsbreak# 啟動服務器async def runServer(self):async with websockets.serve(self.echo, self.ip, self.port):await asyncio.Future()  # run forever# 多協程模式,防止阻塞主線程無法做其他事情def WebSocketServer(self):asyncio.run(self.runServer())# 多線程啟動def startServer(self):# 多線程啟動,否則會堵塞thread = threading.Thread(target=self.WebSocketServer)thread.start()# thread.join()if __name__=='__main__':print("server")s = WS_Server()s.startServer()

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/208338.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/208338.shtml
英文地址,請注明出處:http://en.pswp.cn/news/208338.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

pyside/qt03——人機協同的編程教學—直接面向chatGPT實戰開發(做中學,事上練)

先大概有個草圖框架,一點點豐富 我糾結好久,直接用Python寫UI代碼 還是用designer做UI 再轉Python呢, 因為不管怎么樣都要轉成Python代碼, 想了想還是學一下designer吧,有個中介,有直觀理解。 直接這樣也可…

智能優化算法應用:基于食肉植物算法無線傳感器網絡(WSN)覆蓋優化 - 附代碼

智能優化算法應用:基于食肉植物算法無線傳感器網絡(WSN)覆蓋優化 - 附代碼 文章目錄 智能優化算法應用:基于食肉植物算法無線傳感器網絡(WSN)覆蓋優化 - 附代碼1.無線傳感網絡節點模型2.覆蓋數學模型及分析3.食肉植物算法4.實驗參數設定5.算法結果6.參考…

設計并實現一個多線程圖書館管理系統,涉及數據庫操作

沒有實現全部功能,希望路過的大佬,可以實現全部功能,在評論區聊聊 創建數據庫library-demo CREATE DATABASE library-demo創建圖書表book CREATE TABLE book (bookId int(11) NOT NULL AUTO_INCREMENT COMMENT 圖書ID,bookName varchar(15)…

QUIC協議對比TCP網絡性能測試模擬弱網測試

QUIC正常外網壓測數據---時延diff/ms如下圖: QUIC弱網外網壓測數據 TCP正常外網壓測數據 TCP弱網外網壓測數據 結論: 在弱網情況下,TCP和QUIC協議的表現會有所不同。下面是它們在弱網環境中的性能對比: 連接建立:…

HarmonyOS創建JavaScript(類 Web開發模式)項目

上文 HarmonyOS帶大家創建自己的第一個Page頁面并實現路由跳轉(ArkTS)帶大家創建了我們項目中第一個自己創建的page 并完成了一個跳轉邏輯的編寫 上文的開發模式是 ArkTS 的 也被稱為 聲明式開發范式 還有一種 javaScript的 類Web開發模式 這種方式就類似于我們傳統的前端開發模…

基于微群機器人的二次開發

請求URL: http://域名地址/modifyGroupName 請求方式: POST 請求頭Headers: Content-Type:application/jsonAuthorization:login接口返回 參數: 參數名必選類型說明wId是String登錄實例標識chatRoom…

讀書筆記-《數據結構與算法》-摘要2[冒泡排序]

冒泡排序 核心:冒泡,持續比較相鄰元素,大的挪到后面,因此大的會逐步往后挪,故稱之為冒泡。 public class BubbleSort {public static void main(String[] args) {int unsortedArray[] new int[]{6, 5, 3, 1, 8, 7, 2…

Leetcode每日一題學習訓練——Python3版(到達首都的最少油耗)

版本說明 當前版本號[20231205]。 版本修改說明20231205初版 目錄 文章目錄 版本說明目錄到達首都的最少油耗理解題目代碼思路參考代碼 原題可以點擊此 2477. 到達首都的最少油耗 前去練習。 到達首都的最少油耗 ? 給你一棵 n 個節點的樹(一個無向、連通、無環…

倒計時模塊復習

經典回顧倒計時 倒計時的基本布局介紹。 一個內容區域和一個輸入區域,內容區域進行劃分 直接使用flex布局會更快一點。 js代碼 我們利用一下模塊化思想,直接把獲得時間這個功能寫成一個函數。方便后續的調用 function getTime() {const date new Date…

MES管理系統通過哪些方面提升產品質量管理水平

在當今高度競爭的市場環境中,質量成為了企業生存和發展的關鍵因素。工廠作為生產產品的核心場所,其質量管理水平直接影響到產品的質量和企業的聲譽。為了應對這一挑戰,許多工廠引入了MES管理系統解決方案。本文將探討MES管理系統如何幫助工廠…

【UE5】監控攝像頭效果(上)

目錄 效果 步驟 一、視角切換 二、攝像頭畫面后期處理 三、在場景中顯示攝像頭畫面 效果 步驟 一、視角切換 1. 新建一個Basic關卡,添加第三人稱游戲資源到項目瀏覽器 2. 新建一個Actor藍圖,這里命名為“BP_SecurityCamera” 打開“BP_Securit…

模電筆記。。。。

模電 2.8 蜂鳴器 按照蜂鳴器驅動方式分為有源蜂鳴器和無源蜂鳴器 有源的有自己的震蕩電路,無源的要寫代碼控制。 里面有個線圈,相當于電感,儲能,通直隔交。 蜂鳴器的參數:額定電壓,工作電壓&#xff0…

【CCF-B】1/2區,錄用見刊極快!2個月錄用!

計算機類 ? 好刊解讀 今天小編帶來Taylor and Francis旗下計算機領域快刊,CCF-B類推薦的期刊解讀,期刊審稿周期短,投稿友好,如您有投稿需求,可作為重點關注!后文有相關領域真實發表案例,供您投…

防水,也不怕水。Mate X5是如何做到讓你濕手濕屏也不影響操作的?

相信不少人都碰到過當手機屏幕存在小水珠時,觸控變得不靈敏,或者出現“幽靈觸屏”,指東打西的情況。 尤其是在洗澡、做飯,或者在戶外遇到下雨天氣時,如果打濕的手機收到重要聊天消息或者電話,卻因為濕屏導…

TS學習——面向對象

面向對象是程序中一個非常重要的思想,它被很多同學理解成了一個比較難,比較深奧的問題,其實不然。面向對象很簡單,簡而言之就是程序之中所有的操作都需要通過對象來完成。 舉例來說: 操作瀏覽器要使用window對象操作網…

生成fip.bin在Milkv-duo上跑rtthread的相關嘗試,及其問題分析

前言 (1)PLCT實驗室實習生長期招聘:招聘信息鏈接 (2)本來是想在Milkv-duo上跑rtthread的,做了很多努力,一直沒有結果。雖然不知道最終能不能成功做出來,還是把自己的相關努力分享出來…

MDK官網如何下載stm32支持包

網站:https://www.keil.com/demo/eval/arm.htm 1 2 3點這個下載

基于Mint Mate 21.2 Victoria 的Anjuta安裝與測試

序言 Linux mint mate 21.2 命名為 victoria 版,在vmware虛擬機中安裝按提示默認安裝即可,不做更多記錄。mint mate的優點是穩定,窗口質感好。安裝完成后,需要關注一些常用功能配置。主要有:顯示器調整、桌面調整、工…

當然熱門的原創改寫改寫大全【2023最新】

在信息時代,隨著科技的不斷發展,改寫軟件逐漸成為提高文案質量和寫作效率的重要工具。本文將專心分享一些好用的改寫軟件,其中包括百度文心一言智能寫作以及147SEO改寫軟件。這些工具不僅支持批量改寫,而且在發布到各大平臺后能夠…

python爬取 HTTP_2 網站超時問題的解決方案

問題背景 在進行網絡數據爬取時,使用 Python 程序訪問支持 HTTP/2 協議的網站時,有時會遇到超時問題。這可能會導致數據獲取不完整,影響爬蟲程序的正常運行。 問題描述 在實際操作中,當使用 Python 編寫的爬蟲程序訪問支持 HTT…