使用ASIO的協程實現高并發服務器
在 C++ 網絡編程領域,Asio 庫提供了兩種主要的異步編程范式:傳統的回調模式和基于協程的現代模式,傳統的回調模式大家都很清楚,這里不多做介紹,本文主要介紹基于協程的模式,Asio 協程基于 C++20 標準協程,提供了更簡潔的異步編程模型。
ASIO協程和回調對比
下面先列舉一下ASIO使用回調和使用協程的區別,等看完區別后再講講ASIO使用協程的核心關鍵函數有哪些
讓我用一個更生動的定時器示例來展示協程如何優雅地解決回調地獄問題。我們將實現一個簡單但常見需求:
每隔3秒打印一次消息,共打印5次。
要實現上面這個功能,用傳統的回調實現是很麻煩的,下面是用回調實現的代碼(為了更清晰,這里不用lambda表達式):
#include <boost/asio.hpp>
#include <iostream>namespace asio = boost::asio;// 前向聲明
void timer_callback1(asio::steady_timer* timer, int count);
void timer_callback2(asio::steady_timer* timer, int count);
void timer_callback3(asio::steady_timer* timer, int count);
void timer_callback4(asio::steady_timer* timer, int count);
void timer_callback5(asio::steady_timer* timer, int count);
void final_callback(asio::steady_timer* timer, int count);// 處理第一次定時器到期
void timer_callback1(asio::steady_timer* timer, int count) {std::cout << "回調模式: 第1次打印 - 3秒已過\n";timer->expires_after(std::chrono::seconds(3));timer->async_wait(std::bind(timer_callback2, timer, count + 1));
}// 處理第二次定時器到期
void timer_callback2(asio::steady_timer* timer, int count) {std::cout << "回調模式: 第2次打印 - 3秒已過\n";timer->expires_after(std::chrono::seconds(3));timer->async_wait(std::bind(timer_callback3, timer, count + 1));
}// 處理第三次定時器到期
void timer_callback3(asio::steady_timer* timer, int count) {std::cout << "回調模式: 第3次打印 - 3秒已過\n";timer->expires_after(std::chrono::seconds(3));timer->async_wait(std::bind(timer_callback4, timer, count + 1));
}// 處理第四次定時器到期
void timer_callback4(asio::steady_timer* timer, int count) {std::cout << "回調模式: 第4次打印 - 3秒已過\n";timer->expires_after(std::chrono::seconds(3));timer->async_wait(std::bind(timer_callback5, timer, count + 1));
}// 處理第五次定時器到期
void timer_callback5(asio::steady_timer* timer, int count) {std::cout << "回調模式: 第5次打印 - 3秒已過\n";timer->expires_after(std::chrono::seconds(3));timer->async_wait(std::bind(final_callback, timer, count + 1));
}// 最終回調
void final_callback(asio::steady_timer* timer, int count) {std::cout << "回調模式: 完成" << count << "次打印\n";delete timer; // 清理資源
}// 啟動定時器序列
void start_timer_sequence(asio::io_context& io) {// 在堆上創建定時器(需要在整個序列中保持存活)asio::steady_timer* timer = new asio::steady_timer(io);// 設置第一次等待timer->expires_after(std::chrono::seconds(3));timer->async_wait(std::bind(timer_callback1, timer, 0));
}int main() {asio::io_context io;// 啟動回調地獄start_timer_sequence(io);io.run();return 0;
}
正如上面的例子,每個異步操作需要獨立的處理函數,5次操作需要6個函數(5個回調+1個啟動函數),邏輯上深層嵌套:start → callback1 → callback2 → ... → final
,回調函數形成鏈式調用,如果業務很復雜,你自己都不知道哪一個回調調用了哪一個,這是非常不友好的。c++11后有了lambda表達式這個情況有所好轉,但lambda嵌套lambda看的還是很費勁
上面代碼用協程來實現就比較簡單了:
#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <iostream>namespace asio = boost::asio;// 協程版 - 單一函數順序執行
asio::awaitable<void> sequential_operations() {asio::steady_timer timer(co_await asio::this_coro::executor);// 操作1std::cout << "協程: 啟動操作1\n";timer.expires_after(std::chrono::seconds(3));co_await timer.async_wait(asio::use_awaitable);std::cout << "協程: 操作1完成 - 3秒已過\n";// 操作2std::cout << "協程: 啟動操作2\n";timer.expires_after(std::chrono::seconds(3));co_await timer.async_wait(asio::use_awaitable);std::cout << "協程: 操作2完成 - 3秒已過\n";// 操作3std::cout << "協程: 啟動操作3\n";timer.expires_after(std::chrono::seconds(3));co_await timer.async_wait(asio::use_awaitable);std::cout << "協程: 操作3完成 - 3秒已過\n";// 操作4std::cout << "協程: 啟動操作4\n";timer.expires_after(std::chrono::seconds(3));co_await timer.async_wait(asio::use_awaitable);std::cout << "協程: 操作4完成 - 3秒已過\n";// 操作5std::cout << "協程: 啟動操作5\n";timer.expires_after(std::chrono::seconds(3));co_await timer.async_wait(asio::use_awaitable);std::cout << "協程: 操作5完成 - 3秒已過\n";std::cout << "協程: 所有操作完成\n";co_return;
}int main() {asio::io_context io;asio::co_spawn(io, sequential_operations(), asio::detached);io.run();return 0;
}
使用協程可以讓整個序列在一個函數中實現,執行流程一目了然,從上到下順序執行,無嵌套,無跳轉,調整操作順序只需調整代碼順序,另外,錯誤處理上,一個 try/catch 塊覆蓋所有操作
協程與傳統回調對比
特性 | 協程 | 回調 |
---|---|---|
代碼結構 | 順序執行,類似同步代碼 | 嵌套回調,容易形成"回調地獄" |
可讀性 | 高,邏輯清晰 | 低,跳轉復雜 |
錯誤處理 | 使用 try/catch | 錯誤碼參數 |
局部變量 | 保持狀態 | 需要手動保持狀態 |
控制流 | 標準控制結構 | 手動狀態管理 |
資源管理 | RAII 自然適用 | 需要額外注意生命周期 |
Asio 協程通過 C++20 標準協程提供了更簡潔、更易讀的異步編程模型,同時保持了高性能特性。掌握 co_await
和 co_spawn
等關鍵機制,可以顯著提高網絡應用的開發效率和可維護性。
ASIO協程的核心概念和關鍵字
ASIO協程的關鍵概念和關鍵字如下:
1. co_await
-
作用:掛起當前協程,等待異步操作完成
-
用法:以asio的讀取數據的例子舉例,異步讀取等到讀取到數據后繼續執行數據處理,傳統的回調是這樣實現的:
// 讀取完成處理函數
void handle_read(boost::system::error_code ec, size_t n, std::shared_ptr<std::array<char, 1024>> buffer) {if (!ec) {std::cout << "讀取到 " << n << " 字節數據\n";} else {std::cerr << "讀取錯誤: " << ec.message() << "\n";}
}// 啟動讀取操作
void start_read(tcp::socket& socket) {// 使用 shared_ptr 管理緩沖區auto buffer = std::make_shared<std::array<char, 1024>>();socket.async_read_some(asio::buffer(*buffer),std::bind(handle_read, std::placeholders::_1, std::placeholders::_2, buffer));
}
使用協程是這樣實現的
asio::awaitable<void> read_data(tcp::socket& socket) {char data[1024];// 使用 co_await 等待異步讀取size_t n = co_await socket.async_read_some(asio::buffer(data), asio::use_awaitable);std::cout << "讀取到 " << n << " 字節數據\n";
}
協程直接"等待"操作完成,代碼線性執行,從這個例子也看到協程的另外一個好處,就是可以使用局部變量來管理緩沖區,因為從語法上看,協程它是在一個函數里執行,緩沖區生命周期不會釋放,而回調你只能用成員變量或者堆分配空間了,當然,你用lambda表達式也可以實現類型效果
2. co_return
和asio::awaitable
- 作用:
co_return
類似return
結束協程執行并返回值,這個返回值實際是asio::awaitable
//這里T是result對應類型
asio::awaitable<T> calculate_value() {co_return result; // 對于有返回值的協程
}asio::awaitable<void> do_something() {co_return; // 對于 void 協程
}
下面舉例一個具體的例子
//calculate_value函數將延時1秒后返回42
asio::awaitable<int> calculate_value() {asio::steady_timer timer(co_await asio::this_coro::executor);timer.expires_after(std::chrono::seconds(1));co_await timer.async_wait(asio::use_awaitable);// 使用 co_return 返回值co_return 42;
}
協程直接返回結果,類似同步函數,co_return
搭配asio::awaitable
使用
3. asio::use_awaitable
- 作用:將異步操作轉換為可等待對象,這個作用是告訴asio的函數這是一個協程函數,是為了和異步函數能進行重載區分用的,這個標志能讓asio的協程函數和異步回調函數都一樣,僅僅是傳入的參數不同
//協程版本
asio::awaitable<void> wait_for_timer() {
asio::steady_timer timer(co_await asio::this_coro::executor);// 使用 use_awaitable 使異步操作可等待
timer.expires_after(std::chrono::seconds(1));
co_await timer.async_wait(asio::use_awaitable);std::cout << "定時器完成\n";
}//回調版本
void wait_for_timer(asio::io_context& io) {auto timer = std::make_shared<asio::steady_timer>(io, std::chrono::seconds(1));timer->async_wait([](const asio::error_code& ec) {if (!ec) {std::cout << "定時器完成\n";} else {std::cerr << "定時器錯誤: " << ec.message() << "\n";}});
}
4. 啟動協程 (co_spawn
)
- 作用:
co_spawn
的作用是啟動一個協程
asio::co_spawn(executor, // 執行上下文 (io_context/strand)coroutine_function, // 協程函數completion_handler // 完成回調
);
completion_handler是完成處理程序選項有兩種:
asio::detached
: 不關心協程結束(最常用)
asio::co_spawn(executor, session(std::move(socket)), asio::detached);
- 自定義處理:
asio::co_spawn(executor, session(), [](std::exception_ptr e, int result) {if (e) std::rethrow_exception(e);std::cout << "Result: " << result << "\n";});
這里用ASIO回調方式寫一個最簡單的tcp服務器是這樣的:
// 會話狀態管理類
class Session : public std::enable_shared_from_this<Session> {
public:Session(tcp::socket socket) : socket_(std::move(socket)) {}void start() {start_read();}void start_read() {auto self = shared_from_this();auto buffer = std::make_shared<std::array<char, 1024>>();socket_.async_read_some(asio::buffer(*buffer),std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, buffer));}void handle_read(boost::system::error_code ec, size_t n, std::shared_ptr<std::array<char, 1024>> buffer) {if (!ec) {// 處理數據...start_read(); // 繼續讀取}}private:tcp::socket socket_;
};int main() {asio::io_context io;tcp::socket socket(io);// 創建會話并啟動auto session = std::make_shared<Session>(std::move(socket));session->start();io.run();
}
用協程可以變為這樣:
int main() {asio::io_context io;tcp::socket socket(io);// 使用 co_spawn 啟動協程asio::co_spawn(io, session(std::move(socket)), asio::detached);io.run();
}// 協程函數
asio::awaitable<void> session(tcp::socket socket) {// ...協程邏輯...co_return;
}
asio::co_spawn
啟用一個協程進入session函數,然后等待其完成。
上面例子中asio::detached說明啟動這個協程后就不管他的返回了,如果你要獲取返回,可以傳入一個回調(協程最終還是不能完全避免回調),例如:
// 完成處理函數
void handle_completion(std::exception_ptr e, int result) {if (e) {try {std::rethrow_exception(e);} catch (const std::exception& ex) {std::cerr << "計算錯誤: " << ex.what() << "\n";}} else {std::cout << "計算結果: " << result << "\n";}
}// 可能失敗的協程
asio::awaitable<int> compute_value() {// ...可能拋出異常的計算...co_return 42;
}int main() {asio::io_context io;// 啟動協程并指定完成處理函數asio::co_spawn(io, compute_value(), handle_completion);io.run();
}
ASIO協程的其它操作
1. 切換執行上下文
asio::dispatch
:協程執行上下文切換,它允許開發者精確控制協程在哪個線程或執行器上運行
// 切換到指定線程的 io_context
co_await asio::dispatch(target_io_context, asio::use_awaitable);
當協程執行到co_await asio::dispatch(...)
時:
- 掛起當前協程:暫停當前執行流程
- 調度到目標執行器:將協程續體(continuation)提交到目標執行器的隊列
- 在目標上下文恢復:當目標執行器調度該任務時,協程在目標線程恢復執行
例如:網絡請求后,有些復雜計算切換到別的線程進行操作:
asio::awaitable<void> process_request() {// 在網絡線程池處理請求Request req = co_await receive_request();// 切換到計算線程池處理CPU密集型任務co_await asio::dispatch(compute_pool, asio::use_awaitable);Result res = co_await heavy_computation(req);// 切回網絡線程池發送響應co_await asio::dispatch(network_pool, asio::use_awaitable);co_await send_response(res);
}
還有就是線程和GUI的切換,GUI線程一般不會和網絡請求在一個線程中
asio::awaitable<void> update_ui() {Data data = co_await fetch_data();// 切換到GUI線程更新界面co_await asio::dispatch(gui_executor, asio::use_awaitable);ui_label.set_text(data.message);ui_chart.update(data.values);
}
2. 協程取消
asio::cancellation_signal cancel_signal;// 啟動可取消協程
asio::co_spawn(executor, [](asio::cancellation_signal sig) -> asio::awaitable<void> {asio::steady_timer timer(co_await asio::this_coro::executor);timer.expires_after(10s);// 綁定取消信號co_await timer.async_wait(asio::bind_cancellation_slot(sig.slot(), asio::use_awaitable));}(cancel_signal),asio::detached
);// 在需要時取消
cancel_signal.emit(asio::cancellation_type::all);
協程操作符
協程操作符主要是||
和&&
,允許多個協程同時運行,并等待所有(&&
)或者某個(||
)協程完成。
1.||
操作符
下面舉個網絡編程中最常見的場景之一"帶超時的異步讀取"來展示協程的操作符,網絡編程經常有一個需求,就是你給對方寫一個數據,要n秒內等待對方回復,如果對方n秒內不回復,你要重發,最多重發m次,這個邏輯梳理為:
- 向對方發送請求數據
- 等待回復,設置超時時間(如3秒)
- 如果超時未收到回復,重發請求
- 最多重發M次(如3次)
用協程這樣實現的(注意,這里協程的||
操作符要include asio/experimental/awaitable_operators.hpp
):
#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <iostream>
#include <system_error>
#include <chrono>namespace asio = boost::asio;
using asio::ip::tcp;
using namespace asio::experimental::awaitable_operators;
namespace chrono = std::chrono;// 帶超時重發的可靠請求
asio::awaitable<bool> reliable_request(tcp::socket& socket, const std::string& request,int max_retries = 3,chrono::seconds timeout = 3s)
{int attempt = 0;while (attempt <= max_retries) {attempt++;std::cout << "嘗試 #" << attempt << "/" << max_retries << "\n";try {// 發送請求co_await asio::async_write(socket, asio::buffer(request), asio::use_awaitable);// 準備接收響應std::string response;response.resize(1024);// 設置定時器asio::steady_timer timer(co_await asio::this_coro::executor);timer.expires_after(timeout);// 同時等待讀取和超時,這里演示了||操作符,這里面有兩個協程函數auto result = co_await (socket.async_read_some(asio::buffer(response), asio::use_awaitable) ||timer.async_wait(asio::use_awaitable));//只要上面有一個返回結果就會往下執行// 處理結果if (result.index() == 0) { // 收到響應size_t bytes_read = std::get<0>(result).first;response.resize(bytes_read);std::cout << "收到響應: " << response << "\n";co_return true;}else { // 超時std::cout << "請求超時, ";if (attempt < max_retries) {std::cout << "準備重試...\n";} else {std::cout << "已達最大重試次數\n";}}}catch (const std::exception& e) {std::cerr << "錯誤: " << e.what() << "\n";co_return false;}}co_return false; // 所有嘗試失敗
}// 示例使用
asio::awaitable<void> client_session(tcp::socket socket) {try {// 構造請求數據std::string request = "QUERY_DATA";// 發送可靠請求(最多重試3次,超時3秒)bool success = co_await reliable_request(socket, request, 3, 3s);if (success) {std::cout << "請求成功完成\n";} else {std::cout << "請求失敗\n";}}catch (const std::exception& e) {std::cerr << "會話錯誤: " << e.what() << "\n";}
}int main() {asio::io_context io;// 創建并連接socket(示例)tcp::socket socket(io);// 實際中應連接服務器: socket.connect(endpoint);// 啟動客戶端會話asio::co_spawn(io, client_session(std::move(socket)), asio::detached);io.run();return 0;
}
上面這個協程實現還是比較清晰的,如果用回調實現,則非常麻煩:
#include <boost/asio.hpp>
#include <iostream>
#include <memory>
#include <functional>namespace asio = boost::asio;
using asio::ip::tcp;
namespace chrono = std::chrono;// 前向聲明
class ReliableRequest;
void handle_write(boost::system::error_code ec, size_t, std::shared_ptr<ReliableRequest> request);
void handle_read(boost::system::error_code ec, size_t bytes_read,std::shared_ptr<ReliableRequest> request);
void handle_timeout(boost::system::error_code ec,std::shared_ptr<ReliableRequest> request);
void start_request(std::shared_ptr<ReliableRequest> request);// 請求狀態管理類
class ReliableRequest : public std::enable_shared_from_this<ReliableRequest> {
public:ReliableRequest(tcp::socket& socket, std::string request, int max_retries, chrono::seconds timeout): socket_(std::move(socket)), request_(std::move(request)), max_retries_(max_retries), timeout_(timeout), timer_(socket_.get_executor()), attempt_(0){}void start() {attempt_ = 0;start_attempt();}private:friend void handle_write(boost::system::error_code, size_t, std::shared_ptr<ReliableRequest>);friend void handle_read(boost::system::error_code, size_t,std::shared_ptr<ReliableRequest>);friend void handle_timeout(boost::system::error_code,std::shared_ptr<ReliableRequest>);void start_attempt() {attempt_++;std::cout << "嘗試 #" << attempt_ << "/" << max_retries_ << "\n";auto self = shared_from_this();// 啟動異步寫入asio::async_write(socket_, asio::buffer(request_),std::bind(handle_write, std::placeholders::_1, std::placeholders::_2, self));}void start_wait() {auto self = shared_from_this();response_buffer_ = std::make_shared<std::array<char, 1024>>();// 啟動異步讀取socket_.async_read_some(asio::buffer(*response_buffer_),std::bind(handle_read, std::placeholders::_1, std::placeholders::_2, self));// 啟動超時定時器timer_.expires_after(timeout_);timer_.async_wait(std::bind(handle_timeout, std::placeholders::_1, self));}void handle_success(size_t bytes_read) {std::string response(response_buffer_->data(), bytes_read);std::cout << "收到響應: " << response << "\n";if (completion_) completion_(true);}void handle_failure() {std::cout << "請求失敗\n";if (completion_) completion_(false);}void handle_retry() {if (attempt_ < max_retries_) {std::cout << "準備重試...\n";start_attempt();} else {std::cout << "已達最大重試次數\n";handle_failure();}}public:// 完成回調using CompletionHandler = std::function<void(bool)>;CompletionHandler completion_;tcp::socket socket_;std::string request_;int max_retries_;chrono::seconds timeout_;asio::steady_timer timer_;int attempt_;std::shared_ptr<std::array<char, 1024>> response_buffer_;
};// 寫入完成處理
void handle_write(boost::system::error_code ec, size_t, std::shared_ptr<ReliableRequest> request) {if (ec) {std::cerr << "寫入錯誤: " << ec.message() << "\n";request->handle_failure();return;}request->start_wait();
}// 讀取完成處理
void handle_read(boost::system::error_code ec, size_t bytes_read,std::shared_ptr<ReliableRequest> request) {// 取消定時器request->timer_.cancel();if (ec == asio::error::operation_aborted) {return; // 超時已處理}if (ec) {std::cerr << "讀取錯誤: " << ec.message() << "\n";request->handle_retry();return;}request->handle_success(bytes_read);
}// 超時處理
void handle_timeout(boost::system::error_code ec,std::shared_ptr<ReliableRequest> request) {if (ec == asio::error::operation_aborted) {return; // 讀取已完成}if (ec) {std::cerr << "定時器錯誤: " << ec.message() << "\n";request->handle_retry();return;}// 取消讀取操作request->socket_.cancel();std::cout << "請求超時, ";request->handle_retry();
}// 啟動可靠請求
void start_reliable_request(tcp::socket socket, std::string request,ReliableRequest::CompletionHandler completion,int max_retries = 3, chrono::seconds timeout = 3s) {auto request_obj = std::make_shared<ReliableRequest>(std::move(socket), std::move(request), max_retries, timeout);request_obj->completion_ = completion;request_obj->start();
}// 示例使用
void handle_request_complete(bool success) {if (success) {std::cout << "請求成功完成\n";} else {std::cout << "請求失敗\n";}
}int main() {asio::io_context io;// 創建socket(示例)tcp::socket socket(io);// 實際中應連接服務器// 啟動可靠請求start_reliable_request(std::move(socket), "QUERY_DATA", handle_request_complete,3, chrono::seconds(3));io.run();return 0;
}
2. &&
操作符
&&
操作符允許你同時啟動多個異步操作,等待所有操作完成,以元組形式獲取所有結果
auto [result1, result2] = co_await (async_op1(use_awaitable) &&async_op2(use_awaitable)
);
例如要多個接口結果執行完成后才返回,那么久可以用&&
操作符
asio::awaitable<SearchResults> search_products(string_view query) {// 并行搜索多個分類auto [electronics, clothing, books] = co_await (product_service.search("electronics", query) &&product_service.search("clothing", query) &&product_service.search("books", query));...//合并到resultsco_return results;
}
總結
Asio協程方便了C++高并發服務器的開發,在保持異步高性能的同時,提供同步代碼的簡潔性,同步執行流簡化調試和性能分析,隨著C++20標準的廣泛采用,協程已成為構建下一代高性能服務器的首選范式。
但要注意的是,協程并不會提高程序運行的效率,從原理上講,協程有可能還沒有回調高效,但這個性能并不會有太大的差距