Java版Flink使用指南——定制RabbitMQ數據源的序列化器

大綱

  • 新建工程
    • 新增依賴
    • 數據對象
    • 序列化器
    • 接入數據源
  • 測試
    • 修改Slot個數
    • 打包、提交、運行
  • 工程代碼

在《Java版Flink使用指南——從RabbitMQ中隊列中接入消息流》一文中,我們從RabbitMQ隊列中讀取了字符串型數據。如果我們希望讀取的數據被自動化轉換為一個對象,則需要定制序列化器。本文我們就將講解數據源序列化器的定制方法。

新建工程

我們在IntelliJ中新建一個工程SourceSerializer。
Archetype填入:org.apache.flink:flink-quickstart-java
版本填入與Flink的版本:1.19.1
在這里插入圖片描述

新增依賴

在pom.xml中新增RabbitMQ連接器

		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version></dependency>

新增Json庫依賴

		<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.17.1</version></dependency>

新增lombok庫,主要是為了使用它的一些注解

        <dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.32</version><scope>provided</scope></dependency>

數據對象

我們新建一個簡單的數據對象SampleData
src/main/java/org/example/vo/SampleData.java

package org.example.vo;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class SampleData {private Long id;private String name;private int age;private Boolean married;private Double salary;public String toJson() throws JsonProcessingException {ObjectMapper mapper = new ObjectMapper();return mapper.writeValueAsString(this);}public static SampleData fromJson(String json) throws JsonProcessingException {ObjectMapper mapper = new ObjectMapper();return mapper.readValue(json, SampleData.class);}
}

這個方法包含兩個方法,一個是將SampleData 轉換成字符串,另一個是將字符串轉成SampleData 對象。

序列化器

我們定義的數據源序列化器要實現AbstractDeserializationSchema接口,主要是通過deserialize方法將二進制數組轉換成SampleData 對象。

src/main/java/org/example/serializer/SampleDataRabbitMQSourceSerializer.java

package org.example.serializer;import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.example.vo.SampleData;import java.io.IOException;public class SampleDataRabbitMQSourceSerializer extends AbstractDeserializationSchema<SampleData> {@Overridepublic SampleData deserialize(byte[] message) throws IOException {return SampleData.fromJson(new String(message));}@Overridepublic boolean isEndOfStream(SampleData nextElement) {return false;}@Overridepublic TypeInformation<SampleData> getProducedType() {return TypeInformation.of(SampleData.class);}
}

接入數據源

我們在《Java版Flink使用指南——定制RabbitMQ的Sink序列化器》一文中,往data.to.rbtmq對了寫入了大量SampleData 數據。這次我們將其作為數據源來做測試
這次我們在創建RMQSource時傳入序列化器SampleDataRabbitMQSourceSerializer。它會將從RabbitMQ獲取的數據轉換成SampleData對象。
然后我們獲取所有“已婚”(filter.getMarried() == true)的數據,將其打印到日志中。

		String queueName = "data.to.rbtmq";String host = "172.21.112.140"; // IP of the rabbitmq serverint port = 5672;String username = "admin";String password = "fangliang";String virtualHost = "/";int parallelism = 1;// create a RabbitMQ sourceRMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSource<SampleData> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SampleDataRabbitMQSourceSerializer());final DataStream<SampleData> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);stream.filter(filter -> filter.getMarried() == true).print().name(username + "'s sink to stdout").setParallelism(parallelism);

測試

修改Slot個數

由于我們要運行兩個流式計算任務,于是需要兩個Slot。

vim conf/config.yaml 

將numberOfTaskSlots的值改成2。

打包、提交、運行

