openstack內部rpc消息通信源碼分析

我們知道openstack內部消息隊列基于AMQP協議,默認使用的rabbitmq 消息隊列。談到rabbitmq,大家或許并不陌生,但或許會對oslo message有些陌生。openstack內部并不是直接使用rabbitmq,而是使用了oslo.message 。oslo.message 后端的driver支持rabbitmq,kafka,zeromq等消息隊列(目前只有rabbitmq能用于openstack) 。在 oslo message中封裝了OpenStack各組件內部進行消息通信的方法,并將方法中所使用的數據結構封裝為通用的類,以達到使用簡單快捷、擴展性強的目的。

下面是rabbitmq 支持的模式和場景,簡單回顧一下,包括簡單模式,工作隊列模式,訂閱發布模式,路由模式,topics模式,RPC模式。

d7dbbd8eb405330cc3c3c58b2f102645.png23d05c3eaff976a778dc8377992f7866.png

官網有詳細的使用說明, 具體rabbitmq 的使用可以參考 https://www.rabbitmq.com/tutorials/tutorial-one-python

前面提到了openstack內部消息通信其實是使用的oslo.message庫,關于oslo message 主要提供倆種主要功能:

遠程過程調用 RPC:一個服務進程可以調用其他遠程服務進程的方法。調用的方式:

rpc.call():遠程方法會被同步執行,調用者會被阻塞直到返回方法的結果,在一些調用時間較長的場合中使用會對效率有很大的影響。

rpc.cast():遠程服務的方法會被異步執行,調用者不會被阻塞,結果也無須立即返回,因為是異步,所以也要求調用者利用其他的方法來查詢這次遠程調用的結果。

事件通知:某一個服務進程將事件通知發送到消息總線上,所有在消息總線上且對該事件通知感興趣的服務進程都可以將該事件通知獲取并進行處理,執行的結果并不需要返回給事件發送者。這種方式不僅可以在項目組件內部的進程服務通信間實現,還可以在項目之間的通信中實現比如計量計費等。

Oslo.message中的幾個重要概念

  • server:rpc 服務端,包含一個或多個端點(Endpoint),每個端點包含一組遠程調用的方法,這組方法可以被客戶端通過transport對象遠程調用。創建Server對象時,需要指定Transport、Target和一組endpoint。

  • client:rpc 客戶端, 負責調用服務端提供的RPC接口。

  • exchange:rabbitmq中的概念,一種交換實現,負責把消息交換到相對應隊列上。

  • namespace:服務器端可以在一個主體上暴露多組方法,每組方法屬于一個命名空間。

  • method:方法,方法由一個名字和相關參數組成。

  • transport:顧名思義:運輸工具,就是運輸載體,一個傳送RPC 請求到服務器端并將響應返回給客戶端的底層消息系統。目前主要使用的transport有rabbitmq和qpid。

URL格式:Transport://user:password@hotst1:port[,host:port]/virtual_host

  • API version:每個命名空間都有一個版本號,當命名空間的接口變化時,這個版本號也會響應增加。向前兼容的修改只需要更改小版本號,向前不兼容的更改需要更改大版本號。

  • Target:目的地,指定某一個消息最終目的地的所有信息。Target中封裝了所有將要用到的信息,以確定應該將消息發送到何處或服務器正在偵聽什么信息。

下面講解一下組件cinder 組件內部的 rpc 通信, 從在rpc client和 rpc server端從代碼看具體實現,

RPC Client

當cinder-api 收到創建volume 云硬盤時,cinder-scheduler 調度完資源filter出合適的backend之后,在cinder.scheduler.rpcapi 代碼中,rpc client 發出創建volume 的rpc 請求 , 代碼是下面這樣的:

def create_volume(self, ctxt, volume, snapshot_id=None, image_id=None, ? ? ? ? ? ? ? ?  request_spec=None, filter_properties=None, ? ? ? ? ? ? ? ?  backup_id=None): ?  volume.create_worker() ?  cctxt = self._get_cctxt() ?  msg_args = {'snapshot_id': snapshot_id, 'image_id': image_id, ? ? ? ? ? ? ?  'request_spec': request_spec, ? ? ? ? ? ? ?  'filter_properties': filter_properties, ? ? ? ? ? ? ?  'volume': volume, 'backup_id': backup_id} ?  if not self.client.can_send_version('3.10'): ? ? ?  msg_args.pop('backup_id') ?  return cctxt.cast(ctxt, 'create_volume', **msg_args)

