目錄
Websocket技術背景
Websockec簡介
實現websocket通信程序
實驗環境:
服務端(阿里云ESC,VPC網絡):
客戶端1(本機):
通信模型:
實現功能邏輯:
源代碼:
服務端源代碼:
客戶端源代碼:
實驗效果:
總結
Websocket技術背景
在websocket協議誕生之前,應用層中用于web通信的協議基本由HTTP主導,毫無疑問,HTTP就是構建現代Web的基石,但是http存在比較致命的性能問題——它是一種請求-響應模型,這意味著他是一種短連接的通信協議,在get多個web資源時需要依靠多次請求響應來輪詢,這種通信導致了一種極其低效的通信問題,在實時通信時尤為明顯,因此websocket,基于socket優秀的長連接特性得以被擴展到應用層并得到廣泛使用
Websockec簡介
WebSocket 是一種基于TCP協議之上的應用層協議,2011年已被IETF的RFC文檔標準化,相比socket與socketserver,他天然支持全雙工通信、異步通信,通信效率極高,且由于是應用層標準協議,因此大部分瀏覽器能夠原生支持websocket協議,對瀏覽器兼容性極強
?
實現websocket通信程序
實驗環境:
服務端(阿里云ESC,VPC網絡):
操作系統:Ubuntu22.04
私網IP:172.29.42.239?
公網IP:47.111.23.151
編程語言:Python
數據庫:Mysql
客戶端1(本機):
操作系統:Windows11
編程語言:Python
通信模型:
C/S,由服務端處理具體的業務邏輯并轉發客戶端之間的消息
純異步通信邏輯
實現功能邏輯:
完整的注冊登錄邏輯
歡迎邏輯,分為發送給新用戶自己的單播歡迎邏輯和通告他人新用戶登錄的歡迎邏輯
顯示與隱藏IP歸屬地,缺省隱藏
shell命令調用(非交互式,交互式有點危險)
心跳報文邏輯,5秒hello,15秒dead
獲取在線用戶邏輯
獲取系統級信息邏輯,如主機名、操作系統、分區信息以及每個分區的具體情況、內存信息以及內存占用詳情、CPU信息、當前進程信息等
幫助邏輯,通過發送/help命令獲取到用戶可使用的全部命令
遞歸目錄樹邏輯
每條消息顯示當前時間邏輯
PS:以上,shell命令調用、獲取系統級信息這些邏輯的命令,不能對自己使用,只能對他人,另一個客戶端
源代碼:
服務端源代碼:
import datetime
from websockets.asyncio.server import ServerConnection,serve
import asyncio
from aioconsole import aprint,ainput
import json
from ip2region.binding.python import xdbSearcher
from datetime import datetime
import pymysql
from textwrap import dedentclient_list={}
client_heartbeat={}
location = False#數據庫查詢
async def db_select(user):db_conn = pymysql.connect(user='root',password='root',host='127.0.0.1',database='user_info')cur = db_conn.cursor()sql = ' select passwd from user_pwd where username = %s 'cur.execute(sql,user)pwd = cur.fetchall()db_conn.close()return pwd#數據庫更新
async def db_update(user,pwd):db_conn = pymysql.connect(user='root', password='root', host='127.0.0.1', database='user_info')cur = db_conn.cursor()sql = 'update user_pwd set passwd = %s where username = %s ;'cur.execute(sql, (pwd, user))db_conn.commit()db_conn.close()#數據庫插入
async def db_insert (user,pwd):db_conn = pymysql.connect(user='root', password='root', host='127.0.0.1', database='user_info')cur = db_conn.cursor()sql = 'insert into user_pwd(username,passwd) values(%s,%s)'cur.execute(sql, (user,pwd))db_conn.commit()db_conn.close()#注冊邏輯
async def register(websocket_conn,region,city,name_pwd):try:parts = name_pwd.split(' ', 1)username = parts[0]passwd = parts[1]print (parts)pwd = await db_select(username)if pwd:await websocket_conn.send('親愛的用戶,您的賬號已注冊,無需再次重復注冊,直接登錄即可')await broadcast_welcome(websocket_conn, region, city, choose='登錄')if not pwd:await db_insert(username, passwd)client_list[username] = websocket_conntry:for client_name, ws_conn in client_list.items():print ('進來了3')if username == client_name:print ('進來了4')await ws_conn.send(f'歡迎你,【{username}】,來自{region}{city}的朋友,歡迎你加入我們的戰略會議室!\n輸入help獲取幫助')else:print('進來了5')await ws_conn.send(f'歡迎來自{region}{city}的新同伴【{username}】加入了我們的戰略會議室!\n輸入help獲取幫助')except Exception as e:print (f'注冊邏輯中的歡迎消息邏輯捕獲到異常報錯:{e}')except Exception as e:print (f'注冊邏輯捕獲到異常報錯:{e}')#登錄邏輯
async def login(websocket_conn,region,city,name_pwd):try:parts = name_pwd.split(' ', 1)name = parts[0]passwd = parts[1]pwd = await db_select(name)try:if not pwd:await websocket_conn.send('親愛的用戶,您的賬號尚未注冊,請注冊后再加入我們的會議。')await broadcast_welcome(websocket_conn,region,city,choose='注冊')if pwd:pwd = pwd[0][0]if passwd != pwd:await websocket_conn.send('不好意思,親愛的用戶,您的密碼輸入錯誤,請重新嘗試。')await broadcast_welcome(websocket_conn, region, city,choose='登錄')if str(passwd) == str(pwd):await websocket_conn.send(f'登錄成功!')client_list[name] = websocket_connfor client_name, ws_conn in client_list.items():if name == client_name:await ws_conn.send(f'歡迎你,【{name}】,來自{region}{city}的朋友,歡迎你加入我們的戰略會議室!\n輸入help獲取幫助')if name != client_name:await ws_conn.send(f'歡迎來自{region}{city}的新同伴【{name}】加入了我們的戰略會議室!\n請輸入help獲取幫助')except Exception as e:print (f'登錄邏輯中的條件判定捕獲到異常報錯:{e}')except Exception as e:print (f'登錄邏輯捕獲到異常報錯:{e}')#歡迎邏輯,應用了注冊和登錄邏輯
async def broadcast_welcome(websocket_conn,region,city,choose):global client_listtry:name_pwd = await websocket_conn.recv()if '注冊' in choose:await register(websocket_conn, region, city, name_pwd)if '登錄' in choose:await login(websocket_conn,region, city,name_pwd)except Exception as e:print (f'歡迎邏輯中捕獲到異常報錯:{e}')#服務器通常轉發邏輯
async def forwarding_other(sender_name,sender_msg,city):global client_listglobal locationtry:if sender_msg != 'hold,connect,status' \and not sender_msg.startswith('/cmd') \and not sender_msg.startswith('/cmd_return') \and sender_msg != '/client_online' \and not sender_msg.startswith('/get') \and sender_msg != '/help' \and not sender_msg.startswith('/get_info') \and not sender_msg.startswith('/get_cpu_info') \and not sender_msg.startswith('/change_pwd'):time_now = datetime.now().strftime('%H:%M:%S')show_location_forward_msg = f'\r《{city}|({time_now})|{sender_name}>>{sender_msg}'normal_forward_msg = f'\r({time_now})|{sender_name}>>{sender_msg}'for client_name,ws_conn in client_list.items():if sender_msg == '/get_location':location = Trueif sender_msg == '/disable_location':location = Falseif client_name != sender_name and location == False:await ws_conn.send(normal_forward_msg)if client_name != sender_name and location == True:await ws_conn.send(show_location_forward_msg)except Exception as e:print (f'轉發邏輯捕獲到異常報錯:{e}')#Shell命令調用
async def sys_cmd_unicast(sender_name,sender_msg):global client_listtry:if sender_msg.startswith('/cmd') or sender_msg.startswith('/cmd_return'):parts = sender_msg.split(' ', 2)remote_user = parts[1]if sender_msg.startswith('/return_cmd'):remote_conn = client_list[remote_user]cmd_return = parts[2]remote_msg = cmd_returnawait remote_conn.send(remote_msg)if sender_msg.startswith('/cmd'):remote_conn = client_list[remote_user]cmd = parts[2]remote_msg = f'/cmd {sender_name} {cmd}'await remote_conn.send(remote_msg)except Exception as e:print (f'處理cmd命令的邏輯捕獲到異常報錯:{e}')#歸屬地查詢
async def get_ip_location(ip):# 指定 .xdb 文件路徑#D:/Python_Script/.venv/Lib/site-packages/ip2region/data/ip2region.xdb 我windows上存放位置#/usr/local/lib/python3.11/dist-packages/ip2region/data/ip2region.xdb 我ubuntu上存放位置try:dbfilepath = "/usr/local/lib/python3.11/dist-packages/ip2region/data/ip2region.xdb"# 初始化搜索對象searcher = xdbSearcher.XdbSearcher(dbfile=dbfilepath)# 查詢 IP 所在地區result1 = searcher.search(ip)location_parts = [part for part in result1.split('|') if part != '0' and part !='內網IP']if not location_parts:return '內網IP'return location_partsexcept Exception as e:print (f'歸屬地查詢庫的調用邏輯捕獲到異常報錯{e}')#心跳報文死亡消息的廣播邏輯
async def keepalive_broadcast_send(msg):global client_listtry:for client_name,ws_conn in client_list.items():await ws_conn.send(msg)except Exception as e:print (f'死亡廣播邏輯中捕獲到異常報錯:{e}')#監聽心跳報文
async def listen_keepalive(sender_msg,sender_name):global client_listglobal client_heartbeattry:if sender_msg == 'hold,connect,status':client_heartbeat[sender_name] = datetime.now()need_remove_client = []for name, last_time in client_heartbeat.items():if (datetime.now() - last_time).total_seconds() > 15:need_remove_client.append(name)for name in need_remove_client:ws_conn = client_list[name]del client_list[name]del client_heartbeat[name]await ws_conn.close()time_now = datetime.now().strftime('%H:%M:%S')dead_msg = f'噢!親愛的【{name}】,我們曾引以為傲的一員,在北京時間{time_now}的時刻永遠地停止了心跳,離開了這個世界,朋友們,留給我們的時間不多了,敵人日益強大,我們的同伴卻在不斷減少。'await keepalive_broadcast_send(dead_msg)except Exception as e:print (f'心跳報文邏輯中捕獲的異常報錯')#顯示IP歸屬地邏輯
async def show_ip_location(sender_msg,region,city,sender_name):global client_listtry:if sender_msg == '/get_location':location = f'{region}|{city}'msg = f'/post_location {location}'ws_conn = client_list[sender_name]await ws_conn.send(msg)except Exception as e:print (f'顯示IP歸屬地邏輯中捕獲到異常報錯:{e}')
#獲取在線用戶邏輯
async def get_online_client(sender_msg,sender_name):global client_listtry:name_list = ''if sender_msg == '/client_online' :times = 0for name in client_list:if times == 0 :name_list += nametimes +=1elif times > 0:name_list += '-' + nameawait aprint(name_list)msg = f'/online_client_list {name_list}'ws_conn = client_list[sender_name]await ws_conn.send(msg)except Exception as e:print (f'獲取在線用戶邏輯中捕獲到異常報錯:{e}')#獲取信息邏輯
async def get_info(sender_msg,sender_name):global client_listtry:if sender_msg.startswith('/get_sys_info') :parts = sender_msg.split(' ', 1)remote_user = parts[1]ws_conn = client_list[remote_user]cmd = dedent("""system = platform.system()version = platform.version()arch=platform.architecture()[0]arch=arch.replace('bit','')hostname=platform.node()print('')print('系統信息:')print(f'主機名:{hostname}')print(f'操作系統:{arch}位{system}{version}')""")msg = f'/post_info {sender_name} {cmd}'await ws_conn.send(msg)if sender_msg.startswith('/get_part_info'):parts = sender_msg.split(' ', 1)remote_user = parts[1]ws_conn = client_list[remote_user]cmd = dedent("""all=psutil.disk_partitions()print('')print('分區信息:')for i in all:dict_info = psutil.disk_usage(i[0])total = int(dict_info[0]/1024/1024/1024)used = int(dict_info[1]/1024/1024/1024)used_percent = dict_info[3]print(f'盤符:{i[0]} 文件系統:{i[2]} 分區總空間:{total}GB 分區已用空間{used}GB 分區已用空間占比{used_percent}%')""")msg = f'/post_info {sender_name} {cmd}'await ws_conn.send(msg)if sender_msg.startswith('/get_memory_info'):parts = sender_msg.split(' ', 1)remote_user = parts[1]ws_conn = client_list[remote_user]cmd = dedent("""print('')print('內存信息')all = psutil.virtual_memory()sum_memory=int(all[0]/1024/1024/1024)print(f'總內存:{sum_memory}GB')available_memory=int(all[1]/1024/1024/1024)print (f'可用內存:{available_memory}GB')used=int(all[3]/1024/1024/1024)print(f'已用內存{used}GB')used_percent=all[2]print(f'已用內存占比:{used_percent}%')""")msg = f'/post_info {sender_name} {cmd}'await ws_conn.send(msg)if sender_msg.startswith('/get_cpu_info'):parts = sender_msg.split(' ', 1)remote_user = parts[1]ws_conn = client_list[remote_user]cmd =dedent("""cpu_per_count = psutil.cpu_count()cpu_freq = psutil.cpu_freq()[2]/1000print('')print('CPU信息:')print(f'CPU核心數: {psutil.cpu_count(logical=False)}')print(f"CPU線程數: {cpu_per_count}")print(f"頻率: {cpu_freq}GHz")""")msg = f'/post_info {sender_name} {cmd}'await ws_conn.send(msg)if sender_msg.startswith('/get_process_info'):parts = sender_msg.split(' ',1)remote_user=parts[1]ws_conn = client_list[remote_user]cmd = dedent("""print('')print('當前進程信息:')for i in psutil.process_iter():print (i)""")msg = f'/post_info {sender_name} {cmd}'await ws_conn.send(msg)except Exception as e:print (f'獲取信息邏輯中捕獲到異常報錯{e}')
#幫助邏輯
async def help_cmd(sender_name,sender_msg):global client_listtry:if sender_msg == '/help':ws_conn = client_list[sender_name]dilimiter = "-" * 50help_content = f"""\r用戶命令\r{dilimiter}\r獲取CPU信息: /get_cpu_info [用戶名]\r獲取系統信息: /get_sys_info [用戶名]\r獲取內存信息: /get_memory_info [用戶名]\r獲取分區信息: /get_part_info [用戶名]\r獲取進程: /get_process_info [用戶名]\r非交互式調用Shell命令并獲取執行結果: /cmd [用戶名] [shell命令]\r獲取在線用戶列表:/client_online\r退出會議: [q] or [Q]\r顯示IP歸屬地信息: /get_location\r隱藏IP歸屬地信息: /disable_location\r獲取遞歸目錄樹: /get_path_tree [用戶名]\rps:輸入命令的時候不用填中括號[]\r幫助: /help\r{dilimiter}"""await ws_conn.send(help_content)except Exception as e:print (f'幫助邏輯中捕獲到異常報錯:{e}')#遞歸目錄樹的邏輯
async def get_path_tree(sender_name,sender_msg):global client_listtry:if sender_msg.startswith('/get_path_tree'):#/get_path_tree remote_name remote_pathprint ('tree,我來了')parts = sender_msg.split(' ',2)remote_name = parts[1]ws_conn = client_list[remote_name]remote_path = parts[2]msg = f'/get_path_tree {sender_name} {remote_path}'await ws_conn.send(msg)except Exception as e:print (f'遞歸目錄樹的邏輯捕獲到異常報錯{e}')async def change_pwd(sender_name,sender_msg):global client_listtry:if sender_msg.startswith('/change_pwd'):#/change_pwd new_pwdparts = sender_msg.split(' ',2)pwd = parts[1]ws_conn = client_list[sender_name]await db_update(sender_name,pwd)await ws_conn.send('密碼修改成功!')except Exception as e:print (f'改密邏輯捕獲到異常報錯:{e}')async def server_handle(websocket_conn:ServerConnection):address_port = websocket_conn.remote_addressip_address = address_port[0]location_parts = await get_ip_location(ip_address)if location_parts == '內網IP':region = '本地'city = '內網IP'else:region = location_parts[1]city = location_parts[2]try:choose = await websocket_conn.recv()await broadcast_welcome(websocket_conn,region,city,choose)except Exception as e:print (f'登錄|注冊的入口邏輯捕獲到異常報錯{e}')try:async for msg_dict_json in websocket_conn:msg_dict = json.loads(msg_dict_json)await aprint(msg_dict)sender_name = msg_dict['Name']sender_msg = msg_dict['Message']async with asyncio.TaskGroup() as TG:TG.create_task(sys_cmd_unicast(sender_name,sender_msg))TG.create_task(forwarding_other(sender_name,sender_msg,city))TG.create_task(listen_keepalive(sender_msg,sender_name))TG.create_task(show_ip_location(sender_msg,region,city,sender_name))TG.create_task(get_online_client(sender_msg,sender_name))TG.create_task(get_info(sender_msg,sender_name))TG.create_task(help_cmd(sender_name,sender_msg))TG.create_task(get_path_tree(sender_name,sender_msg))TG.create_task(change_pwd(sender_name,sender_msg))except Exception as e:print (f'雙向通信階段捕獲到異常報錯{e}')
async def main():try:async with serve(server_handle,'172.29.42.239',12345):print("WebSocket 服務器已啟動,監聽中...")await asyncio.Future()except Exception as e:print (f'啟動服務邏輯捕獲到異常報錯:{e}')if __name__ == '__main__':asyncio.run(main())
客戶端源代碼:
import os.path
import websockets
import asyncio
from aioconsole import aprint,ainput
import json
import subprocess
import sys
from io import StringIO
import psutil
import platform
import cpuinfo
from datetime import datetimeregion = ''
city = ''
location = Falseasync def send(ws_conn,name):global regionglobal cityglobal locationmsg = ''while 1:name_msg = {}time_now = datetime.now().strftime('%H:%M:%S')if location:msg = await ainput(f'《{region}{city}|({time_now})|{name}>>')if not location:msg = await ainput(f'({time_now})|{name}>>')if msg.upper() == 'Q':await ws_conn.close()await sys.exit()if msg == '/disable_location':location = Falseelse:name_msg['Name'] = namename_msg['Message'] = msgname_msg = json.dumps(name_msg)await ws_conn.send(name_msg)#獲取遞歸目錄樹
async def get_path_tree(path,depth=1):tree = ''dirname = os.path.basename(path)tree = tree +depth * '|-----' + dirname + '\n'files = os.listdir(path)for file in files:filepath = os.path.join(path,file)if os.path.isdir(filepath):tree = await get_path_tree(filepath,depth+1)if os.path.isfile(filepath):tree = tree + depth * '|-----' + '|-----' + file + '\n'return treeasync def recv(ws_conn,name):global locationglobal cityglobal regionwhile 1:msg = await ws_conn.recv()time_now = datetime.now().strftime('%H:%M:%S')if msg.startswith('/cmd'):parts = msg.split(' ',2)sender_name =parts[1]cmd_raw = parts[2]cmd_raw_split_num = cmd_raw.count(' ')if cmd_raw_split_num != 0:cmd = cmd_raw.split(' ',cmd_raw_split_num)else:cmd = cmd_rawresult = subprocess.run(cmd,text=True,shell=True,capture_output=True)cmd_result = result.stdoutmsg = f'/return_cmd {sender_name} {cmd_result}'name_unicate_msg = {}name_unicate_msg['Name'] = namename_unicate_msg['Message'] = msgname_unicate_msg_json = json.dumps(name_unicate_msg)await ws_conn.send(name_unicate_msg_json)if msg.startswith('/post_location'):parts = msg.split(' ',1)location_parts = parts[1]location_parts = location_parts.split('|',1)region = location_parts[0]city = location_parts[1]location = Trueif msg.startswith('/post_info'):parts = msg.split(' ',2)sender_name = parts[1]cmd = parts[2]output = StringIO()sys.stdout = outputexec(cmd)result_cmd = output.getvalue()sys.stdout = sys.__stdout__msg = f'/result_cmd {sender_name} {result_cmd}'print (msg)name_unicate_msg = {}name_unicate_msg['Name'] = namename_unicate_msg['Message'] = msgname_unicate_msg_json = json.dumps(name_unicate_msg)await ws_conn.send(name_unicate_msg_json)if msg.startswith('/get_path_tree'):parts = msg.split(' ',2)sender_name = parts[1]local_path = parts[2]tree = await get_path_tree(local_path)print (tree)msg = f'/result_cmd {sender_name} {tree}'name_unicate_msg = {}name_unicate_msg['Name'] = namename_unicate_msg['Message'] = msgname_unicate_msg_json = json.dumps(name_unicate_msg)await ws_conn.send(name_unicate_msg_json)if msg.startswith('/online_client_list'):parts = msg.split(' ',1)name_list = parts[1]n=1split_num = name_list.count('-')if split_num >0:name_list = name_list.split('-',split_num)n = 0for online_user in name_list:if n == 0:await aprint(f'\n在線用戶列表:')n += 1if n > 0 :await aprint(f' 用戶{n}:{online_user}')elif msg.startswith('/'):if location:await aprint(f'《{region}{city}|{name}>>', end='', flush=True)else:await aprint(f'{name}>>',end='',flush=True)else:if location:await aprint(f'\r{msg}\n《{region}{city}|({time_now})|{name}>>', end='', flush=True)else:await aprint(f'\r{msg}\n({time_now})|{name}>>', end='', flush=True)#客戶端注冊邏輯
async def register(ws_conn):await aprint("-" * 50)await aprint('注冊階段')await aprint("-" * 50)name = await ainput('請輸入用戶名:\n')pwd = await ainput('請輸入登錄代碼:\n')await aprint("-" * 50)name_pwd = f'{name} {pwd}'await ws_conn.send(name_pwd) # 發送賬號密碼return name#服務端登錄邏輯
async def login(ws_conn):await aprint("-" * 50)await aprint('登錄階段')await aprint("-" * 50)name = await ainput('請輸入用戶名:\n')pwd = await ainput('請輸入登錄代碼:\n')await aprint("-" * 50)name_pwd = f'{name} {pwd}'await ws_conn.send(name_pwd) # 發送賬號密碼return nameasync def create_name(ws_conn,choose):while 1:if '注冊' in choose:name = await register(ws_conn)print (name)return nameif '登錄' in choose:name = await login(ws_conn)msg = await ws_conn.recv()await aprint (msg)if '輸入錯誤' in msg:continueif '尚未注冊' in msg:await create_name(ws_conn,choose='注冊')if '登錄成功' in msg:return nameasync def keepalive(ws_conn,name):keepalive_packets = {}content = 'hold,connect,status'keepalive_packets['Name'] = namekeepalive_packets['Message'] = contentwhile 1:keepalive_packets_json = json.dumps(keepalive_packets)await ws_conn.send(keepalive_packets_json)await asyncio.sleep(5)async def client_handle():url = 'ws://47.111.23.151:12345'async with websockets.connect(url) as ws_conn:await aprint (r"""(`-').-> (`-') _ (`-') (`-') _ (`-') <-. (`-') (`-') _ (`-') _ (`-') _ <-. (`-')_ ( OO)_ ( OO).-/ _ <-.(OO ) ( OO).-/ ( OO).-> \(OO )_ ( OO).-/ ( OO).-/ ( OO).-> (_) \( OO) ) .->
(_)--\_) (,------. \-,-----. ,------,) (,------. / '._ ,--./ ,-.)(,------. (,------. / '._ ,-(`-') ,--./ ,--/ ,---(`-')
/ _ / | .---' | .--./ | /`. ' | .---' |'--...__) | `.' | | .---' | .---' |'--...__) | ( OO) | \ | | ' .-(OO )
\_..`--. (| '--. /_) (`-') | |_.' | (| '--. `--. .--' | |'.'| |(| '--. (| '--. `--. .--' | | ) | . '| |)| | .-, \
.-._) \ | .--' || |OO ) | . .' | .--' | | | | | | | .--' | .--' | | (| |_/ | |\ | | | '.(_/
\ / | `---. (_' '--'\ | |\ \ | `---. | | | | | | | `---. | `---. | | | |'-> | | \ | | '-' | `-----' `------' `-----' `--' '--' `------' `--' `--' `--' `------' `------' `--' `--' `--' `--' `-----'
""")await aprint(" ┌──────────────────────────────────────────────────────────────────────┐")await aprint(" │ ? Secret Meeting v1.0 ? │")await aprint(" │ Forever │")await aprint(" │? WebSocket Successful Connect to Server@47.111.23.151 │")await aprint(" └──────────────────────────────────────────────────────────────────────┘")while 1:option = input('【注冊】(register)還是【登錄】(login),請做出你的選擇\n1.注冊\n2.登錄\n')if option == '1':choose = '注冊'breakif option == '2':choose = '登錄'breakelse:continueawait ws_conn.send(choose)name = await create_name(ws_conn,choose)msg_welcome = await ws_conn.recv()await aprint(msg_welcome)async with asyncio.TaskGroup() as TG:TG.create_task(send(ws_conn,name))TG.create_task(recv(ws_conn,name))TG.create_task(keepalive(ws_conn,name))if __name__ == '__main__':asyncio.run(client_handle())
實驗效果:
服務端剛運行,開始監聽:
客戶端初次進入聊天室:
客戶端注冊后:
客戶端獲取幫助:
客戶端注冊登錄后,服務端的心跳開始運作:
總結:
這些功能各位可以自己去嘗試,我就不一一演示了,源碼自取即可。