0.對原教程的一些見解
其回顧完請求流程就是抽象了兩個接口,PeerPicker和PeerGetter。這樣操作,讀者閱讀時可能很難快速明白其含義,不好理解為什么就創建出兩個接口,感覺會比較疑惑。原教程的評論中也有討論這點。
?本教程就先不創建接口,而是使用struct方式,這樣可能好理解點。
1.節點請求處理的流程
先弄清楚我們查詢緩存的邏輯。
單節點:?
客戶發送查詢請求到節點A,該節點有緩存就立即返回,若是沒有就執行用戶設置的回調函數獲取值并添加到緩存中,然后返回。
分布式節點:
客戶端發送查詢請求到某個緩存節點,該節點會判斷該key是否在本地,若是不在本地,使用一致性哈希選擇節點,若不是在遠程節點,則就退回到本地節點處理;若在遠程節點,該節點會發送請求去訪問其他?node
?節點。(不是客戶端再去訪問其他節點)
從這可以看出,一個node要處理兩種請求,一個是來自客戶端的外部請求,一個是來自其他遠端節點的內部請求。
為了清晰,劃分職責,我們可以在一個node中啟動兩種HTTP服務,一個處理客戶端請求(APIServer), 一個處理節點之間的請求(CacheServer)。
2.HTTP客戶端
之前我們為?HTTPPool
?實現了服務端功能,通信不僅需要服務端還需要客戶端,因此,我們接下來先實現客戶端的功能。這個客戶端是節點作為客戶端去訪問其他節點。
- baseURL 表示將要訪問的遠程節點的地址,例如?
http://example.com/geecache/
。
type httpGetter struct {baseURL string
}func (h *httpGetter) Get(group string, key string) ([]byte, error) {//QueryEscape 對字符串進行轉義,以便可以將其安全地放置在 URL 查詢中。u := fmt.Sprintf("%v/%v/%v", h.baseURL,url.QueryEscape(group),url.QueryEscape(key))res, err := http.Get(u)if err != nil {return nil, err}defer res.Body.Close()if res.StatusCode != http.StatusOK {return nil, fmt.Errorf("server returned: %v", res.Status)}bytes, err := io.ReadAll(res.Body)if err != nil {return nil, fmt.Errorf("reading response body: %v", err)}return bytes, nil
}
3.回顧上一章節實現的單節點的訪問流程
func (g *Group) Get(key string) (ByteView, error) {//現在本地查詢if v, ok := g.mainCache.get(key); ok {return v, nil}return g.load(key)
}func (g *Group) load(key string) (ByteView, error) {bytes, err := g.getter.Get(key)if err != nil {return ByteView{}, err}value := ByteView{b: cloneByte(bytes)}g.mainCache.add(key, value)return value, nil
}
那很明顯是需要修改load方法,讓其可以去訪問遠程節點。
在load方法中,偽代碼如下。
func func (g *Group) load(key string) (ByteView, error){if 有遠程節點 {if 找到key所在的遠程節點 {本地作為客戶端去訪問該遠程節點}}沒有遠程節點,只能在本地調用回調函數去源地方獲取
}
要想在Group中訪問節點,那么就要在Group中存儲節點集合。
節點結合結構體Peers
那節點集合是不是又要創建一個結構體?那先試試創建一個結構體Peers。
因為 hash 環的 map 不是線程安全的,所以這里要加鎖。
成員變量?httpGetters
,映射遠程節點與對應的 httpGetter。(httpGetter就是個客戶端,是一個節點作為客戶端),每一個遠程節點對應一個 httpGetter,因為 httpGetter 與遠程節點的地址?baseURL
?有關,map的key是遠程節點的地址,比如"http://localhost:10000"
type Peers struct {addr string //這個是用于進行選擇節點時用來判斷是不是本地節點basePath stringmutex sync.Mutex //guards peersHashRing and httpGetterspeersHashRing *consistenthash.HashRinghttpGetters map[string]*httpGetter
}//這是HTTP服務端章節的HTTPPool,這是很相似的
type HTTPPool struct {addr stringbasePath string
}
那么該結構體Peers就要有添加遠程節點和通過key去獲取遠程節點的方法。
增添遠程節點方法Set
通過該方法可以知道其map的key是遠程節點的地址。
// 使用用例:Set("http://localhost:8001","http://localhost:8002")
func (p *Peers) Set(peers ...string) {p.mutex.Lock()defer p.mutex.Unlock()p.peersHashRing = consistenthash.NewHash(50, nil)p.peersHashRing.Add(peers...) //在 hash 環上添加真實節點和虛擬節點//存儲遠端節點信息p.httpGetters = make(map[string]*httpGetter)for _, peer := range peers {p.httpGetters[peer] = &httpGetter{baseURL: peer + p.basePath}}
}
通過key去獲取遠程節點的方法PickPeer
Peers結構體中的變量addr在這里派上用場了,返回的地址要是等于本身addr,那就返回false,不用自己作為客戶端再去訪問自己。
func (p *Peers) PickPeer(key string) (*httpGetter, bool) {p.mutex.Lock()defer p.mutex.Unlock()//這里返回的peer是個地址,可以查看(Peers).Set函數中的參數if peer := p.peersHashRing.Get(key); peer != "" && peer != p.addr {fmt.Println("pick peer ", peer)return p.httpGetters[peer], true}return &httpGetter{}, false
}
Peers這個結構體就實現了,可以看到其與HTTPPool是很相似的。對比HTTPPool,就是成員變量添加了一些,方法也添加了一些,也沒有改變HTTPPool原有的邏輯,只是擴張了。所以可以把Peers的內容添加到HTTPPool中去,具體的代碼就不在這里顯示了。
type HTTPPool struct {addr stringbasePath string//新添加的,把Peers內容增添到HTTPPool中mutex sync.MutexpeersHashRing *consistenthash.HashRinghttpGetters map[string]*httpGetter
}
4.集成,實現主流程
最后,我們需要將上述新增的功能集成在主流程(geecache.go)中。
在Group結構體中有改變。
新增?RegisterPeers()
?方法,將 peers?注入到 Group 中。
type Group struct {name stringmainCache cachegetter Getterpeers *Peers //添加了節點集合
}// 往分組內注冊節點集合
func (g *Group) RegisterPeers(peers *Peers) {if g.peers != nil {panic("RegisterPeerPicker called more than once")}g.peers = peers
}
最終再回到load函數,這個函數是需要修改的。
func (g *Group) load(key string) (value ByteView, err error) {if g.peers != nil { //有遠程節點的情況if peer, ok := g.peers.PickPeer(key); ok { //通過key找到該遠程節點if value, err = g.getFromPeer(peer, key); err == nil {return value, nil //找到值}log.Println("[GeeCache] Failed to get from peer", err)}}return g.getLocally(key) //回到本地處理
}func (g *Group) getFromPeer(peer *httpGetter, key string) (ByteView, error) {bytes, err := peer.Get(g.name, key)if err != nil {return ByteView{}, err}return ByteView{b: bytes}, nil
}func (g *Group) getLocally(key string) (ByteView, error) {bytes, err := g.getter.Get(key)if err != nil {return ByteView{}, err}value := ByteView{b: cloneByte(bytes)}g.mainCache.add(key, value)return value, nil
}
- 新增?
getFromPeer()
?方法,使用httpGetter 訪問遠程節點,獲取緩存值。 - 修改 load 方法,使用?
PickPeer()
?方法選擇節點,若非本機節點,則調用?getFromPeer()
?從遠程獲取。若是本機節點或失敗,則回退到?getLocally()
。
5. 測試
總結——緩存節點啟動的流程
- 創建 Group 對象.(用于存儲我們的緩存數據)
- 啟動緩存 http 服務.(創建 HTTPPool,添加節點信息,注冊到緩存分組中)
- 啟動 API 服務.(用于與客戶端進行交互)
?測試代碼:
var db = map[string]string{"Tom": "630","Jack": "589","Sam": "567",
}func main() {var port intvar api boolflag.IntVar(&port, "port", 8001, "Geecache server port")flag.BoolVar(&api, "api", false, "Start a api server?")flag.Parse()apiAddr := "http://localhost:9999"addrMap := map[int]string{8001: "http://localhost:8001",8002: "http://localhost:8002",8003: "http://localhost:8003",}var addrs []stringfor _, v := range addrMap {addrs = append(addrs, v)}gee := createGroup()if api {go startAPIServer(apiAddr, gee)}startCacheServer(addrMap[port], addrs, gee)time.Sleep(time.Second * 1000)
}func createGroup() *cache.Group {return cache.NewGroup("scores", 2<<10, cache.GetterFunc(func(key string) ([]byte, error) {if v, ok := db[key]; ok {return []byte(v), nil}return nil, fmt.Errorf("%s not exit", key)}))
}func startCacheServer(addr string, addrs []string, groups *cache.Group) {//HTTPPool是節點結合和HTTP服務端peers := cache.NewHTTPPool(addr, cache.DefaultBasePath)peers.Set(addrs...) //添加節點groups.RegisterPeers(peers) //注冊節點集合log.Println("geecache is running at", addr)http.ListenAndServe(addr[7:], peers)
}func startAPIServer(apiAddr string, groups *cache.Group) {http.HandleFunc("/api", func(w http.ResponseWriter, r *http.Request) {key := r.URL.Query().Get("key")view, err := groups.Get(key)if err != nil {http.Error(w, err.Error(), http.StatusInternalServerError)return}w.Header().Set("Content-Type", "application/octet-stream")w.Write(view.ByteSlice())})log.Println("fontend server is running at", apiAddr)http.ListenAndServe(apiAddr[7:], nil)
}
為了方便,我們將啟動的命令封裝為一個?shell
?腳本:
我們開啟了三個節點(都是在同一個臺機器上的,只是用不同端口來當做一個節點,進行區分)。
在端口8003的節點上開啟APIServer,用戶去訪問時候,都是訪問端口8003的那個節點。
#!/bin/bash#trap 命令用于在 shell 腳本退出時,刪掉臨時文件,結束在該shell腳本運行的后臺程序
trap "rm server;kill 0" EXITgo build -o server
./server -port=8001 &
./server -port=8002 &
./server -port=8003 -api=1 &sleep 2
echo ">>> start test"
curl "http://localhost:9999/api?key=Tom" &
curl "http://localhost:9999/api?key=Tom" &
curl "http://localhost:9999/api?key=Tom" &wait
結果
測試的時候,我們并發了 3 個請求??key=Tom
,從日志中可以看到,三次均選擇了節點?8001
,這是一致性哈希算法的功勞。
但是會有一個問題,同時向?8001
?發起了 3 次請求。試想,假如有 10 萬個在并發請求該數據呢?那就會向?8001
?同時發起 10 萬次請求,如果?8001
?又同時向數據庫發起 10 萬次查詢請求,很容易導致緩存被擊穿。
三次請求的結果是一致的,對于相同的 key,能不能只向?8001
?發起一次請求?這個問題下一次解決。
6.多節點的訪問流程圖
完整代碼:https://github.com/liwook/Go-projects/tree/main/go-cache/5-multi-nodes