由前面所提到的,cast和call分別對應異步和同步請求。當調用cast或者call時,通過oslo.message庫序列化消息體,通過 調用transport._send 發送到哪個target,transport 會調用對應driver 比如 AMQPDriverBase.send方法。從連接池中獲取到 rabbitmq connection 連接,根據消息類型,選擇通過topic exchange還是fanout exchange等模式 , 調用 kombu(類似于pika,但是支持重連策略以及連接池功能等)發送到對應的消息隊列中。

try:with self._get_connection(rpc_common.PURPOSE_SEND, retry) as conn:if notify:exchange = self._get_exchange(target)LOG.debug(log_msg + "NOTIFY exchange '%(exchange)s'"" topic '%(topic)s'", {'exchange': exchange,'topic': target.topic})conn.notify_send(exchange, target.topic, msg, retry=retry)elif target.fanout:log_msg += "FANOUT topic '%(topic)s'" % {'topic': target.topic}LOG.debug(log_msg)conn.fanout_send(target.topic, msg, retry=retry)else:topic = target.topicexchange = self._get_exchange(target)if target.server:topic = '%s.%s' % (target.topic, target.server)LOG.debug(log_msg + "exchange '%(exchange)s'"" topic '%(topic)s'", {'exchange': exchange,'topic': topic})conn.topic_send(exchange_name=exchange, topic=topic,msg=msg, timeout=timeout, retry=retry,transport_options=transport_options)

那么send 完發送到隊列中后,服務端怎么就能執行到對應的方法呢,我們看下rpc server端的實現

RPC Server

以cinder-volume 為例,在cinder-volume 服務啟動時,會先初始化rpc 再啟動rpc server,其實每個服務都是這樣。

通過在cinder.service.Service.start 函數中,調用 messaging.get_rpc_server 構造rpc_server對象,調用rpc_server對象start方法啟動。

def18ff49fae31b290786323ccda38b3.jpeg

每個組件通過service start時 ,會啟動相關的rpc 服務。

if not rpc.initialized():rpc.init(CONF)endpoints = [self.manager]
endpoints.extend(self.manager.additional_endpoints)serializer = objects_base.CinderObjectSerializer(obj_version_cap)target = messaging.Target(topic=self.topic, server=self.host)
self.rpcserver = rpc.get_server(target, endpoints, serializer)
self.rpcserver.start()if self.topic == constants.VOLUME_TOPIC:target = messaging.Target(topic='%(topic)s.%(host)s' % {'topic': self.topic,'host': self.host},server=vol_utils.extract_host(self.host, 'host'))self.backend_rpcserver = rpc.get_server(target, endpoints,serializer)self.backend_rpcserver.start()

由上面可以看到在構造 messaging.get_rpc_server 實例時 ,傳入TRANSPORT,target,endpoints, json serializer,其中TRANSPORT 傳的rabbitmq,target 傳入的是包含當前服務 topic和本機名稱的target對象,endpoints這里傳的就是VolumeManager對象,serializer傳入的是json serializer,最終通過構造了RPCServer實例,self.rpcserver.start() 啟動會 啟動 max_workers 大小的eventlet 協程池,創建 listener , 并不斷處理incoming的message。RPCServer. processincoming 中取到message后,確認消息,并dispatch消息到對應的endpint上

class RPCServer(msg_server.MessageHandlingServer):def _process_incoming(self, incoming):try:res = self.dispatcher.dispatch(message)...........
def _do_dispatch(self, endpoint, method, ctxt, args):ctxt = self.serializer.deserialize_context(ctxt)new_args = dict()for argname, arg in args.items():new_args[argname] = self.serializer.deserialize_entity(ctxt, arg)func = getattr(endpoint, method)result = func(ctxt, **new_args)return self.serializer.serialize_entity(ctxt, result)

oslo.message 在接收到dispatch的message后,解析message中的method,args,namespace,version等,并遍歷endpoints,如果endpoint含有對應method,則反射執行,并最終反序列化返回結果

