使用pyflink進行kafka實時數據消費

目錄

背景

代碼demo

?踩坑記錄

1、kafka連接器,kafka客戶端jar包找不到

2、java模塊系統訪問限制

3、執行demo任務,一直報錯連接kafka topic超時

總結?


背景

? ? ? ? 實際項目中經常遇到source是kafka,需要實時消費kafka某個topic中的數據,在此寫個demo學習了解。

? ? ? ? 環境:本地windows10,python版本3.8.5,pyflink版本1.14.2,kafka版本2.12_2.0.1

代碼demo

? ? ? ? 代碼很簡單,在本地啟動一個flink任務,消費對端一個kafka的topic,從中取出數據并進行打印,代碼如下:

import os
os.environ["_JAVA_OPTIONS"] = "--add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED"from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema# 設置執行環境
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 設置并行度
env.add_jars("file:///F:/learn/flinkdemo/flink-connector-kafka_2.12-1.14.2.jar","file:///F:/learn/flinkdemo/kafka-clients-3.6.1.jar")# Kafka 消費者配置
kafka_topic = 'zhubao'
kafka_bootstrap_servers = 'xxx:9092'  # Kafka 服務器地址和端口
consumer = FlinkKafkaConsumer(kafka_topic, SimpleStringSchema(), properties={'bootstrap.servers': kafka_bootstrap_servers,'group.id': 'zhubao-group','auto.offset.reset': 'earliest','session.timeout.ms': '120000','request.timeout.ms': '120000','max.poll.interval.ms': '600000'
})# 將 Kafka 數據源添加到執行環境中
data_stream = env.add_source(consumer)# 數據處理邏輯,例如打印數據
data_stream.print()# 啟動執行環境
env.execute("Flink Kafka Consumer Example")

使用kafka客戶端生產一些數據,在本地則能實時消費到數據?

在本地終端執行上述flink demo任務,看到相關消費結果?

?踩坑記錄

1、kafka連接器,kafka客戶端jar包找不到

? ? ? ? 上述demo代碼中,開頭需要在程序運行環境中引入兩個jar包,分別是kafka連接器和kafka客戶端

env.add_jars("file:///F:/learn/flinkdemo/flink-connector-kafka_2.12-1.14.2.jar","file:///F:/learn/flinkdemo/kafka-clients-3.6.1.jar")

這兩個包默認在本地是沒有的,需要進行額外的下載,下載地址:

Flink Kafka連接器?:

  • 下載地址:https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-kafka_2.12/

?Kafka客戶端?:

  • 下載地址:https://repo.maven.apache.org/maven2/org/apache/kafka/kafka-clients/

注意選擇對應的版本,我的本地環境是pyflink1.14.2,因此下載的jiar包是對應版本,客戶端是3.6.1版本

此外,在add_jars上,需要使用file:///方式,并且最好是絕對路徑,否則會報jar包找不到等類似錯誤

2、java模塊系統訪問限制

py4j.protocol.Py4JJavaError: An error occurred while calling o10.addSource.
: java.lang.reflect.InaccessibleObjectException: Unable to make field private static final long java.util.Properties.serialVersionUID accessible: module java.base does not "opens java.util" to unnamed module @32cf48b7

? ? ? ? 該報錯是因為在windows環境Java模塊系統(JPMS)的訪問限制導致,需通過JVM參數解除模塊封裝限制。

? ? ? ? 解決方法:在程序啟動開頭,增加JVM啟動參數,注意格式,如下:

import os
os.environ["_JAVA_OPTIONS"] = "--add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED"

3、執行demo任務,一直報錯連接kafka topic超時

Traceback (most recent call last):File ".\kafkademo.py", line 31, in <module>env.execute("Flink Kafka Consumer Example")File "F:\learn\flinkdemo\venv\lib\site-packages\pyflink\datastream\stream_execution_environment.py", line 691, in executereturn JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))File "F:\learn\flinkdemo\venv\lib\site-packages\py4j\java_gateway.py", line 1285, in __call__return_value = get_return_value(File "F:\learn\flinkdemo\venv\lib\site-packages\pyflink\util\exceptions.py", line 146, in decoreturn f(*a, **kw)File "F:\learn\flinkdemo\venv\lib\site-packages\py4j\protocol.py", line 326, in get_return_valueraise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed....at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
...
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)at akka.actor.Actor.aroundReceive(Actor.scala:537)at akka.actor.Actor.aroundReceive$(Actor.scala:535)at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)at akka.actor.ActorCell.invoke(ActorCell.scala:548)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)at akka.dispatch.Mailbox.run(Mailbox.scala:231)at akka.dispatch.Mailbox.exec(Mailbox.scala:243)... 5 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition zhubao-0 could be determined

