非阻塞寫入核心:asyncio.StreamWriter 的流量控制與數據推送之道

asyncio 的異步編程框架中,如果說 asyncio.StreamReader 是你異步應用的數據輸入管道,那么 asyncio.StreamWriter 就是你異步應用的數據輸出管道。它是一個至關重要的組件,讓你能夠方便、高效且非阻塞地向連接的另一端(如 TCP 套接字)發送字節數據

你可以把 StreamWriter 想象成一個異步的、非阻塞的“數據發送器”。當你調用它的方法來發送數據時,它不會立即把所有數據都推送到網絡或底層I/O。相反,它會將數據放入自己的內部緩沖區,并通知事件循環有數據待發送。如果網絡繁忙,數據暫時無法發送,StreamWriter 會讓出控制權,讓事件循環去執行其他任務,直到數據可以被真正寫入后再繼續。


StreamWriter 的核心職能與價值

asyncio.StreamWriter 主要用于向異步 I/O 源(通常是網絡套接字或進程間的管道)高效地寫入字節數據。它的核心價值體現在以下幾個方面:

  1. 異步非阻塞寫入:這是其最核心的特性。StreamWriter 提供的所有寫入方法都是非阻塞的。當底層I/O(如網絡)因緩沖區滿而暫時無法接收更多數據時,寫入操作不會阻塞你的整個程序。StreamWriter 會暫停當前的寫入任務,允許事件循環處理其他協程,直到數據能夠被寫入。
  2. 智能內部數據緩沖StreamWriter 內部維護一個輸出緩沖區。你寫入的數據會先進入這個緩沖區。它會嘗試批量發送數據到操作系統底層,減少系統調用次數,提高發送效率。
  3. 流量控制 (Flow Control):這是 StreamWriter 的一個高級特性。當底層傳輸層(如 TCP 緩沖區)變得擁塞,無法立即發送更多數據時,StreamWriter 會自動暫停數據的寫入,直到擁塞緩解。這通過其 drain() 方法得到體現,它能確保數據被“排空”到網絡中,而不是無限堆積在應用程序層緩沖區。
  4. 優雅處理連接關閉StreamWriter 提供了明確的方法來關閉連接(如 close()),并且能夠讓你等待所有待發送數據發送完畢,確保數據的完整傳輸。

如何獲取 StreamWriter 實例?

