區塊鏈教程Fabric1.0源代碼分析流言算法Gossip服務端二

  區塊鏈教程Fabric1.0源代碼分析流言算法Gossip服務端二

Fabric 1.0源代碼筆記 之 gossip(流言算法) #GossipServer(Gossip服務端)

5.2、commImpl結構體方法

//conn.serviceConnection(),啟動連接服務
func (c *commImpl) GossipStream(stream proto.Gossip_GossipStreamServer) error
//return &proto.Empty{}
func (c *commImpl) Ping(context.Context, *proto.Empty) (*proto.Empty, error)func (c *commImpl) GetPKIid() common.PKIidType
//向指定節點發送消息
func (c *commImpl) Send(msg *proto.SignedGossipMessage, peers ...*RemotePeer)
//探測遠程節點是否有響應,_, err = cl.Ping(context.Background(), &proto.Empty{})
func (c *commImpl) Probe(remotePeer *RemotePeer) error
//握手驗證遠程節點,_, err = cl.Ping(context.Background(), &proto.Empty{})
func (c *commImpl) Handshake(remotePeer *RemotePeer) (api.PeerIdentityType, error)
func (c *commImpl) Accept(acceptor common.MessageAcceptor) <-chan proto.ReceivedMessage
func (c *commImpl) PresumedDead() <-chan common.PKIidType
func (c *commImpl) CloseConn(peer *RemotePeer)
func (c *commImpl) Stop()//創建并啟動gRPC Server,以及注冊GossipServer實例
func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType,
//將GossipServer實例注冊至peerServer
func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper,
func extractRemoteAddress(stream stream) string
func readWithTimeout(stream interface{}, timeout time.Duration, address string) (*proto.SignedGossipMessage, error) 
//創建gRPC Server,grpc.NewServer(serverOpts...)
func createGRPCLayer(port int) (*grpc.Server, net.Listener, api.PeerSecureDialOpts, []byte)//創建與服務端連接
func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error)
//向指定節點發送消息
func (c *commImpl) sendToEndpoint(peer *RemotePeer, msg *proto.SignedGossipMessage)
//return atomic.LoadInt32(&c.stopping) == int32(1)
func (c *commImpl) isStopping() bool
func (c *commImpl) emptySubscriptions()
func (c *commImpl) authenticateRemotePeer(stream stream) (*proto.ConnectionInfo, error)
func (c *commImpl) disconnect(pkiID common.PKIidType)
func (c *commImpl) createConnectionMsg(pkiID common.PKIidType, certHash []byte, cert api.PeerIdentityType, signer proto.Signer) (*proto.SignedGossipMessage, error)
//代碼在gossip/comm/comm_impl.go

5.2.1、func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType,secureDialOpts api.PeerSecureDialOpts, dialOpts ...grpc.DialOption) (Comm, error)

創建并啟動gRPC Server,以及注冊GossipServer實例

