目錄
背景
代碼demo
?踩坑記錄
1、kafka連接器,kafka客戶端jar包找不到
2、java模塊系統訪問限制
3、執行demo任務,一直報錯連接kafka topic超時
總結?
背景
? ? ? ? 實際項目中經常遇到source是kafka,需要實時消費kafka某個topic中的數據,在此寫個demo學習了解。
? ? ? ? 環境:本地windows10,python版本3.8.5,pyflink版本1.14.2,kafka版本2.12_2.0.1
代碼demo
? ? ? ? 代碼很簡單,在本地啟動一個flink任務,消費對端一個kafka的topic,從中取出數據并進行打印,代碼如下:
import os
os.environ["_JAVA_OPTIONS"] = "--add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED"from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema# 設置執行環境
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1) # 設置并行度
env.add_jars("file:///F:/learn/flinkdemo/flink-connector-kafka_2.12-1.14.2.jar","file:///F:/learn/flinkdemo/kafka-clients-3.6.1.jar")# Kafka 消費者配置
kafka_topic = 'zhubao'
kafka_bootstrap_servers = 'xxx:9092' # Kafka 服務器地址和端口
consumer = FlinkKafkaConsumer(kafka_topic, SimpleStringSchema(), properties={'bootstrap.servers': kafka_bootstrap_servers,'group.id': 'zhubao-group','auto.offset.reset': 'earliest','session.timeout.ms': '120000','request.timeout.ms': '120000','max.poll.interval.ms': '600000'
})# 將 Kafka 數據源添加到執行環境中
data_stream = env.add_source(consumer)# 數據處理邏輯,例如打印數據
data_stream.print()# 啟動執行環境
env.execute("Flink Kafka Consumer Example")
使用kafka客戶端生產一些數據,在本地則能實時消費到數據?
在本地終端執行上述flink demo任務,看到相關消費結果?
?踩坑記錄
1、kafka連接器,kafka客戶端jar包找不到
? ? ? ? 上述demo代碼中,開頭需要在程序運行環境中引入兩個jar包,分別是kafka連接器和kafka客戶端
env.add_jars("file:///F:/learn/flinkdemo/flink-connector-kafka_2.12-1.14.2.jar","file:///F:/learn/flinkdemo/kafka-clients-3.6.1.jar")
這兩個包默認在本地是沒有的,需要進行額外的下載,下載地址:
Flink Kafka連接器?:
- 下載地址:
https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-kafka_2.12/
?Kafka客戶端?:
- 下載地址:
https://repo.maven.apache.org/maven2/org/apache/kafka/kafka-clients/
注意選擇對應的版本,我的本地環境是pyflink1.14.2,因此下載的jiar包是對應版本,客戶端是3.6.1版本
此外,在add_jars上,需要使用file:///方式,并且最好是絕對路徑,否則會報jar包找不到等類似錯誤
2、java模塊系統訪問限制
py4j.protocol.Py4JJavaError: An error occurred while calling o10.addSource.
: java.lang.reflect.InaccessibleObjectException: Unable to make field private static final long java.util.Properties.serialVersionUID accessible: module java.base does not "opens java.util" to unnamed module @32cf48b7
? ? ? ? 該報錯是因為在windows環境Java模塊系統(JPMS)的訪問限制導致,需通過JVM參數解除模塊封裝限制。
? ? ? ? 解決方法:在程序啟動開頭,增加JVM啟動參數,注意格式,如下:
import os
os.environ["_JAVA_OPTIONS"] = "--add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED"
3、執行demo任務,一直報錯連接kafka topic超時
Traceback (most recent call last):File ".\kafkademo.py", line 31, in <module>env.execute("Flink Kafka Consumer Example")File "F:\learn\flinkdemo\venv\lib\site-packages\pyflink\datastream\stream_execution_environment.py", line 691, in executereturn JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))File "F:\learn\flinkdemo\venv\lib\site-packages\py4j\java_gateway.py", line 1285, in __call__return_value = get_return_value(File "F:\learn\flinkdemo\venv\lib\site-packages\pyflink\util\exceptions.py", line 146, in decoreturn f(*a, **kw)File "F:\learn\flinkdemo\venv\lib\site-packages\py4j\protocol.py", line 326, in get_return_valueraise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed....at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
...
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)at akka.actor.Actor.aroundReceive(Actor.scala:537)at akka.actor.Actor.aroundReceive$(Actor.scala:535)at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)at akka.actor.ActorCell.invoke(ActorCell.scala:548)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)at akka.dispatch.Mailbox.run(Mailbox.scala:231)at akka.dispatch.Mailbox.exec(Mailbox.scala:243)... 5 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition zhubao-0 could be determined
? ? ? ? 報錯信息很長,重點關注最后的
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition zhubao-0 could be determined
????????解決方法:嘗試對程序增加各種延長超時時間,但都沒有啥效果,最后排查尋找發現,是由于本地hosts未配置kafka的域名映射,需要在本地hosts里增加對應映射關系
添加后,問題得以解決
總結?
? ? ? ? 該demo很簡單,完成在windows環境上使用pyflink進行連接kafka并進行數據消費,主要在windows上并且使用pyflink做開發,遇到一些奇奇怪怪的問題,在此做個記錄,方便后續查閱。