接口介紹
頭文件
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/SyncClient.hpp>
#include <etcd/Value.hpp>
#include <etcd/Watcher.hpp>
下面從功能介紹幾個類的概念
Value :保存鍵值對的 key 和 value
Event:記錄鍵值對是否改變或者被刪除的狀態
Response :etcd 服務器向客戶端的響應
KeepAlive : 客戶端向 etcd 保活數據
Client? : 客戶端類
Watcher : 檢測 etcd 服務器上鍵值對是否發生改變
下面是接口
namespace etcd {
class Value {bool is_dir();//判斷是否是一個目錄std::string const& key() //鍵值對的 key 值std::string const& as_string()//鍵值對的 val 值int64_t lease() //用于創建租約的響應中,返回租約 ID
}
//etcd 會監控所管理的數據的變化,一旦數據產生變化會通知客戶端
//在通知客戶端的時候,會返回改變前的數據和改變后的數據
class Event {enum class EventType {PUT, //鍵值對新增或數據發生改變DELETE_,//鍵值對被刪除INVALID,};enum EventType event_type() const Value& kv()const Value& prev_kv()
}
class Response {bool is_ok()std::string const& error_message()Value const& value()//當前的數值 或者 一個請求的處理結果Value const& prev_value()//之前的數值Value const& value(int index)//std::vector<Event> const& events();//觸發的事件
}
class KeepAlive {KeepAlive(Client const& client, int ttl, int64_t lease_id =
0);//返回租約 IDint64_t Lease();//停止保活動作void Cancel();
}
class Client {// etcd_url: "http://127.0.0.1:2379"Client(std::string const& etcd_url,std::string const& load_balancer = "round_robin");//Put a new key-value pair 新增一個鍵值對pplx::task<Response> put(std::string const& key, std::string const& value);//新增帶有租約的鍵值對 (一定時間后,如果沒有續租,數據自動刪除)pplx::task<Response> put(std::string const& key, std::string const& value,const int64_t leaseId);//獲取一個指定 key 目錄下的數據列表pplx::task<Response> ls(std::string const& key);
//創建并獲取一個存活 ttl 時間的租約pplx::task<Response> leasegrant(int ttl);//獲取一個租約保活對象,其參數 ttl 表示租約有效時間pplx::task<std::shared_ptr<KeepAlive>> leasekeepalive(int
ttl);//撤銷一個指定的租約pplx::task<Response> leaserevoke(int64_t lease_id);//數據鎖pplx::task<Response> lock(std::string const& key);
}
class Watcher {Watcher(Client const& client, std::string const& key, //要監控的鍵值對 keystd::function<void(Response)> callback, //發生改變后的回調bool recursive = false); //是否遞歸監控目錄下的所有數據改變Watcher(std::string const& address, std::string const& key,std::function<void(Response)> callback, bool recursive = false);//阻塞等待,直到監控任務被停止bool Wait();bool Cancel();
}
二次封裝
????????設計兩個類?Register??Discover? ,Register? 用來向 etcd 服務器注冊鍵值對,它會向 etcd 服務器申請一個租約ID,負責數據保活。Discover?查看服務器鍵值對,并檢測鍵值對是否被改變或刪除。
? ? ? ??Register? 類只需封裝?Client?KeepAlive 類即可,通過KeepAlive 得到租約ID 并且向服務器不斷進行保活,Client?::put? 向 etcd 服務器添加或更改鍵值對。代碼如下
class Register
{public://構造,傳入 etcd 服務器地址Register(const std::string& url = "http://127.0.0.1:2379"):_client_ptr(std::make_shared<etcd::Client>(url)),_keep_alive_ptr(std::make_shared<etcd::KeepAlive>(*(_client_ptr.get()), 3)),_lease_id(_keep_alive_ptr->Lease()){}//析構,撤銷指定租約~Register(){_client_ptr->leaserevoke(_lease_id);}//向 etcd 添加服務void put(const std::string& key, const std::string& value){if(false == _client_ptr->put(key, value, _lease_id).get().is_ok()){LOG_ERROR("etcd 添加服務失敗 {} : {}", key, value);}else{LOG_INFO("etcd 添加服務成功 {} : {}", key, value);}}private:std::shared_ptr<etcd::Client> _client_ptr; //客戶端實體 std::shared_ptr<etcd::KeepAlive> _keep_alive_ptr; //保活int64_t _lease_id; //租約ID
};
????????Discover?封裝?Client?和?Watcher ,檢測 etcd 服務器鍵值對的變化。同時設置兩個回調函數當鍵值對改變時,調用回調函數。
class Discover
{using notify_callback = std::function<void(std::string, std::string)>;public://四個參數為: etcd 服務器地址, 根目錄, 服務添加回調,服務刪除回調Discover(std::string url, std::string base_dir, notify_callback put_cb, notify_callback del_cd):_client_ptr(std::make_shared<etcd::Client>(url)),_watcher_ptr(std::make_shared<etcd::Watcher>(*(_client_ptr.get()), base_dir, std::bind(&Discover::callback, this, std::placeholders::_1), true)),_put_cb(put_cb),_del_cb(del_cd){//ls查看根目錄下所有服務etcd::Response response = _client_ptr->ls(base_dir).get();if(false == response.is_ok()){LOG_ERROR("獲取服務失敗 {}", base_dir);}else{for(int i = 0; i < response.keys().size(); i++){if(_put_cb) _put_cb(response.value(i).key(), response.value(i).as_string());LOG_INFO("上線了 {} : {} 服務", response.value(i).key(), response.value(i).as_string());}}//監控根目錄下服務_watcher_ptr->Wait();}~Discover(){_watcher_ptr->Cancel();}private:void callback(const etcd::Response& resp){if(false == resp.is_ok()){LOG_ERROR("監控服務失敗" );return;}else{for(const etcd::Event& ev : resp.events()){if(ev.event_type() == etcd::Event::EventType::PUT){LOG_INFO("服務發生改變 {} : {} ---> {} : {}", ev.prev_kv().key(), ev.prev_kv().as_string(), ev.kv().key(), ev.kv().as_string());if(_put_cb) _put_cb(ev.kv().key(), ev.kv().as_string());}else if(ev.event_type() == etcd::Event::EventType::DELETE_){LOG_INFO("下線了服務 {} : {}", ev.prev_kv().key(), ev.prev_kv().as_string());if(_del_cb) _del_cb(ev.prev_kv().key(), ev.prev_kv().as_string());}} }}private:std::shared_ptr<etcd::Client> _client_ptr; //客戶端實體 std::shared_ptr<etcd::Watcher> _watcher_ptr; //監控服務notify_callback _put_cb;notify_callback _del_cb;
};