func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType,secureDialOpts api.PeerSecureDialOpts, dialOpts ...grpc.DialOption) (Comm, error) {var ll net.Listenervar s *grpc.Servervar certHash []byteif len(dialOpts) == 0 {//peer.gossip.dialTimeout,gRPC連接撥號的超時dialOpts = []grpc.DialOption{grpc.WithTimeout(util.GetDurationOrDefault("peer.gossip.dialTimeout", defDialTimeout))}}if port > 0 {//創建gRPC Server,grpc.NewServer(serverOpts...)s, ll, secureDialOpts, certHash = createGRPCLayer(port)}commInst := &commImpl{selfCertHash:   certHash,PKIID:          idMapper.GetPKIidOfCert(peerIdentity),idMapper:       idMapper,logger:         util.GetLogger(util.LoggingCommModule, fmt.Sprintf("%d", port)),peerIdentity:   peerIdentity,opts:           dialOpts,secureDialOpts: secureDialOpts,port:           port,lsnr:           ll,gSrv:           s,msgPublisher:   NewChannelDemultiplexer(),lock:           &sync.RWMutex{},deadEndpoints:  make(chan common.PKIidType, 100),stopping:       int32(0),exitChan:       make(chan struct{}, 1),subscriptions:  make([]chan proto.ReceivedMessage, 0),}commInst.connStore = newConnStore(commInst, commInst.logger)if port > 0 {commInst.stopWG.Add(1)go func() {defer commInst.stopWG.Done()s.Serve(ll) //啟動gRPC Server}()//commInst注冊到gRPC Serverproto.RegisterGossipServer(s, commInst)}return commInst, nil
}//代碼在gossip/comm/comm_impl.go

5.2.2、func NewCommInstance(s grpc.Server, cert tls.Certificate, idStore identity.Mapper,peerIdentity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts,dialOpts ...grpc.DialOption) (Comm, error)

將GossipServer實例注冊至peerServer

func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper,peerIdentity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts,dialOpts ...grpc.DialOption) (Comm, error) {dialOpts = append(dialOpts, grpc.WithTimeout(util.GetDurationOrDefault("peer.gossip.dialTimeout", defDialTimeout)))//構造commImplcommInst, err := NewCommInstanceWithServer(-1, idStore, peerIdentity, secureDialOpts, dialOpts...)if cert != nil {inst := commInst.(*commImpl)inst.selfCertHash = certHashFromRawCert(cert.Certificate[0])}proto.RegisterGossipServer(s, commInst.(*commImpl))return commInst, nil
}//代碼在gossip/comm/comm_impl.go

//創建與服務端連接

5.2.3、func (c commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (connection, error)

func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error) {var err errorvar cc *grpc.ClientConnvar stream proto.Gossip_GossipStreamClientvar pkiID common.PKIidTypevar connInfo *proto.ConnectionInfovar dialOpts []grpc.DialOptiondialOpts = append(dialOpts, c.secureDialOpts()...)dialOpts = append(dialOpts, grpc.WithBlock())dialOpts = append(dialOpts, c.opts...)cc, err = grpc.Dial(endpoint, dialOpts...)cl := proto.NewGossipClient(cc)if _, err = cl.Ping(context.Background(), &proto.Empty{}); err != nil {cc.Close()return nil, err}ctx, cf := context.WithCancel(context.Background())stream, err = cl.GossipStream(ctx)connInfo, err = c.authenticateRemotePeer(stream)pkiID = connInfo.IDconn := newConnection(cl, cc, stream, nil)conn.pkiID = pkiIDconn.info = connInfoconn.logger = c.loggerconn.cancel = cfh := func(m *proto.SignedGossipMessage) {c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{conn:                conn,lock:                conn,SignedGossipMessage: m,connInfo:            connInfo,})}conn.handler = hreturn conn, nil
}
//代碼在gossip/comm/comm_impl.go

6、connectionStore和connection結構體及方法

6.1、connection結構體及方法

type connection struct {cancel       context.CancelFuncinfo         *proto.ConnectionInfooutBuff      chan *msgSendinglogger       *logging.Logger                 // loggerpkiID        common.PKIidType                // pkiID of the remote endpointhandler      handler                         // function to invoke upon a message receptionconn         *grpc.ClientConn                // gRPC connection to remote endpointcl           proto.GossipClient              // gRPC stub of remote endpointclientStream proto.Gossip_GossipStreamClient // client-side stream to remote endpointserverStream proto.Gossip_GossipStreamServer // server-side stream to remote endpointstopFlag     int32                           // indicates whether this connection is in process of stoppingstopChan     chan struct{}                   // a method to stop the server-side gRPC call from a different go-routinesync.RWMutex                                 // synchronizes access to shared variables
}//構造connection
func newConnection(cl proto.GossipClient, c *grpc.ClientConn, cs proto.Gossip_GossipStreamClient, ss proto.Gossip_GossipStreamServer) *connection
//關閉connection
func (conn *connection) close()
//atomic.LoadInt32(&(conn.stopFlag)) == int32(1)
func (conn *connection) toDie() bool
//conn.outBuff <- m,其中m為msgSending{envelope: msg.Envelope,onErr: onErr,}
func (conn *connection) send(msg *proto.SignedGossipMessage, onErr func(error))
//go conn.readFromStream(errChan, msgChan)、go conn.writeToStream(),同時msg := <-msgChan,conn.handler(msg)
func (conn *connection) serviceConnection() error
//循環不間斷從conn.outBuff取數據,然后stream.Send(m.envelope)
func (conn *connection) writeToStream()
//循環不間斷envelope, err := stream.Recv()、msg, err := envelope.ToGossipMessage()、msgChan <- msg
func (conn *connection) readFromStream(errChan chan error, msgChan chan *proto.SignedGossipMessage)
//獲取conn.serverStream
func (conn *connection) getStream() stream
//代碼在gossip/comm/conn.go

