【brpc學習實踐五】brpc自適應限流案例

自適應限流

服務的處理能力是有客觀上限的。當請求速度超過服務的處理速度時,服務就會過載。

如果服務持續過載,會導致越來越多的請求積壓,最終所有的請求都必須等待較長時間才能被處理,從而使整個服務處于癱瘓狀態。

與之相對的,如果直接拒絕掉一部分請求,反而能夠讓服務能夠"及時"處理更多的請求。對應的方法就是設置最大并發。

自適應限流能動態調整服務的最大并發,在保證服務不過載的前提下,讓服務盡可能多的處理請求。

使用場景

通常情況下要讓服務不過載,只需在上線前進行壓力測試,并通過little’s law計算出best_max_concurrency就可以了(并發度 = 時延 * QPS)。但在服務數量多,拓撲復雜,且處理能力會逐漸變化的局面下,使用固定的最大并發會帶來巨大的測試工作量,很不方便。自適應限流就是為了解決這個問題。

使用自適應限流前建議做到:

客戶端開啟了重試功能。服務端有多個節點。

這樣當一個節點返回過載時,客戶端可以向其他的節點發起重試,從而盡量不丟失流量。

brpc開啟自適應限流方法

目前只有method級別(即具體的rpc服務方法)支持自適應限流。如果要為某個method開啟自適應限流,只需要將它的最大并發設置為"auto"即可。

// Set auto concurrency limiter for all methods
brpc::ServerOptions options;
options.method_max_concurrency = "auto";// Set auto concurrency limiter for specific method
server.MaxConcurrencyOf("example.EchoService.Echo") = "auto";

原理

名詞及解釋

  • concurrency(并發度): 同時處理的請求數,又被稱為“并發度”。

  • max_concurrency:
    設置的最大并發度。超過并發的請求會被拒絕(返回ELIMIT錯誤),在集群層面,client應重試到另一臺server上去

  • best_max_concurrency:
    并發的物理含義是任務處理槽位,天然存在上限,這個上限就是best_max_concurrency,也就是最佳的最大并發度,一般推薦設置最大并發為該值,若max_concurrency設置的過大,則concurrency可能大于best_max_concurrency,任務將無法被及時處理而暫存在各種隊列中排隊,系統也會進入擁塞狀態。若max_concurrency設置的過小,則concurrency總是會小于best_max_concurrency,限制系統達到本可以達到的更高吞吐。

  • noload_latency:
    單純處理任務的延時,不包括排隊時間。另一種解釋是低負載的延時。由于正確處理任務得經歷必要的環節,其中會耗費cpu或等待下游返回,noload_latency是一個服務固有的屬性,但可能隨時間逐漸改變(由于內存碎片,壓力變化,業務數據變化等因素)。

  • min_latency:
    實際測定的latency中的較小值的ema,當concurrency不大于best_max_concurrency時,min_latency和noload_latency接近(可能輕微上升)。

  • peak_qps: 服務處理qps的上限。注意是處理或回復的qps而不是接收的qps。值取決于best_max_concurrency /
    noload_latency,這兩個量都是服務的固有屬性,故peak_qps也是服務的固有屬性,和擁塞狀況無關,但可能隨時間逐漸改變。

  • max_qps: 實際測定的qps中的較大值。由于qps具有上限,max_qps總是會小于peak_qps,不論擁塞與否。
    - Little’s Law

    在服務處于穩定狀態時: concurrency = latency * qps。 這是自適應限流的理論基礎。

當服務沒有超載時,隨著流量的上升,latency基本穩定(接近noload_latency),qps和concurrency呈線性關系一起上升。

當流量超過服務的peak_qps時,則concurrency和latency會一起上升,而qps會穩定在peak_qps。

假如一個服務的peak_qps和noload_latency都比較穩定,那么它的best_max_concurrency = noload_latency * peak_qps。

自適應限流就是要找到服務的noload_latency和peak_qps, 并將最大并發設置為靠近兩者乘積的一個值。

自適應限流計算公式

自適應限流會不斷的對請求進行采樣,當采樣窗口的樣本數量足夠時,會根據樣本的平均延遲和服務當前的qps計算出下一個采樣窗口的max_concurrency:

max_concurrency = max_qps * ((2+alpha) * min_latency - latency)
  • alpha為可接受的延時上升幅度,默認0.3。
  • latency是當前采樣窗口內所有請求的平均latency。
  • max_qps是最近一段時間測量到的qps的極大值。
  • min_latency是最近一段時間測量到的latency較小值的ema,是noload_latency的估算值。

注意:當計算出來的 max_concurrency 和當前的 max_concurrency 的值不同時,每次對 max_concurrency 的調整的比例有一個上限,讓 max_concurrency 的變化更為平滑。