? ? ? ? 報錯信息很長,重點關注最后的

Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition zhubao-0 could be determined

????????解決方法:嘗試對程序增加各種延長超時時間,但都沒有啥效果,最后排查尋找發現,是由于本地hosts未配置kafka的域名映射,需要在本地hosts里增加對應映射關系

添加后,問題得以解決

總結?

? ? ? ? 該demo很簡單,完成在windows環境上使用pyflink進行連接kafka并進行數據消費,主要在windows上并且使用pyflink做開發,遇到一些奇奇怪怪的問題,在此做個記錄,方便后續查閱。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/87123.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/87123.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/87123.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

軟件測試理論框架與發展:分類、原則與質量保障策略

第一章 一、計算機軟件的發展分類 早期軟件開發的特點&#xff1a; 軟件規模小、復雜程度低、開發過程不規范 測試的情況&#xff1a; 測試等同于調試 目的糾正軟件的已經知道的故障 投入少&#xff0c;介入晚 成為一種發現軟件的活動&#xff08;1957&#xff09; 測試不等于…

未知威脅攻擊原理和架構

大家讀完覺得有幫助記得關注和點贊&#xff01;&#xff01;&#xff01; 未知威脅&#xff08;Unknown Threats&#xff09;指利用零日漏洞、合法工具濫用、高級逃逸技術等**繞過傳統特征檢測**的攻擊&#xff0c;其核心在于**動態對抗防御體系的認知盲區**。以下從攻擊原理、…

基于Netty-WebSocket構建高性能實時通信服務

引言&#xff1a;WebSocket在現代應用中的重要性 在當今實時交互應用盛行的時代&#xff0c;WebSocket協議已成為實現雙向通信的核心技術。相比傳統的HTTP輪詢&#xff0c;WebSocket提供了&#xff1a; 真正的全雙工通信極低的延遲&#xff08;毫秒級&#xff09;高效的連接管…

咸蝦米項目總結1--const用法

在 UniApp&#xff08;或 Vue 3&#xff09;中&#xff0c;聲明一個空對象可使用下面這2種寫法&#xff1a; // 寫法1 const a ref(null);// 寫法2 const a ref({}); 在UniApp中&#xff0c;const a ref()用法概述&#xff1a; 用途&#xff1a; 創建一個響應式引用&#x…

在mac下手動編譯遷移的android版webrtc組件

我原先使用的android版webrtc是在linux下編譯的&#xff0c;現在因為某些原因需要把整個庫遷移到mac下編譯。 把代碼遷移完后&#xff0c;正常是需要通過gclient sync 重新構建編譯環境&#xff0c;但是由于網絡限制等方面原因&#xff0c;會導致完成的比較慢。 在摸索一陣后…

Linux 命令:mkdir

Linux mkdir 命令詳細教程 一、mkdir 命令的基本功能 mkdir&#xff08;Make Directory&#xff09;是 Linux 系統中用于創建新目錄&#xff08;文件夾&#xff09;的基礎命令。它支持一次性創建單個或多個目錄&#xff0c;以及遞歸創建多層目錄結構&#xff0c;是文件系統操…

Django 數據遷移全解析:makemigrations migrate 常見錯誤與解決方案

1. 遷移機制與底層原理 在 Django 中&#xff0c;ORM&#xff08;Object-Relational Mapping&#xff09;是連接模型&#xff08;Model&#xff09;和數據庫結構的橋梁。Django 鼓勵開發者通過編寫 Python 類&#xff08;模型&#xff09;來定義業務數據結構&#xff0c;而不是…

SuperGlue:使用圖神經網絡學習特征匹配

摘要 本文提出了 SuperGlue&#xff0c;一種神經網絡&#xff0c;用于通過聯合尋找對應關系并排除不可匹配點來匹配兩組局部特征。匹配結果通過求解一個可微的最優傳輸問題來估計&#xff0c;該問題的代價由一個圖神經網絡預測。我們引入了一種基于注意力的靈活上下文聚合機制…

ssh -T git@github.com失敗后解決方案

這個錯誤表示你的 SSH 連接無法到達 GitHub 服務器。以下是詳細解決方案&#xff0c;按照優先級排序&#xff1a; 首選解決方案&#xff1a;使用 SSH over HTTPS&#xff08;端口 443&#xff09; 這是最有效的解決方案&#xff0c;因為許多網絡會阻止 22 端口&#xff1a; …

從蘋果事件看 ARM PC市場的未來走向