總體來說:相比較其他消息隊列,比如kafka,redis,pulsar,rocketMQ ,rabbitmq算是功能比較豐富的消息隊列了,openstack社區實現的 oslo message 完美的基于rabbitmq 很好的實現了一套內部組件的消息通信功能。可能這種消息通信方式用的比較少,但是代碼實現上來說很有深度,尤其在較大型項目中,很有借鑒價值。


推薦閱讀:

高性能版云聯網實現原理

虛擬機磁盤熱切換實現方案

VPC場景虛機熱遷網絡無感

更多技術和產品文章,請關注👆

如果您對哪個產品感興趣,歡迎留言給我們,我們會定向邀文~

360智匯云是以"匯聚數據價值,助力智能未來"為目標的企業應用開放服務平臺,融合360豐富的產品、技術力量,為客戶提供平臺服務。
目前,智匯云提供數據庫、中間件、存儲、大數據、人工智能、計算、網絡、視聯物聯與通信等多種產品服務以及一站式解決方案,助力客戶降本增效,累計服務業務1000+。
智匯云致力于為各行各業的業務及應用提供強有力的產品、技術服務,幫助企業和業務實現更大的商業價值。
官網:https://zyun.360.cn 或搜索“360智匯云”
客服電話:4000052360

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/62164.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/62164.shtml
英文地址,請注明出處:http://en.pswp.cn/web/62164.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Python 3 和 MongoDB 的集成使用

Python 3 和 MongoDB 的集成使用 MongoDB 是一個流行的 NoSQL 數據庫,以其靈活的數據模型和強大的查詢功能而聞名。Python 3 作為一種廣泛使用的編程語言,與 MongoDB 的集成變得日益重要。本文將介紹如何在 Python 3 環境中集成和使用 MongoDB&#xff…

Postman自定義腳本Pre-request-script以及Test

這兩個都是我們進行自定義script腳本的地方,分別是在請求執行的前后運行。 我們舉兩個可能經常運用到的場景。 (一)請求A先執行,請求B使用請求A響應結果作為參數。如果我們不用自定義腳本,可能得先執行請求A,然后手動復制響應結果…

構建高效OTA旅游平臺的技術指南

1. 引言 在信息技術高速發展的今天,互聯網深刻地改變了人們的旅行方式。傳統的旅行社模式逐漸被在線旅游平臺所取代,OTA(Online Travel Agency,在線旅行社)旅游平臺應運而生,成為人們獲取旅游信息、預訂旅…

總結的一些MySql面試題

目錄 一:基礎篇 二:索引原理和SQL優化 三:事務原理 四:緩存策略 一:基礎篇 1:定義:按照數據結構來組織、存儲和管理數據的倉庫;是一個長期存儲在計算機內的、有組織的、可共享 的…

116. UE5 GAS RPG 實現擊殺掉落戰利品功能

這一篇,我們實現敵人被擊敗后,掉落戰利品的功能。首先,我們將創建一個新的結構體,用于定義掉落體的內容,方便我們設置掉落物。然后,我們實現敵人死亡時的掉落函數,并在藍圖里實現對應的邏輯&…

Excel技巧:如何批量調整excel表格中的圖片?

插入到excel表格中的圖片大小不一,如何做到每張圖片都完美的與單元格大小相同?并且能夠根據單元格來改變大小?今天分享,excel表格里的圖片如何批量調整大小。 方法如下: 點擊表格中的一個圖片,然后按住Ct…

智能合約

06-智能合約 0 啥是智能合約? 定義 智能合約,又稱加密合約,在一定條件下可直接控制數字貨幣或資產在各方之間轉移的一種計算機程序。 角色 區塊鏈網絡可視為一個分布式存儲服務,因為它存儲了所有交易和智能合約的狀態 智能合約還…

智慧油客:從初識、再識OceanBase,到全棧上線

今天,我們邀請了智慧油客的研發總監黃普友,為我們講述智慧油客與 OceanBase 初識、熟悉和結緣的故事。 智慧油客自2016年誕生以來,秉持新零售的思維,成功從過去二十年間以“以銷售產品為中心”的傳統思維模式,轉向“以…

【深度學習】手機SIM卡托缺陷檢測【附鏈接】