當服務處于低負載時,min_latency約等于noload_latency,此時計算出來的max_concurrency會高于concurrency,但低于best_max_concurrency,給流量上漲留探索空間。而當服務過載時,服務的qps約等于max_qps,同時latency開始明顯超過min_latency,此時max_concurrency則會接近concurrency,并通過定期衰減避免遠離best_max_concurrency,保證服務不會過載。
估算noload_latency

服務的noload_latency并非是一成不變的,自適應限流必須能夠正確的探測noload_latency的變化。當noload_latency下降時,是很容感知到的,因為這個時候latency也會下降。難點在于當latency上漲時,需要能夠正確的辨別到底是服務過載了,還是noload_latency上漲了。

可能的方案有:

取最近一段時間的最小latency來近似noload_latency
取最近一段時間的latency的各種平均值來預測noload_latency
收集請求的平均排隊等待時間,使用latency - queue_time作為noload_latency
每隔一段時間縮小max_concurrency,過一小段時間后以此時的latency作為noload_latency

方案1和方案2的問題在于:假如服務持續處于高負載,那么最近的所有latency都會高出noload_latency,從而使得算法估計的noload_latency不斷升高。

方案3的問題在于,假如服務的性能瓶頸在下游服務,那么請求在服務本身的排隊等待時間無法反應整體的負載情況。

方案4是最通用的,也經過了大量實驗的考驗。縮小max_concurrency和公式中的alpha存在關聯。讓我們做個假想實驗,若latency極為穩定并都等于min_latency,那么公式簡化為max_concurrency = max_qps * latency * (1 + alpha)。根據little’s law,qps最多為max_qps * (1 + alpha). alpha是qps的"探索空間",若alpha為0,則qps被鎖定為max_qps,算法可能無法探索到peak_qps。但在qps已經達到peak_qps時,alpha會使延時上升(已擁塞),此時測定的min_latency會大于noload_latency,一輪輪下去最終會導致min_latency不收斂。定期降低max_concurrency就是阻止這個過程,并給min_latency下降提供"探索空間"。
減少重測時的流量損失

每隔一段時間,自適應限流算法都會縮小max_concurrency,并持續一段時間,然后將此時的latency作為服務的noload_latency,以處理noload_latency上漲了的情況。測量noload_latency時,必須讓先服務處于低負載的狀態,因此對max_concurrency的縮小是難以避免的。

由于max_concurrency < concurrency時,服務會拒絕掉所有的請求,限流算法將"排空所有的經歷過排隊等待的請求的時間" 設置為 latency * 2 ,以確保用于計算min_latency的樣本絕大部分都是沒有經過排隊等待的。

由于服務的latency通常都不會太長,這種做法所帶來的流量損失也很小。
應對抖動

即使服務自身沒有過載,latency也會發生波動,根據Little’s Law,latency的波動會導致server的concurrency發生波動。

我們在設計自適應限流的計算公式時,考慮到了latency發生抖動的情況: 當latency與min_latency很接近時,根據計算公式會得到一個較高max_concurrency來適應concurrency的波動,從而盡可能的減少“誤殺”。同時,隨著latency的升高,max_concurrency會逐漸降低,以保護服務不會過載。

從另一個角度來說,當latency也開始升高時,通常意味著某處(不一定是服務本身,也有可能是下游服務)消耗了大量CPU資源,這個時候縮小max_concurrency也是合理的。
平滑處理

為了減少個別窗口的抖動對限流算法的影響,同時盡量降低計算開銷,計算min_latency時會通過使用EMA來進行平滑處理:

if latency < min_latency:
min_latency = latency * ema_alpha + (1 - ema_alpha) * min_latency
else:
do_nothing

估算peak_qps
提高qps增長的速度

當服務啟動時,由于服務本身需要進行一系列的初始化,tcp本身也有慢啟動等一系列原因。服務在剛啟動時的qps一定會很低。這就導致了服務啟動時的max_concurrency也很低。而按照上面的計算公式,當max_concurrency很低的時候,預留給qps增長的冗余concurrency也很低(即:alpha * max_qps * min_latency)。從而會影響當流量增加時,服務max_concurrency的增加速度。

假如從啟動到打滿qps的時間過長,這期間會損失大量流量。在這里我們采取的措施有兩個,

采樣方面,一旦采到的請求數量足夠多,直接提交當前采樣窗口,而不是等待采樣窗口的到時間了才提交
計算公式方面,當current_qps > 保存的max_qps時,直接進行更新,不進行平滑處理。

在進行了這兩個處理之后,絕大部分情況下都能夠在2秒左右將qps打滿。
平滑處理

為了減少個別窗口的抖動對限流算法的影響,同時盡量降低計算開銷,在計算max_qps時,會通過使用EMA來進行平滑處理:

if current_qps > max_qps:
max_qps = current_qps
else:
max_qps = current_qps * ema_alpha / 10 + (1 - ema_alpha / 10) * max_qps

將max_qps的ema參數置為min_latency的ema參數的十分之一的原因是: max_qps 下降了通常并不意味著極限qps也下降了。而min_latency下降了,通常意味著noload_latency確實下降了。
與netflix gradient算法的對比

netflix中的gradient算法公式為:max_concurrency = min_latency / latency * max_concurrency + queue_size。

其中latency是采樣窗口的最小latency,min_latency是最近多個采樣窗口的最小latency。min_latency / latency就是算法中的"梯度",當latency大于min_latency時,max_concurrency會逐漸減少;反之,max_concurrency會逐漸上升,從而讓max_concurrency圍繞在best_max_concurrency附近。

這個公式可以和本文的算法進行類比:
gradient算法中的latency和本算法的不同,前者的latency是最小值,后者是平均值。netflix的原意是最小值能更好地代表noload_latency,但實際上只要不對max_concurrency做定期衰減,不管最小值還是平均值都有可能不斷上升使算法不收斂。最小值并不能帶來額外的好處,反而會使算法更不穩定。
gradient算法中的max_concurrency / latency從概念上和qps有關聯(根據little’s law),但可能嚴重脫節。比如在重測 min_latency前,若所有latency都小于min_latency,那么max_concurrency會不斷下降甚至到0;但按照本算法,max_qps和min_latency仍然是穩定的,它們計算出的max_concurrency也不會劇烈變動。究其本質,gradient算法在迭代max_concurrency時,latency并不能代表實際并發為max_concurrency時的延時,兩者是脫節的,所以max_concurrency / latency的實際物理含義不明,與qps可能差異甚大,最后導致了很大的偏差。
gradient算法的queue_size推薦為sqrt(max_concurrency),這是不合理的。netflix對queue_size的理解大概是代表各種不可控環節的緩存,比如socket里的,和max_concurrency存在一定的正向關系情有可原。但在我們的理解中,這部分queue_size作用微乎其微,沒有或用常量即可。我們關注的queue_size是給concurrency上升留出的探索空間: max_concurrency的更新是有延遲的,在并發從低到高的增長過程中,queue_size的作用就是在max_concurrency更新前不限制qps上升。而當concurrency高時,服務可能已經過載了,queue_size就應該小一點,防止進一步惡化延時。這里的queue_size和并發是反向關系。

服務端代碼實現

