使用ray擴展python應用之流式處理應用

流式處理就是數據一來,咱們就得趕緊處理,不能攢批再算。這里的實時不是指瞬間完成,而是要在數據產生的那一刻,或者非常接近那個時間點,就做出響應。這種處理方式,我們稱之為流式處理。

流式處理的應用場景

流式處理到底能干啥?它應用場景非常廣泛。

  • 日志分析。應用每天產生海量日志,邊生產邊分析,一旦發現異常,比如某個服務崩潰了,或者有安全事件發生,立刻就能報警,快速定位問題根源,大大縮短故障恢復時間。

  • 金融交易,流式處理就能實時監控每一筆交易,結合用戶行為模式、地理位置、交易金額等多維度信息,通過規則引擎或者機器學習模型,秒級識別出異常交易。

  • 網絡安全。實時監控網絡流量、系統日志、用戶登錄行為等等。通過建立正常的安全基線,任何偏離這個基線的異常活動,比如大量未授權訪問嘗試、異常的數據包傳輸,都能被流式系統迅速捕捉到。

  • 物流行業。GPS信號、傳感器數據源源不斷地傳入系統,通過流式處理,可以實時計算最優路徑,避開擁堵路段,動態調整配送計劃。這不僅提高了效率,還能降低油耗和運營成本。

  • 物聯網IoT。無數的傳感器設備,比如工廠里的機器、城市里的路燈、農田里的土壤濕度監測器,它們都在不停地產生數據。

  • 推薦引擎。每一次點擊、瀏覽、搜索,都被實時記錄下來,形成你的行為數據流。推薦系統實時分析這些數據,結合協同過濾、深度學習等算法,不斷更新你的興趣畫像,然后給你推送最相關的商品或內容。

Ray如何實現流式處理

了解了流式應用的重要性,我們來看看如何在 Ray 中實現它們。目前主要有兩種方式:

  1. 利用 Ray 提供的強大底層組件,比如 Actors、Task 并行、共享內存等,自己動手構建一套定制化的流式處理框架。這種方式靈活性高,但開發量也相對較大。

  2. 將 Ray 與現有的成熟流式引擎集成,比如 Apache Flink,通常會借助 Kafka 這樣的消息中間件來連接數據源和處理邏輯。

Ray 的定位不是要做一個獨立的、功能全面的流式系統,而是提供一個強大的計算平臺,讓開發者可以更方便地構建自己的流式應用。既然提到了集成,那為什么 Kafka 成為了流式應用中最受歡迎的消息中間件之一呢?Kafka 能夠以驚人的吞吐量處理海量數據流,同時保證數據的持久化存儲,這意味著你可以隨時回溯歷史數據進行分析。而且,Kafka 的水平擴展性非常好,可以通過增加 Broker 節點輕松應對數據量的增長。更重要的是,圍繞 Kafka 已經形成了一個非常成熟的生態系統,各種工具和庫層出不窮。

kafka和ray集成

這里只關注那些kafka與 Ray 集成時最相關的特性。很多人把 Kafka 當作消息隊列,比如 RabbitMQ,但其實它本質上是一個分布式日志系統

在這里插入圖片描述

它不像傳統的隊列那樣,消息發出去就沒了,Kafka 把每一條消息都當作一個記錄,按順序追加寫入到日志文件中。每條記錄可以包含 Key 和 Value,當然兩者都是可選的。生產者總是往日志的末尾寫入新消息。而消費者呢,它可以選擇從哪個位置開始讀取,這個位置叫做 Offset。這意味著,消費者可以讀取任意歷史消息,也可以只讀最新的消息。

這種基于日志的設計,帶來了幾個關鍵區別。

  • 消息的生命周期。傳統隊列里的消息,一旦被消費者成功消費,通常就從隊列里刪除了,是臨時的。而 Kafka 的消息是持久化的,會一直保存在磁盤上,直到達到配置的保留策略。這使得 Kafka 支持消息回溯。

  • 消費者管理。在隊列系統里,通常是 Broker 來管理消費者的 Offset,告訴消費者下次該從哪里讀。但在 Kafka 里,Offset 是由消費者自己負責管理的。Kafka 可以支持大量的消費者同時讀取同一個 Topic,因為每個消費者只需要記錄自己的 Offset 即可,互不干擾。