6.2、connectionStore結構體及方法

type connectionStore struct {logger           *logging.Logger          // loggerisClosing        bool                     // whether this connection store is shutting downconnFactory      connFactory              // creates a connection to remote peersync.RWMutex                              // synchronize access to shared variablespki2Conn         map[string]*connection   //connection map, key為pkiID,value為connectiondestinationLocks map[string]*sync.RWMutex //mapping between pkiIDs and locks,// used to prevent concurrent connection establishment to the same remote endpoint
}//構造connectionStore
func newConnStore(connFactory connFactory, logger *logging.Logger) *connectionStore
//從connection map中獲取連接,如無則創建并啟動連接,并寫入connection map中
func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error)
//連接數量
func (cs *connectionStore) connNum() int
//關閉指定連接
func (cs *connectionStore) closeConn(peer *RemotePeer)
//關閉所有連接
func (cs *connectionStore) shutdown()
func (cs *connectionStore) onConnected(serverStream proto.Gossip_GossipStreamServer, connInfo *proto.ConnectionInfo) *connection
//注冊連接
func (cs *connectionStore) registerConn(connInfo *proto.ConnectionInfo, serverStream proto.Gossip_GossipStreamServer) *connection
//關閉指定連接
func (cs *connectionStore) closeByPKIid(pkiID common.PKIidType) 
//代碼在gossip/comm/conn.go

6.2.1、func (cs connectionStore) getConnection(peer RemotePeer) (*connection, error)

func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error) {cs.RLock()isClosing := cs.isClosingcs.RUnlock()pkiID := peer.PKIIDendpoint := peer.Endpointcs.Lock()destinationLock, hasConnected := cs.destinationLocks[string(pkiID)]if !hasConnected {destinationLock = &sync.RWMutex{}cs.destinationLocks[string(pkiID)] = destinationLock}cs.Unlock()destinationLock.Lock()cs.RLock()//從connection map中獲取conn, exists := cs.pki2Conn[string(pkiID)]if exists {cs.RUnlock()destinationLock.Unlock()return conn, nil}cs.RUnlock()//創建連接createdConnection, err := cs.connFactory.createConnection(endpoint, pkiID)destinationLock.Unlock()conn = createdConnectioncs.pki2Conn[string(createdConnection.pkiID)] = conngo conn.serviceConnection() //啟動連接的消息接收處理、以及向對方節點發送消息return conn, nil
}
//代碼在gossip/comm/conn.go

7、ChannelDeMultiplexer結構體及方法(多路復用器)

type ChannelDeMultiplexer struct {channels []*channellock     *sync.RWMutexclosed   int32
}//構造ChannelDeMultiplexer
func NewChannelDemultiplexer() *ChannelDeMultiplexer
//atomic.LoadInt32(&m.closed) == int32(1)
func (m *ChannelDeMultiplexer) isClosed() bool
//關閉
func (m *ChannelDeMultiplexer) Close() 
//添加通道
func (m *ChannelDeMultiplexer) AddChannel(predicate common.MessageAcceptor) chan interface{} 
//挨個通道發送消息
func (m *ChannelDeMultiplexer) DeMultiplex(msg interface{}) 

轉載于:https://blog.51cto.com/14041296/2311323

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/253181.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/253181.shtml
英文地址,請注明出處:http://en.pswp.cn/news/253181.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

一段H264數據的分析