#include <gflags/gflags.h>
#include <butil/logging.h>
#include <brpc/server.h>
#include <butil/atomicops.h>
#include <butil/time.h>
#include <butil/logging.h>
#include <json2pb/json_to_pb.h>
#include <bthread/timer_thread.h>
#include <bthread/bthread.h>#include <cstdlib>
#include <fstream>
#include "cl_test.pb.h"DEFINE_int32(logoff_ms, 2000, "Maximum duration of server's LOGOFF state ""(waiting for client to close connection before server stops)");
DEFINE_int32(server_bthread_concurrency, 4, "Configuring the value of bthread_concurrency, For compute max qps, ");
DEFINE_int32(server_sync_sleep_us, 2500, "Usleep time, each request will be executed once, For compute max qps");
// max qps = 1000 / 2.5 * 4 DEFINE_int32(control_server_port, 9000, "");
DEFINE_int32(echo_port, 9001, "TCP Port of echo server");
DEFINE_int32(cntl_port, 9000, "TCP Port of controller server");
DEFINE_string(case_file, "", "File path for test_cases");
DEFINE_int32(latency_change_interval_us, 50000, "Intervalt for server side changes the latency");
DEFINE_int32(server_max_concurrency, 0, "Echo Server's max_concurrency");
DEFINE_bool(use_usleep, false, "EchoServer uses ::usleep or bthread_usleep to simulate latency ""when processing requests");bthread::TimerThread g_timer_thread;int cast_func(void* arg) {return *(int*)arg;
}void DisplayStage(const test::Stage& stage) {std::string type;switch(stage.type()) {case test::FLUCTUATE: type = "Fluctuate";break;case test::SMOOTH:type = "Smooth";break;default:type = "Unknown";}std::stringstream ss;ss << "Stage:[" << stage.lower_bound() << ':' << stage.upper_bound() <<  "]"<< " , Type:" << type;LOG(INFO) << ss.str();
}butil::atomic<int> cnt(0);
butil::atomic<int> atomic_sleep_time(0);
bvar::PassiveStatus<int> atomic_sleep_time_bvar(cast_func, &atomic_sleep_time);namespace bthread {
DECLARE_int32(bthread_concurrency);
}void TimerTask(void* data);class EchoServiceImpl : public test::EchoService {
public:EchoServiceImpl() : _stage_index(0), _running_case(false) {};virtual ~EchoServiceImpl() {};void SetTestCase(const test::TestCase& test_case) {_test_case = test_case;_next_stage_start = _test_case.latency_stage_list(0).duration_sec() + butil::gettimeofday_s();_stage_index = 0;_running_case = false;DisplayStage(_test_case.latency_stage_list(_stage_index));}void StartTestCase() {CHECK(!_running_case);_running_case = true;UpdateLatency();}void StopTestCase() {_running_case = false;}void UpdateLatency() {if (!_running_case) {return;}ComputeLatency();g_timer_thread.schedule(TimerTask, (void*)this, butil::microseconds_from_now(FLAGS_latency_change_interval_us));}virtual void Echo(google::protobuf::RpcController* cntl_base,const test::NotifyRequest* request,test::NotifyResponse* response,google::protobuf::Closure* done) {brpc::ClosureGuard done_guard(done); response->set_message("hello");::usleep(FLAGS_server_sync_sleep_us);if (FLAGS_use_usleep) {::usleep(_latency.load(butil::memory_order_relaxed));} else {bthread_usleep(_latency.load(butil::memory_order_relaxed));}}void ComputeLatency() {if (_stage_index < _test_case.latency_stage_list_size() &&butil::gettimeofday_s() > _next_stage_start) {++_stage_index;if (_stage_index < _test_case.latency_stage_list_size()) {_next_stage_start += _test_case.latency_stage_list(_stage_index).duration_sec();DisplayStage(_test_case.latency_stage_list(_stage_index));}}if (_stage_index == _test_case.latency_stage_list_size()) {const test::Stage& latency_stage = _test_case.latency_stage_list(_stage_index - 1);if (latency_stage.type() == test::ChangeType::FLUCTUATE) {_latency.store((latency_stage.lower_bound() + latency_stage.upper_bound()) / 2,butil::memory_order_relaxed);} else if (latency_stage.type() == test::ChangeType::SMOOTH) {_latency.store(latency_stage.upper_bound(), butil::memory_order_relaxed);}return;}const test::Stage& latency_stage = _test_case.latency_stage_list(_stage_index);const int lower_bound = latency_stage.lower_bound();const int upper_bound = latency_stage.upper_bound();if (latency_stage.type() == test::FLUCTUATE) {_latency.store(butil::fast_rand_less_than(upper_bound - lower_bound) + lower_bound,butil::memory_order_relaxed); } else if (latency_stage.type() == test::SMOOTH) {int latency = lower_bound + (upper_bound - lower_bound) / double(latency_stage.duration_sec()) * (latency_stage.duration_sec() - _next_stage_start + butil::gettimeofday_s());_latency.store(latency, butil::memory_order_relaxed);} else {LOG(FATAL) << "Wrong Type:" << latency_stage.type();}}private:int _stage_index;int _next_stage_start;butil::atomic<int> _latency;test::TestCase _test_case;bool _running_case;
};void TimerTask(void* data) {EchoServiceImpl* echo_service = (EchoServiceImpl*)data;echo_service->UpdateLatency();
}class ControlServiceImpl : public test::ControlService {
public:ControlServiceImpl() : _case_index(0) {LoadCaseSet(FLAGS_case_file);_echo_service = new EchoServiceImpl;if (_server.AddService(_echo_service,brpc::SERVER_OWNS_SERVICE) != 0) {LOG(FATAL) << "Fail to add service";}g_timer_thread.start(NULL);}virtual ~ControlServiceImpl() { _echo_service->StopTestCase();g_timer_thread.stop_and_join(); };virtual void Notify(google::protobuf::RpcController* cntl_base,const test::NotifyRequest* request,test::NotifyResponse* response,google::protobuf::Closure* done) {brpc::ClosureGuard done_guard(done);const std::string& message = request->message();LOG(INFO) << message;if (message == "ResetCaseSet") {_server.Stop(0);_server.Join();_echo_service->StopTestCase();LoadCaseSet(FLAGS_case_file);_case_index = 0;response->set_message("CaseSetReset");} else if (message == "StartCase") {CHECK(!_server.IsRunning()) << "Continuous StartCase";const test::TestCase& test_case = _case_set.test_case(_case_index++);_echo_service->SetTestCase(test_case);brpc::ServerOptions options;options.max_concurrency = FLAGS_server_max_concurrency;_server.MaxConcurrencyOf("test.EchoService.Echo") = test_case.max_concurrency();_server.Start(FLAGS_echo_port, &options);            _echo_service->StartTestCase();response->set_message("CaseStarted");} else if (message == "StopCase") {CHECK(_server.IsRunning()) << "Continuous StopCase";_server.Stop(0);_server.Join();_echo_service->StopTestCase();response->set_message("CaseStopped");} else {LOG(FATAL) << "Invalid message:" << message;response->set_message("Invalid Cntl Message");}}private:void LoadCaseSet(const std::string& file_path) {std::ifstream ifs(file_path.c_str(), std::ios::in);  if (!ifs) {LOG(FATAL) << "Fail to open case set file: " << file_path;}std::string case_set_json((std::istreambuf_iterator<char>(ifs)),  std::istreambuf_iterator<char>()); test::TestCaseSet case_set;std::string err;if (!json2pb::JsonToProtoMessage(case_set_json, &case_set, &err)) {LOG(FATAL) << "Fail to trans case_set from json to protobuf message: "<< err;}_case_set = case_set;ifs.close();}brpc::Server _server;EchoServiceImpl* _echo_service;test::TestCaseSet _case_set;int _case_index;
};int main(int argc, char* argv[]) {// Parse gflags. We recommend you to use gflags as well.GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);bthread::FLAGS_bthread_concurrency= FLAGS_server_bthread_concurrency;brpc::Server server;ControlServiceImpl control_service_impl;if (server.AddService(&control_service_impl, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {LOG(ERROR) << "Fail to add service";return -1;}if (server.Start(FLAGS_cntl_port, NULL) != 0) {LOG(ERROR) << "Fail to start EchoServer";return -1;}server.RunUntilAskedToQuit();return 0;
}