最近&#xff0c;蘋果宣布部分搭載 Intel 處理器的 Mac 不再支持最新的 macOS 系統更新&#xff0c;這一消息猶如一顆石子投入平靜湖面&#xff0c;激起層層漣漪。它不僅讓 Intel 芯片在 Mac 產品線上徹底成為歷史&#xff0c;也促使我們重新審視 PC 行業的發展脈絡&#xff0c…

vue + element ui 實現超出寬度展示..,鼠標移入顯示完整內容

vue element ui 實現超出寬度展示…&#xff0c;鼠標移入顯示完整內容 代碼理念&#xff1a; 當高度大于對應行數的高度 則說明需要展示"…" 子組件 <template><div class"tooltip"><div ref"tooltipRef" :class"[tooltip…

HarmonyOSNext應用無響應全解析:從機制到實戰的卡死問題排查

HarmonyOSNext應用無響應全解析&#xff1a;從機制到實戰的卡死問題排查 ##Harmony OS Next ##Ark Ts ##教育 本文適用于教育科普行業進行學習&#xff0c;有錯誤之處請指出我會修改。 喂喂喂&#xff01;應用卡成PPT了&#xff1f;點啥都沒反應&#xff1f;別慌&#xff01…

git 遷移之獲取原庫所有分支

以下是一個安全的 Bash 腳本&#xff0c;用于將遠程 Git 倉庫的所有分支檢出到本地&#xff08;自動跳過已存在的分支&#xff09;&#xff1a; #!/bin/bash# 獲取所有遠程分支&#xff08;排除 HEAD&#xff09; remote_branches$(git branch -r | grep -v HEAD\|->)# 循環…

設計模式 | 適配器模式

適配器模式&#xff08;Adapter Pattern&#xff09; 是結構型設計模式中的連接器大師&#xff0c;它允許不兼容接口的類能夠協同工作。本文將深入探索適配器模式的核心思想、實現技巧以及在C中的高效實踐&#xff0c;解決現實開發中的接口兼容性問題。 為什么需要適配器模式 …

RTL 級機器人電機控制器的 FPGA 設計

借助Verilog&#xff0c;在FPGA中實現了帶編碼器的兩臺電機的電機控制系統的RTL級設計。 介紹 借助硬件描述語言 (HDL) Verilog 和 AMD Vivado 設計套件&#xff0c;在 AMD Spartan-7 FPGA 中實現帶編碼器的兩個電機的控制器系統的 RTL 設計。 在這個項目中&#xff0c;使用了搭…

4_Flink CEP

Flink CEP 1、何為CEP&#xff1f; CEP&#xff0c;全稱為復雜事件處理&#xff08;Complex Event Processing&#xff09;&#xff0c;是一種用于實時監測和分析數據流的技術。 CEP詳細講解&#xff1a; CEP是基于動態環境的事件流的分析技術&#xff0c;事件是狀態變化&am…

容器基礎知識2-K8s 和 Docker 的關系與管理邏輯詳解

K8s 和 Docker 的關系與管理邏輯詳解 一、先搞懂&#xff1a;Docker 和 K8s 分別是做什么的&#xff1f; Docker&#xff08;容器工具&#xff09;&#xff1a;好比「集裝箱工廠」&#xff0c;負責把應用和依賴打包成標準化容器&#xff08;類似集裝箱&#xff09;&#xff0…

QT MaintenanceTool 登錄無法找到 QtAccount 憑據

親測有效&#xff1a;QT6 Maintenance Tool 登錄問題_qt6 maintenancetool-CSDN博客 將ini這個配置文件移出文件夾后&#xff0c;在切換自己賬戶登錄即可

華為云Flexus+DeepSeek征文|利用華為云一鍵部署 Dify 平臺并接入 DeepSeek 大模型,構建長篇文章生成助手

目錄 前言 1 華為云一鍵部署 Dify 平臺 1.1 華為云 Dify 平臺介紹 1.2 部署過程介紹 1.3 登錄 Dify 平臺 2 接入華為云 ModelArts Studio 中的 DeepSeek 大模型 3 構建長篇文章生成助手 3.1 簡要介紹長篇文章生成助手 3.2 開始節點 3.3 生成標題和大綱&#xff08;LL…

js的一些基礎概念總結

1.變量聲明 首先js變量聲明有三種&#xff0c;var&#xff0c;const&#xff0c;let&#xff0c;這三種變量聲明中我們第一優先使用const&#xff0c;需要改變這個值的時候我們用ley&#xff0c;var是盡量不去使用。 那么我們現在來總結一下三種聲明變量的區別。首先是var let …