- 哨兵(centennial)負責接待客人,直接與調用方對接。
- 哨兵的核心組件包括service HUB和connection pool。
- service HUB用于與服務中心通信,獲取可提供服務的節點信息。
- connection pool用于緩存與index worker的連接,避免每次搜索時重新建立連接。
- 連接池初始化為空map。
- 提供函數獲取指定endpoint的GRPC連接。
- 函數首先檢查本地緩存中是否有可用連接,若無則創建新連接。
- 創建連接時默認立即返回,可選阻塞模式直到連接可用。
- 連接建立后放入緩存并返回。
- 哨兵提供添加、刪除和搜索三個核心功能。
- 添加功能:隨機選擇一臺index worker添加新文檔。
- 刪除功能:遍歷所有endpoint,并行刪除指定文檔。
- 搜索功能:將搜索請求發送到所有endpoint,合并搜索結果。
- 使用channel進行并發搜索結果的收集。
- 上游并發寫入channel,下游讀取channel數據到切片。
- 使用wait group等待所有搜索任務完成。
- 關閉channel后仍可讀取,確保讀取到所有數據。
package index_serviceimport ("context""fmt""github.com/jmh000527/criker-search/index_service/service_hub""github.com/jmh000527/criker-search/types""github.com/jmh000527/criker-search/utils""google.golang.org/grpc""google.golang.org/grpc/connectivity""google.golang.org/grpc/credentials/insecure""sync""sync/atomic""time"
)
type Sentinel struct {hub service_hub.ServiceHub connPool sync.Map
}
func NewSentinel(etcdServers []string) *Sentinel {return &Sentinel{hub: service_hub.GetServiceHubProxy(etcdServers, 3, 100), connPool: sync.Map{}, }
}
func (sentinel *Sentinel) GetGrpcConn(endpoint string) *grpc.ClientConn {v, exists := sentinel.connPool.Load(endpoint)if exists {conn := v.(*grpc.ClientConn)if conn.GetState() == connectivity.TransientFailure || conn.GetState() == connectivity.Shutdown {utils.Log.Printf("連接到 endpoint %s 的狀態為 %s", endpoint, conn.GetState().String())conn.Close()sentinel.connPool.Delete(endpoint)} else {return conn}}ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)defer cancel()grpcConn, err := grpc.DialContext(ctx, endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())if err != nil {utils.Log.Printf("連接到 %s 的 gRPC 失敗,錯誤: %s", endpoint, err.Error())return nil}utils.Log.Printf("連接到 %s 的 gRPC 成功", endpoint)sentinel.connPool.Store(endpoint, grpcConn)return grpcConn
}
func (sentinel *Sentinel) AddDoc(doc types.Document) (int, error) {endpoint := sentinel.hub.GetServiceEndpoint(IndexService)if len(endpoint) == 0 {return 0, fmt.Errorf("未找到服務 %s 的有效節點", IndexService)}grpcConn := sentinel.GetGrpcConn(endpoint)if grpcConn == nil {return 0, fmt.Errorf("連接到 %s 的 gRPC 失敗", endpoint)}client := NewIndexServiceClient(grpcConn)affected, err := client.AddDoc(context.Background(), &doc)if err != nil {return 0, err}utils.Log.Printf("成功向 worker %s 添加 %d 個文檔", endpoint, affected.Count)return int(affected.Count), nil
}
func (sentinel *Sentinel) DeleteDoc(docId string) int {endpoints := sentinel.hub.GetServiceEndpoints(IndexService)if len(endpoints) == 0 {return 0}var n int32wg := sync.WaitGroup{}wg.Add(len(endpoints))for _, endpoint := range endpoints {go func(endpoint string) {defer wg.Done()grpcConn := sentinel.GetGrpcConn(endpoint)if grpcConn == nil {utils.Log.Printf("連接到 %s 的 gRPC 失敗", endpoint)return}client := NewIndexServiceClient(grpcConn)affected, err := client.DeleteDoc(context.Background(), &DocId{docId})if err != nil {utils.Log.Printf("從 worker %s 刪除文檔 %s 失敗,錯誤: %s", endpoint, docId, err)return}if affected.Count > 0 {atomic.AddInt32(&n, affected.Count)utils.Log.Printf("從 worker %s 刪除文檔 %s 成功", endpoint, docId)}}(endpoint)}wg.Wait()return int(atomic.LoadInt32(&n))
}
func (sentinel *Sentinel) Search(query *types.TermQuery, onFlag, offFlag uint64, orFlags []uint64) []*types.Document {endpoints := sentinel.hub.GetServiceEndpoints(IndexService)if len(endpoints) == 0 {return nil}docs := make([]*types.Document, 0, 1000)resultChan := make(chan *types.Document, 1000)var wg sync.WaitGroupwg.Add(len(endpoints))for _, endpoint := range endpoints {go func(endpoint string) {defer wg.Done()grpcConn := sentinel.GetGrpcConn(endpoint)if grpcConn == nil {utils.Log.Printf("連接到 %s 的 gRPC 連接失敗", endpoint)return}client := NewIndexServiceClient(grpcConn)searchResult, err := client.Search(context.Background(), &SearchRequest{Query: query,OnFlag: onFlag,OffFlag: offFlag,OrFlags: orFlags,})if err != nil {utils.Log.Printf("向 worker %s 執行查詢 %s 失敗,錯誤: %s", endpoint, query, err)return}if len(searchResult.Results) > 0 {utils.Log.Printf("向 worker %s 執行查詢 %s 成功,獲取到 %v 個文檔", endpoint, query, len(searchResult.Results))for _, result := range searchResult.Results {resultChan <- result}}}(endpoint)}signalChan := make(chan struct{})go func() {for doc := range resultChan {docs = append(docs, doc)}signalChan <- struct{}{}}()wg.Wait()close(resultChan)<-signalChanreturn docs
}
func (sentinel *Sentinel) Count() int {var n int32endpoints := sentinel.hub.GetServiceEndpoints(IndexService)if len(endpoints) == 0 {return 0}var wg sync.WaitGroupwg.Add(len(endpoints))for _, endpoint := range endpoints {go func(endpoint string) {defer wg.Done()grpcConn := sentinel.GetGrpcConn(endpoint)if grpcConn != nil {client := NewIndexServiceClient(grpcConn)affected, err := client.Count(context.Background(), new(CountRequest))if err != nil {utils.Log.Printf("從 worker %s 獲取文檔數量失敗: %s", endpoint, err)}if affected.Count > 0 {atomic.AddInt32(&n, affected.Count)utils.Log.Printf("worker %s 共有 %d 個文檔", endpoint, affected.Count)}}}(endpoint)}wg.Wait()return int(atomic.LoadInt32(&n))
}
func (sentinel *Sentinel) Close() (err error) {sentinel.connPool.Range(func(key, value any) bool {conn := value.(*grpc.ClientConn)err = conn.Close()return true})sentinel.hub.Close()return
}