客戶端代碼實現

#include <gflags/gflags.h>
#include <butil/logging.h>
#include <butil/time.h>
#include <brpc/channel.h>
#include <bvar/bvar.h>
#include <bthread/timer_thread.h>
#include <json2pb/json_to_pb.h>#include <fstream>
#include "cl_test.pb.h"DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in src/brpc/options.proto");
DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short");
DEFINE_string(cntl_server, "0.0.0.0:9000", "IP Address of server");
DEFINE_string(echo_server, "0.0.0.0:9001", "IP Address of server");
DEFINE_int32(timeout_ms, 3000, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 0, "Max retries(not including the first RPC)"); 
DEFINE_int32(case_interval, 20, "Intervals for different test cases");
DEFINE_int32(client_qps_change_interval_us, 50000, "The interval for client changes the sending speed");
DEFINE_string(case_file, "", "File path for test_cases");void DisplayStage(const test::Stage& stage) {std::string type;switch(stage.type()) {case test::FLUCTUATE: type = "Fluctuate";break;case test::SMOOTH:type = "Smooth";break;default:type = "Unknown";}std::stringstream ss;ss << "Stage:[" << stage.lower_bound() << ':' << stage.upper_bound() <<  "]"<< " , Type:" << type;LOG(INFO) << ss.str();
}uint32_t cast_func(void* arg) {return *(uint32_t*)arg;
}butil::atomic<uint32_t> g_timeout(0);
butil::atomic<uint32_t> g_error(0);
butil::atomic<uint32_t> g_succ(0);
bvar::PassiveStatus<uint32_t> g_timeout_bvar(cast_func, &g_timeout);
bvar::PassiveStatus<uint32_t> g_error_bvar(cast_func, &g_error);
bvar::PassiveStatus<uint32_t> g_succ_bvar(cast_func, &g_succ);
bvar::LatencyRecorder g_latency_rec;void LoadCaseSet(test::TestCaseSet* case_set, const std::string& file_path) {std::ifstream ifs(file_path.c_str(), std::ios::in);  if (!ifs) {LOG(FATAL) << "Fail to open case set file: " << file_path;}std::string case_set_json((std::istreambuf_iterator<char>(ifs)),  std::istreambuf_iterator<char>()); std::string err;if (!json2pb::JsonToProtoMessage(case_set_json, case_set, &err)) {LOG(FATAL) << "Fail to trans case_set from json to protobuf message: "<< err;}
}void HandleEchoResponse(brpc::Controller* cntl,test::NotifyResponse* response) {// std::unique_ptr makes sure cntl/response will be deleted before returning.std::unique_ptr<brpc::Controller> cntl_guard(cntl);std::unique_ptr<test::NotifyResponse> response_guard(response);if (cntl->Failed() && cntl->ErrorCode() == brpc::ERPCTIMEDOUT) {g_timeout.fetch_add(1, butil::memory_order_relaxed);LOG_EVERY_N(INFO, 1000) << cntl->ErrorText();} else if (cntl->Failed()) {g_error.fetch_add(1, butil::memory_order_relaxed);LOG_EVERY_N(INFO, 1000) << cntl->ErrorText();} else {g_succ.fetch_add(1, butil::memory_order_relaxed);g_latency_rec << cntl->latency_us();}}void Expose() {g_timeout_bvar.expose_as("cl", "timeout");g_error_bvar.expose_as("cl", "failed");g_succ_bvar.expose_as("cl", "succ");g_latency_rec.expose("cl");
}struct TestCaseContext {TestCaseContext(const test::TestCase& tc) : running(true), stage_index(0), test_case(tc), next_stage_sec(test_case.qps_stage_list(0).duration_sec() + butil::gettimeofday_s()) {DisplayStage(test_case.qps_stage_list(stage_index));Update();}bool Update() {if (butil::gettimeofday_s() >= next_stage_sec) {++stage_index;if (stage_index < test_case.qps_stage_list_size()) {next_stage_sec += test_case.qps_stage_list(stage_index).duration_sec(); DisplayStage(test_case.qps_stage_list(stage_index));} else {return false;}}int qps = 0;const test::Stage& qps_stage = test_case.qps_stage_list(stage_index);const int lower_bound = qps_stage.lower_bound();const int upper_bound = qps_stage.upper_bound();if (qps_stage.type() == test::FLUCTUATE) {qps = butil::fast_rand_less_than(upper_bound - lower_bound) + lower_bound;} else if (qps_stage.type() == test::SMOOTH) {qps = lower_bound + (upper_bound - lower_bound) / double(qps_stage.duration_sec()) * (qps_stage.duration_sec() - next_stage_sec+ butil::gettimeofday_s());}interval_us.store(1.0 / qps * 1000000, butil::memory_order_relaxed);return true;}butil::atomic<bool> running;butil::atomic<int64_t> interval_us;int stage_index;const test::TestCase test_case;int next_stage_sec;
};void RunUpdateTask(void* data) {TestCaseContext* context = (TestCaseContext*)data;bool should_continue = context->Update();if (should_continue) {bthread::get_global_timer_thread()->schedule(RunUpdateTask, data, butil::microseconds_from_now(FLAGS_client_qps_change_interval_us));} else {context->running.store(false, butil::memory_order_release);}
}void RunCase(test::ControlService_Stub &cntl_stub, const test::TestCase& test_case) {LOG(INFO) << "Running case:`" << test_case.case_name() << '\'';brpc::Channel channel;brpc::ChannelOptions options;options.protocol = FLAGS_protocol;options.connection_type = FLAGS_connection_type;options.timeout_ms = FLAGS_timeout_ms;options.max_retry = FLAGS_max_retry;if (channel.Init(FLAGS_echo_server.c_str(), &options) != 0) {LOG(FATAL) << "Fail to initialize channel";}test::EchoService_Stub echo_stub(&channel);test::NotifyRequest cntl_req;test::NotifyResponse cntl_rsp;brpc::Controller cntl;cntl_req.set_message("StartCase");cntl_stub.Notify(&cntl, &cntl_req, &cntl_rsp, NULL);CHECK(!cntl.Failed()) << "control failed";TestCaseContext context(test_case);bthread::get_global_timer_thread()->schedule(RunUpdateTask, &context, butil::microseconds_from_now(FLAGS_client_qps_change_interval_us));while (context.running.load(butil::memory_order_acquire)) {test::NotifyRequest echo_req;echo_req.set_message("hello");brpc::Controller* echo_cntl = new brpc::Controller;test::NotifyResponse* echo_rsp = new test::NotifyResponse;google::protobuf::Closure* done = brpc::NewCallback(&HandleEchoResponse, echo_cntl, echo_rsp);echo_stub.Echo(echo_cntl, &echo_req, echo_rsp, done);::usleep(context.interval_us.load(butil::memory_order_relaxed));}LOG(INFO) << "Waiting to stop case: `" << test_case.case_name() << '\'';::sleep(FLAGS_case_interval);cntl.Reset();cntl_req.set_message("StopCase");cntl_stub.Notify(&cntl, &cntl_req, &cntl_rsp, NULL);CHECK(!cntl.Failed()) << "control failed";LOG(INFO) << "Case `" << test_case.case_name() << "' finshed:";
}int main(int argc, char* argv[]) {// Parse gflags. We recommend you to use gflags as well.GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);Expose();brpc::Channel channel;brpc::ChannelOptions options;options.protocol = FLAGS_protocol;options.connection_type = FLAGS_connection_type;options.timeout_ms = FLAGS_timeout_ms;if (channel.Init(FLAGS_cntl_server.c_str(), &options) != 0) {LOG(ERROR) << "Fail to initialize channel";return -1;}test::ControlService_Stub cntl_stub(&channel);test::TestCaseSet case_set;LoadCaseSet(&case_set, FLAGS_case_file);brpc::Controller cntl;test::NotifyRequest cntl_req;test::NotifyResponse cntl_rsp;cntl_req.set_message("ResetCaseSet");cntl_stub.Notify(&cntl, &cntl_req, &cntl_rsp, NULL);CHECK(!cntl.Failed()) << "Cntl Failed";for (int i = 0; i < case_set.test_case_size(); ++i) {RunCase(cntl_stub, case_set.test_case(i));}LOG(INFO) << "EchoClient is going to quit";return 0;
}