&#xfeff;&#xfeff;目錄(?)[-] 分析00 00 00 01 67 42 00 1E 99 A0 B1 31 00 00 00 01分析00 00 00 01 68 CE 38 80 00 00 00 01 分析00 00 00 01 67 42 00 1E 99 A0 B1 31 00 00 00 01 H264的數據流分為兩種&#xff0c;一種是NAL UNIT stream(RTP),一種是 bits stream…

海華模組:WIFI、BT、SoC模組列表

各種模塊廣泛應用于網絡攝像頭、智能機器人、兒童故事機、詞典筆、智能音箱、智能家電等需要實現無線聯網設備的消費類電子產品。 模塊化有很大的有點&#xff1a;集成設計、減少調試工作&#xff0c;避開開發盲區、加速將產品推向市場&#xff01; 下面介紹下海華各類通訊模…

JAVA-初步認識-第七章-構造函數和一般函數的區別

一. 構造函數是對象一創建&#xff0c;就被調用了。(調用這個詞很特殊&#xff0c;是涉及到實體時&#xff0c;才會有調用的過程) 還有一點想說的是&#xff0c;構造函數的聲明應該是固定的&#xff0c;不然沒法隨著對象的創建一起執行&#xff0c;必須是類名括號的形式。 二. …

深入理解哈希表

轉自&#xff1a;https://bestswifter.com/hashtable/ 這篇文章由一個簡單的問題引出: 有兩個字典&#xff0c;分別存有 100 條數據和 10000 條數據&#xff0c;如果用一個不存在的 key 去查找數據&#xff0c;在哪個字典中速度更快&#xff1f; 有些計算機常識的讀者都會立刻回…

Linux服務器ftp+httpd部署

一、ftp安裝 1、安裝vsftpd 命令&#xff1a;yum -y install vsftpd 2、修改ftp配置文件 命令&#xff1a;vim /etc/vsftpd/vsftpd.conf 3、按i進入insert模式后&#xff0c;按以下要求修改 anonymous_enableYES 改為anonymous_enableNO chroot_local_userYES #去掉前面的注釋 …

高清網絡攝像機主流芯片方案之安霸、TI和海思對比

高清網絡視頻監控發展到今天&#xff0c;市場也開始進入真正的高清時代&#xff0c;諸多有實力的高清攝像機廠家的產品線也逐漸完善起來&#xff0c;高清網絡視頻監控的配套產品有更加豐富和成熟。與此同時困擾很多人的高清網絡攝像機與后端平臺或者與后端NVR互聯互通的問題也在…

ios審核4.3被拒,快速通過IOS4.3問題

最近有許多開發者遇到了因為審核條款 4.3&#xff08;后文統一簡稱 4.3&#xff09;審核條款 4.3&#xff08;后文統一簡稱 4.3&#xff09;&#xff0c;這種情況 常見于大家上傳重復應用的時候&#xff0c;因為App Store 已經有了很多相似的應用 而被打回&#xff0c;今天我們…

正基模組:WIFI/BT/GPS/FM模組列表

各種模塊廣泛應用于網絡攝像頭、智能機器人、兒童故事機、詞典筆、智能音箱、智能家電等需要實現無線聯網設備的消費類電子產品。 模組由于其特性&#xff0c;給終端硬件開發帶來巨大的便利性和實用性&#xff0c;具體小結如下&#xff1a; Feature特點:1. 模塊均采用郵票孔形…

計算機網絡基礎教程---強烈推薦!來自銳捷官方網站

一、計算機網絡基礎教程 說明&#xff1a;每個教程的時間大約為6分鐘&#xff0c;以問題為導向&#xff0c;以項目為驅動。1、第一章 IPV4地址介紹 http://www.ruijie.com.cn/fw/zxpx/4092、第二章 TCP/IP協議簇介紹 http://www.ruijie.com.cn/fw/zxpx/4103、第三章 ARP協議工作…

楊冪掐點祝福唐嫣,打破不和傳言,情感營銷還能這么玩?

發現今天的蜂蜜泡水特別地甜&#xff0c;舍友說&#xff0c;同樣地蜂蜜同樣多的水泡出來的水有什么不一樣&#xff0c;肯定是你心情變好了。說得好像也有道理&#xff0c;想想最近這么多甜蜜的事&#xff0c;一開始是穎寶結婚&#xff0c;不久唐嫣和羅晉也宣布結婚&#xff0c;…