Kafka 也像消息隊列一樣,用 Topic 來組織數據。但 Kafka 的 Topic 是一個純粹的邏輯概念,它下面實際上是由多個 Partition 組成的。你可以把 Partition 理解為 Topic 的物理分片。為什么要這樣做?主要是為了實現水平擴展和并行處理。每個 Partition 內部的數據是有序的,但不同 Partition 之間的數據是無序的。生產者寫入數據時,會根據一定的策略選擇寫入哪個 Partition。那么,生產者是怎么決定把消息寫到哪個 Partition 的呢?主要有兩種情況。

  • 如果你沒有指定 Key,Kafka 默認會采用輪詢的方式,均勻地把消息分配到不同的 Partition。這樣可以保證負載均衡。
  • 你給消息指定一個 Key,比如用戶的 ID 或者訂單號。Kafka 默認會使用 Key 的 Hash 值來決定寫入哪個 Partition。這樣做的好處是,同一個 Key 的所有消息,都會被寫入同一個 Partition,保證了該 Key 下消息的順序性。
  • 如果有特殊需求,也可以實現自定義的 Partitioning 策略。

記住,Partition 內部消息是有序的,跨 Partition 的消息是無序的。有了 Partition,怎么讓消費者高效地讀取呢?這就引出了 Consumer Group 的概念。你可以把多個消費者組成一個組,讓它們共同消費同一個 Topic 的消息。Kafka 會把這個 Topic 的所有 Partition 分配給這個 Consumer Group 里的消費者。

在這里插入圖片描述

比如,一個 Topic 有 10 個 Partition,你在一個 Group 里放了 5 個消費者,那么 Kafka 會把每個消費者分配到 2 個 Partition。這樣,每個消費者就可以并行地從自己的 Partition 里讀取消息,大大提高了整體的消費速度。所以,想提升消費能力,要么增加消費者數量,要么增加 Partition 數量。Kafka 提供了豐富的 API 來支持各種操作。主要有五大類:

  • Producer API 用來發送消息;
  • Consumer API 用來讀取消息;
  • AdminClient API 用來管理 Topic、Broker 等元數據;
  • Streams API 提供了更高級的流處理能力,可以直接在 Kafka 上做轉換;
  • Connect API 則是用來連接 Kafka 和外部系統的,比如數據庫、搜索引擎等。

Kafka 本身只關心字節數組,所以我們需要把實際的數據結構序列化成字節數組才能發送,這個過程叫做 Marshaling。常用的格式有很多,比如 Avro、Protobuf、JSON、甚至是 Python 的 Pickle。選擇哪種格式取決于你的具體需求,比如性能、消息大小、是否需要 Schema 定義、擴展性以及語言兼容性。另外要注意一點,Kafka 本身不保證消息的唯一性,也就是說,可能會出現重復消息。所以,確保消息只被處理一次的責任落在了消費者身上,通常需要消費者自己記錄 Offset 并提交。

示例代碼

現在我們把 Kafka 和 Ray 結合起來。為什么用 Ray Actors 來封裝 Kafka 的 Consumer 和 Producer 呢?

  • 對于 Kafka Consumer,它通常需要在一個無限循環里運行,不斷拉取消息,并且需要記住自己已經讀到哪里了,也就是維護 Offset。這正好符合 Ray Actor 的特點:一個 Actor 就是一個獨立的狀態服務。所以,把 Kafka Consumer 實現為一個 Ray Actor,非常自然。
  • 對于 Producer,雖然它本身不需要維護狀態,但把它放在一個 Actor 里,我們可以方便地異步調用 produce 方法,向任何 Kafka Topic 發送消息,而無需為每個 Topic 創建一個獨立的 Producer 實例,簡化了管理。

這是一個簡單的 Kafka Producer Actor 的實現。