StreamReader 一樣,你通常不會直接實例化 asyncio.StreamWriter。它總是與 StreamReader 成對出現,并通過 asyncio 庫的以下兩個核心函數獲得:

  1. 作為客戶端連接 (asyncio.open_connection())
    當你需要作為客戶端連接到遠程服務器時,asyncio.open_connection() 會建立 TCP 連接,并返回一個包含 (StreamReader, StreamWriter) 的元組。StreamWriter 用于向服務器發送數據。

    import asyncioasync def connect_and_write_client():host, port = 'localhost', 8888print(f"Trying to connect to {host}:{port}...")writer = None # Pre-define writer for finally blocktry:# open_connection returns (StreamReader, StreamWriter) tuplereader, writer = await asyncio.open_connection(host, port)print(f"Successfully connected to {host}:{port}")# --- Using StreamWriter to send data ---message_to_send = "Hello from async client!"print(f"Sending message: '{message_to_send}'")writer.write(message_to_send.encode()) # Write bytes to the bufferawait writer.drain() # Crucial: ensures data is pushed to the network# Optionally, read server's response using StreamReaderprint("Waiting for server response...")data = await reader.read(100)if data:response = data.decode()print(f"Received server response: '{response}'")else:print("Server did not send data or closed connection.")except ConnectionRefusedError:print(f"Connection refused. Make sure the server is running on {host}:{port}.")except Exception as e:print(f"Client encountered an unexpected error: {e}")finally:if writer and not writer.is_closing(): # Check if writer exists and is not already closingprint("Closing client connection...")writer.close() # Close the writer (and underlying socket)await writer.wait_closed() # Wait for the writer to fully closeprint("Connection fully closed.")if __name__ == "__main__":# To run this client example, first start a compatible server (e.g., the echo_server below).# asyncio.run(connect_and_write_client())pass # Not running directly to allow server to be started first
    
  2. 作為服務器處理連接 (asyncio.start_server())
    當你使用 asyncio.start_server() 啟動 TCP 服務器時,每當有新的客戶端連接到來,你提供的連接處理回調函數就會被調用。這個回調函數會接收到 (StreamReader, StreamWriter) 元組,其中 StreamWriter 代表了向該客戶端發送數據的通道。

    import asyncioasync def echo_server_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):"""Coroutine to handle each new client connection."""addr = writer.get_extra_info('peername') # Get client address infoprint(f"Accepted new connection from {addr!r}.")try:while True:# Read data from the client using StreamReaderdata = await reader.read(1024) if not data: # If data is empty, client has disconnectedprint(f"Client {addr!r} disconnected.")break # Exit loop to close connectionmessage = data.decode().strip()print(f"Received from {addr!r}: '{message}'")# --- Using StreamWriter to send a response ---response_message = f"Server received your message: {message}"print(f"Sending response to {addr!r}: '{response_message}'")writer.write(response_message.encode()) # Write bytes to the bufferawait writer.drain() # Crucial: ensures data is sent over the networkexcept asyncio.IncompleteReadError:print(f"Client {addr!r} disconnected before full read.")except ConnectionResetError:print(f"Client {addr!r} forcibly closed the connection.")except Exception as e:print(f"Error handling {addr!r}: {e}")finally:print(f"Closing connection with {addr!r}.")writer.close() # Close the writer (and underlying socket)await writer.wait_closed() # Wait for the writer to fully closeprint(f"Connection with {addr!r} fully closed.")async def run_echo_server():host, port = 'localhost', 8888server = await asyncio.start_server(echo_server_handler, host, port)addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)print(f"Server listening on {addrs}...")# server.serve_forever() keeps the server running until the event loop stopsasync with server:await server.serve_forever()if __name__ == "__main__":# Run this script to start the server.# After starting, you can connect with the client example above or# using 'telnet localhost 8888' in another terminal.asyncio.run(run_echo_server())
    

StreamWriter 的常用方法詳解

StreamWriter 提供了一系列用于發送數據和管理連接的異步方法。請注意,它主要操作字節數據

  1. writer.write(data)

    • 作用:將 data(必須是 bytesbytearray 對象)寫入 StreamWriter內部緩沖區
    • 行為:這是一個非協程方法,它會立即返回。數據并不會立即發送到網絡,而是被放置到寫入器的內部緩沖區中等待發送。
    • 重要性:這個方法僅僅是“排隊”數據,它不保證數據已經發送出去了。要確保數據被發送到網絡,你通常需要緊接著調用 await writer.drain()
    • 示例writer.write(b"Hello, Server!")
  2. await writer.drain()

    • 作用關鍵的異步方法,用于實現流量控制。它等待內部寫入緩沖區中的數據被完全“排空”(drain),即所有數據都已發送到操作系統底層的網絡緩沖區。
    • 行為:如果內部緩沖區中有數據待發送,且底層網絡緩沖區已滿或數據發送速度跟不上生產速度,drain()掛起當前協程,直到所有數據發送完畢,或底層網絡緩沖區有足夠的空間接收更多數據。
    • 重要性:在寫入大量數據或確保某個消息塊被完整發送后才能繼續執行后續邏輯時,drain() 至關重要。任何重要的寫入操作后,都應考慮調用 await writer.drain()。否則,數據可能仍在 Python 應用程序的緩沖區中,而未真正發出去。
    • 示例writer.write(b"Large data chunk"); await writer.drain()
  3. writer.writelines(list_of_data)

    • 作用:將一個包含 bytesbytearray 對象的列表寫入內部緩沖區。
    • 行為:與 write() 類似,這是一個非協程方法,僅將列表中的所有數據依次放入緩沖區。
    • 重要性:同樣需要結合 await writer.drain() 來確保數據發送。
    • 示例writer.writelines([b"Line 1\n", b"Line 2\n"])
  4. writer.can_write_eof()

    • 作用:同步方法,檢查傳輸層是否支持發送 EOF (End-Of-File) 標志。
    • 行為:返回 TrueFalse。對于 TCP 套接字,通常返回 True
    • 應用場景:在需要明確通知對端不再有數據發送,但又不立即關閉連接時(例如半關閉連接)。
  5. writer.write_eof()

    • 作用:發送一個 EOF 標志到對端。這表明此端將不再發送數據,但仍可以接收數據。
    • 行為:這是一個非協程方法,僅排隊發送 EOF 標志。
    • 重要性:發送 EOF 后,通常應調用 await writer.drain() 來確保 EOF 標志被送出。
  6. writer.close()

    • 作用:請求關閉寫入器及其關聯的底層傳輸層(如套接字)。
    • 行為:這是一個非協程方法,它會立即返回。它將關閉操作放入隊列。在調用 close() 之后,不應再進行寫入操作。
    • 重要性:通常在使用完 StreamWriter 后調用,以釋放資源。
  7. await writer.wait_closed()

    • 作用:等待寫入器完全關閉。
    • 行為:這是一個協程方法。它會掛起當前協程,直到 writer.close() 操作完成,并且所有底層資源都被釋放。
    • 重要性:在使用 writer.close() 后,強烈建議調用 await writer.wait_closed()。這能確保在程序繼續執行之前,所有待發送的數據都已嘗試發送,并且底層連接已正確關閉,避免資源泄露或數據丟失。這在 finally 塊中尤其有用。
    • 示例writer.close(); await writer.wait_closed()
  8. writer.is_closing()

    • 作用:同步方法,檢查寫入器是否正在關閉中。
    • 行為:返回 True 如果 close() 已經被調用,但 wait_closed() 還沒有完成。
    • 應用場景:在編寫復雜的連接管理邏輯時,可以用于判斷當前寫入器的狀態。
  9. writer.get_extra_info(name, default=None)

    • 作用:獲取關于底層傳輸層的額外信息,例如遠程地址、本地地址等。
    • 行為:返回指定名稱的信息。
    • 常見參數'peername' (遠程地址), 'sockname' (本地地址), 'compression', 'cipher' (SSL/TLS 信息)。
    • 示例peername = writer.get_extra_info('peername')

