自適應限流
服務的處理能力是有客觀上限的。當請求速度超過服務的處理速度時,服務就會過載。
如果服務持續過載,會導致越來越多的請求積壓,最終所有的請求都必須等待較長時間才能被處理,從而使整個服務處于癱瘓狀態。
與之相對的,如果直接拒絕掉一部分請求,反而能夠讓服務能夠"及時"處理更多的請求。對應的方法就是設置最大并發。
自適應限流能動態調整服務的最大并發,在保證服務不過載的前提下,讓服務盡可能多的處理請求。
使用場景
通常情況下要讓服務不過載,只需在上線前進行壓力測試,并通過little’s law計算出best_max_concurrency就可以了(并發度 = 時延 * QPS)。但在服務數量多,拓撲復雜,且處理能力會逐漸變化的局面下,使用固定的最大并發會帶來巨大的測試工作量,很不方便。自適應限流就是為了解決這個問題。
使用自適應限流前建議做到:
客戶端開啟了重試功能。服務端有多個節點。
這樣當一個節點返回過載時,客戶端可以向其他的節點發起重試,從而盡量不丟失流量。
brpc開啟自適應限流方法
目前只有method級別(即具體的rpc服務方法)支持自適應限流。如果要為某個method開啟自適應限流,只需要將它的最大并發設置為"auto"即可。
// Set auto concurrency limiter for all methods
brpc::ServerOptions options;
options.method_max_concurrency = "auto";// Set auto concurrency limiter for specific method
server.MaxConcurrencyOf("example.EchoService.Echo") = "auto";
原理
名詞及解釋
-
concurrency(并發度): 同時處理的請求數,又被稱為“并發度”。
-
max_concurrency:
設置的最大并發度。超過并發的請求會被拒絕(返回ELIMIT錯誤),在集群層面,client應重試到另一臺server上去。 -
best_max_concurrency:
并發的物理含義是任務處理槽位,天然存在上限,這個上限就是best_max_concurrency,也就是最佳的最大并發度,一般推薦設置最大并發為該值,若max_concurrency設置的過大,則concurrency可能大于best_max_concurrency,任務將無法被及時處理而暫存在各種隊列中排隊,系統也會進入擁塞狀態。若max_concurrency設置的過小,則concurrency總是會小于best_max_concurrency,限制系統達到本可以達到的更高吞吐。 -
noload_latency:
單純處理任務的延時,不包括排隊時間。另一種解釋是低負載的延時。由于正確處理任務得經歷必要的環節,其中會耗費cpu或等待下游返回,noload_latency是一個服務固有的屬性,但可能隨時間逐漸改變(由于內存碎片,壓力變化,業務數據變化等因素)。 -
min_latency:
實際測定的latency中的較小值的ema,當concurrency不大于best_max_concurrency時,min_latency和noload_latency接近(可能輕微上升)。 -
peak_qps: 服務處理qps的上限。注意是處理或回復的qps而不是接收的qps。值取決于best_max_concurrency /
noload_latency,這兩個量都是服務的固有屬性,故peak_qps也是服務的固有屬性,和擁塞狀況無關,但可能隨時間逐漸改變。 -
max_qps: 實際測定的qps中的較大值。由于qps具有上限,max_qps總是會小于peak_qps,不論擁塞與否。
- Little’s Law在服務處于穩定狀態時: concurrency = latency * qps。 這是自適應限流的理論基礎。
當服務沒有超載時,隨著流量的上升,latency基本穩定(接近noload_latency),qps和concurrency呈線性關系一起上升。
當流量超過服務的peak_qps時,則concurrency和latency會一起上升,而qps會穩定在peak_qps。
假如一個服務的peak_qps和noload_latency都比較穩定,那么它的best_max_concurrency = noload_latency * peak_qps。
自適應限流就是要找到服務的noload_latency和peak_qps, 并將最大并發設置為靠近兩者乘積的一個值。
自適應限流計算公式
自適應限流會不斷的對請求進行采樣,當采樣窗口的樣本數量足夠時,會根據樣本的平均延遲和服務當前的qps計算出下一個采樣窗口的max_concurrency:
max_concurrency = max_qps * ((2+alpha) * min_latency - latency)
- alpha為可接受的延時上升幅度,默認0.3。
- latency是當前采樣窗口內所有請求的平均latency。
- max_qps是最近一段時間測量到的qps的極大值。
- min_latency是最近一段時間測量到的latency較小值的ema,是noload_latency的估算值。
注意:當計算出來的 max_concurrency 和當前的 max_concurrency 的值不同時,每次對 max_concurrency 的調整的比例有一個上限,讓 max_concurrency 的變化更為平滑。
當服務處于低負載時,min_latency約等于noload_latency,此時計算出來的max_concurrency會高于concurrency,但低于best_max_concurrency,給流量上漲留探索空間。而當服務過載時,服務的qps約等于max_qps,同時latency開始明顯超過min_latency,此時max_concurrency則會接近concurrency,并通過定期衰減避免遠離best_max_concurrency,保證服務不會過載。
估算noload_latency
服務的noload_latency并非是一成不變的,自適應限流必須能夠正確的探測noload_latency的變化。當noload_latency下降時,是很容感知到的,因為這個時候latency也會下降。難點在于當latency上漲時,需要能夠正確的辨別到底是服務過載了,還是noload_latency上漲了。
可能的方案有:
取最近一段時間的最小latency來近似noload_latency
取最近一段時間的latency的各種平均值來預測noload_latency
收集請求的平均排隊等待時間,使用latency - queue_time作為noload_latency
每隔一段時間縮小max_concurrency,過一小段時間后以此時的latency作為noload_latency
方案1和方案2的問題在于:假如服務持續處于高負載,那么最近的所有latency都會高出noload_latency,從而使得算法估計的noload_latency不斷升高。
方案3的問題在于,假如服務的性能瓶頸在下游服務,那么請求在服務本身的排隊等待時間無法反應整體的負載情況。
方案4是最通用的,也經過了大量實驗的考驗。縮小max_concurrency和公式中的alpha存在關聯。讓我們做個假想實驗,若latency極為穩定并都等于min_latency,那么公式簡化為max_concurrency = max_qps * latency * (1 + alpha)。根據little’s law,qps最多為max_qps * (1 + alpha). alpha是qps的"探索空間",若alpha為0,則qps被鎖定為max_qps,算法可能無法探索到peak_qps。但在qps已經達到peak_qps時,alpha會使延時上升(已擁塞),此時測定的min_latency會大于noload_latency,一輪輪下去最終會導致min_latency不收斂。定期降低max_concurrency就是阻止這個過程,并給min_latency下降提供"探索空間"。
減少重測時的流量損失
每隔一段時間,自適應限流算法都會縮小max_concurrency,并持續一段時間,然后將此時的latency作為服務的noload_latency,以處理noload_latency上漲了的情況。測量noload_latency時,必須讓先服務處于低負載的狀態,因此對max_concurrency的縮小是難以避免的。
由于max_concurrency < concurrency時,服務會拒絕掉所有的請求,限流算法將"排空所有的經歷過排隊等待的請求的時間" 設置為 latency * 2 ,以確保用于計算min_latency的樣本絕大部分都是沒有經過排隊等待的。
由于服務的latency通常都不會太長,這種做法所帶來的流量損失也很小。
應對抖動
即使服務自身沒有過載,latency也會發生波動,根據Little’s Law,latency的波動會導致server的concurrency發生波動。
我們在設計自適應限流的計算公式時,考慮到了latency發生抖動的情況: 當latency與min_latency很接近時,根據計算公式會得到一個較高max_concurrency來適應concurrency的波動,從而盡可能的減少“誤殺”。同時,隨著latency的升高,max_concurrency會逐漸降低,以保護服務不會過載。
從另一個角度來說,當latency也開始升高時,通常意味著某處(不一定是服務本身,也有可能是下游服務)消耗了大量CPU資源,這個時候縮小max_concurrency也是合理的。
平滑處理
為了減少個別窗口的抖動對限流算法的影響,同時盡量降低計算開銷,計算min_latency時會通過使用EMA來進行平滑處理:
if latency < min_latency:
min_latency = latency * ema_alpha + (1 - ema_alpha) * min_latency
else:
do_nothing
估算peak_qps
提高qps增長的速度
當服務啟動時,由于服務本身需要進行一系列的初始化,tcp本身也有慢啟動等一系列原因。服務在剛啟動時的qps一定會很低。這就導致了服務啟動時的max_concurrency也很低。而按照上面的計算公式,當max_concurrency很低的時候,預留給qps增長的冗余concurrency也很低(即:alpha * max_qps * min_latency)。從而會影響當流量增加時,服務max_concurrency的增加速度。
假如從啟動到打滿qps的時間過長,這期間會損失大量流量。在這里我們采取的措施有兩個,
采樣方面,一旦采到的請求數量足夠多,直接提交當前采樣窗口,而不是等待采樣窗口的到時間了才提交
計算公式方面,當current_qps > 保存的max_qps時,直接進行更新,不進行平滑處理。
在進行了這兩個處理之后,絕大部分情況下都能夠在2秒左右將qps打滿。
平滑處理
為了減少個別窗口的抖動對限流算法的影響,同時盡量降低計算開銷,在計算max_qps時,會通過使用EMA來進行平滑處理:
if current_qps > max_qps:
max_qps = current_qps
else:
max_qps = current_qps * ema_alpha / 10 + (1 - ema_alpha / 10) * max_qps
將max_qps的ema參數置為min_latency的ema參數的十分之一的原因是: max_qps 下降了通常并不意味著極限qps也下降了。而min_latency下降了,通常意味著noload_latency確實下降了。
與netflix gradient算法的對比
netflix中的gradient算法公式為:max_concurrency = min_latency / latency * max_concurrency + queue_size。
其中latency是采樣窗口的最小latency,min_latency是最近多個采樣窗口的最小latency。min_latency / latency就是算法中的"梯度",當latency大于min_latency時,max_concurrency會逐漸減少;反之,max_concurrency會逐漸上升,從而讓max_concurrency圍繞在best_max_concurrency附近。
這個公式可以和本文的算法進行類比:
gradient算法中的latency和本算法的不同,前者的latency是最小值,后者是平均值。netflix的原意是最小值能更好地代表noload_latency,但實際上只要不對max_concurrency做定期衰減,不管最小值還是平均值都有可能不斷上升使算法不收斂。最小值并不能帶來額外的好處,反而會使算法更不穩定。
gradient算法中的max_concurrency / latency從概念上和qps有關聯(根據little’s law),但可能嚴重脫節。比如在重測 min_latency前,若所有latency都小于min_latency,那么max_concurrency會不斷下降甚至到0;但按照本算法,max_qps和min_latency仍然是穩定的,它們計算出的max_concurrency也不會劇烈變動。究其本質,gradient算法在迭代max_concurrency時,latency并不能代表實際并發為max_concurrency時的延時,兩者是脫節的,所以max_concurrency / latency的實際物理含義不明,與qps可能差異甚大,最后導致了很大的偏差。
gradient算法的queue_size推薦為sqrt(max_concurrency),這是不合理的。netflix對queue_size的理解大概是代表各種不可控環節的緩存,比如socket里的,和max_concurrency存在一定的正向關系情有可原。但在我們的理解中,這部分queue_size作用微乎其微,沒有或用常量即可。我們關注的queue_size是給concurrency上升留出的探索空間: max_concurrency的更新是有延遲的,在并發從低到高的增長過程中,queue_size的作用就是在max_concurrency更新前不限制qps上升。而當concurrency高時,服務可能已經過載了,queue_size就應該小一點,防止進一步惡化延時。這里的queue_size和并發是反向關系。
服務端代碼實現
#include <gflags/gflags.h>
#include <butil/logging.h>
#include <brpc/server.h>
#include <butil/atomicops.h>
#include <butil/time.h>
#include <butil/logging.h>
#include <json2pb/json_to_pb.h>
#include <bthread/timer_thread.h>
#include <bthread/bthread.h>#include <cstdlib>
#include <fstream>
#include "cl_test.pb.h"DEFINE_int32(logoff_ms, 2000, "Maximum duration of server's LOGOFF state ""(waiting for client to close connection before server stops)");
DEFINE_int32(server_bthread_concurrency, 4, "Configuring the value of bthread_concurrency, For compute max qps, ");
DEFINE_int32(server_sync_sleep_us, 2500, "Usleep time, each request will be executed once, For compute max qps");
// max qps = 1000 / 2.5 * 4 DEFINE_int32(control_server_port, 9000, "");
DEFINE_int32(echo_port, 9001, "TCP Port of echo server");
DEFINE_int32(cntl_port, 9000, "TCP Port of controller server");
DEFINE_string(case_file, "", "File path for test_cases");
DEFINE_int32(latency_change_interval_us, 50000, "Intervalt for server side changes the latency");
DEFINE_int32(server_max_concurrency, 0, "Echo Server's max_concurrency");
DEFINE_bool(use_usleep, false, "EchoServer uses ::usleep or bthread_usleep to simulate latency ""when processing requests");bthread::TimerThread g_timer_thread;int cast_func(void* arg) {return *(int*)arg;
}void DisplayStage(const test::Stage& stage) {std::string type;switch(stage.type()) {case test::FLUCTUATE: type = "Fluctuate";break;case test::SMOOTH:type = "Smooth";break;default:type = "Unknown";}std::stringstream ss;ss << "Stage:[" << stage.lower_bound() << ':' << stage.upper_bound() << "]"<< " , Type:" << type;LOG(INFO) << ss.str();
}butil::atomic<int> cnt(0);
butil::atomic<int> atomic_sleep_time(0);
bvar::PassiveStatus<int> atomic_sleep_time_bvar(cast_func, &atomic_sleep_time);namespace bthread {
DECLARE_int32(bthread_concurrency);
}void TimerTask(void* data);class EchoServiceImpl : public test::EchoService {
public:EchoServiceImpl() : _stage_index(0), _running_case(false) {};virtual ~EchoServiceImpl() {};void SetTestCase(const test::TestCase& test_case) {_test_case = test_case;_next_stage_start = _test_case.latency_stage_list(0).duration_sec() + butil::gettimeofday_s();_stage_index = 0;_running_case = false;DisplayStage(_test_case.latency_stage_list(_stage_index));}void StartTestCase() {CHECK(!_running_case);_running_case = true;UpdateLatency();}void StopTestCase() {_running_case = false;}void UpdateLatency() {if (!_running_case) {return;}ComputeLatency();g_timer_thread.schedule(TimerTask, (void*)this, butil::microseconds_from_now(FLAGS_latency_change_interval_us));}virtual void Echo(google::protobuf::RpcController* cntl_base,const test::NotifyRequest* request,test::NotifyResponse* response,google::protobuf::Closure* done) {brpc::ClosureGuard done_guard(done); response->set_message("hello");::usleep(FLAGS_server_sync_sleep_us);if (FLAGS_use_usleep) {::usleep(_latency.load(butil::memory_order_relaxed));} else {bthread_usleep(_latency.load(butil::memory_order_relaxed));}}void ComputeLatency() {if (_stage_index < _test_case.latency_stage_list_size() &&butil::gettimeofday_s() > _next_stage_start) {++_stage_index;if (_stage_index < _test_case.latency_stage_list_size()) {_next_stage_start += _test_case.latency_stage_list(_stage_index).duration_sec();DisplayStage(_test_case.latency_stage_list(_stage_index));}}if (_stage_index == _test_case.latency_stage_list_size()) {const test::Stage& latency_stage = _test_case.latency_stage_list(_stage_index - 1);if (latency_stage.type() == test::ChangeType::FLUCTUATE) {_latency.store((latency_stage.lower_bound() + latency_stage.upper_bound()) / 2,butil::memory_order_relaxed);} else if (latency_stage.type() == test::ChangeType::SMOOTH) {_latency.store(latency_stage.upper_bound(), butil::memory_order_relaxed);}return;}const test::Stage& latency_stage = _test_case.latency_stage_list(_stage_index);const int lower_bound = latency_stage.lower_bound();const int upper_bound = latency_stage.upper_bound();if (latency_stage.type() == test::FLUCTUATE) {_latency.store(butil::fast_rand_less_than(upper_bound - lower_bound) + lower_bound,butil::memory_order_relaxed); } else if (latency_stage.type() == test::SMOOTH) {int latency = lower_bound + (upper_bound - lower_bound) / double(latency_stage.duration_sec()) * (latency_stage.duration_sec() - _next_stage_start + butil::gettimeofday_s());_latency.store(latency, butil::memory_order_relaxed);} else {LOG(FATAL) << "Wrong Type:" << latency_stage.type();}}private:int _stage_index;int _next_stage_start;butil::atomic<int> _latency;test::TestCase _test_case;bool _running_case;
};void TimerTask(void* data) {EchoServiceImpl* echo_service = (EchoServiceImpl*)data;echo_service->UpdateLatency();
}class ControlServiceImpl : public test::ControlService {
public:ControlServiceImpl() : _case_index(0) {LoadCaseSet(FLAGS_case_file);_echo_service = new EchoServiceImpl;if (_server.AddService(_echo_service,brpc::SERVER_OWNS_SERVICE) != 0) {LOG(FATAL) << "Fail to add service";}g_timer_thread.start(NULL);}virtual ~ControlServiceImpl() { _echo_service->StopTestCase();g_timer_thread.stop_and_join(); };virtual void Notify(google::protobuf::RpcController* cntl_base,const test::NotifyRequest* request,test::NotifyResponse* response,google::protobuf::Closure* done) {brpc::ClosureGuard done_guard(done);const std::string& message = request->message();LOG(INFO) << message;if (message == "ResetCaseSet") {_server.Stop(0);_server.Join();_echo_service->StopTestCase();LoadCaseSet(FLAGS_case_file);_case_index = 0;response->set_message("CaseSetReset");} else if (message == "StartCase") {CHECK(!_server.IsRunning()) << "Continuous StartCase";const test::TestCase& test_case = _case_set.test_case(_case_index++);_echo_service->SetTestCase(test_case);brpc::ServerOptions options;options.max_concurrency = FLAGS_server_max_concurrency;_server.MaxConcurrencyOf("test.EchoService.Echo") = test_case.max_concurrency();_server.Start(FLAGS_echo_port, &options); _echo_service->StartTestCase();response->set_message("CaseStarted");} else if (message == "StopCase") {CHECK(_server.IsRunning()) << "Continuous StopCase";_server.Stop(0);_server.Join();_echo_service->StopTestCase();response->set_message("CaseStopped");} else {LOG(FATAL) << "Invalid message:" << message;response->set_message("Invalid Cntl Message");}}private:void LoadCaseSet(const std::string& file_path) {std::ifstream ifs(file_path.c_str(), std::ios::in); if (!ifs) {LOG(FATAL) << "Fail to open case set file: " << file_path;}std::string case_set_json((std::istreambuf_iterator<char>(ifs)), std::istreambuf_iterator<char>()); test::TestCaseSet case_set;std::string err;if (!json2pb::JsonToProtoMessage(case_set_json, &case_set, &err)) {LOG(FATAL) << "Fail to trans case_set from json to protobuf message: "<< err;}_case_set = case_set;ifs.close();}brpc::Server _server;EchoServiceImpl* _echo_service;test::TestCaseSet _case_set;int _case_index;
};int main(int argc, char* argv[]) {// Parse gflags. We recommend you to use gflags as well.GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);bthread::FLAGS_bthread_concurrency= FLAGS_server_bthread_concurrency;brpc::Server server;ControlServiceImpl control_service_impl;if (server.AddService(&control_service_impl, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {LOG(ERROR) << "Fail to add service";return -1;}if (server.Start(FLAGS_cntl_port, NULL) != 0) {LOG(ERROR) << "Fail to start EchoServer";return -1;}server.RunUntilAskedToQuit();return 0;
}
客戶端代碼實現
#include <gflags/gflags.h>
#include <butil/logging.h>
#include <butil/time.h>
#include <brpc/channel.h>
#include <bvar/bvar.h>
#include <bthread/timer_thread.h>
#include <json2pb/json_to_pb.h>#include <fstream>
#include "cl_test.pb.h"DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in src/brpc/options.proto");
DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short");
DEFINE_string(cntl_server, "0.0.0.0:9000", "IP Address of server");
DEFINE_string(echo_server, "0.0.0.0:9001", "IP Address of server");
DEFINE_int32(timeout_ms, 3000, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 0, "Max retries(not including the first RPC)");
DEFINE_int32(case_interval, 20, "Intervals for different test cases");
DEFINE_int32(client_qps_change_interval_us, 50000, "The interval for client changes the sending speed");
DEFINE_string(case_file, "", "File path for test_cases");void DisplayStage(const test::Stage& stage) {std::string type;switch(stage.type()) {case test::FLUCTUATE: type = "Fluctuate";break;case test::SMOOTH:type = "Smooth";break;default:type = "Unknown";}std::stringstream ss;ss << "Stage:[" << stage.lower_bound() << ':' << stage.upper_bound() << "]"<< " , Type:" << type;LOG(INFO) << ss.str();
}uint32_t cast_func(void* arg) {return *(uint32_t*)arg;
}butil::atomic<uint32_t> g_timeout(0);
butil::atomic<uint32_t> g_error(0);
butil::atomic<uint32_t> g_succ(0);
bvar::PassiveStatus<uint32_t> g_timeout_bvar(cast_func, &g_timeout);
bvar::PassiveStatus<uint32_t> g_error_bvar(cast_func, &g_error);
bvar::PassiveStatus<uint32_t> g_succ_bvar(cast_func, &g_succ);
bvar::LatencyRecorder g_latency_rec;void LoadCaseSet(test::TestCaseSet* case_set, const std::string& file_path) {std::ifstream ifs(file_path.c_str(), std::ios::in); if (!ifs) {LOG(FATAL) << "Fail to open case set file: " << file_path;}std::string case_set_json((std::istreambuf_iterator<char>(ifs)), std::istreambuf_iterator<char>()); std::string err;if (!json2pb::JsonToProtoMessage(case_set_json, case_set, &err)) {LOG(FATAL) << "Fail to trans case_set from json to protobuf message: "<< err;}
}void HandleEchoResponse(brpc::Controller* cntl,test::NotifyResponse* response) {// std::unique_ptr makes sure cntl/response will be deleted before returning.std::unique_ptr<brpc::Controller> cntl_guard(cntl);std::unique_ptr<test::NotifyResponse> response_guard(response);if (cntl->Failed() && cntl->ErrorCode() == brpc::ERPCTIMEDOUT) {g_timeout.fetch_add(1, butil::memory_order_relaxed);LOG_EVERY_N(INFO, 1000) << cntl->ErrorText();} else if (cntl->Failed()) {g_error.fetch_add(1, butil::memory_order_relaxed);LOG_EVERY_N(INFO, 1000) << cntl->ErrorText();} else {g_succ.fetch_add(1, butil::memory_order_relaxed);g_latency_rec << cntl->latency_us();}}void Expose() {g_timeout_bvar.expose_as("cl", "timeout");g_error_bvar.expose_as("cl", "failed");g_succ_bvar.expose_as("cl", "succ");g_latency_rec.expose("cl");
}struct TestCaseContext {TestCaseContext(const test::TestCase& tc) : running(true), stage_index(0), test_case(tc), next_stage_sec(test_case.qps_stage_list(0).duration_sec() + butil::gettimeofday_s()) {DisplayStage(test_case.qps_stage_list(stage_index));Update();}bool Update() {if (butil::gettimeofday_s() >= next_stage_sec) {++stage_index;if (stage_index < test_case.qps_stage_list_size()) {next_stage_sec += test_case.qps_stage_list(stage_index).duration_sec(); DisplayStage(test_case.qps_stage_list(stage_index));} else {return false;}}int qps = 0;const test::Stage& qps_stage = test_case.qps_stage_list(stage_index);const int lower_bound = qps_stage.lower_bound();const int upper_bound = qps_stage.upper_bound();if (qps_stage.type() == test::FLUCTUATE) {qps = butil::fast_rand_less_than(upper_bound - lower_bound) + lower_bound;} else if (qps_stage.type() == test::SMOOTH) {qps = lower_bound + (upper_bound - lower_bound) / double(qps_stage.duration_sec()) * (qps_stage.duration_sec() - next_stage_sec+ butil::gettimeofday_s());}interval_us.store(1.0 / qps * 1000000, butil::memory_order_relaxed);return true;}butil::atomic<bool> running;butil::atomic<int64_t> interval_us;int stage_index;const test::TestCase test_case;int next_stage_sec;
};void RunUpdateTask(void* data) {TestCaseContext* context = (TestCaseContext*)data;bool should_continue = context->Update();if (should_continue) {bthread::get_global_timer_thread()->schedule(RunUpdateTask, data, butil::microseconds_from_now(FLAGS_client_qps_change_interval_us));} else {context->running.store(false, butil::memory_order_release);}
}void RunCase(test::ControlService_Stub &cntl_stub, const test::TestCase& test_case) {LOG(INFO) << "Running case:`" << test_case.case_name() << '\'';brpc::Channel channel;brpc::ChannelOptions options;options.protocol = FLAGS_protocol;options.connection_type = FLAGS_connection_type;options.timeout_ms = FLAGS_timeout_ms;options.max_retry = FLAGS_max_retry;if (channel.Init(FLAGS_echo_server.c_str(), &options) != 0) {LOG(FATAL) << "Fail to initialize channel";}test::EchoService_Stub echo_stub(&channel);test::NotifyRequest cntl_req;test::NotifyResponse cntl_rsp;brpc::Controller cntl;cntl_req.set_message("StartCase");cntl_stub.Notify(&cntl, &cntl_req, &cntl_rsp, NULL);CHECK(!cntl.Failed()) << "control failed";TestCaseContext context(test_case);bthread::get_global_timer_thread()->schedule(RunUpdateTask, &context, butil::microseconds_from_now(FLAGS_client_qps_change_interval_us));while (context.running.load(butil::memory_order_acquire)) {test::NotifyRequest echo_req;echo_req.set_message("hello");brpc::Controller* echo_cntl = new brpc::Controller;test::NotifyResponse* echo_rsp = new test::NotifyResponse;google::protobuf::Closure* done = brpc::NewCallback(&HandleEchoResponse, echo_cntl, echo_rsp);echo_stub.Echo(echo_cntl, &echo_req, echo_rsp, done);::usleep(context.interval_us.load(butil::memory_order_relaxed));}LOG(INFO) << "Waiting to stop case: `" << test_case.case_name() << '\'';::sleep(FLAGS_case_interval);cntl.Reset();cntl_req.set_message("StopCase");cntl_stub.Notify(&cntl, &cntl_req, &cntl_rsp, NULL);CHECK(!cntl.Failed()) << "control failed";LOG(INFO) << "Case `" << test_case.case_name() << "' finshed:";
}int main(int argc, char* argv[]) {// Parse gflags. We recommend you to use gflags as well.GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);Expose();brpc::Channel channel;brpc::ChannelOptions options;options.protocol = FLAGS_protocol;options.connection_type = FLAGS_connection_type;options.timeout_ms = FLAGS_timeout_ms;if (channel.Init(FLAGS_cntl_server.c_str(), &options) != 0) {LOG(ERROR) << "Fail to initialize channel";return -1;}test::ControlService_Stub cntl_stub(&channel);test::TestCaseSet case_set;LoadCaseSet(&case_set, FLAGS_case_file);brpc::Controller cntl;test::NotifyRequest cntl_req;test::NotifyResponse cntl_rsp;cntl_req.set_message("ResetCaseSet");cntl_stub.Notify(&cntl, &cntl_req, &cntl_rsp, NULL);CHECK(!cntl.Failed()) << "Cntl Failed";for (int i = 0; i < case_set.test_case_size(); ++i) {RunCase(cntl_stub, case_set.test_case(i));}LOG(INFO) << "EchoClient is going to quit";return 0;
}
proto
syntax="proto2";
package test;option cc_generic_services = true;message NotifyRequest {required string message = 1;
};message NotifyResponse {required string message = 1;
};enum ChangeType {FLUCTUATE = 1; // Fluctuating between upper and lower bound SMOOTH = 2; // Smoothly rising from the lower bound to the upper bound
}message Stage {required int32 lower_bound = 1;required int32 upper_bound = 2;required int32 duration_sec = 3;required ChangeType type = 4;
}message TestCase {required string case_name = 1;required string max_concurrency = 2;repeated Stage qps_stage_list = 3;repeated Stage latency_stage_list = 4;
}message TestCaseSet {repeated TestCase test_case = 1;
}service ControlService {rpc Notify(NotifyRequest) returns (NotifyResponse);
}service EchoService {rpc Echo(NotifyRequest) returns (NotifyResponse);
};