Golang | 搜索哨兵-對接分布式gRPC服務

  • 哨兵(centennial)負責接待客人,直接與調用方對接。
  • 哨兵的核心組件包括service HUB和connection pool。
  • service HUB用于與服務中心通信,獲取可提供服務的節點信息。
  • connection pool用于緩存與index worker的連接,避免每次搜索時重新建立連接。
  • 連接池初始化為空map。
  • 提供函數獲取指定endpoint的GRPC連接。
  • 函數首先檢查本地緩存中是否有可用連接,若無則創建新連接。
  • 創建連接時默認立即返回,可選阻塞模式直到連接可用。
  • 連接建立后放入緩存并返回。
  • 哨兵提供添加、刪除和搜索三個核心功能。
  • 添加功能:隨機選擇一臺index worker添加新文檔。
  • 刪除功能:遍歷所有endpoint,并行刪除指定文檔。
  • 搜索功能:將搜索請求發送到所有endpoint,合并搜索結果。
  • 使用channel進行并發搜索結果的收集。
  • 上游并發寫入channel,下游讀取channel數據到切片。
  • 使用wait group等待所有搜索任務完成。
  • 關閉channel后仍可讀取,確保讀取到所有數據。
package index_serviceimport ("context""fmt""github.com/jmh000527/criker-search/index_service/service_hub""github.com/jmh000527/criker-search/types""github.com/jmh000527/criker-search/utils""google.golang.org/grpc""google.golang.org/grpc/connectivity""google.golang.org/grpc/credentials/insecure""sync""sync/atomic""time"
)// Sentinel 哨兵前臺,與外部系統對接的接口。
type Sentinel struct {hub      service_hub.ServiceHub // 從 Hub 中獲取 IndexServiceWorker 的集合。可以直接訪問 ServiceHub,也可能通過代理模式進行訪問。connPool sync.Map               // 與各個 IndexServiceWorker 建立的 gRPC 連接池。緩存連接以避免每次請求都重新建立連接,提升效率。
}// NewSentinel 創建并返回一個 Sentinel 實例。
//
// 參數:
//   - etcdServers: 一個字符串數組,包含了 etcd 服務器的地址。
//
// 返回值:
//   - *Sentinel: 一個新的 Sentinel 實例。
func NewSentinel(etcdServers []string) *Sentinel {return &Sentinel{// hub: GetServiceHub(etcdServers, 10), // 直接訪問 ServiceHubhub:      service_hub.GetServiceHubProxy(etcdServers, 3, 100), // 使用代理模式訪問 ServiceHubconnPool: sync.Map{},                                          // 初始化 gRPC 連接池}
}// GetGrpcConn 向指定的 endpoint 建立 gRPC 連接。
// 如果連接已經存在于緩存中且狀態可用,則直接返回緩存的連接。
// 如果連接狀態不可用或不存在,則重新建立連接并存儲到緩存中。
//
// 參數:
//   - endpoint: 要連接的 gRPC 服務的地址。
//
// 返回值:
//   - *grpc.ClientConn: 返回與 endpoint 建立的 gRPC 連接,如果連接失敗則返回 nil。
func (sentinel *Sentinel) GetGrpcConn(endpoint string) *grpc.ClientConn {v, exists := sentinel.connPool.Load(endpoint)// 連接緩存中存在if exists {conn := v.(*grpc.ClientConn)// 如果連接狀態不可用,則從連接緩存中刪除if conn.GetState() == connectivity.TransientFailure || conn.GetState() == connectivity.Shutdown {utils.Log.Printf("連接到 endpoint %s 的狀態為 %s", endpoint, conn.GetState().String())conn.Close()sentinel.connPool.Delete(endpoint)} else {return conn}}// 連接到服務,控制連接超時ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)defer cancel()// 獲取 gRPC 連接// grpc.Dial 是異步連接,連接狀態為正在連接。// 如果設置了 grpc.WithBlock 選項,則會阻塞等待(等待握手成功)。// 需要注意的是,當未設置 grpc.WithBlock 時,ctx 超時控制對其無任何效果。grpcConn, err := grpc.DialContext(ctx, endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())if err != nil {utils.Log.Printf("連接到 %s 的 gRPC 失敗,錯誤: %s", endpoint, err.Error())return nil}utils.Log.Printf("連接到 %s 的 gRPC 成功", endpoint)// 將 gRPC 連接緩存到連接池中sentinel.connPool.Store(endpoint, grpcConn)return grpcConn
}// AddDoc 向集群中的 IndexService 添加文檔。如果文檔已存在,會先刪除舊文檔再添加新文檔。
//
// 參數:
//   - doc: 要添加的文檔,類型為 types.Document。
//
// 返回值:
//   - int: 成功添加的文檔數量。
//   - error: 如果在添加文檔時出現錯誤,返回相應的錯誤信息。
func (sentinel *Sentinel) AddDoc(doc types.Document) (int, error) {// 根據負載均衡策略,選擇一個 IndexService 節點,將文檔添加到該節點endpoint := sentinel.hub.GetServiceEndpoint(IndexService)if len(endpoint) == 0 {return 0, fmt.Errorf("未找到服務 %s 的有效節點", IndexService)}// 創建到該節點的 gRPC 連接grpcConn := sentinel.GetGrpcConn(endpoint)if grpcConn == nil {return 0, fmt.Errorf("連接到 %s 的 gRPC 失敗", endpoint)}// 創建 gRPC 客戶端并進行調用client := NewIndexServiceClient(grpcConn)affected, err := client.AddDoc(context.Background(), &doc)if err != nil {return 0, err}utils.Log.Printf("成功向 worker %s 添加 %d 個文檔", endpoint, affected.Count)return int(affected.Count), nil
}// DeleteDoc 從集群中刪除與 docId 對應的文檔,返回成功刪除的文檔數量(通常不會超過 1)。
//
// 參數:
//   - docId: 要刪除的文檔的唯一標識符。
//
// 返回值:
//   - int: 成功刪除的文檔數量。
func (sentinel *Sentinel) DeleteDoc(docId string) int {// 獲取該服務的所有 endpointsendpoints := sentinel.hub.GetServiceEndpoints(IndexService)if len(endpoints) == 0 {return 0}var n int32wg := sync.WaitGroup{}wg.Add(len(endpoints))for _, endpoint := range endpoints {// 并行地向各個 IndexServiceWorker 刪除對應的 docId 的文檔。// 正常情況下,只有一個 worker 上有該文檔。go func(endpoint string) {defer wg.Done()grpcConn := sentinel.GetGrpcConn(endpoint)if grpcConn == nil {utils.Log.Printf("連接到 %s 的 gRPC 失敗", endpoint)return}client := NewIndexServiceClient(grpcConn)affected, err := client.DeleteDoc(context.Background(), &DocId{docId})if err != nil {utils.Log.Printf("從 worker %s 刪除文檔 %s 失敗,錯誤: %s", endpoint, docId, err)return}if affected.Count > 0 {atomic.AddInt32(&n, affected.Count)utils.Log.Printf("從 worker %s 刪除文檔 %s 成功", endpoint, docId)}}(endpoint)}wg.Wait()return int(atomic.LoadInt32(&n))
}// Search 執行檢索操作,并返回文檔列表。
//
// 參數:
//   - query: 指定的檢索查詢條件,類型為 *types.TermQuery。
//   - onFlag: 開啟的標志位,類型為 uint64。
//   - offFlag: 關閉的標志位,類型為 uint64。
//   - orFlags: OR 標志位的切片,類型為 []uint64。
//
// 返回值:
//   - []*types.Document: 經過檢索的文檔列表,可能為空。
//
// 詳細描述:
//  1. 從服務中心獲取所有的 endpoints。
//  2. 使用 goroutines 并行地對每個 endpoint 執行檢索操作。
//  3. 將每個檢索結果發送到 resultChan 通道中。
//  4. 在另一個 goroutine 中,從 resultChan 通道中讀取結果,并將其存儲在 docs 切片中。
//  5. 等待所有的檢索操作完成后,關閉 resultChan,并等待從 resultChan 中讀取完所有結果。
//  6. 返回存儲的文檔列表。
func (sentinel *Sentinel) Search(query *types.TermQuery, onFlag, offFlag uint64, orFlags []uint64) []*types.Document {// 獲取該服務所有的 endpointsendpoints := sentinel.hub.GetServiceEndpoints(IndexService)if len(endpoints) == 0 {return nil}// 用于存儲檢索結果的切片和通道docs := make([]*types.Document, 0, 1000)resultChan := make(chan *types.Document, 1000)// 使用 WaitGroup 并行開啟協程去每個 endpoint 執行檢索操作var wg sync.WaitGroupwg.Add(len(endpoints))for _, endpoint := range endpoints {go func(endpoint string) {defer wg.Done()// 獲取 gRPC 連接grpcConn := sentinel.GetGrpcConn(endpoint)if grpcConn == nil {utils.Log.Printf("連接到 %s 的 gRPC 連接失敗", endpoint)return}client := NewIndexServiceClient(grpcConn)// 執行檢索請求searchResult, err := client.Search(context.Background(), &SearchRequest{Query:   query,OnFlag:  onFlag,OffFlag: offFlag,OrFlags: orFlags,})if err != nil {utils.Log.Printf("向 worker %s 執行查詢 %s 失敗,錯誤: %s", endpoint, query, err)return}if len(searchResult.Results) > 0 {utils.Log.Printf("向 worker %s 執行查詢 %s 成功,獲取到 %v 個文檔", endpoint, query, len(searchResult.Results))for _, result := range searchResult.Results {resultChan <- result}}}(endpoint)}// 啟動另一個 goroutine 從 resultChan 中獲取結果signalChan := make(chan struct{})go func() {for doc := range resultChan {docs = append(docs, doc)}// 讀取完成,通知主 goroutinesignalChan <- struct{}{}}()// 等待所有檢索操作完成wg.Wait()// 關閉 resultChan 通道close(resultChan)// 等待結果讀取完畢<-signalChanreturn docs
}// Count 獲取所有服務中的搜索條目數量。
//
// 參數:
//   - 無參數。
//
// 返回值:
//   - int: 所有服務中的文檔總數量。
//
// 詳細描述:
//  1. 從服務中心獲取所有的 endpoints。
//  2. 使用 goroutines 并行地對每個 endpoint 執行計數操作。
//  3. 將每個 worker 中的文檔數量累加到總計數中。
//  4. 等待所有計數操作完成后,返回文檔總數量。
func (sentinel *Sentinel) Count() int {var n int32// 獲取所有服務的 endpointsendpoints := sentinel.hub.GetServiceEndpoints(IndexService)if len(endpoints) == 0 {return 0}var wg sync.WaitGroupwg.Add(len(endpoints))for _, endpoint := range endpoints {go func(endpoint string) {defer wg.Done()// 獲取 gRPC 連接grpcConn := sentinel.GetGrpcConn(endpoint)if grpcConn != nil {client := NewIndexServiceClient(grpcConn)// 執行計數請求affected, err := client.Count(context.Background(), new(CountRequest))if err != nil {utils.Log.Printf("從 worker %s 獲取文檔數量失敗: %s", endpoint, err)}if affected.Count > 0 {// 累加計數atomic.AddInt32(&n, affected.Count)utils.Log.Printf("worker %s 共有 %d 個文檔", endpoint, affected.Count)}}}(endpoint)}// 等待所有計數操作完成wg.Wait()return int(atomic.LoadInt32(&n))
}// Close 關閉各個grpc client連接,關閉etcd client連接
func (sentinel *Sentinel) Close() (err error) {sentinel.connPool.Range(func(key, value any) bool {conn := value.(*grpc.ClientConn)err = conn.Close()return true})sentinel.hub.Close()return
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/85370.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/85370.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/85370.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

CSS3實現的賬號密碼輸入框提示效果

以下是通過CSS3實現輸入框提示效果的常用方法&#xff0c;包含浮動標簽和動態提示兩種經典實現方案&#xff1a; 一、浮動標簽效果 <div class"input-group"><input type"text" required><label>用戶名</label> </div><…

maven編譯時跳過test過程

如果代碼里有無法在打包環境中測試的部分&#xff0c;則直接運行mvn clean package&#xff0c;因為測試失敗&#xff0c;會導致打包失敗。目前有兩種方式可以跳過測試&#xff1a; 1. mvn clean package -DskipTests&#xff0c;這會跳過執行階須&#xff0c;但仍會生成測試所…

美業+智能體,解鎖行業轉化新密碼(2/6)

摘要&#xff1a;中國美業市場近年蓬勃發展&#xff0c;規模持續擴大&#xff0c;預計不久將突破萬億級別&#xff0c;但同時也面臨著諸多挑戰&#xff0c;如獲客成本攀升、服務質量不穩定、難以滿足消費者多元化個性化需求等。智能體技術的出現為美業帶來了新的發展機遇&#…

設計模式——責任鏈設計模式(行為型)

摘要 責任鏈設計模式是一種行為型設計模式&#xff0c;旨在將請求的發送者與接收者解耦&#xff0c;通過多個處理器對象按鏈式結構依次處理請求&#xff0c;直到某個處理器處理為止。它包含抽象處理者、具體處理者和客戶端等核心角色。該模式適用于多個對象可能處理請求的場景…

react/vue移動端項目,刷新頁面404的原因以及解決辦法

一 、 項目 移動端 二、背景 1、問題描述&#xff1a;react/vue移動端項目&#xff0c;正常的頁面操作跳轉&#xff0c;不會出現404的問題&#xff0c;但是一旦刷新&#xff0c;就會出現404報錯 2、產生原因&#xff1a; React Router是客戶端的路由&#xff0c;當再次刷新時…

數據結構-算法學習C++(入門)

目錄 03二進制和位運算04 選擇、冒泡、插入排序05 對數器06 二分搜索07 時間復雜度和空間復雜度08 算法和數據結構09 單雙鏈表09.1單雙鏈表及反轉09.2合并鏈表09.2兩數相加09.2分隔鏈表 013隊列、棧、環形隊列013.1隊列013.2棧013.3循環隊列 014棧-隊列的相互轉換014.1用棧實現…

用JS實現植物大戰僵尸(前端作業)

1. 先搭架子 整體效果&#xff1a; 點擊開始后進入主場景 左側是植物卡片 右上角是游戲的開始和暫停鍵 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevic…

深入理解設計模式之代理模式

深入理解設計模式之&#xff1a;代理模式 一、什么是代理模式&#xff1f; 代理模式&#xff08;Proxy Pattern&#xff09;是一種結構型設計模式。它為其他對象提供一種代理以控制對這個對象的訪問。代理對象在客戶端和目標對象之間起到中介作用&#xff0c;可以在不改變目標…

Ubuntu設置之初始化

安裝SSH服務 # 安裝 OpenSSH Server sudo apt update sudo apt install -y openssh-server# 檢查 SSH 服務狀態 sudo systemctl status ssh # Active: active (running) since Sat 2025-05-31 17:13:07 CST; 6s ago# 重啟服務 sudo systemctl restart ssh自定義分辨率 新…

【仿生機器人】極具前瞻性的架構——認知-情感-記憶“三位一體的仿生機器人系統架構

基于您的深度需求分析&#xff0c;我將為您設計一個全新的"認知-情感-記憶"三位一體的仿生機器人系統架構。以下是經過深度優化的解決方案&#xff1a; 一、核心架構升級&#xff08;三體認知架構&#xff09; 采用量子糾纏式架構設計&#xff1a; 認知三角&#xf…

Python量化交易12——Tushare全面獲取各種經濟金融數據

兩年前寫過Tushare的簡單使用&#xff1a; Python量化交易08——利用Tushare獲取日K數據_skshare- 現在更新一下吧&#xff0c;這兩年用過不少的金融數據庫&#xff0c;akshare&#xff0c;baostock&#xff0c;雅虎的&#xff0c;pd自帶的......發現還是Tushare最穩定最好用&…

python打卡day39@浙大疏錦行

知識點回顧 圖像數據的格式&#xff1a;灰度和彩色數據模型的定義顯存占用的4種地方 模型參數梯度參數優化器參數數據批量所占顯存神經元輸出中間狀態 batchisize和訓練的關系 1. 圖像數據格式 - 灰度圖像 &#xff1a;單通道&#xff0c;像素值范圍通常0-255&#xff0c;形狀為…

源碼解析(二):nnUNet

原文 &#x1f600; nnU-Net 是一個用于生物醫學圖像分割的自配置深度學習框架&#xff0c;可自動適應不同的數據集。可用于處理和訓練可能規模龐大的二維和三維醫學圖像。該系統分析數據集屬性并配置優化的基于 U-Net 的分割流程&#xff0c;無需手動參數調整或深度學習專業知…

clickhouse如何查看操作記錄,從日志來查看寫入是否成功

背景 插入表數據后&#xff0c;因為原本表中就有數據&#xff0c;一時間沒想到怎么查看插入是否成功&#xff0c;因為對數據源沒有很多的了解&#xff0c;這時候就想怎么查看下插入是否成功呢&#xff0c;于是就有了以下方法 具體方法 根據操作類型查找&#xff0c;比如inse…

udp 傳輸實時性測量

UDP&#xff08;用戶數據報協議&#xff09;是一種無連接的傳輸協議&#xff0c;適用于實時性要求較高的應用&#xff0c;如視頻流、音頻傳輸和游戲等。測量UDP傳輸的實時性可以通過多種工具和方法實現&#xff0c;以下是一些常見的方法和工具&#xff1a; 1. 使用 iperf 測試…

pikachu通關教程- over permission

如果使用A用戶的權限去操作B用戶的數據&#xff0c;A的權限小于B的權限&#xff0c;如果能夠成功操作&#xff0c;則稱之為越權操作。 越權漏洞形成的原因是后臺使用了 不合理的權限校驗規則導致的。 水平越權 當我們以Lucy賬號登錄&#xff0c;查詢個人信息時&#xff0c;會有…

nc 命令示例

nc -zv 實用示例 示例 1&#xff1a;測試單個 TCP 端口&#xff08;最常見&#xff09; 目標&#xff1a; 檢查主機 webserver.example.com 上的 80 端口 (HTTP) 是否開放。 nc -zv webserver.example.com 80成功輸出&#xff1a; Connection to webserver.example.com (19…

Redis是什么

注&#xff1a;本人不懂Redis是什么&#xff0c;問的大模型&#xff0c;讓它用生動淺顯的語言向我解釋。為了防止忘記&#xff0c;我把它說的記錄下來。接下來的解釋都是大模型生成的&#xff0c;如果有錯誤的地方歡迎指正 。 Redis 是什么&#xff1f;&#xff08;一句話解釋&…

CVE-2021-28164源碼分析與漏洞復現

漏洞概述 漏洞名稱&#xff1a;Jetty 路徑解析邏輯漏洞導致 WEB-INF 敏感信息泄露 漏洞編號&#xff1a;CVE-2021-28164 CVSS 評分&#xff1a;7.5 影響版本&#xff1a;Jetty 9.4.37 - 9.4.38 修復版本&#xff1a;Jetty ≥ 9.4.39 漏洞類型&#xff1a;路徑遍歷/信息泄露 C…

顛覆傳統!單樣本熵最小化如何重塑大語言模型訓練范式?

顛覆傳統&#xff01;單樣本熵最小化如何重塑大語言模型訓練范式&#xff1f; 大語言模型&#xff08;LLM&#xff09;的訓練往往依賴大量標注數據與復雜獎勵設計&#xff0c;但最新研究發現&#xff0c;僅用1條無標注數據和10步優化的熵最小化&#xff08;EM&#xff09;方法…