Java版Flink使用指南——從RabbitMQ中隊列中接入消息流

大綱

  • 創建RabbitMQ隊列
  • 新建工程
    • 新增依賴
    • 編碼
      • 設置數據源配置
      • 讀取、處理數據
      • 完整代碼
    • 打包、上傳和運行任務
    • 測試
  • 工程代碼

在《Java版Flink使用指南——安裝Flink和使用IntelliJ制作任務包》一文中,我們完成了第一個小型Demo的編寫。例子中的數據是代碼預先指定的。而現實中,數據往往來源于外部。本文我們將嘗試Flink從RabbitMQ中讀取數據,然后輸出到日志中。
關于RabbitMQ的知識可以參閱《RabbitMQ實踐》。

創建RabbitMQ隊列

我們創建一個Classic隊列data.from.rbtmq。注意要選擇Durable類型,這是后續用的默認連接器的限制。
具體方法見《RabbitMQ實踐——在管理后臺測試消息收發功能》。
在這里插入圖片描述

后續我們將在后臺通過默認交換器,給這個隊列新增消息。

新建工程

我們在IntelliJ中新建一個工程DataFromRabbitMQ。
Archetype填入:org.apache.flink:flink-quickstart-java。
版本填入與Flink的版本:1.19.1
在這里插入圖片描述

新增依賴

在pom.xml中新增RabbitMQ連接器

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version>
</dependency>

編碼

設置數據源配置

String queueName = "data.from.rbtmq";
String host = "172.21.112.140"; // IP of the rabbitmq server
int port = 5672;
String username = "admin";
String password = "fangliang";
String virtualHost = "/";
int parallelism = 1;// create a RabbitMQ source
RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSource<String> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SimpleStringSchema());

讀取、處理數據

下面代碼通過addSource添加RabbitMQ數據源。注意,不能使用fromSource方法,是因為RMQSource沒有實現SourceFunction方法。

final DataStream<String> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);stream.print().name(username + "'s data from " + queueName);