@ray.remote
class KafkaProducer:def __init__(self, server: str = 'localhost:9092'):from confluent_kafka import Producerconf = {'bootstrap.servers': server}self.producer = Producer(**conf)def produce(self, data: dict, key: str = None, topic: str = 'test'):def delivery_callback(err, msg):if err:print(f'Message failed delivery: {err}')else:print(f'Message delivered to topic {msg.topic()} partition 'f'{msg.partition()} offset {msg.offset()}')binary_key = Noneif key is not None:binary_key = key.encode('UTF8')self.producer.produce(topic=topic, value=json.dumps(data).encode('UTF8'),key=binary_key, callback=delivery_callback)self.producer.poll(0)def destroy(self):self.producer.flush(30)

它使用了 confluent_kafka 庫,這是 Python 中常用的 Kafka 客戶端。

  • 在 init 方法里,我們根據 broker 地址初始化一個 Kafka Producer 對象。produce 方法就是我們用來發送消息的接口,它接收數據、可選的 key 和 topic 名稱。內部,它會調用 Kafka Producer 的 produce 方法,這里我們用了 json.dumps 把 Python 字典序列化成 JSON 字符串,再 encode 成字節。
  • delivery_callback 是一個回調函數,用來處理消息發送成功或失敗的情況。
  • destroy 方法在 Actor 銷毀前調用,確保所有待發送的消息都被 flush 出去。

這是 Kafka Consumer Actor 的實現。

@ray.remote
class KafkaConsumer:def __init__(self, callback, group: str = 'ray', server: str = 'localhost:9092',topic: str = 'test', restart: str = 'latest'):from confluent_kafka import Consumerfrom uuid import uuid4# Configurationconsumer_conf = {'bootstrap.servers': server,   # bootstrap server'group.id': group,                      # group ID'session.timeout.ms': 6000,            # session tmout'auto.offset.reset': restart}          # restart# Create Consumer instanceself.consumer = Consumer(consumer_conf)self.topic = topicself.callback = callbackself.id = str(uuid4())def start(self):self.run = Truedef print_assignment(consumer, partitions):print(f'Consumer: {self.id}')print(f'Assignment: {partitions}')# Subscribe to topicsself.consumer.subscribe([self.topic], on_assign = print_assignment)while self.run:msg = self.consumer.poll(timeout=1.0)if msg is None:continueif msg.error():print(f'Consumer error: {msg.error()}')continueelse:# Proper messageself.callback(self.id, msg)def stop(self):self.run = Falsedef destroy(self):self.consumer.close()

同樣使用了 confluent_kafka 庫。

  • init 方法里,除了 broker 地址,還需要配置 group.id、session.timeout.ms、auto.offset.reset 等參數。group.id 是 Consumer Group 的標識,auto.offset.reset 決定了消費者啟動時沒有 Offset 或者 Offset 不存在時的行為,比如 latest 表示從最新的消息開始讀。

  • start 方法啟動了一個無限循環,使用 consumer.poll 拉取消息。如果收到消息,就調用傳入的 callback 函數進行處理。

  • stop 方法通過設置 run 為 False 來停止循環。

  • destroy 方法則關閉 Kafka Consumer 連接。

測試函數

def print_message(consumer_id: str, msg):print(f"Consumer {consumer_id} new message: topic={msg.topic()}  "f"partition= {msg.partition()}  offset={msg.offset()} "f"key={msg.key().decode('UTF8')}")print(json.loads(msg.value().decode('UTF8')))# Start Ray
ray.init()# Start consumers and producers
n_ = 5     # Number of consumers
consumers = [KafkaConsumer.remote(print_message) for _ in range(n_consumers)]
producer = KafkaProducer.remote()
refs = [c.start.remote() for c in consumers]# publish messages
user_name = 'john'
user_favorite_color = 'blue'try:while True:user = {'name': user_name,'favorite_color': user_favorite_color,'favorite_number': randint(0, 1000)}producer.produce.remote(user, str(randint(0, 100)))sleep(1)# end gracefully
except KeyboardInterrupt:for c in consumers:c.stop.remote()
finally:for c in consumers:c.destroy.remote()producer.destroy.remote()ray.kill(producer)