proto

syntax="proto2";
package test;option cc_generic_services = true;message NotifyRequest {required string message = 1;
};message NotifyResponse {required string message = 1;
};enum ChangeType {FLUCTUATE = 1;  // Fluctuating between upper and lower bound SMOOTH = 2;     // Smoothly rising from the lower bound to the upper bound 
}message Stage {required int32 lower_bound = 1;required int32 upper_bound = 2;required int32 duration_sec = 3;required ChangeType type = 4; 
}message TestCase {required string case_name = 1;required string max_concurrency = 2;repeated Stage qps_stage_list = 3;repeated Stage latency_stage_list = 4;
}message TestCaseSet {repeated TestCase test_case = 1;
}service ControlService {rpc Notify(NotifyRequest) returns (NotifyResponse);
}service EchoService {rpc Echo(NotifyRequest) returns (NotifyResponse);
};

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/162045.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/162045.shtml
英文地址,請注明出處:http://en.pswp.cn/news/162045.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

PHP中間件實現

目錄 1、簡單中間實現 2、使用閉包函數實現中間件 在PHP中&#xff0c;中間件是一種常用的設計模式&#xff0c;用于處理請求和響應&#xff0c;它可以在請求到達目標處理程序之前或響應發送給客戶端之前執行一些特定的邏輯。中間件提供了一種靈活的方式來修改或擴展應用程序的…