StreamWriter 的內部工作原理(簡述)

當你調用 writer.write() 方法時:

  1. 數據首先被添加到 StreamWriter內部輸出緩沖區
  2. StreamWriter 會通知事件循環,表示其內部緩沖區中有數據待發送。
  3. 事件循環會在合適的時機(通常是當底層網絡套接字準備好接收數據時),將緩沖區中的數據異步地寫入到底層套接字。
  4. 如果你調用 await writer.drain(),并且此時底層網絡緩沖區已滿或數據發送有延遲,drain()掛起當前協程。事件循環會繼續處理其他任務,直到底層緩沖區有空間或數據發送完畢后,再恢復被掛起的協程。
  5. 當你調用 writer.close() 時,StreamWriter 會嘗試發送所有剩余的緩沖數據,然后關閉底層套接字。await writer.wait_closed() 則會等待這一系列關閉操作完成。

這種設計確保了數據發送的高效性和非阻塞性,即使網絡擁塞,你的應用程序也能保持響應。


StreamWriterStreamReader 的協同

StreamReaderStreamWriter 幾乎總是成對出現,共同代表了一個完整的、雙向的異步通信通道。它們通過底層的 asyncio 傳輸層(Transport)進行協作,傳輸層負責具體的網絡 I/O。

理解如何同時使用 StreamReader 進行數據接收和 StreamWriter 進行數據發送,是構建任何基于 asyncio 的網絡應用(無論是客戶端還是服務器)的核心。它們提供了一個高層次的抽象,讓你可以專注于應用邏輯,而不必陷入底層的套接字細節和非阻塞 I/O 的復雜性中。


總結

asyncio.StreamWriterasyncio 框架中用于非阻塞數據發送的核心組件。它通過智能的內部緩沖和流量控制機制,確保數據能夠高效且可靠地從你的異步應用程序發送到網絡或其他 I/O 目標。

掌握 StreamWriterwrite()drain()(尤其重要!)、close()wait_closed() 方法,是編寫健壯、高性能、響應迅速的異步網絡服務和客戶端的關鍵。它與 StreamReader 共同構筑了 asyncio 異步 I/O 的強大基石。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/914454.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/914454.shtml
英文地址,請注明出處:http://en.pswp.cn/news/914454.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

