ASIO 避坑指南:高效、安全與穩健的異步網絡編程
引言
ASIO是很強大的一個異步io庫,但服務器除了高效以外,穩定也是關鍵,這篇文章主要總結了ASIO使用遇到的典型問題和坑:
- 如何榨干
io_context
的性能,讓CPU和網卡持續飽和工作? - 如何安全地關閉一個連接,避免資源泄漏或程序崩潰?(特別是當異步操作還在進行時)
- 如何正確地實現一個異步寫操作,確保數據完整發送且內存安全?
- 如何管理跨越多個異步操作的對象生命周期?
- 如何設計緩沖區(Buffer) 才能避免懸垂指針或數據競爭?
- 如何在多線程環境下安全地操作共享資源?
- 如何處理錯誤碼,哪些錯誤碼要特殊處理,例如
operation_aborted
和eof
?
上面這些問題處理不好,輕則導致性能低下、資源泄漏,重則引發程序崩潰、數據錯誤,是基于ASIO開發服務器必須要了解清楚的點。
一、最大化利用 I/O (榨干 io_context
的性能)
很多人包括很多博客對asio的多線程操作僅僅照搬例子進行介紹,沒有真正的線上實戰,asio多線程有兩種方法:
-
單一 I/O 上下文 + 多工作線程
- 單
io_context
實例 - 多個線程調用
io_context.run()
- 事件分發機制:操作系統將就緒事件分配給不同工作線程執行
- 單
-
多 I/O 上下文 (io_context per Thread)
- 每個線程獨占一個
io_context
實例 - 每個線程調用自己
io_context.run()
- 資源隔離:Socket/Timer 綁定到特定線程的
io_context
- 每個線程獨占一個
io_context
的核心是事件循環。一個線程調用 run()
通常足以高效處理數千連接,如果多個線程都執行 run()
,那么事件會分配到多個線程中執行,這樣你首先要考慮的是線程安全,每個回調如果調用了共享資源都需要枷鎖,這會降低運行效率。
單一 I/O 上下文 + 多工作線程
// 創建線程池執行同一個io_context
asio::io_context io;
asio::thread_pool pool(4); // 4個線程// 多個線程執行同一個io_context的run()
for(int i=0; i<4; ++i){asio::post(pool, [&]{ io.run(); });
}// 注意:所有Handler可能在不同線程執行!
socket.async_read_some(..., [](...){// 需要線程同步!可能被任意線程執行
});
優勢:
- 最佳負載均衡:內核自動分配事件到空閑線程
- 簡化資源管理:所有操作共享單一I/O上下文
劣勢:
- 鎖競爭開銷:共享資源訪問需要同步,抵消多線程收益
ASIO針對這種情況提供了 strand
進行序列化訪問
例如:
// 創建strand綁定到io_context
asio::strand<asio::io_context::executor_type> my_strand = asio::make_strand(io.get_executor());// 通過strand分發處理程序
socket.async_read_some(asio::bind_executor(my_strand, [](...){// 保證同一strand上的handler不會并發執行connections.erase(id); // 無需鎖!}
));
每線程獨立 I/O 上下文 (io_context per Thread)
這是推薦做法,經過本人驗證,能極大提高并發處理能力
// 每個線程擁有獨立io_context
std::vector<std::unique_ptr<asio::io_context>> io_contexts;
std::vector<std::thread> threads;for(int i=0; i<4; ++i){io_contexts.emplace_back(std::make_unique<asio::io_context>());threads.emplace_back([ioc=io_contexts.back().get()]{ioc->run(); // 每個線程運行自己的io_context});
}// 連接綁定到特定io_context
auto& io = *io_contexts[connection_id % 4];
tcp::socket socket(io);
優勢:
- 處理程序始終在同一線程執行,避免線程切換開銷
- 能更大程度發揮單個io的性能
保證異步操作鏈的持續
一個異步操作完成時,在其完成處理函數 (Completion Handler) 中發起下一個異步操作(如 async_read
后發起 async_write
,或繼續 async_read
),這樣可以保持 I/O 通道持續忙碌,避免輪詢
要注意的是,一定要避免在 回調 中做耗時同步操作阻塞事件循環。
高效 Buffer 管理
asio::buffer
是視圖: 它不擁有數據,只是指向現有內存塊的引用,處理不當會導致野指針、數據損壞或程序崩潰。底層數據必須在異步操作期間保持有效!
絕對要避免使用棧分配的內容做作為 Buffer,例如下面這個就是典型的錯誤:
//錯誤示范
void do_async_write(tcp::socket& socket) {char buffer[1024]; // 錯誤:棧分配緩沖區generate_data(buffer, 1024); // 填充數據// 異步寫操作 - 緩沖區可能在函數返回后失效!socket.async_write_some(asio::buffer(buffer, 1024),[](const asio::error_code& ec, size_t bytes) {// 此時原buffer棧幀已銷毀 - 野指針訪問!});
} // 函數退出,棧緩沖區被銷毀!
除非你用的是協程模式,否則不要用棧分配內存做 Buffer,因為異步操作結束后,棧內存會被回收,Buffer 就會變成無效的,過一段時間在執行回調你的buffer里面就是野指針,因此要嚴格保證 async_read
/async_write
使用的 buffer 底層內存在其整個操作期間(從調用開始到 Handler 執行結束)有效且不被修改
你應該使用智能指針來分配緩沖,并讓這個智能指針跟隨回調函數,直至回調函數結束,典型的就是讓lambda把這個智能指針捕獲,讓它跟著回調函數的生命周期。
void send_large_data(tcp::socket& socket) {// 使用shared_ptr管理堆緩沖區auto buf = std::make_shared<std::vector<char>>(generate_large_data());asio::async_write(socket, asio::buffer(*buf),// 捕獲智能指針延長生命周期[buf](const asio::error_code& ec, size_t) {// 緩沖區在lambda銷毀前保持有效});
}
或者作為session的成員變量
class Connection : public std::enable_shared_from_this<Connection> {std::array<char, 8192> buffer_; // 成員緩沖區void start_read() {auto self(shared_from_this());socket_.async_read_some(asio::buffer(buffer_),[self](const asio::error_code& ec, size_t length) {if (!ec) self->process_data(length);});}
};
如果是linux系統,還可以用零拷貝緩沖區注冊方法,讓io和回調都操作這個緩沖區,從而避免了數據拷貝。
// 注冊持久內存到io_context
auto buf = std::make_shared<std::array<char, 4096>>();
asio::io_context& ioc = socket.get_executor().context();// 顯式注冊緩沖區(Linux專屬優化)
const bool registered = asio::register_buffer(ioc, asio::buffer(*buf), asio::buffer_registration::permanent);socket.async_read_some(asio::buffer(*buf),[buf](const asio::error_code& ec, size_t bytes) {// 緩沖區保持注冊狀態});
這種尤其適合高頻小包數據的處理
安全關閉 Socket 和連接
關閉是異步編程中最容易出資源泄漏或崩潰的地方。關閉做的不好,會出現如下問題:
- 資源泄漏(文件描述符、內存)
- 大量CLOSE_WAIT狀態連接
- 程序崩潰(訪問已銷毀對象)
- 數據丟失(未發送完的數據)
Socket的關閉有shutdown()
和 close()
兩個行數
socket.shutdown
shutdown
可以理解為是關閉通知,有三種模式(shutdown_receive
, shutdown_send
, shutdown_both
),通知對端“我不會再發數據了”(shutdown_send
)或“我不想再收數據了”(shutdown_receive
)。
shutdown
執行后,后續的 async_read
會立即完成并返回 asio::error::shut_down
(如果接收端關閉),后續的 async_write
會立即完成并返回 asio::error::shut_down
(如果發送端關閉)。
需要注意的是,shutdown()
后,Socket 描述符依然有效。
socket.close
socket.close
會釋放系統資源(Socket 描述符),它會隱式地執行 shutdown(both)
。任何掛起(Pending)的異步操作(async_read
, async_write
, async_connect
等)會立即取消,它們的回調函數會被調用,并傳入 asio::error::operation_aborted
錯誤碼。
因此,在讀寫回調中,遇到asio::error::operation_aborted
錯誤碼要特殊處理,避免重復關閉
回調函數設計時,應檢查錯誤碼,如果是 operation_aborted
,通常意味著 Socket 正在被關閉/銷毀,回調函數應該:
- 忽略這個操作的結果。
- 清理相關的資源(如釋放為這次操作分配的 Buffer)。
- 避免再訪問Socket
當調用 socket.close()
取消操作時,包含 socket
的對象(如 connection
)可能正在被銷毀
安全關閉方法
關閉分服務器主動關閉,以及對方客戶端主動關閉,兩種不同方式的關閉處理方式不太一樣
- 服務器主動關閉
- 標記關閉開始,執行
shutdown(socket, asio::ip::tcp::socket::shutdown_receive);
// 告訴對方我不再接收數據了 - 檢查是否有待發送數據,無數據 → 立即關閉,有數據 → 等待當前寫操作完成
// 在管理類中觸發關閉
void ConnectionManager::stop_all() {for (auto& conn : connections_) {conn->safe_shutdown();}
}// Connection::safe_shutdown實現:
void safe_shutdown() {if (shutdown_initiated_.exchange(true)) return;// 1. 停止接收新數據socket_.shutdown(shutdown_receive);// 2. 檢查寫狀態if (!writing_) {final_close(); // 無待發數據直接關閉}// 否則等待進行中的寫操作完成
}
- 客戶端主動關閉
- 在
async_read
Handler 中檢測到error_code == asio::error::eof
(對方正常關閉發送端) - (可選)如果還有數據要發送,可以嘗試發送(但對方可能已關閉接收端,會出錯)。
- 調用
socket.close()
。
下面是一個客戶端的安全關閉示例:
void Connection::handle_read_error(asio::error_code ec) {if (ec == asio::error::eof) {// 客戶端發送了FIN包safe_shutdown();}else if (ec == asio::error::operation_aborted) {// 正常關閉過程中的取消// 不進行任何操作,連接即將銷毀return;}
}
因此,一個安全的關閉不僅僅是close,還要針對不同的錯誤碼來執行不同的關閉策略,在接收和發送的錯誤碼處理不一樣,建議一個回話應該對錯誤碼處理單獨提取一個函數,如下:
class Connection : public std::enable_shared_from_this<Connection> {
private:asio::ip::tcp::socket socket_;std::queue<std::vector<char>> write_queue_;bool writing_;std::atomic<bool> shutdown_initiated_;std::array<char, 1024> read_buffer_;
public:Connection(asio::ip::tcp::socket socket): socket_(std::move(socket)), writing_(false),shutdown_initiated_(false) {}void start() {read_header(); // 開始讀循環}void safe_shutdown() {if (shutdown_initiated_.exchange(true)) return;// 1. 停止接受新數據asio::error_code ec;socket_.shutdown(asio::ip::tcp::socket::shutdown_receive, ec);// 忽略錯誤:可能已關閉// 2. 檢查寫隊列if (!writing_) {// 無待發送數據,直接關閉final_close();} else {// 等待進行中的寫操作完成// final_close將在寫回調中調用}}private:void read_header() {auto self(shared_from_this());socket_.async_read_some(asio::buffer(read_buffer_),[this, self](asio::error_code ec, size_t length) {if (ec) {handle_read_error(ec);return;}process_data(length);read_header(); // 繼續讀});}void handle_read_error(asio::error_code ec) {if (ec == asio::error::eof) {// 客戶端正常關閉safe_shutdown();} else if (ec == asio::error::operation_aborted) {// 關閉過程中的正常取消} else {// 其他錯誤final_close();}}void async_write_data(std::vector<char> data) {// 將數據加入隊列bool write_in_progress = !write_queue_.empty();write_queue_.push(std::move(data));if (!write_in_progress && !writing_) {start_write_chain();}}void start_write_chain() {writing_ = true;auto self(shared_from_this());auto& buf = write_queue_.front();asio::async_write(socket_, asio::buffer(buf),[this, self](asio::error_code ec, size_t /*bytes*/) {writing_ = false;if (ec) {handle_write_error(ec);return;}write_queue_.pop();if (!write_queue_.empty()) {start_write_chain();} else if (shutdown_initiated_) {// 所有數據已發送,安全關閉final_close();}});}void handle_write_error(asio::error_code ec) {if (ec == asio::error::operation_aborted) {// 正常取消,忽略} else if (ec == asio::error::broken_pipe || ec == asio::error::connection_reset) {// 連接已斷開final_close();} else {// 其他錯誤處理safe_shutdown();}}void final_close() {asio::error_code ignore_ec;// 取消所有異步操作socket_.cancel(ignore_ec);// 關閉socketsocket_.shutdown(asio::ip::tcp::socket::shutdown_both, ignore_ec);socket_.close(ignore_ec);// 清理資源decltype(write_queue_) empty;std::swap(write_queue_, empty);}
};
上面的例子不僅展示了安全關閉,還包含了安全發送,關閉和發生接收關聯緊密,因此很難用一句函數就能涵蓋,上面的例子的發送用來一個隊列,這引出下一節,如何用asio進行高并發的安全的異步寫操作
安全的異步寫操作
除了上面提到的buffer有效性外,異步寫操作有其特定要點:
- 同一 Socket 上的多個并發
async_write
操作是未定義行為! TCP 是流協議,數據順序必須保證。 - 必須使用隊列(見上面的例子)。同一時間只允許一個
async_write
操作在進行,等回調完了之后,再寫入下一個
寫回調應該作如下工作:
- 檢查
error_code
(包括operation_aborted
,具體見上節安全關閉)。 - 處理
bytes_transferred
(通常成功時等于請求量)。 - 釋放或標記該次寫操作使用的 Buffer 可重用/釋放。
- **檢查寫隊列:如果隊列非空,取出下一批數據發起新的
async_write
,如果隊列為空,設置writing_ = false
。
如上面的例子所示,安全大并發的異步寫操作,應該如下:
class Connection : public std::enable_shared_from_this<Connection> {
private:asio::ip::tcp::socket socket_;std::queue<std::vector<char>> write_queue_;bool writing_;std::atomic<bool> shutdown_initiated_;std::array<char, 1024> read_buffer_;
public://這里省略其他函數,在安全關閉里已經展示完整代碼void async_write_data(std::vector<char> data) {// 將數據加入隊列bool write_in_progress = !write_queue_.empty();write_queue_.push(std::move(data));if (!write_in_progress && !writing_) {start_write_chain();}}void start_write_chain() {writing_ = true;auto self(shared_from_this());auto& buf = write_queue_.front();asio::async_write(socket_, asio::buffer(buf),[this, self](asio::error_code ec, size_t /*bytes*/) {writing_ = false;if (ec) {handle_write_error(ec);return;}write_queue_.pop();if (!write_queue_.empty()) {start_write_chain();} else if (shutdown_initiated_) {// 所有數據已發送,安全關閉final_close();}});}void handle_write_error(asio::error_code ec) {if (ec == asio::error::operation_aborted) {// 正常取消,忽略} else if (ec == asio::error::broken_pipe || ec == asio::error::connection_reset) {// 連接已斷開final_close();} else {// 其他錯誤處理safe_shutdown();}}
};
- 要有個寫隊列,示例中的
std::queue<std::shared_ptr<std::string>> write_queue_;
- 寫狀態標記
writing_
,主要作用是在關閉時,檢查是否還沒寫完,沒寫完就等寫完再關閉socket - 關閉標記
shutdown_initiated_
,這個主要作用是關閉標記,如果寫完發現已經關閉,直接調用close,通過shutdown_initiated_
和writing_
可以實現安全的關閉,同時保證寫數據不會丟失