分布式的消息流平臺之Pulsar

Pulsar 流處理詳解

Apache Pulsar 是一個分布式的消息流平臺,集成了**消息隊列(MQ)流處理(Stream Processing)**能力。Pulsar 不僅提供低延遲、高吞吐的消息傳輸能力,還支持基于 Pulsar Functions、Flink、Spark Streaming 的流式處理能力。

本篇詳細介紹 Pulsar 的流處理能力,涵蓋 核心概念、流處理模式、編程模型、集成生態、應用場景 等方面。


1. Pulsar 流處理概述

(1)Pulsar 的流處理能力

Pulsar 主要通過以下方式實現流處理:

  • Pulsar Functions:輕量級流處理框架,適用于簡單的 ETL、數據轉換、事件處理等任務。
  • Flink & Spark Streaming 集成:Pulsar 提供 Flink 和 Spark Streaming 連接器,支持復雜流處理任務,如窗口計算、數據聚合、模式匹配等。
  • Pulsar IO:內置的 Source/Sink 連接器,支持數據流的輸入輸出,如 Kafka、Elasticsearch、JDBC、HDFS 等。

(2)Pulsar 流處理 VS 傳統流處理

特性Pulsar FunctionsFlink on PulsarKafka Streams
復雜度低(適合輕量任務)高(適合復雜任務)中等(偏向事件流處理)
集成性內置在 Pulsar 中需集成 Flink/Spark依賴 Kafka
擴展性高(自動擴展)高(分布式計算)中等(依賴 Kafka 集群)
窗口計算支持基本窗口計算強大,支持滾動、滑動、會話窗口支持窗口操作

2. Pulsar 流處理核心概念

(1)Pulsar Functions

Pulsar Functions 是一種輕量級計算框架,專為 Pulsar 設計,允許開發者編寫無狀態(Stateless)或有狀態(Stateful)的流處理邏輯,并直接運行在 Pulsar 集群中,而無需額外的計算框架(如 Flink 或 Spark)。

Pulsar Functions 關鍵特性
  • 輕量級:無需外部計算框架,適用于簡單任務。
  • 原生集成:與 Pulsar 主題(Topic)無縫對接,延遲低。
  • 內置管理:支持負載均衡、故障恢復。
  • 支持多種語言:可用 Java、Python、Go 編寫。
Pulsar Functions 編程模型

Pulsar Functions 的計算邏輯類似于 map-reduce,用戶編寫 Function(函數) 處理輸入數據,并將結果寫入另一個 Pulsar 主題。

示例:Java 版 Pulsar Function

public class MyFunction implements Function<String, String> {@Overridepublic String process(String input, Context context) {return input.toUpperCase(); // 處理邏輯:轉換為大寫}
}

注冊 Pulsar Function:

pulsar-admin functions create \--tenant public --namespace default \--name my-function \--inputs persistent://public/default/input-topic \--output persistent://public/default/output-topic \--classname MyFunction \--jar my-function.jar

(2)Pulsar IO

Pulsar IO 提供了開箱即用的 Source(數據源)和 Sink(數據輸出)連接器,允許 Pulsar 作為數據流的中心,連接各種外部存儲和計算系統。

常見 Source/Sink 連接器
類型連接器示例
數據庫MySQL、PostgreSQL、MongoDB
消息系統Kafka、RabbitMQ
存儲系統HDFS、S3、Elasticsearch
計算引擎Flink、Spark

示例:啟動一個 Kafka Source 連接器

pulsar-admin sources create \--name kafka-source \--tenant public --namespace default \--source-type kafka \--destination-topic-name persistent://public/default/kafka-topic \--source-config '{"bootstrapServers": "kafka-broker:9092","topic": "source-topic"}'

(3)Pulsar + Flink/Spark Streaming

Pulsar 也可作為 Flink / Spark Streaming 的流式數據源,支持復雜計算,如:

  • 窗口計算(Tumbling, Sliding, Session Window)
  • 聚合計算(sum, avg, count)
  • 狀態管理(Stateful Processing)
  • 事件模式檢測(CEP)

示例:Flink Pulsar 讀取流數據

PulsarSource<String> source = PulsarSource.builder().setServiceUrl("pulsar://localhost:6650").setTopics("persistent://public/default/input-topic").setDeserializationSchema(SimpleStringSchema.class).build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");stream.map(value -> value.toUpperCase()).print();env.execute();

3. Pulsar 流處理的運行模式

Pulsar Functions 支持三種運行模式:

運行模式說明
本地模式(LocalRun)在本地測試和運行 Functions
進程模式(Process)在 Pulsar Worker 進程中獨立運行
Kubernetes 模式(K8s)在 Kubernetes 集群中運行 Pulsar Functions

示例:在 Kubernetes 上運行 Pulsar Function

pulsar-admin functions create \--name my-k8s-function \--runtime JAVA \--inputs persistent://public/default/input-topic \--output persistent://public/default/output-topic \--parallelism 3 \--jar my-function.jar \--kubernetes-namespace pulsar

4. Pulsar 流處理應用場景

(1)實時數據流處理