控制臺打開mysql服務報錯解決辦法

控制臺打開mysql服務報錯解決辦法這個MySQL錯誤表示訪問被拒絕,通常是因為沒有提供正確的用戶名和密碼。以下是幾種解決方法: 方法1:指定用戶名和密碼連接 mysql -u root -p然后輸入root用戶的密碼。 方法2:如果忘記了root密碼&am…

Unsloth 實戰:DeepSeek-R1 模型高效微調指南(下篇)

食用指南 本系列因篇幅原因拆分為上下兩篇: 上篇以基礎環境搭建為主,介紹了 Unsloth 框架、基座模型下載、導入基座模型、數據集下載/加載/清洗、SwanLab 平臺賬號注冊。 下篇(本文)以實戰微調為主,介紹預訓練、全量…

Ubuntu安裝Jenkins

Ubuntu安裝Jenkins方法1:使用官方的Jenkins倉庫1. 添加Jenkins倉庫2. 更新軟件包列表3. 安裝Jenkins4. 啟動Jenkins服務5. 設置Jenkins開機啟動6. 查找初始管理員密碼7. 訪問Jenkins方法2:使用Snap包(適用于較新的Ubuntu版本)1. 安…

ubuntu22.04下配置qt5.15.17開發環境

自從qt5.15版本開始,不再提供免費的離線安裝包,只能通過源碼自行編譯。剛好最近需要在ubuntu22.04下配置qt開發環境,于是寫篇文章記錄配置的過程。 其實一開始是想配置qt5.15.2的,但是在編譯配置參數這一步驟中出現如下報錯 em…

S7-1200 與 S7-300 CPS7-400 CP UDP 通信 Step7 項目編程

S7-1200 CPU 與S7-300 CP STEP7 UDP通信S7-1200 與 S7-300 CP 之間的以太網通信可以通過 UDP 協議來實現,使用的通信指令是在S7-1200 CPU 側調用通信-開放式用戶通信TSEND_C,TRCV_C指令或TCON,TDISCON,TUSEND,TURCV 指…

基于YOLOv11的無人機目標檢測實戰(Windows環境)

1. 環境搭建 1.1 硬件與操作系統 操作系統:Windows 11 CPU:Intel i7-9700 GPU:NVIDIA RTX 2080(8GB顯存) 1.2 安裝CUDA和cuDNN 由于YOLOv11依賴PyTorch的GPU加速,需要安裝CUDA和cuDNN: 安…

Spring Cloud分布式配置中心:架構設計與技術實踐

從單體到微服務:Spring Cloud 開篇與微服務設計 Spring Cloud服務注冊與發現:架構設計與技術實踐深度分析 在以往分享中,碼友們已經掌握了微服務的設計和注冊中心的設計,部分聰明的碼友已經察覺了,已經到了需要設計一個…

15.2 Common Criteria合規

目錄1. Common Criteria簡介1.1 CC評估要素1.2 CC與TF-A的關系2. TF-A的CC合規要求2.1 安全功能需求2.2 開發過程要求3. TF-A的CC合規實現3.1 關鍵安全機制3.2 開發流程控制4. CC認證實踐指南4.1 認證準備步驟4.2 典型挑戰與解決方案4.3 已認證案例參考5. 持續合規建議1. Commo…

【前端:Typst】--let關鍵字的用法

在 Typst 中,#let 命令是用于定義變量和函數的核心指令,其用法非常靈活。以下是詳細的用法說明和示例。 目錄 1.基礎變量定義 2.函數定義 3.默認參數 4.內容塊參數(Content Blocks) 5.遞歸函數 1.基礎變量定義 // 定義簡單…

Qt輪廓分析設計+算法+避坑

輪廓分析擬合方面我現在只考慮矩形擬合和圓形擬合細分的話,橢圓擬合,矩形擬合,最小外接矩形,最小外接圓。對于一張圖像可能有不同的圖形,不同的圓,不同的矩形,我需要對其進行篩選,也…

C++中STL六大組件List的簡單介紹