查看當前laravel版本三種方法(筆記二)

1、在終端中使用 Artisan 命令&#xff1a;在 Laravel 項目的根目錄下&#xff0c;打開終端&#xff08;命令行界面&#xff09;&#xff0c;然后運行以下命令&#xff1a; php artisan --version 2、控制器中打印版本 var_dump(app()->version()); 3、在 Laravel 項目的根目…

【kubernetes】k8s架構之節點

文章目錄 1、集群架構示意圖2、概述3、管理3.1 節點名稱唯一性3.2 節點自注冊3.3 手動節點管理 4、節點狀態4.1 地址&#xff08;Addresses&#xff09;4.2 狀況&#xff08;Condition&#xff09;4.3 容量&#xff08;Capacity&#xff09;與可分配&#xff08;Allocatable&am…

PTA-輸出三角形面積和周長

本題要求編寫程序&#xff0c;根據輸入的三角形的三條邊a、b、c&#xff0c;計算并輸出面積和周長。注意&#xff1a;在一個三角形中&#xff0c; 任意兩邊之和大于第三邊。三角形面積計算公式&#xff1a;areas(s?a)(s?b)(s?c)?&#xff0c;其中s(abc)/2。 輸入格式&…

某60區塊鏈安全之Call函數簇濫用實戰二學習記錄

區塊鏈安全 文章目錄 區塊鏈安全Call函數簇濫用實戰二實驗目的實驗環境實驗原理實驗內容實驗步驟EXP利用 Call函數簇濫用實戰二 實驗目的 學會使用python3的web3模塊 學會并區分以太坊call、staticcall、delegatecall三種函數調用的特點 找到合約漏洞進行分析并形成利用 實驗…

關于git hooks

Git hooks 是一種在 Git 倉庫中觸發自定義腳本的機制。這些腳本可以在特定的 Git 操作&#xff08;如提交、推送、合并等&#xff09;發生時執行。通過使用 Git hooks&#xff0c;你可以在版本控制的不同階段自動運行腳本&#xff0c;以執行一些定制化的操作。 在 Git 中&…

03梯度下降

目錄 lambda基礎知識 代碼 核心算法&#xff1a; lambda基礎知識 lambda 是 Python 中的一個關鍵字&#xff0c;用于創建匿名函數。匿名函數是一種沒有具體名稱的小型、臨時的函數&#xff0c;通常用于一次性的、簡單的操作。lambda 函數的語法如下&#xff1a;python Copy c…

高效運維工具,助力運維服務商為企業用戶提供IT遠程維保服務

一、背景介紹 隨著科技的迅速發展和信息化建設的不斷推進&#xff0c;IT運維在中小企業中的地位逐漸提升。IT運維是指通過技術手段和工具&#xff0c;對企業的IT基礎設施進行監控、管理和維護&#xff0c;以確保企業信息系統的穩定運行和業務的持續發展。 然而&#xff0c;對于…

計算3個點的6種分布在平面上的占比

假設平面的尺寸是6*6&#xff0c;用11的方式構造2&#xff0c;在用21的方式構造3 2 2 2 1 2 2 2 2 2 1 2 2 2 2 2 1 2 2 3 3 3 x 3 3 2 2 2 1 2 2 2 2 2 1 2 2 在平面上有一個點x&#xff0c;11的操作吧平面分成了3部分2a1&#xff0c;2a…

