flink廣播算子Broadcast

文章目錄

  • 一、Broadcast
  • 二、代碼示例
  • 三.或者第二種(只讀取一個csv文件到廣播內存中)


提示:以下是本篇文章正文內容,下面案例可供參考

一、Broadcast

為了關聯一個非廣播流(keyed 或者 non-keyed)與一個廣播流(BroadcastStream),我們可以調用非廣播流的方法 connect(),并將 BroadcastStream 當做參數傳入。 這個方法的返回參數是 BroadcastConnectedStream,具有類型方法 process(),傳入一個特殊的 CoProcessFunction 來書寫我們的模式識別邏輯。 具體傳入 process() 的是哪個類型取決于非廣播流的類型:

  • 如果流是一個 keyed 流,那就是 KeyedBroadcastProcessFunction 類型;
  • 如果流是一個 non-keyed 流,那就是 BroadcastProcessFunction 類型。

1).例如非keyby的要實現兩個方法

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {//主流 public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;//廣播操作public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}

2).keyby的

public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {//主流public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;//廣播public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;//只有keyby的可以onTimer。此方法可以不重寫public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}

在處理廣播流元素這端,是具有讀寫權限的,而對于處理非廣播流元素這端是只讀的。 這樣做的原因是,Flink 中是不存在跨 task 通訊的。所以為了保證 broadcast state 在所有的并發實例中是一致的,我們在處理廣播流元素的時候給予寫權限,在所有的 task 中均可以看到這些元素,并且要求對這些元素處理是一致的, 那么最終所有 task 得到的 broadcast state 是一致的。
廣播算子是不使用 RocksDB state backend: broadcast state 在運行時保存在內存中,需要保證內存充足。這一特性同樣適用于所有其他 Operator State。

二、代碼示例

此處將本地csv文件加載到內存廣播中
CSV文件的內容是:
1.user_details.csv

1,Alice,30
2,Bob,25

2.user_details03.csv

3,Charlie,35
5,name,5

下面是代碼(下面是將兩個本地CSV文件放到廣播內存中案例)

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;import java.util.HashMap;
import java.util.Map;
public <

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/75835.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/75835.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/75835.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Redis 和 MySQL雙寫一致性的更新策略有哪些?常見面試題深度解答。

目錄 一. 業務數據查詢&#xff0c;更新順序簡要分析 二. 更新數據庫、查詢數據庫、更新緩存、查詢緩存耗時對比 2.1 更新數據庫&#xff08;最慢&#xff09; 2.2 查詢數據庫&#xff08;較慢&#xff09; 2.3 更新緩存&#xff08;次快&#xff09; 2.4 查詢緩存&#…

SRT協議

SRT&#xff08;Secure Reliable Transport&#xff09;是一種開源的視頻傳輸協議&#xff0c;專為高丟包、高延遲網絡環境設計&#xff0c;結合了UDP的低延遲和TCP的可靠性&#xff0c;廣泛應用于直播、遠程制作、視頻會議等場景。 定位&#xff1a;SRT協議的官方C/C實現庫&am…

“征服HTML引號惡魔:“完全解析手冊”!!!(quot;表示雙引號)

&#x1f6a8;&#x1f4e2; "征服HTML引號惡魔&#xff1a;“完全解析手冊” &#x1f4e2;&#x1f6a8; &#x1f3af; 博客引言&#xff1a;當引號變成"惡魔" &#x1f631; 是否遇到過這種情況&#xff1a; 寫HTML時滿心歡喜輸入<div title"他…

npm install 卡在創建項目:sill idealTree buildDeps

參考&#xff1a; https://blog.csdn.net/PengXing_Huang/article/details/136460133 或者再執行 npm install -g cnpm --registryhttps://registry.npm.taobao.org 或者換梯子

c++中cpp文件從編譯到執行的過程

C 文件從編寫到執行的過程可以分為幾個主要階段&#xff1a;編寫代碼、預處理、編譯、匯編、鏈接和運行。以下是每個階段的詳細說明&#xff1a; 1. 編寫代碼 這是整個過程的起點。程序員使用文本編輯器&#xff08;如 VSCode、Sublime Text 或其他 IDE&#xff09;編寫 C 源…

PROE 與 STL 格式轉換:開啟 3D 打印及多元應用的大門

在 3D 設計與制造的復雜生態中&#xff0c;將 PROE 格式轉換為 STL 格式絕非無端之舉&#xff0c;而是有著深厚且多元的現實需求作為支撐。 一、文件格式介紹? &#xff08;一&#xff09;PROE 格式? PROE 作為一款參數化設計軟件&#xff0c;采用基于特征的參數化建模技術…

開發中后端返回下劃線數據,要不要統一轉駝峰?

先說結論。看情況&#xff01;&#xff01;&#xff01;&#xff01; 前端 主要用 JS/TS 建議后端返回 camelCase&#xff0c;減少前端轉換成本。后端 主要是 Python/Go 建議保持 snake_case&#xff0c;前端做轉換。但是團隊統一風格最重要&#xff01;如果統一返回駝峰就駝峰…

docker pull時報錯:https://registry-1.docker.io/v2/

原文&#xff1a;https://www.cnblogs.com/sdgtxuyong/p/18647915 https://www.cnblogs.com/OneSeting/p/18532166 docker 換源&#xff0c;解決連接不上的問題。 編輯以下文件&#xff0c;不存在則創建&#xff1a; vim /etc/docker/daemon.json {"registry-mirrors&qu…

Pytorch學習筆記(十二)Learning PyTorch - NLP from Scratch

這篇博客瞄準的是 pytorch 官方教程中 Learning PyTorch 章節的 NLP from Scratch 部分。 官網鏈接&#xff1a;https://pytorch.org/tutorials/intermediate/nlp_from_scratch_index.html 完整網盤鏈接: https://pan.baidu.com/s/1L9PVZ-KRDGVER-AJnXOvlQ?pwdaa2m 提取碼: …

基礎算法02——冒泡排序(Bubble Sort)

冒泡排序&#xff08;Bubble Sort&#xff09; 冒泡排序&#xff1a;是一種簡單的排序算法&#xff0c;其基本思想是通過重復遍歷要排序的列表&#xff0c;比較相鄰的元素&#xff0c;并在必要時&#xff08;即前面的數比后面的數大的時候&#xff09;交換它們的位置&#xff…

RestTemplate遠程調用接口方式

1.Post(body空參) 也就是說需要給一個空的json 代碼: String getDeviceUrl this.MOVABLE_URL "detected-data/getMachineLists"; // 遠程調用 RestTemplate restTemplate new RestTemplate(); restTemplate.getMessageConverters().set(1,new StringHttpMessageC…

ar頭顯和眼鏡圖像特效處理

使用一個線程從攝像頭或者其他設備循環讀取圖像數據寫入鏈表&#xff0c;另一個線程從鏈表循環讀取數據并做相應的特效處理&#xff0c;由于寫入的速度比讀取的快&#xff0c;最終必然會因為寫入過快導致線程讀寫一幀而引發沖突和數據幀正常數據幀被覆蓋。最好使用共享內存&…

mysql--socket報錯

錯誤原因分析 MySQL 服務未運行&#xff08;最常見原因&#xff09; 錯誤中的 (2) 表示 “No such file or directory”&#xff0c;即 /tmp/mysql.sock 不存在這通常意味著 MySQL 服務器根本沒有啟動 socket 文件路徑不匹配 客戶端嘗試連接 /tmp/mysql.sock但 MySQL 服務器可…

labview加載matlab數據時報錯提示:對象引用句柄無效。

1. labview報錯提示 labview加載mat數據時報錯提示&#xff1a;對象引用句柄無效。返回該引用句柄的節點可能遇到錯誤&#xff0c;并沒有返回有效的引用句柄。該引用句柄所指的存儲可能在執行調用之前已關閉。報錯提示如下&#xff1a; 這是由于labview缺少matlab MathWorks導…

面試計算機操作系統解析(一中)

判斷 1. 一般來說&#xff0c;先進先出頁面置換算法比最近最少使用頁面置換算法有較少的缺頁率。&#xff08;?&#xff09; 正確答案&#xff1a;錯誤解釋&#xff1a;FIFO&#xff08;先進先出&#xff09;頁面置換算法可能導致“Belady異常”&#xff0c;即頁面數增加反而…

如何防御TCP洪泛攻擊

TCP洪泛攻擊&#xff08;TCP Flood Attack&#xff09;是一種常見的分布式拒絕服務&#xff08;DDoS&#xff09;攻擊手段&#xff0c;以下是其原理、攻擊方式和危害的詳細介紹&#xff1a; 定義與原理 TCP洪泛攻擊利用了TCP協議的三次握手過程。在正常的TCP連接建立過程中&a…

20250330 Pyflink with Paimon

1. 數據湖 2. 本地安裝Pyflink和Paimon 必須安裝Python 3.11 Pip install python -m pip install apache-flink1.20.1 需要手動加入這兩個jar 測試代碼&#xff1a; import argparse import logging import sys import timefrom pyflink.common import Row from pyflink.tab…

-PHP 應用SQL 盲注布爾回顯延時判斷報錯處理增刪改查方式

#PHP-MYSQL-SQL 操作 - 增刪改查 1 、功能&#xff1a;數據查詢(對數據感興趣&#xff09; 查詢&#xff1a; SELECT * FROM news where id$id 2 、功能&#xff1a;新增用戶&#xff0c;添加新聞等&#xff08;對操作的結果感興趣&#xff09; 增加&#xff1a; INSERT INT…

【學習記錄】大模型微調之使用 LLaMA-Factory 微調 Qwen系列大模型,可以用自己的數據訓練

一、LoRA微調的基本原理 1、基本概念 LoRA&#xff08;Low-Rank Adaptation&#xff09;是一種用于大模型微調的技術&#xff0c;通過引入低秩矩陣來減少微調時的參數量。在預訓練的模型中&#xff0c;LoRA通過添加兩個小矩陣B和A來近似原始的大矩陣ΔW&#xff0c;從而減少需…

Vue 使用 xlsx 插件導出 excel 文件

安裝與引入 安裝 npm install xlsx npm install file-saver # 或者 yarn add xlsx yarn add file-saver 引入 import * as XLSX from xlsx; import FileSaver from file-saver 基本功能 讀取 Excel 文件 // 讀取文件內容 const workbook XLSX.readFile(path/to/file.xl…