一、前言C非常重視效率&#xff0c;對效率有損失的代碼常常是能省則省。使用list要包含的頭文件是<list>&#xff0c;要包含頭文件就是#iinclude <list>&#xff0c;List肯定是一種鏈表&#xff0c;我們不妨回憶一下那種鏈表插入刪除效率最快也就是最簡單&#xff…

第十五節:Vben Admin 最新 v5.0 (vben5) + Python Flask 快速入門 - vue前端 生產部署

Vben Admin vben5 系列文章目錄 ?? 基礎篇 ? 第一節:Vben Admin 最新 v5.0 (vben5) + Python Flask 快速入門 ? 第二節:Vben Admin 最新 v5.0 (vben5) + Python Flask 快速入門 - Python Flask 后端開發詳解(附源碼) ? 第三節:Vben Admin 最新 v5.0 (vben5) + Python …

背包初步(0-1背包、完全背包)

當月光灑在我的臉上 我想我就快變了模樣 有一種叫做撕心裂肺的湯 喝了它有神奇的力量 動態規劃初步&#xff08;完全背包&#xff09; 目錄動態規劃初步&#xff08;完全背包&#xff09;0-1背包簡介完全背包檢查數組是否存在有效劃分&#xff08;前綴劃分DP&#xff09;單詞拆…

Linux驅動06 --- UDP

目錄 一、UDP 1.1 介紹 1.2 UDP 的通信方式 1.3 單播 發送函數 接收函數 1.4 廣播 1.5 組播/多播 一、UDP 1.1 介紹 傳輸層的另外一個協議 面向無連接&#xff0c;不穩定&#xff0c;速度快&#xff0c;可以一對多 UDP&#xff08;User Datagram Protocol&…

AJAX 投票:技術解析與應用場景

AJAX 投票:技術解析與應用場景 引言 隨著互聯網技術的不斷發展,Web應用的用戶體驗越來越受到重視。AJAX(Asynchronous JavaScript and XML)作為一種重要的技術,在實現異步數據交互方面發揮著關鍵作用。本文將深入探討AJAX投票系統的技術原理、應用場景以及優化策略。 A…

【字節跳動】數據挖掘面試題0017:推薦算法:雙塔模型,怎么把內容精準地推送給用戶

文章大綱 雙塔模型:推薦算法中的“高效匹配引擎一、雙塔模型的核心思想:“分而治之” 的匹配邏輯二、雙塔模型的結構:從特征輸入到相似度輸出1. 輸入層:特征的 “原材料處理”2. 塔網絡層:用戶與物品的“個性化編碼”3. 交互層:向量相似度的“偏好打分”三、雙塔模型的優…

7月14日日記

數學類今天考完最后一科英語放假回家了。有點羨慕他們。今天英語成績出來了&#xff0c;我是89分&#xff0c;一開始有點失望&#xff0c;感覺沒有上90&#xff0c;這是一個很好的沖擊4.0 的機會。但是后來一想好像也沒什么可惜的&#xff0c;這個分數還是很高的。舍友小林是90…

js的局部變量和全局變量

全局變量常常定義在函數外&#xff0c;具有全局定義域&#xff0c;在整個js代碼的任何地方都可以使用&#xff0c;這個就叫全局變量局部變量定義在函數內部&#xff0c;只在當前函數的定義域可以被使用&#xff0c;而且不同的函數可以定義相同的局部變量&#xff0c;他們之間相…

C++ 多態詳解:從概念到實現原理----《Hello C++ Wrold!》(14)--(C/C++)

文章目錄前言多態的概念多態的定義和實現虛函數虛函數的重寫(覆蓋)多態的構成條件override 和 final&#xff08;C11提出&#xff09;finaloverride重載、覆蓋(重寫)、隱藏(重定義)的對比抽象類接口繼承和實現繼承多態的原理虛函數表(也叫做虛表)引申:虛表的打印多態的原理靜態…

Node.js + Express的數據庫AB View切換方案設計

方案總覽數據導入過程&#xff1a; - 根據控制表判斷當前活躍組&#xff08;假設當前活躍的是a&#xff0c;那么接下來要導入到b&#xff09;。 - 清空非活躍表&#xff08;即b表&#xff09;的數據&#xff0c;然后將新數據導入到b表。 - 切換控制表&#xff0c;將活…