大綱
- 創建RabbitMQ隊列
- 新建工程
- 新增依賴
- 編碼
- 設置數據源配置
- 讀取、處理數據
- 完整代碼
- 打包、上傳和運行任務
- 測試
- 工程代碼
在《Java版Flink使用指南——安裝Flink和使用IntelliJ制作任務包》一文中,我們完成了第一個小型Demo的編寫。例子中的數據是代碼預先指定的。而現實中,數據往往來源于外部。本文我們將嘗試Flink從RabbitMQ中讀取數據,然后輸出到日志中。
關于RabbitMQ的知識可以參閱《RabbitMQ實踐》。
創建RabbitMQ隊列
我們創建一個Classic隊列data.from.rbtmq。注意要選擇Durable類型,這是后續用的默認連接器的限制。
具體方法見《RabbitMQ實踐——在管理后臺測試消息收發功能》。
后續我們將在后臺通過默認交換器,給這個隊列新增消息。
新建工程
我們在IntelliJ中新建一個工程DataFromRabbitMQ。
Archetype填入:org.apache.flink:flink-quickstart-java。
版本填入與Flink的版本:1.19.1
新增依賴
在pom.xml中新增RabbitMQ連接器
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version>
</dependency>
編碼
設置數據源配置
String queueName = "data.from.rbtmq";
String host = "172.21.112.140"; // IP of the rabbitmq server
int port = 5672;
String username = "admin";
String password = "fangliang";
String virtualHost = "/";
int parallelism = 1;// create a RabbitMQ source
RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSource<String> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SimpleStringSchema());
讀取、處理數據
下面代碼通過addSource添加RabbitMQ數據源。注意,不能使用fromSource方法,是因為RMQSource沒有實現SourceFunction方法。
final DataStream<String> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);stream.print().name(username + "'s data from " + queueName);
完整代碼
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.example;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;/*** Skeleton for a Flink DataStream Job.** <p>For a tutorial how to write a Flink application, check the* tutorials and examples on the <a href="https://flink.apache.org">Flink Website</a>.** <p>To package your application into a JAR file for execution, run* 'mvn clean package' on the command line.** <p>If you change the name of the main class (with the public static void main(String[] args))* method, change the respective entry in the POM.xml file (simply search for 'mainClass').*/
public class DataStreamJob {public static void main(String[] args) throws Exception {// Sets up the execution environment, which is the main entry point// to building Flink applications.final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String queueName = "data.from.rbtmq";String host = "172.21.112.140"; // IP of the rabbitmq serverint port = 5672;String username = "admin";String password = "fangliang";String virtualHost = "/";int parallelism = 1;// create a RabbitMQ sourceRMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSource<String> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SimpleStringSchema());final DataStream<String> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);stream.print().name(username + "'s data from " + queueName);env.execute("Flink Java API Skeleton");}
}
打包、上傳和運行任務
測試
在RabbitMQ后臺的默認交換器中,發布一條消息到data.from.rbtmq
然后使用下面指令可以看到Flink讀取到消息并執行了print方法
tail log/flink-*-taskexecutor-*.out
==> flink-fangliang-taskexecutor-0-fangliang.out <==
data from http://172.21.112.140:15672/#/exchanges/%2F/amq.default
工程代碼
https://github.com/f304646673/FlinkDemo