海康Visionmaster-模塊索引:MFC 模塊索引異常解決 辦法

現象&#xff1a;文件編碼格式為 UTF-8 不帶簽名編碼格式&#xff0c;模塊索引會出現 模塊無法找到異常 更改文件類型為 UTF-8 帶簽名格式或 vs 默認 GBK2312 編碼格式

JMeter處理接口簽名sign

寫接口腳本的時候&#xff0c;很多接口涉及到簽名&#xff0c;今天介紹下用JMeter編寫簽名腳本的方法。 舉個例子&#xff0c;開啟紅包接口&#xff0c;請求方式為post POST /v1/api/red/open json請求參數 { "red_id":1, "timestamp":"1667033841…

2023年中國邊緣計算網關現狀及發展趨勢分析[圖]

邊緣計算網關是一種可以在設備上運行本地計算、消息通信、數據緩存等功能的工業智能網關&#xff0c;可以在無需聯網的情況下實現設備的本地聯動以及數據處理分析。邊緣計算網關是一種連接物聯網設備和云端服務的關鍵技術&#xff0c;它可以在設備和云端之間建立一個安全、高效…

實例講解Simulink的MATLAB Function模塊

內容 MATLAB Function是一個支持使用M語言編寫模塊功能,并能夠將所編寫的M語言生成C代碼&#xff0c;用于開發桌面應用和嵌入式應用的模塊。它支持的 MATLAB內建函數比 Fcn模塊要廣泛&#xff0c;除去基本的四則運算、邏輯操作符和關系操作符&#xff0c;還可以調用MATLAB各種…

代碼隨想錄算法訓練營第四十三天【動態規劃part05】 | 1049. 最后一塊石頭的重量 II、494. 目標和、474.一和零

1049. 最后一塊石頭的重量 II 題目鏈接&#xff1a; 力扣&#xff08;LeetCode&#xff09;官網 - 全球極客摯愛的技術成長平臺 求解思路&#xff1a; 等于把石頭盡量分成重量相同的兩堆 動規五部曲 確定dp數組及其下標含義&#xff1a;容量為j的背包&#xff0c;最多能裝…

logstash安裝和使用

官網&#xff1a;https://www.elastic.co/cn/logstash/ 1.上傳Linux安裝包 2.解壓安裝包且重命名 [rootVM-4-10-centos logstash]# tar -zxvf logstash-8.11.1-linux-x86_64.tar.gz -C ../software/[rootVM-4-10-centos logstash]# mv logstash-8.11.1/ logstash3.啟動測試 …

國產遙感影像處理軟件 GSRS,真是很方便

兼容國內外絕大多數衛星遙感影像格式&#xff1b;高效的影像查看&#xff0c;比如漫游、放大、縮小、查看影像像素灰度值、影像地理坐標、影像投影坐標系等等&#xff1b;人機交互影像裁剪&#xff0c;任何繪制裁剪區域&#xff0c;輸出裁剪影像&#xff1b;具備影像基本處理功…

基于Haclon的Blob分析

任務要求&#xff1a; 請用BLOB分析的方法計算圖中所有灰度值在120和255之間的像素構成的8連通區域的面積與中心點坐標。 Blob基礎&#xff1a; 分析過程&#xff1a;首先獲取圖像&#xff0c;然后根據特征對原始圖像進行閾值分割&#xff08;區分背景像素和前景像素&#xf…

洛谷 P4552 [Poetize6] IncDec Sequence

挺好的一道思維題。 分析 因為是對區間修改&#xff0c;多次修改肯定會超時&#xff0c;很容易想到差分。 那么原題的對區間修改就可以轉換為下面三個操作&#xff08;均在差分數組中&#xff09;&#xff1a; 1. 任選一個數1 2. 任選一個數-1 3. 任選兩個數1和-1 進一步考…

貪心算法及相關例題

目錄 什么是貪心算法&#xff1f; leetcode455題.分發餅干 leetcode376題.擺動序列 leetcode55題.跳躍游戲I leetcode45題.跳躍游戲II leetcode621題.任務調度器 leetcode435題.無重疊空間 leetcode135題.分發糖果 什么是貪心算法&#xff1f; 貪心算法更多的是一種思…

《QT從基礎到進階·三十七》QWidget實現左側導航欄效果

NavigationBarPlugin插件類實現了對左側導航欄的管理&#xff0c;我們可以在導航欄插件中添加界面&#xff0c;并用鼠標點擊導航欄能夠切換對應的界面。 源碼在文章末尾 實現效果如下&#xff1a; NavigationBarPlugin實現的接口如下&#xff1a; class NAVIGATIONBAR_EXP…