額外的閱讀材料

  • https://www.anyscale.com/blog/serverless-kafka-stream-processing-with-ray

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/81923.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/81923.shtml
英文地址,請注明出處:http://en.pswp.cn/web/81923.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

火狐安裝自動錄制表單教程——仙盟自動化運營大衍靈機——仙盟創夢IDE

打開火狐插件頁面 安裝完成 使用 功能 錄制瀏覽器操作 錄入地址 開始操作 錄制完成 在當今快速發展的軟件開發生態中,自動化測試已從一種新興技術手段,轉變為保障軟件質量與開發效率不可或缺的關鍵環節。其重要性體現在多個維度,同時&#x…

小程序 - 視圖與邏輯

個人簡介 ??????個人主頁: 魔術師 ??學習方向: 主攻前端方向,正逐漸往全棧發展 ??個人狀態: 研發工程師,現效力于政務服務網事業 ????人生格言: “心有多大,舞臺就有多大。” ??推薦學習: ??Vue2 ??Vue3 ??Vue2/3項目實戰 ??Node.js實戰 ??T…

【LLM應用開發】上下文記憶的解決方案(主流全面)

一、前言 上下文記憶(Contextual Memory)解決方案的作用: 提升 AI(尤其是大語言模型,LLM)的對話連貫性和個性化。 本文將介紹幾個主流的實現方式。 二、🧠 什么是上下文記憶? 在對…

C/C++ 面試復習筆記(2)