完整代碼

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.example;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;/*** Skeleton for a Flink DataStream Job.** <p>For a tutorial how to write a Flink application, check the* tutorials and examples on the <a href="https://flink.apache.org">Flink Website</a>.** <p>To package your application into a JAR file for execution, run* 'mvn clean package' on the command line.** <p>If you change the name of the main class (with the public static void main(String[] args))* method, change the respective entry in the POM.xml file (simply search for 'mainClass').*/
public class DataStreamJob {public static void main(String[] args) throws Exception {// Sets up the execution environment, which is the main entry point// to building Flink applications.final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String queueName = "data.from.rbtmq";String host = "172.21.112.140"; // IP of the rabbitmq serverint port = 5672;String username = "admin";String password = "fangliang";String virtualHost = "/";int parallelism = 1;// create a RabbitMQ sourceRMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSource<String> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SimpleStringSchema());final DataStream<String> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);stream.print().name(username + "'s data from " + queueName);env.execute("Flink Java API Skeleton");}
}

打包、上傳和運行任務

在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述

測試

在RabbitMQ后臺的默認交換器中,發布一條消息到data.from.rbtmq
在這里插入圖片描述
然后使用下面指令可以看到Flink讀取到消息并執行了print方法

tail log/flink-*-taskexecutor-*.out

==> flink-fangliang-taskexecutor-0-fangliang.out <==
data from http://172.21.112.140:15672/#/exchanges/%2F/amq.default

工程代碼

https://github.com/f304646673/FlinkDemo

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/42355.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/42355.shtml
英文地址,請注明出處:http://en.pswp.cn/web/42355.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

判斷對象能否回收的兩種方法,以及JVM引用

判斷對象能否回收的兩種方法&#xff1a;引用計數算法&#xff0c;可達性分析算法 引用計數算法&#xff1a;給對象添加一個引用計數器&#xff0c;當該對象被其它對象引用時計數加一&#xff0c;引用失效時計數減一&#xff0c;計數為0時&#xff0c;可以回收。 特點&#xf…

自動駕駛SLAM又一開源巔峰之作!深挖時間一致性,精準構建超清地圖

論文標題&#xff1a; DTCLMapper: Dual Temporal Consistent Learning for Vectorized HD Map Construction 論文作者&#xff1a; Siyu Li, Jiacheng Lin, Hao Shi, Jiaming Zhang, Song Wang, You Yao, Zhiyong Li, Kailun Yang 導讀&#xff1a; 本文介紹了一種用于自動…

突發!馬斯克3140億參數Grok開源!Grok原理大公開!

BIG NEWS: 全球最大開源大模型&#xff01;馬斯克Grok-1參數量3410億&#xff0c;正式開源!!! 說到做到&#xff0c;馬斯克xAI的Grok&#xff0c;果然如期開源了&#xff01; 就在剛剛&#xff0c;馬斯克的AI創企xAI正式發布了此前備受期待大模型Grok-1&#xff0c;其參數量達…

硅紀元視角 | 虛擬神經科學的突破:AI「賽博老鼠」誕生

在數字化浪潮的推動下&#xff0c;人工智能&#xff08;AI&#xff09;正成為塑造未來的關鍵力量。硅紀元視角欄目緊跟AI科技的最新發展&#xff0c;捕捉行業動態&#xff1b;提供深入的新聞解讀&#xff0c;助您洞悉技術背后的邏輯&#xff1b;匯聚行業專家的見解&#xff0c;…

企業需要什么樣的MES?

MES&#xff08;英文全稱&#xff1a;Manufacturing Execution System&#xff09;&#xff0c;即制造執行系統&#xff0c;是面向車間生產的管理系統。它位于上層計劃管理系統&#xff08;如ERP&#xff09;與底層工業控制&#xff08;如PCS層&#xff09;之間&#xff0c;是制…

【Linux】:服務器用戶的登陸、刪除、密碼修改

用Xshell登錄云服務器。 1.登錄云服務器 先打開Xshell。彈出的界面點。 在終端上輸入命令ssh usernameip_address&#xff0c;其中username為要登錄的用戶名&#xff0c;ip_address為Linux系統的IP地址或主機名。 然后輸入密碼進行登錄。 具體如下&#xff1a; 找到新建會話…

Windows與time.windows.com同步time出錯(手把手操作)

今天我來針對Windows講解Time同步 時間問題 計算機的時間不同&#xff0c;過快或者過慢。&#xff08;可以和自己的手機時間進行對比&#xff0c;手機的時間進行同步的頻率會比計算機更快&#xff0c;因此更精準&#xff09;計算機time過快和過慢&#xff0c;會導致使用過程中…

想實現隨時隨地遠程訪問?解析可道云teamOS內網穿透功能

在數字化時代&#xff0c;無論是個人還是企業&#xff0c;都面臨著數據共享與遠程訪問的迫切需求。 比如我有時會需要在家中加班&#xff0c;急需訪問公司內網中的某個關鍵文件。 然而&#xff0c;由于公網與內網的天然隔閡&#xff0c;這些需求往往難以實現。這時&#xff0c…

代碼隨想錄 鏈表章節總結

移除鏈表元素 && 設計鏈表 學會設置虛擬頭結點 翻轉鏈表 leetcode 206 https://leetcode.cn/problems/reverse-linked-list/description/ 方法一&#xff1a;非遞歸新開鏈表 頭插法&#xff1a;創建一個新的鏈表&#xff0c;遍歷舊鏈表&#xff0c;按順序在新鏈表使…

AIGC | 在機器學習工作站安裝NVIDIA CUDA? 并行計算平臺和編程模型

[ 知識是人生的燈塔&#xff0c;只有不斷學習&#xff0c;才能照亮前行的道路 ] 0x02.初識與安裝 CUDA 并行計算平臺和編程模型 什么是 CUDA? CUDA&#xff08;Compute Unified Device Architecture&#xff09;是英偉達&#xff08;NVIDIA&#xff09;推出的并行計算平臺和編…

idea提交代碼或更新代碼一直提示token然后登陸失敗無法提交或者更新代碼

最近因為換了電腦需要對開發環境做配置&#xff0c; 遇到了這個問題&#xff0c; 應該是因為我們用到了gitlab&#xff0c;默認的最新的idea會有gitlab插件 強制錄入gitlab的token&#xff0c;如果gitlab不支持token的驗證那么問題就來了 &#xff0c; 不管怎么操作都無法提交或…

FPGA之術語

FPGA之術語 IOSTANDARDDIFF_SSTL12:LVCMOS33:sys_clk_p/n:rst_n:UART時鐘JTAG:GPIOONFIPCIe IOSTANDARD 在電子工程領域&#xff0c;DIFF_SSTL12和LVCMOS33是兩種不同的電氣標準&#xff0c;用于定義信號的電壓級別和特性。 IOSTANDARD是一個在FPGA&#xff08;現場可編程門陣…

Spring MVC深入理解之源碼實現

1、SpringMVC的理解 1&#xff09;談談對Spring MVC的了解 MVC 是模型(Model)、視圖(View)、控制器(Controller)的簡寫&#xff0c;其核心思想是通過將業務邏輯、數據、顯示分離來組織代碼。 Model&#xff1a;數據模型&#xff0c;JavaBean的類&#xff0c;用來進行數據封裝…

【cocos2dx】【iOS工程】如何保存用戶在游戲內的繪畫數據,并將數據以圖像形式展示在預覽界面

【cocos2dx】【iOS工程】如何保存用戶在應用內的操作數據&#xff0c;并將數據以圖像形式展示在預覽界面 設備/引擎&#xff1a;Mac&#xff08;11.6&#xff09;/Mac Mini 開發工具&#xff1a;Xcode&#xff08;15.0.1&#xff09; 開發需求&#xff1a;如何保存用戶在應用…

富格林:抓住正規穩健出金思路

富格林指出&#xff0c;凡事要學會抓住正規思路避繁就簡&#xff0c;才會順利達到終點。在現貨黃金市場中&#xff0c;投資者必須學會抓對正規趨勢&#xff0c;才是走向盈利出金的根本保障。以下是富格林投資總結的幾個觀點和建議&#xff0c;希望能幫助投資者實現穩健出金。 …

算法基礎之分治法

算法原理 對于一個規模為 n n n 的子問題&#xff0c;若該問題可以容易地解決則直接解決&#xff0c;否則將其分解為 k k k 個規模較小的子問題&#xff0c;這些子問題相互獨立且與原問題形式相同。遞歸地解決這些子問題&#xff0c;然后將各子問題的解合并得到原問題的解&a…

單鏈表詳解(2)

三、函數定義 查找節點 //查找結點 SLTNode* SLTNodeFind(SLTNode* phead, SLTDataType x) {assert(phead);SLTNode* pcur phead;while (pcur){if (pcur->data x){return pcur;}pcur pcur->next;}return NULL; } 查找節點我們是通過看數據域來查找的&#xff0c;查…

Arm64 基礎指令集介紹

按照字母排序順序&#xff1a; ● ADC&#xff1a;帶進位加法。 ● ADCS&#xff1a;帶進位加法&#xff0c;設置標志位。 ● ADD (extended register)&#xff1a;擴展寄存器加法。 ● ADD (immediate)&#xff1a;立即數加法。 ● ADD (shifted register)&#xff1a;移位寄存…

【MySQL05】【 undo 日志】

文章目錄 一、前言二、undo 日志&#xff08;回滾日志&#xff09;1. 事務 id2. undo 日志格式2.1 INSERT 對應的 undo 日志2.2 DELETE 對應的 undo 日志2.3 UPDATE 對應的 undo 日志2.3.1 不更新主鍵2.3.2 更新主鍵 2.3 增刪改操作對二級索引的影響2.4 roll_pointer 3. FIL_PA…

Windows 網絡重置

netsh int ip reset 命令是用于重置 Windows 操作系統中的網絡設置和配置的命令。 在網絡故障排除、修復網絡連接問題以及清除可能存在的網絡配置沖突時非常有用。 命令詳解&#xff1a; netsh: 用于配置各種網絡設置 int: 用于管理網絡接口 ip: 用于管理網絡接口的 IP 配…