RTP/RTCP協議介紹

1流媒體協議 當前在Internet上傳輸音頻和視頻等信息主要有兩種方式&#xff1a;下載和流式傳輸。 下載情況下&#xff0c;用戶需要先下載整個媒體文件到本地&#xff0c;然后才能播放媒體文件。流式傳輸是指傳輸之前首先對多媒體進行預處理(降低質量和高效壓縮)&#xff0c;然后…

推薦一款軟件(作業)

在過去&#xff0c;每當我遇見不認識的英文單詞時我的解決方法是:查閱英漢詞典&#xff0c;后來在我擁有手機之后&#xff0c;我的解決方法是&#xff1a;上網百度&#xff0c;而現在我的解決方法是&#xff1a;“有道翻譯官”。是的&#xff0c;我要介紹的這款軟件便是“有道翻…

網易有道最新力作 有道詞典筆3 結構拆解

2020年12月1日&#xff0c;有道品牌推出了一款硬件新品&#xff0c;名叫有道詞典筆3。 網易有道于2019年8月推出可以“一掃查詞”的有道詞典筆2代&#xff0c;搭載了OCR&#xff08;光學字符識別&#xff09;技術的產品&#xff0c;大大改變了傳統的學習方式&#xff0c;查詞效…

DataGridView動態添加新行的兩種方法

簡單介紹如何為DataGridView控件動態添加新行的兩種方 法&#xff1a; 方法一&#xff1a; int indexthis.dataGridView1.Rows.Add();this.dataGridView1.Rows[index].Cells[0].Value "1"; this.dataGridView1.Rows[index].Cells[1].Value "2"; this.dat…

使用glew和glad 新建窗口

一、添加頭文件 首先&#xff0c;將頭文件加到項目的.cpp文件中 1 #include <glad/glad.h> 2 #include <GLFW/glfw3.h> 注&#xff1a; 包含glad的頭文件一定要在包含glfw的頭文件之前使用。因為glad的頭文件包含了正確的openGL頭文件&#xff08;例如GL/gl.h&…

有道詞典筆3新增功能掃讀和點讀是怎么集成的?

2020年12月1日&#xff0c;有道品牌推出了一款硬件新品&#xff0c;名叫有道詞典筆3。 相對有道于2019年8月推出后來被稱為“爆品”的有道詞典筆2來說&#xff0c;有道3硬件最大最明顯差別是屏幕變的更大了&#xff0c;同時增加了點讀功能&#xff08;點讀筆點讀特定教材的功能…

??RTP協議分析

RTP協議分析 一&#xff0e; RTP協議背景.......................................................................................................... 1 二&#xff0e; RTP協議原理及工作機制........................................................................…

mongodb 部署

安裝mongodb-3.4 1&#xff09;將安裝包上傳至服務器 2&#xff09;對壓縮文件進行解壓 tar -zxvf mongodb-linux-x86_64-suse12-v3.4-latest.tar.gz 3&#xff09;把解壓出來的文件修改一下名字&#xff0c;并挪到指定安裝路徑 sudo mv mongodb-linux-x86_64-suse12-3.4.6-22-…

如何選擇一款優秀的兒童讀寫臺燈?

如何選擇一款優秀的兒童閱讀臺燈&#xff1f;除了品牌、外觀、材質、價格等因素外&#xff0c;最關鍵的是技術參數。 先說結論&#xff0c;滿足如下幾點參數&#xff0c;當數優選&#xff1a; 1-光通量&#xff1a;500lm以上 2-顯色指數&#xff1a;≥95 3-色溫&#xff1a…

Python與操作系統有關的模塊

Os模塊Python的標準庫中的os模塊主要涉及普遍的操作系統功能。可以在Linux和Windows下運行&#xff0c;與平臺無關。os.sep 可以取代操作系統特定的路徑分割符。os.name字符串指示你正在使用的平臺。比如對于Windows&#xff0c;它是’nt’&#xff0c;而對于Linux/Unix用戶&am…