C語言如何實現快速排序算法? 答案:快排是一種分治算法,選擇一個基準元素,將數據劃分成兩部分,然后遞歸排序 補充: void quick_sort(int arr[], int start, int end) {//判斷是否需要排序if (start > …

2025吉林CCPC 題解(前六題)

// Problem: J - Odd-Even Game // Contest: Virtual Judge - sdccpc20250527 // URL: https://vjudge.net/contest/719585#problem/J // Memory Limit: 1024 MB // Time Limit: 1000 ms // 簽到題 // Powered by CP Editor (https://cpeditor.org)#include <bits/std…

Q: dify知識庫模塊主要庫表和字段

【回到目錄】~~~~【回到問題集】 Q: dify知識庫模塊主要庫表和字段 A: 表1&#xff1a;datasets 知識庫表 name 知識庫名稱 index_struct 向量索引node 表2&#xff1a;document 文檔表 name 文檔名稱 word_count 字數 doc_form 分段類型(hierarchical_model、qa_model、te…

NodeMediaEdge快速上手

NodeMediaEdge快速上手 簡介 NodeMediaEdge是一款部署在監控攝像機網絡前端中&#xff0c;拉取Onvif或者rtsp/rtmp/http視頻流并使用rtmp/kmp推送到公網流媒體服務器的工具。 通過云平臺協議注冊到NodeMediaServer后&#xff0c;可以同NodeMediaServer結合使用。使用圖形化的…

通用前端框架項目靜態部署到Hugging Face Space的實踐指南

背景介紹 在輕量級展示前端項目的場景中,Hugging Face Space 提供了一個便捷的靜態托管平臺。需求是將無后端服務的Vite的 Vue項目部署到Hugging Face Space 上。其實無論是基于Vite的Vue/React項目,還是使用Webpack構建的工程化方案,都可以通過兩種方式將其部署到Space:自…

Android studio 查看aar源碼出現/* compiled code */

如圖查看aar源碼時看不到具體實現&#xff0c;在排除是sdk版本導致的問題后&#xff0c;下面說解決方法 打開設置&#xff0c;找到插件 輸入decompiler 搜索 這個是自帶的反編譯工具&#xff0c;啟用就好了

Spark實時流數據處理實例(SparkStreaming通話記錄消息處理)

所用資源&#xff1a; 通過網盤分享的文件&#xff1a;spark-streaming-kafka-0-8-assembly_2.11-2.4.8.jar等4個文件 鏈接: https://pan.baidu.com/s/1zYHu29tLgDvS_L2Ud-22ZA?pwdhnpg 提取碼: hnpg 1.需求分析 &#xff1a; 假定有一個手機通信計費系統&#xff0c;用戶通…

Vue3處理number輸入框避免NaN

在 Vue3 中處理 number 類型輸入框避免顯示 NaN&#xff0c;核心在于正確處理用戶輸入的非數字值。以下是幾種解決方案&#xff1a; 方案1&#xff1a;使用字符串中轉 計算屬性&#xff08;推薦&#xff09; vue 復制 下載 <template><input v-model"input…

Python自動化之selenium語句——瀏覽器設置顯示尺寸、截圖、刷新網頁

目錄 一、瀏覽器設置最大化、最小化 1.瀏覽器最大化 2.瀏覽器最小化 二、瀏覽器打開的位置、尺寸 1.瀏覽器打開位置 2.瀏覽器打開尺寸 三、瀏覽器截圖 1.截圖語句 2.運行成功后查看 四、刷新網頁 上一節實現了打開瀏覽器、打開指定網址、關閉瀏覽器的操作&#xff0c…

model.classifier:分類頭

model.classifier:分類頭 分類頭(model.classifier)含義 在基于Transformer架構的模型(如BERT、GPT等 )用于分類任務時,“分類頭(model.classifier)” 是模型的一個重要組成部分。以Hugging Face的Transformers庫為例,許多預訓練模型在完成通用的預訓練任務(如語言…

4.1.2 操作數據集

在本實戰中&#xff0c;我們深入學習了Spark SQL的操作數據集&#xff0c;包括了解Spark會話、準備數據文件、啟動Spark Shell以及獲取和操作學生數據集。通過Spark Shell&#xff0c;我們可以直接使用SparkSession實例來加載、轉換和處理數據。我們學習了如何將文本文件加載為…

LangChain整合Milvus向量數據庫實戰:數據新增與刪除操作

導讀&#xff1a;在AI應用開發中&#xff0c;向量數據庫已成為處理大規模語義搜索和相似性匹配的核心組件。本文通過詳實的代碼示例&#xff0c;深入探討LangChain框架與Milvus向量數據庫的集成實踐&#xff0c;為開發者提供生產級別的向量數據管理解決方案。 文章聚焦于向量數…

從根源解決Augment免費額度限制問題:Windows詳細教程

從根源解決Augment免費額度限制問題&#xff1a;Windows詳細教程 本文將詳細介紹如何在Windows系統上解決Augment AI助手的"Too many free trials"限制問題&#xff0c;通過清理VS Code緩存和修改設備ID實現無限制使用Augment的方法。 視頻地址 augment從根源上解決免…

IoTDB 集成 DBeaver,簡易操作實現時序數據清晰管理

數據結構一目了然&#xff0c;跨庫分析輕松實現&#xff0c;方便 IoTDB “內部構造”管理&#xff01; 隨著物聯網場景對時序數據處理需求激增&#xff0c;時序數據庫與數據庫管理工具的集成尤為關鍵。作為數據資產的 “智能管家”&#xff0c;借助數據庫管理工具的可視化操作界…

應用層協議http(無代碼版)

目錄 認識URL urlencode 和 urldecode HTTP 協議請求與響應格式 HTTP 的請求方法 GET 方法 POST 方法 HTTP 的狀態碼 HTTP 常見 Header Location 關于 connection 報頭 HTTP版本 遠程連接服務器工具 setsockopt 我們來學習應用層協議http。 雖然我們說, 應用層協…

Cangjie 中的值類型與引用類型

1. 值類型和引用類型 1.1 值的存儲方式 所有變量在底層實現中&#xff0c;都會關聯一個具體的“值”&#xff0c;這個值可能存儲在 內存地址 或 寄存器 中。 寄存器用于優化常用變量的訪問速度。只有局部、小、頻繁使用的變量才更可能被分配到寄存器中。實際行為由編譯器根據…

使用el-input數字校驗,輸入漢字之后校驗取消不掉

先說說復現方式 本來input是只能輸入數字的&#xff0c;然后你不小心輸入了漢字&#xff0c;觸發校驗了&#xff0c;然后這時候&#xff0c;你發現校驗取消不掉了 就這樣了 咋辦啊&#xff0c;你一看校驗沒錯啊&#xff0c;各種number啥的也寫了,發現沒問題啊 <el-inputv…