我們知道openstack內部消息隊列基于AMQP協議,默認使用的rabbitmq 消息隊列。談到rabbitmq,大家或許并不陌生,但或許會對oslo message有些陌生。openstack內部并不是直接使用rabbitmq,而是使用了oslo.message 。oslo.message 后端的driver支持rabbitmq,kafka,zeromq等消息隊列(目前只有rabbitmq能用于openstack) 。在 oslo message中封裝了OpenStack各組件內部進行消息通信的方法,并將方法中所使用的數據結構封裝為通用的類,以達到使用簡單快捷、擴展性強的目的。
下面是rabbitmq 支持的模式和場景,簡單回顧一下,包括簡單模式,工作隊列模式,訂閱發布模式,路由模式,topics模式,RPC模式。
官網有詳細的使用說明, 具體rabbitmq 的使用可以參考 https://www.rabbitmq.com/tutorials/tutorial-one-python
前面提到了openstack內部消息通信其實是使用的oslo.message庫,關于oslo message 主要提供倆種主要功能:
遠程過程調用 RPC:一個服務進程可以調用其他遠程服務進程的方法。調用的方式:
rpc.call():遠程方法會被同步執行,調用者會被阻塞直到返回方法的結果,在一些調用時間較長的場合中使用會對效率有很大的影響。
rpc.cast():遠程服務的方法會被異步執行,調用者不會被阻塞,結果也無須立即返回,因為是異步,所以也要求調用者利用其他的方法來查詢這次遠程調用的結果。
事件通知:某一個服務進程將事件通知發送到消息總線上,所有在消息總線上且對該事件通知感興趣的服務進程都可以將該事件通知獲取并進行處理,執行的結果并不需要返回給事件發送者。這種方式不僅可以在項目組件內部的進程服務通信間實現,還可以在項目之間的通信中實現比如計量計費等。
Oslo.message中的幾個重要概念:
server:rpc 服務端,包含一個或多個端點(Endpoint),每個端點包含一組遠程調用的方法,這組方法可以被客戶端通過transport對象遠程調用。創建Server對象時,需要指定Transport、Target和一組endpoint。
client:rpc 客戶端, 負責調用服務端提供的RPC接口。
exchange:rabbitmq中的概念,一種交換實現,負責把消息交換到相對應隊列上。
namespace:服務器端可以在一個主體上暴露多組方法,每組方法屬于一個命名空間。
method:方法,方法由一個名字和相關參數組成。
transport:顧名思義:運輸工具,就是運輸載體,一個傳送RPC 請求到服務器端并將響應返回給客戶端的底層消息系統。目前主要使用的transport有rabbitmq和qpid。
URL格式:Transport://user:password@hotst1:port[,host:port]/virtual_host
API version:每個命名空間都有一個版本號,當命名空間的接口變化時,這個版本號也會響應增加。向前兼容的修改只需要更改小版本號,向前不兼容的更改需要更改大版本號。
Target:目的地,指定某一個消息最終目的地的所有信息。Target中封裝了所有將要用到的信息,以確定應該將消息發送到何處或服務器正在偵聽什么信息。
下面講解一下組件cinder 組件內部的 rpc 通信, 從在rpc client和 rpc server端從代碼看具體實現,
RPC Client
當cinder-api 收到創建volume 云硬盤時,cinder-scheduler 調度完資源filter出合適的backend之后,在cinder.scheduler.rpcapi 代碼中,rpc client 發出創建volume 的rpc 請求 , 代碼是下面這樣的:
def create_volume(self, ctxt, volume, snapshot_id=None, image_id=None, ? ? ? ? ? ? ? ? request_spec=None, filter_properties=None, ? ? ? ? ? ? ? ? backup_id=None): ? volume.create_worker() ? cctxt = self._get_cctxt() ? msg_args = {'snapshot_id': snapshot_id, 'image_id': image_id, ? ? ? ? ? ? ? 'request_spec': request_spec, ? ? ? ? ? ? ? 'filter_properties': filter_properties, ? ? ? ? ? ? ? 'volume': volume, 'backup_id': backup_id} ? if not self.client.can_send_version('3.10'): ? ? ? msg_args.pop('backup_id') ? return cctxt.cast(ctxt, 'create_volume', **msg_args)
由前面所提到的,cast和call分別對應異步和同步請求。當調用cast或者call時,通過oslo.message庫序列化消息體,通過 調用transport._send 發送到哪個target,transport 會調用對應driver 比如 AMQPDriverBase.send方法。從連接池中獲取到 rabbitmq connection 連接,根據消息類型,選擇通過topic exchange還是fanout exchange等模式 , 調用 kombu(類似于pika,但是支持重連策略以及連接池功能等)發送到對應的消息隊列中。
try:with self._get_connection(rpc_common.PURPOSE_SEND, retry) as conn:if notify:exchange = self._get_exchange(target)LOG.debug(log_msg + "NOTIFY exchange '%(exchange)s'"" topic '%(topic)s'", {'exchange': exchange,'topic': target.topic})conn.notify_send(exchange, target.topic, msg, retry=retry)elif target.fanout:log_msg += "FANOUT topic '%(topic)s'" % {'topic': target.topic}LOG.debug(log_msg)conn.fanout_send(target.topic, msg, retry=retry)else:topic = target.topicexchange = self._get_exchange(target)if target.server:topic = '%s.%s' % (target.topic, target.server)LOG.debug(log_msg + "exchange '%(exchange)s'"" topic '%(topic)s'", {'exchange': exchange,'topic': topic})conn.topic_send(exchange_name=exchange, topic=topic,msg=msg, timeout=timeout, retry=retry,transport_options=transport_options)
那么send 完發送到隊列中后,服務端怎么就能執行到對應的方法呢,我們看下rpc server端的實現
RPC Server
以cinder-volume 為例,在cinder-volume 服務啟動時,會先初始化rpc 再啟動rpc server,其實每個服務都是這樣。
通過在cinder.service.Service.start 函數中,調用 messaging.get_rpc_server 構造rpc_server對象,調用rpc_server對象start方法啟動。
每個組件通過service start時 ,會啟動相關的rpc 服務。
if not rpc.initialized():rpc.init(CONF)endpoints = [self.manager]
endpoints.extend(self.manager.additional_endpoints)serializer = objects_base.CinderObjectSerializer(obj_version_cap)target = messaging.Target(topic=self.topic, server=self.host)
self.rpcserver = rpc.get_server(target, endpoints, serializer)
self.rpcserver.start()if self.topic == constants.VOLUME_TOPIC:target = messaging.Target(topic='%(topic)s.%(host)s' % {'topic': self.topic,'host': self.host},server=vol_utils.extract_host(self.host, 'host'))self.backend_rpcserver = rpc.get_server(target, endpoints,serializer)self.backend_rpcserver.start()
由上面可以看到在構造 messaging.get_rpc_server 實例時 ,傳入TRANSPORT,target,endpoints, json serializer,其中TRANSPORT 傳的rabbitmq,target 傳入的是包含當前服務 topic和本機名稱的target對象,endpoints這里傳的就是VolumeManager對象,serializer傳入的是json serializer,最終通過構造了RPCServer實例,self.rpcserver.start() 啟動會 啟動 max_workers 大小的eventlet 協程池,創建 listener , 并不斷處理incoming的message。RPCServer. processincoming 中取到message后,確認消息,并dispatch消息到對應的endpint上
class RPCServer(msg_server.MessageHandlingServer):def _process_incoming(self, incoming):try:res = self.dispatcher.dispatch(message)...........
def _do_dispatch(self, endpoint, method, ctxt, args):ctxt = self.serializer.deserialize_context(ctxt)new_args = dict()for argname, arg in args.items():new_args[argname] = self.serializer.deserialize_entity(ctxt, arg)func = getattr(endpoint, method)result = func(ctxt, **new_args)return self.serializer.serialize_entity(ctxt, result)
oslo.message 在接收到dispatch的message后,解析message中的method,args,namespace,version等,并遍歷endpoints,如果endpoint含有對應method,則反射執行,并最終反序列化返回結果
總體來說:相比較其他消息隊列,比如kafka,redis,pulsar,rocketMQ ,rabbitmq算是功能比較豐富的消息隊列了,openstack社區實現的 oslo message 完美的基于rabbitmq 很好的實現了一套內部組件的消息通信功能。可能這種消息通信方式用的比較少,但是代碼實現上來說很有深度,尤其在較大型項目中,很有借鑒價值。
推薦閱讀:
高性能版云聯網實現原理
虛擬機磁盤熱切換實現方案
VPC場景虛機熱遷網絡無感
更多技術和產品文章,請關注👆
如果您對哪個產品感興趣,歡迎留言給我們,我們會定向邀文~
360智匯云是以"匯聚數據價值,助力智能未來"為目標的企業應用開放服務平臺,融合360豐富的產品、技術力量,為客戶提供平臺服務。
目前,智匯云提供數據庫、中間件、存儲、大數據、人工智能、計算、網絡、視聯物聯與通信等多種產品服務以及一站式解決方案,助力客戶降本增效,累計服務業務1000+。
智匯云致力于為各行各業的業務及應用提供強有力的產品、技術服務,幫助企業和業務實現更大的商業價值。
官網:https://zyun.360.cn 或搜索“360智匯云”
客服電話:4000052360