背景
隊列還是非常重要的中間件,可以幫助我們:提高處理效率、完成更復雜的處理流程
最初,我覺得只要掌握一種消息隊列就夠了,現在想想挺好笑的。
過去的探索
因為我用python,而rabbitmq比較貼合快速和復雜的數據處理,然后就選了這款。一開始我覺得不靠譜,服務老是斷了,后來發現,是因為ubuntu做服務器如果用wifi的話天然容易這樣。當時不明白為什么,所以棄用了rabbitmq一段時間。
后來再次使用時,功能倒是大體ok了,在早期版本好像直接支持延時消息啥的,當時都把插件裝上搞好了。后來又覺得rabbitmq的并發太小了,在大規模etl的時候比較吃不消。所以又棄用了。
再后來,我想搞一些簡單,使用一點的隊列,于是搞了redis stream。在某些方面的確也還挺好用,速度和吞吐都還比較大。但是是吃內存的,而且在后來也出過一次問題:在維持十幾個隊列進行大量流轉時,我發現有些隊列失效了,要刪除才能釋放內存。這樣的隱患看起來更大,所以又多少被打入冷宮。
兜兜轉轉又回到了kafka。這個是最早我比較排斥的,因為看起來比價麻煩,而且更偏向java。但搭起來的確很好用,吞吐大,也不需要內存,主要依賴硬盤。但是kafka搭起來稍微麻煩一點,組要zookeeper,而且對網絡和機器資源的要求很高。有一次,我不確定是不是kafka的問題,把我一臺機器的網絡整個帶崩了。當然大部分時候也是沒問題的。
后來也陸續看了一些隊列,但類型也就是上面三種了,差異不大。還有個zmq,感覺是更底層的隊列轉發,沒去管了。
一些對應的結論:
- 1 rabbitmq 可以在稍微小的場景用,功能可以比較全面,方便重試等
- 2 redis 把持久化去掉,不需要了,這樣萬一stream出問題重啟服務就行
- 3 kafka 適合保存大量的對話日志,有7天滾動刪除,需要用資源稍微高一點的機器運行
新的思考
最近碰到一些問題,又觸發了我關于這方面的思考。
問題1:一個同事抱怨請求的微服務失敗率過高,任務失敗后他的重試比較難搞?
問題2:機器人會有很多零散的數據需要向量化,而現有的向量化微服務是處理批量的,這樣導致了能力無法輸出?
問題3:大模型不斷出新的模型,以及現有的接口價格還是稍貴(雖然已經是業內最低),如何能確保替換?
對于問題1,后來我發現還是數據連接失活的問題,已經解決掉了。但如果是大模型接口不穩定導致的問題,應該如何解決呢? – RabbitMQ
由于調用大模型處理的需求一般都是比較昂貴且緩慢的,這意味著天然的并發就不會太高。RabbitMQ即使在消息體很大的情況下,應該也能做到2000左右的并發(這個后續我可以壓一下),那樣在并發處理上就夠了。
然后利用rabbitmq本身豐富的機制,比如死信隊列這種來完成重試。
這樣可以應對接口的不穩定調用情況,減少我們自己進行失敗的檢查和調度。
對于問題2,應該就是做一個微批次服務了。服務端用隊列接收請求,只有到一定批次時或者到指定輪詢時間(1s),服務才會處理隊列的數據,此時就可以發揮服務的批量處理效率了。這對于矩陣處理類的服務特別有效,使用redis stream這樣簡單的隊列來完成這種服務正好。 – Redis Stream。
對于問題3,那么就是一個廣播的過程。用kafka比較合適,一方面可以支持很大的吞吐,然后對于不同的消費者,這時應該是不同的模型都可以重復消費。每一個input,只存一次,在kafka,然后可以被重復消費,消費的結果進行實時比對。勝利的模型上臺,失敗的模型退位。
還有一個比較讓我本能抗拒的問題,但其實應該是可以的,后面我也要嘗試。
【實時隊列服務】服務只是一個消息入口,并不直接處理,而是發到kafka。然后由多個worker盯著kafka進行消費。
這種間接服務是有點不靠譜的,拋開入口服務不談,這里有kafka隊列和worker兩個不穩定因素。但如果可行的話,這樣反而是比較好的:
- 1 數據存在歷史(7天緩存),必要的時候可以追溯和回放
- 2 數據存到kafka,可以有更高的彈性處理能力,對那些延時要求不高的,比如允許timeout 30秒的任務來說肯定是可以的
worker處理完之后進行返回 ,可以采用webhook, websocket或者sse的方式將結果實時的返給請求。
【復雜ETL流轉】將數據的處理抽象為在若干個kafka之間進行流轉。
這樣最大的好處是可以讓不同人/流程之間的交互變的簡單,可能會稍微費點硬盤,但應該是值得的。