  • 實時 ETL:流式數據清洗、轉換,存入數據湖或數據倉庫(Iceberg、Doris)。
  • 用戶行為分析:分析用戶操作日志,計算熱點數據。

(2)事件驅動架構(EDA)

  • 金融風控:實時監控交易流,檢測欺詐行為。
  • IoT 監控:處理物聯網傳感器數據,異常報警。

(3)數據同步 & 數據管道

  • CDC 數據同步:從 MySQL/PostgreSQL 讀取變更數據,實時寫入 Pulsar 供下游消費。
  • 消息系統橋接:Kafka → Pulsar → Flink,實現高效流數據處理。

5. 總結

Pulsar 提供強大的流處理能力,主要包括:

  1. Pulsar Functions(輕量級流處理)
  2. Pulsar IO(數據連接器)
  3. Flink / Spark Streaming(復雜流計算)
  4. 多種運行模式(Local、Process、K8s)

Pulsar 適用于高吞吐、低延遲的流式數據處理場景,可用于數據管道、事件驅動架構、實時分析等領域。

如果你的應用場景需要 流處理 + 消息隊列,Pulsar 是一個值得考慮的方案!🚀

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/73914.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/73914.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/73914.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【C++多線程】thread

C中的std::thread是C11引入的線程庫的一部分&#xff0c;提供了創建和管理線程的能力。它封裝了操作系統的線程接口&#xff0c;使得在C中更方便地進行多線程編程。 1. std::thread 的定義 std::thread 類位于<thread>頭文件中&#xff0c;定義在std命名空間下&#xff…

【css酷炫效果】純CSS實現故障文字特效

【css酷炫效果】純CSS實現故障文字特效 緣創作背景html結構css樣式完整代碼基礎版進階版(3D效果) 效果圖 想直接拿走的老板&#xff0c;鏈接放在這里&#xff1a;https://download.csdn.net/download/u011561335/90492053 緣 創作隨緣&#xff0c;不定時更新。 創作背景 剛…

uniapp配置代理解決跨域問題

兩種方式&#xff1a; 1、manifest.json中配置 "h5" : {"template" : "static/index.html","devServer" : {"port" : 9090,"https" : false,"proxy":{"/prod-api":{"target":&quo…

物聯網為什么用MQTT不用 HTTP 或 UDP?

先來兩個代碼對比&#xff0c;上傳溫度數據給服務器。 MQTT代碼示例 // MQTT 客戶端連接到 MQTT 服務器 mqttClient.connect("mqtt://broker.server.com:8883", clientId) // 訂閱特定主題 mqttClient.subscribe("sensor/data", qos1) // …

Flutter:頁面滾動,導航欄背景顏色過渡動畫

記錄&#xff1a;導航默認透明&#xff0c;頁面發生滾動后&#xff0c;導航背景色由0-1&#xff0c;過渡到白色背景。 view import package:ducafe_ui_core/ducafe_ui_core.dart; import package:flutter/material.dart; import package:get/get.dart; import package:redo…

STM32 —— MCU、MPU、ARM、FPGA、DSP

在嵌入式系統中&#xff0c;MCU、MPU、ARM、FPGA和DSP是核心組件&#xff0c;各自在架構、功能和應用場景上有顯著差異。以下從專業角度詳細解析這些概念&#xff1a; 一、 MCU&#xff08;Microcontroller Unit&#xff0c;微控制器單元&#xff09; 核心定義 集成系統芯片&a…

批量刪除 PPT 空白幻燈片頁面

如果我們需要刪除 PPT 文檔中的空白幻燈片頁面&#xff0c;我們可以借助 Office 工具來完成&#xff0c;但是如果是大量的 PPT 文檔需要批量刪除空白幻燈片頁面&#xff0c;那就需要使用專業的批量處理工具來完成&#xff0c;今天就給大家介紹一種批量刪除 PPT 空白幻燈片頁面的…

【canvas】一鍵自動布局:如何讓流程圖節點自動找到最佳位置

一鍵自動布局&#xff1a;如何讓流程圖節點自動找到最佳位置 引言 在流程圖、拓撲圖和系統架構圖設計中&#xff0c;節點布局往往是最令人頭疼的問題。如果手動調整每個節點位置&#xff0c;不僅耗時費力&#xff0c;還難以保證美觀性和一致性。本文將深入解析如何實現自動布…

【平臺優化】大數據集群一個客戶端參數引起的任務性能差的問題

大數據集群一個客戶端參數引起的任務性能差的問題 背景介紹排查過程任務慢的具體原因Executor中數據內存往磁盤溢寫結果數據寫入分區路徑 分析解決方案 結語&思考 背景介紹 隨著業務量不斷擴大&#xff0c;平臺逐步發展成HDFS多聯邦的架構&#xff0c;這個過程中&#xff…

【微信小程序變通實現DeepSeek支持語音】

微信小程序實現錄音轉文字&#xff0c;并調用后端服務&#xff08;Node.js&#xff09;進行語音識別和&#xff0c;然后調用DeepSeek 處理的完整實現。 整體架構 前端&#xff08;微信小程序&#xff09;&#xff1a; 實現錄音功能。將錄音文件上傳到后端。接收后端返回的語音…

uniapp常用組件

寫在前面 今天將uniapp中的組件都過了一遍&#xff0c;上手難度不大&#xff0c;但是還是遇到了一些問題&#xff1a; HBuilder實在是太難用&#xff0c;不管是插件生態還是設計之類的&#xff0c;總之就是用的哪哪不順手雖然打開內置瀏覽器是挺方便的&#xff0c;但是不知道…

【Linux】應用層自定義協議 + 序列化和反序列化

應用層自定義協議 序列化和反序列化 一.應用層1.再談 "協議"2.序列化 和 反序列化 二. Jsoncpp1.序列化2.反序列化 三. Tcp全雙工 面向字節流四.自定義協議 保證報文的完整性1.Makefile2.Mutex.hpp3.Cond.hpp4.Log.hpp5.Thread.hpp6.ThreadPool.hpp7.Common.hpp8.…

二.使用ffmpeg對原始音頻數據重采樣并進行AAC編碼

重采樣&#xff1a;將音頻三元組【采樣率 采樣格式 通道數】之中的任何一個或者多個值改變。 一.為什么要進行重采樣&#xff1f; 1.原始音頻數據和編碼器的數據格式不一致 2.播放器要求的和獲取的數據不一致 3.方便運算 二.本次編碼流程 1.了解自己本機麥克風參數&#x…

器材借用管理系統詳細設計基于Spring Boot-SSM

? 目錄 ?摘要 一、系統概述? ?二、系統架構設計? 2?.1技術選型? ?2.2系統架構? ?三、需求分析 3.1用戶需求分析 3.2功能模塊設計? 3.3、性能需求分析 3.4、安全需求分析 ?四、數據庫設計? ?五、安全性設計? ?六、系統測試與維護? ?七、總結?…

麒麟V10 arm cpu aarch64 下編譯 RocketMQ-Client-CPP 2.2.0

國產自主可控服務器需要訪問RocketMQ消息隊列&#xff0c;最新的CSDK是2020年發布的 rocketmq-client-cpp-2.2.0 這個版本支持TLS模式。 用默認的版本安裝遇到一些問題&#xff0c;記錄一下。 下載Releases apache/rocketmq-client-cpp GitHubhttps://github.com/apache/roc…

C語言每日一練——day_12(最后一天)

引言 針對初學者&#xff0c;每日練習幾個題&#xff0c;快速上手C語言。第十二天。&#xff08;最后一天&#xff0c;完結散花啦&#xff09; 采用在線OJ的形式 什么是在線OJ&#xff1f; 在線判題系統&#xff08;英語&#xff1a;Online Judge&#xff0c;縮寫OJ&#xff0…

網絡安全應急入門到實戰

奇安信&#xff1a;95015網絡安全應急響應分析報告&#xff08;2022-2024年&#xff09;官網可以下載 https://github.com/Bypass007/Emergency-Response-Notes 應急響應實戰筆記 網絡安全應急響應技術實戰指南 .pdf 常見場景 第4章 勒索病毒網絡安全應急響應 第5章 挖礦木…

jvm中每個類的Class對象是唯一的嗎

jvm中每個類的Class對象是唯一的嗎 在 Java 中&#xff0c;同一個類的 Class 對象在由同一個類加載器加載時是唯一的。析&#xff1a; 1. 同一類加載器的唯一性 規則&#xff1a;若一個類被同一個類加載器加載&#xff0c;無論創建多少實例&#xff0c;其 Class 對象始終唯一…

Visual Studio里的調試(debugging)功能介紹

參考 1- Introduction to Debugging | Basic Visual Studio Debugging&#xff08;這是一位印度博主視頻&#xff0c;我下面做到筆記也主要參考她的視頻&#xff0c;但不得不說口音太重了&#xff0c;一股咖喱味&#xff09; 目錄 個人對調試淺顯的認識和對調試的介紹逐行調…

NLP高頻面試題(六)——decoder-only、encoder-only和encoder-decoder的區別與聯系

一、基本概念與代表模型 1. Encoder-only 架構 Encoder-only 架構最具代表性的模型是 BERT。BERT 使用 masked language modeling&#xff08;MLM&#xff09;進行預訓練&#xff0c;即隨機遮蔽部分輸入詞匯&#xff0c;讓模型預測被遮蔽的詞匯。由于這種架構能夠同時看到輸入…