怎么結合呢?
我們先來回顧一下一致性哈希代碼實現里面的結構
// Consistent holds the information about the members of the consistent hash circle.
type Consistent struct {mu sync.RWMutex // 讀寫鎖,用于保護并發訪問共享數據config Config // 存儲配置信息hasher Hasher // 存儲哈希器實例,直接從 config 復制過來sortedSet []uint64 // 存儲所有虛擬節點(通過成員名稱+副本索引哈希得到)的哈希值,并保持升序排列。這是哈希環的骨架。partitionCount uint64 // 邏輯分區的總數,從 config.PartitionCount 轉換而來loads map[string]float64 // 存儲每個真實成員當前的負載(即它擁有的邏輯分區數量)members map[string]*Member // 存儲所有真實成員的映射,鍵是成員的 String() 返回值,值是指向 Member 接口的指針partitions map[int]*Member // 核心映射:存儲每個邏輯分區ID(0到PartitionCount-1)對應的真實成員(指向 Member 接口的指針)ring map[uint64]*Member // 存儲虛擬節點哈希值到真實成員的映射。它是 sortedSet 的補充,sortedSet 提供有序列表,ring 提供哈希值到成員的查找。
}
現在就是加入一個新的服務器后,會修改什么內容
func (c *Consistent) add(member Member) {for i := 0; i < c.config.ReplicationFactor; i++ {key := []byte(fmt.Sprintf("%s%d", member.String(), i))h := c.hasher.Sum64(key)c.ring[h] = &memberc.sortedSet = append(c.sortedSet, h)}// sort hashes ascendinglysort.Slice(c.sortedSet, func(i int, j int) bool {return c.sortedSet[i] < c.sortedSet[j]})// Storing member at this map is useful to find backup members of a partition.c.members[member.String()] = &member
}
我們每次增加服務器都會修改sortedset以及ring(虛擬節點和服務器的映射),但是如果我們能夠得到服務器列表,那么是可以在本地構建sortedset和ring
分區和服務器映射需要存儲到區塊鏈中嗎?不需要
它是通過確定性算法計算出來的:partitions 的內容是根據成員列表、虛擬節點哈希環和有界負載等參數,通過 distributePartitions() 方法計算得出的。這個過程是確定性的,只要輸入(成員列表、配置)相同,輸出(partitions 映射)也一定相同。
它與 sortedSet 是強關聯的:partitions 映射的構建依賴于 sortedSet 和 ring。distributePartitions() 函數正是通過遍歷 sortedSet 來為每個邏輯分區尋找歸屬成員的。
它是高頻查詢的關鍵:Consistent.LocateKey 方法的最后一步就是查詢 partitions 映射。如果這個映射也需要從區塊鏈上獲取,那么每次鍵的定位都會變成一個慢速的區塊鏈查詢,這會徹底破壞系統的性能。
下面開始構建
鏈碼
我們先創建一個鏈碼
找一個位置存下下面的代碼
package mainimport ("encoding/json""fmt""log"// "strconv"// "github.com/hyperledger/fabric-chaincode-go/shim""github.com/hyperledger/fabric-contract-api-go/contractapi"
)// ConsistentHashManager 鏈碼的智能合約結構體
type ConsistentHashManager struct {contractapi.Contract
}// 成員列表鍵,用于在世界狀態中存儲成員列表
const membersKey = "all_members_key"// Member represents a member in the consistent hash ring.
// This is the same structure we used in our previous local Go program.
type Member struct {Name string `json:"name"`
}// InitLedger 初始化鏈碼。
// 在這里,我們將創建一個空的成員列表并將其寫入世界狀態。
func (s *ConsistentHashManager) InitLedger(ctx contractapi.TransactionContextInterface) error {log.Printf("Initializing the ledger with an empty member list.")// 創建一個空成員列表members := []Member{}membersJSON, err := json.Marshal(members)if err != nil {return err}// 將空成員列表寫入世界狀態return ctx.GetStub().PutState(membersKey, membersJSON)
}// AddMember 添加一個新成員到哈希環中。
func (s *ConsistentHashManager) AddMember(ctx contractapi.TransactionContextInterface, memberName string) error {log.Printf("Attempting to add member: %s", memberName)// 從世界狀態中讀取當前成員列表membersJSON, err := ctx.GetStub().GetState(membersKey)if err != nil {return fmt.Errorf("failed to read from world state: %w", err)}// 如果列表不存在,創建一個空列表if membersJSON == nil {membersJSON = []byte("[]")}// 反序列化成員列表var members []Membererr = json.Unmarshal(membersJSON, &members)if err != nil {return fmt.Errorf("failed to unmarshal member list: %w", err)}// 檢查成員是否已存在for _, member := range members {if member.Name == memberName {return fmt.Errorf("member '%s' already exists", memberName)}}// 添加新成員members = append(members, Member{Name: memberName})// 將更新后的列表序列化membersJSON, err = json.Marshal(members)if err != nil {return err}// 將更新后的列表寫入世界狀態return ctx.GetStub().PutState(membersKey, membersJSON)
}// RemoveMember 從哈希環中移除一個成員。
func (s *ConsistentHashManager) RemoveMember(ctx contractapi.TransactionContextInterface, memberName string) error {log.Printf("Attempting to remove member: %s", memberName)// 從世界狀態中讀取當前成員列表membersJSON, err := ctx.GetStub().GetState(membersKey)if err != nil {return fmt.Errorf("failed to read from world state: %w", err)}if membersJSON == nil {return fmt.Errorf("member list is empty, cannot remove '%s'", memberName)}// 反序列化成員列表var members []Membererr = json.Unmarshal(membersJSON, &members)if err != nil {return fmt.Errorf("failed to unmarshal member list: %w", err)}// 查找并移除成員found := falsefor i, member := range members {if member.Name == memberName {// 移除切片中的元素members = append(members[:i], members[i+1:]...)found = truebreak}}if !found {return fmt.Errorf("member '%s' not found", memberName)}// 將更新后的列表序列化membersJSON, err = json.Marshal(members)if err != nil {return err}// 將更新后的列表寫入世界狀態return ctx.GetStub().PutState(membersKey, membersJSON)
}// GetMembers 查詢并返回當前所有成員。
func (s *ConsistentHashManager) GetMembers(ctx contractapi.TransactionContextInterface) ([]*Member, error) {log.Println("Querying for all members.")// 從世界狀態中讀取成員列表membersJSON, err := ctx.GetStub().GetState(membersKey)if err != nil {return nil, fmt.Errorf("failed to read from world state: %w", err)}if membersJSON == nil {log.Println("No members found, returning empty list.")return []*Member{}, nil}// 反序列化成員列表var members []Membererr = json.Unmarshal(membersJSON, &members)if err != nil {return nil, fmt.Errorf("failed to unmarshal member list: %w", err)}// 將 []Member 轉換為 []*Member 以符合 contractapi 接口var result []*Memberfor i := range members {result = append(result, &members[i])}return result, nil
}func main() {chaincode, err := contractapi.NewChaincode(&ConsistentHashManager{})if err != nil {log.Panicf("Error creating consistent hash manager chaincode: %v", err)}if err := chaincode.Start(); err != nil {log.Panicf("Error starting consistent hash manager chaincode: %v", err)}
}
然后我們再運行
go mod init consistent-hash-manager # 你可以替換成你喜歡的模塊名
go mod tidy
下面就可以去部署網絡,部署鏈碼
./network.sh deployCC -ccn consistent-hash-manager -ccp ../chaincode/ConsistentHashManager -ccv 1.0 -ccl go -c mychannel
接著我們運行我們的客戶端代碼
package mainimport ("bytes""crypto/x509""encoding/json""fmt""log""os""path"// "time""github.com/hyperledger/fabric-gateway/pkg/client""github.com/hyperledger/fabric-gateway/pkg/identity""google.golang.org/grpc""google.golang.org/grpc/credentials"
)// 定義鏈碼名稱、通道名稱和網絡配置路徑
const (mspID = "Org1MSP"cryptoPath = "../../test-network/organizations/peerOrganizations/org1.example.com"certPath = cryptoPath + "/users/User1@org1.example.com/msp/signcerts"keyPath = cryptoPath + "/users/User1@org1.example.com/msp/keystore"tlsCertPath = cryptoPath + "/peers/peer0.org1.example.com/tls/ca.crt"peerEndpoint = "localhost:7051" // Fabric Peer 的 Gateway 服務地址gatewayPeer = "peer0.org1.example.com" // Gateway Peer 的主機名,用于 TLS 驗證channelName = "mychannel"chaincodeName = "consistent-hash-manager" // 使用我們自己的鏈碼名稱
)// Member 結構體,用于解析鏈碼返回的成員列表
type Member struct {Name string `json:"name"`
}// newGrpcConnection 創建與 Gateway 服務器的 gRPC 連接
func newGrpcConnection() *grpc.ClientConn {log.Println("--> Creating gRPC client connection...")certificatePEM, err := os.ReadFile(tlsCertPath)if err != nil {log.Fatalf("Failed to read TLS certificate file at %s: %v", tlsCertPath, err)}certPool := x509.NewCertPool()if !certPool.AppendCertsFromPEM(certificatePEM) {log.Fatalf("Failed to append TLS CA certificate to pool")}transportCredentials := credentials.NewClientTLSFromCert(certPool, gatewayPeer)connection, err := grpc.NewClient(peerEndpoint, grpc.WithTransportCredentials(transportCredentials), grpc.WithBlock())if err != nil {log.Fatalf("Failed to create gRPC connection: %v", err)}log.Println("--> gRPC client connection created successfully.")return connection
}// newIdentity 為 Gateway 連接創建一個客戶端身份 (X.509 證書)
func newIdentity() *identity.X509Identity {log.Println("--> Creating new client identity...")certificatePEM, err := readFirstFile(certPath)if err != nil {log.Fatalf("Failed to read certificate file from %s: %v", certPath, err)}certificate, err := identity.CertificateFromPEM(certificatePEM)if err != nil {log.Fatalf("Failed to parse identity certificate: %v", err)}id, err := identity.NewX509Identity(mspID, certificate)if err != nil {log.Fatalf("Failed to create X.509 identity: %v", err)}log.Println("--> Client identity created successfully.")return id
}// newSign 創建一個函數,該函數使用私鑰從消息摘要生成數字簽名。
func newSign() identity.Sign {log.Println("--> Creating new private key signer...")privateKeyPEM, err := readFirstFile(keyPath)if err != nil {log.Fatalf("Failed to read private key file from %s: %v", keyPath, err)}privateKey, err := identity.PrivateKeyFromPEM(privateKeyPEM)if err != nil {log.Fatalf("Failed to parse private key: %v", err)}sign, err := identity.NewPrivateKeySign(privateKey)if err != nil {log.Fatalf("Failed to create private key signer: %v", err)}log.Println("--> Private key signer created successfully.")return sign
}// readFirstFile 從指定目錄中讀取第一個文件
func readFirstFile(dirPath string) ([]byte, error) {dir, err := os.Open(dirPath)if err != nil {return nil, fmt.Errorf("failed to open directory %s: %w", dirPath, err)}defer dir.Close()fileNames, err := dir.Readdirnames(1)if err != nil {return nil, fmt.Errorf("failed to read file names from directory %s: %w", dirPath, err)}if len(fileNames) == 0 {return nil, fmt.Errorf("no files found in directory: %s", dirPath)}filePath := path.Join(dirPath, fileNames[0])fileContent, err := os.ReadFile(filePath)if err != nil {return nil, fmt.Errorf("failed to read file %s: %w", filePath, err)}return fileContent, nil
}// formatJSON 格式化 JSON 數據,使其更易讀
func formatJSON(data []byte) string {if len(data) == 0 {return "[]" // 如果數據為空,返回一個空的 JSON 數組字符串}var prettyJSON bytes.Bufferif err := json.Indent(&prettyJSON, data, "", " "); err != nil {log.Printf("Warning: Failed to parse JSON, returning raw data. Error: %v", err)return string(data)}return prettyJSON.String()
}func main() {// 1. 創建 gRPC 客戶端連接clientConnection := newGrpcConnection()defer clientConnection.Close()// 2. 創建客戶端身份和簽名函數id := newIdentity()sign := newSign()// 3. 連接 Fabric Gatewaygw, err := client.Connect(id,client.WithSign(sign),client.WithClientConnection(clientConnection),)if err != nil {log.Fatalf("Failed to connect to Gateway: %v", err)}defer gw.Close()// 獲取網絡和合約對象network := gw.GetNetwork(channelName)contract := network.GetContract(chaincodeName)// memberNameToAdd := "node-a"// // 4. 調用 AddMember 提交事務// log.Printf("\n--> 提交 AddMember 事務以添加 '%s'...", memberNameToAdd)// _, err = contract.SubmitTransaction("AddMember", memberNameToAdd)// if err != nil {// log.Fatalf("Failed to submit AddMember transaction: %v", err)// }// log.Printf("成員 '%s' 添加成功。", memberNameToAdd)// memberNameToAdd2 := "node-b"// // 4. 調用 AddMember 提交事務// log.Printf("\n--> 提交 AddMember 事務以添加 '%s'...", memberNameToAdd2)// _, err = contract.SubmitTransaction("AddMember", memberNameToAdd2)// if err != nil {// log.Fatalf("Failed to submit AddMember transaction: %v", err)// }// log.Printf("成員 '%s' 添加成功。", memberNameToAdd2)// 5. 調用 GetMembers 查詢當前成員列表log.Println("\n--> 查詢所有成員...")result, err := contract.EvaluateTransaction("GetMembers")if err != nil {log.Fatalf("Failed to evaluate GetMembers transaction: %v", err)}log.Printf("當前成員列表:\n%s", formatJSON(result))// // 6. 調用 RemoveMember 提交事務// log.Printf("\n--> 提交 RemoveMember 事務以移除 '%s'...", memberNameToAdd)// _, err = contract.SubmitTransaction("RemoveMember", memberNameToAdd)// if err != nil {// log.Fatalf("Failed to submit RemoveMember transaction: %v", err)// }// log.Printf("成員 '%s' 移除成功。", memberNameToAdd)// // 7. 再次調用 GetMembers 查詢以確認移除// log.Println("\n--> 再次查詢所有成員以確認移除...")// result, err = contract.EvaluateTransaction("GetMembers")// if err != nil {// log.Fatalf("Failed to evaluate GetMembers transaction: %v", err)// }// log.Printf("移除后成員列表:\n%s", formatJSON(result))// log.Println("\n所有操作已完成。")
}