我們將本例和《Java版Flink使用指南——定制RabbitMQ的Sink序列化器》中的包都提交運行
在這里插入圖片描述
然后在日志中可以看到“已婚”的數據都在輸出

 tail -f log/*

在這里插入圖片描述

工程代碼

https://github.com/f304646673/FlinkDemo

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/43286.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/43286.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/43286.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Linux C++ 043-機房預約系統

Linux C 043-機房預約系統 本節關鍵字&#xff1a;Linux、C、機房預約系統 相關庫函數&#xff1a;for_each、open、close、write 系統簡介 學校現在有幾個規格不同的機房&#xff0c;由于使用經常出現撞車現象&#xff0c;現開發一套機房預約系統&#xff0c;解決這一問題。…

Java進階---抽象方法abstract

抽象方法 案例引入: 在某個寵物店的寵物資源管理系統中有&#xff1a; 狗類&#xff1a;屬性&#xff08;姓名&#xff09;&#xff0c;行為&#xff08;吃飯&#xff09; 貓類&#xff1a;屬性&#xff08;姓名&#xff09;&#xff0c;行為&#xff08;吃飯&#xff09;利用…

智慧科技照亮水利未來:深入剖析智慧水利解決方案如何助力水利行業實現高效、精準、可持續的管理

目錄 一、智慧水利的概念與內涵 二、智慧水利解決方案的核心要素 1. 物聯網技術&#xff1a;構建全面感知網絡 2. 大數據與云計算&#xff1a;實現數據高效處理與存儲 3. GIS與三維可視化&#xff1a;提升決策支持能力 4. 人工智能與機器學習&#xff1a;驅動決策智能化 …

LibreOffice的國內鏡像安裝地址和node.js國內快速下載網站

文章目錄 1、LibreOffice1.1、LibreOffice在application-conf.yml中的配置2、node.js 1、LibreOffice 國內鏡像包網址&#xff1a;https://mirrors.cloud.tencent.com/libreoffice/libreoffice/ 1.1、LibreOffice在application-conf.yml中的配置 jodconverter:local:enable…

Java面試八股之MySQL中int(10)和bigint(10)能存儲讀的數據大小一樣嗎

MySQL中int(10)和bigint(10)能存儲讀的數據大小一樣嗎 在MySQL中&#xff0c;int(10)和bigint(10)的數據存儲能力并不相同&#xff0c;盡管括號內的數字&#xff08;如10&#xff09;看起來似乎暗示著某種關聯&#xff0c;但實際上這個數字代表的是顯示寬度&#xff0c;而不是…

vue學習day03-指令修飾符、v-bind對于樣式控制的增強、v-model應用于其他表單元素

7、指令修飾符 &#xff08;1&#xff09;概念&#xff1a; 通過“.”指明一些指令后綴&#xff0c;不同后綴封裝了不同的處理操作->簡化代碼 &#xff08;2&#xff09;按鍵修飾符 keyup.enter->鍵盤回車監聽 &#xff08;3&#xff09;v-model修飾符 v-model.tri…

vue + element ui 實現側邊欄導航欄折疊收起

首頁布局如下 要求點擊按鈕,將側邊欄收縮, 通過 row 和 col 組件&#xff0c;并通過 col 組件的 span 屬性我們就可以自由地組合布局。 折疊前 折疊后 <template><div class"app-layout" :class"{ collapse: app.isFold }"><div class&…

Onekey正版steam分流下載工具

今天給大家介紹的是一款下載steam游戲的工具。Onekey工具&#xff0c;是一款游戲下載器&#xff0c;可以下載steam正版分流游戲。下載正版分流的網站很多&#xff0c;但是都是網盤或者迅雷下載&#xff0c;或者游戲盒子下載&#xff0c;速度都很慢。這款軟件是用steam下載的&am…

Flask項目搭建及部署 —— Python

flask搭建及部署 pip 19.2.3 python 3.7.5 Flask 1.1.1 Flask-SQLAlchemy 2.4.1 Pika 1.1.0 Redis 3.3.11 flask-wtf 0.14.2 1、創建flask項目&#xff1a; 創建完成后整個項目結構樹&#xff1a; app.py: 項?管理?件&#xff0c;通過它管理項?。 static: 存放靜態…

自定義控件視圖篇(一)測量與布局

在自定義控件的開發過程中&#xff0c;"視圖篇"的測量與布局是非常關鍵的步驟&#xff0c;這直接決定了控件的尺寸、位置以及子視圖的排列方式。下面我將詳細介紹測量和布局的過程&#xff0c;以及如何在自定義控件中正確實現這些步驟。 視圖的測量 (onMeasure) 在…

2021版本的idea熱部署的詳細步驟

背景&#xff1a;我是自己用的是2021版本的idea,然后發現跟2023版本的熱部署不太一樣&#xff0c;所以&#xff0c;今天自己出一期這樣的文章吧&#xff01;&#xff01;&#xff01;其他人配置的時候根據自己的情況&#xff0c;來閱讀吧&#xff01; 第一步&#xff1a;方式一…

MyBatis是如何分頁的及原理

MyBatis 是一種持久層框架&#xff0c;支持通過配置文件和注解將 SQL 映射為 Java 對象。在實際開發中&#xff0c;查詢數據時經常需要進行分頁處理。 MyBatis 也提供了支持分頁的方案&#xff0c;其主要思路是使用 Limit 偏移量和限制個數&#xff0c;來獲取指定數量的數據。下…

音視頻入門基礎:H.264專題(10)——FFmpeg源碼中,存放SPS屬性的結構體和解碼SPS的函數分析

一、引言 FFmpeg源碼對AnnexB包裝的H.264碼流解碼過程中&#xff0c;通過ff_h2645_extract_rbsp函數拿到該H.264碼流中的某個NALU的NALU Header RBSP后&#xff08;具體可以參考&#xff1a;《FFmpeg源碼&#xff1a;ff_h2645_extract_rbsp函數分析》&#xff09;&#xff0c…

【沐風老師】3DMAX建筑體塊生成插件BuildingBlocks使用方法詳解

BuildingBlocks建筑體塊生成插件使用方法詳解 聽說你還在手動建配景樓&#xff1f;有了BuildingBlocks這個插件&#xff0c;一分鐘搞定喔&#xff01; 3DMAX建筑體塊生成插件BuildingBlocks&#xff0c;用于快速自定義街道及生成配景樓區塊。 【適用版本】 3dMax2019及更高版…

空間分析在3D應用中的革命:提升投資回報與業務價值

在3D應用的浪潮中&#xff0c;空間分析技術正成為提升用戶體驗、優化業務決策和解決復雜問題的關鍵工具。本文將深入探討空間分析如何通過提供深度用戶行為洞察和數據可視化&#xff0c;增強3D應用的實際效益和市場競爭力。 一、空間分析的概念與背景 Tony Bevilacqua&#x…

分布式I/O從站的認知

為什么需要分布式I/O從站&#xff1f; 當PLC與控制機構距離過遠時&#xff0c;遠距離會帶來信號干擾&#xff0c;分布式I/O從站只需要一個網絡線纜連接。 ET200分布式I/O從站家族 體積緊湊、功能強大。 ET200SP ET200M ET200S ET200iSP ET200 AL ET200pro ET200 eco PN 通訊協議…

DSSM雙塔特征交互

傳統的DSSM雙塔無法在早期進行user和item側的特征交互&#xff0c;這在一定程度上降低了模型性能。我們想要對雙塔模型進行細粒度的特征交互&#xff0c;同時又不失雙塔模型離線建向量索引的解耦性。下面介紹兩篇這方面的工作。 美團-Dual Augmented Two-tower 在user和item的特…

1. CSS Grid 網格布局教程

CSS Grid 網格布局教程 一、概述 網格布局&#xff08;Grid&#xff09;是最強大的 CSS 布局方案。 它將網頁劃分成一個個網格&#xff0c;可以任意組合不同的網格&#xff0c;做出各種各樣的布局。以前&#xff0c;只能通過復雜的 CSS 框架達到的效果&#xff0c;現在瀏覽器…

linux工具應用_VERDI

verdi 1. 基礎知識1.1 verdi介紹1.2 fsdb文件2. fsdb dump2.1 1st step-設置環境變量LD_LIBRARY_PATH2.2 2nd step-xrun仿真命令2.3 3rd step-仿真過程中調用fsdb函數dump波形2.3.1 在testbench、top.sv中調用fsdb函數2.3.2 在tcl腳本中用xrun的dump指令(同樣要調用fsdb函數)2.…

Scrapy crawl spider 停止工作

Scrapy是一個用于爬取網站數據的流行框架&#xff0c;有時爬蟲可能會停止工作&#xff0c;這通常是由多種原因引起的。以下是一些常見問題及其解決方法&#xff1a; 1、問題背景 用戶在使用 Scrapy 0.16.2 版本進行網絡爬取時遇到問題&#xff0c;具體表現為爬蟲在運行一段時間…