一、手機SIM卡托用途 SIM卡托是用于固定和保護SIM卡的部件,通過連接SIM卡與手機主板的方式,允許設備訪問移動網絡,用戶可以通過SIM卡進行通話、發送短信和使用數據服務。 二、手機SIM卡托不良影響 SIM卡接觸不良,造成信號中斷&…

高新技術企業復審需要哪些材料?

高新技術企業復審需要準備以下材料: 《高新技術企業認定復審申請書》;高新技術企業證書;企業營業執照副本、稅務登記證書(復印件);企業職工人數、學歷結構以及研發人員占企業職工的比例證明;五…

消防物證管理系統|DW-S404實現消防物證智能化管理

一、系統概述 智慧消防物證管理系統DW-S404系統旨在借助現代信息技術,達成消防物證管理的高效化、安全化及智能化管理目標。該系統運用物聯網、大數據、云計算等先進技術,實現對消防物證從產生到銷毀的全生命周期跟蹤與監控,從而增強物證管理…

Odoo :一款免費且開源的食品生鮮領域ERP管理系統

文 / 貝思納斯 Odoo金牌合作伙伴 引言 提供業財人資稅的精益化管理,實現研產供銷的融通、食品安全的追蹤與溯源,達成渠道的扁平化以及直面消費者的 D2C 等數字化解決方案,以此提升運營效率與核心競爭力,支撐高質量的變速擴張。…

如何部署vue項目到Github Pages

1.創建vue項目 npm create vitelatest my-vue-app -- --template vue 2.創建github倉庫 3.連接倉庫 在項目根目錄右鍵選擇open git base here,如果沒有安裝git請先安裝git。 初始化倉庫 $ git init $ git add . $ git commit -m "init"將項目與倉庫連…

Dubbo應用篇

文章目錄 一、Dubbo簡介二、SSM項目整合Dubbo1.生產者方配置2.消費者方配置 三、Spring Boot 項目整合Dubbo1.生產者方配置2.消費者方配置 四、應用案例五、Dubbo配置的優先級別1. 方法級配置(Highest Priority)2. 接口級配置3. 消費者/提供者級配置4. 全…

ubuntu的matlab使用心得

1.讀取視頻 v VideoReader(2222.mp4);出問題,報錯: matlab 錯誤使用 VideoReader/initReader (第 734 行) 由于出現意外錯誤而無法讀取文件。原因: Unable to initialize the video properties 出錯 audiovideo.internal.IVideoReader (第 136 行) init…

消息中間件-Kafka1-實現原理

消息中間件-Kafka 一、kafka簡介 1、概念 Kafka是最初由Linkedin公司開發,是一個分布式、支持分區(partition)、多副本的(replica),基于zookeeper協調的分布式消息系統,它的最大的特性就是可以…

如何利用“一鍵生成ppt”減輕工作壓力

隨著數字化的快速發展,PPT設計這一傳統任務也迎來了新的變化。過去,制作一個簡潔、專業的PPT需要花費大量時間與精力。但現在借助科技的力量,一鍵生成PPT的夢想成真了。從智能生成ppt到ai生成ppt的技術不斷進步,令我們能夠體驗到更…

創造未來:The Sandbox 創作者訓練營如何賦能全球創造者

創作者訓練營讓創造者有能力打造下一代數字體驗。通過促進合作和提供尖端工具,The Sandbox 計劃確保今天的元宇宙是由一個個創造者共同打造。 2024 年 5 月,The Sandbox 推出了「創作者訓練營」系列,旨在重新定義數字創作。「創作者訓練營」系…

Docker多架構鏡像構建踩坑記

背景 公司為了做信創項目的亮點,需要將現有的一套在X86上運行的應用系統遷移到ARM服務器上運行,整個項目通過后端Java,前端VUEJS開發通過CICD做成Docker鏡像在K8S里面運行。但是當前的CICD產品不支持ARM的鏡像構建,于是只能手工構…

python學opencv|讀取圖像(三)放大和縮小圖像

【1】引言 前序已經學習了常規的圖像讀取操作和圖像保存技巧,相關文章鏈接為: python學opencv|讀取圖像-CSDN博客 python學opencv|讀取圖像(二)保存彩色圖像-CSDN博客 今天我們更近一步,學習放大和縮小圖像的技巧&…