java消息順序執行_Apache Flink:如何并行執行但保持消息順序?

請在下面找到使用側輸出和插槽組進行本地擴展的示例 .

package org.example

/*

* Licensed to the Apache Software Foundation (ASF) under one

* or more contributor license agreements. See the NOTICE file

* distributed with this work for additional information

* regarding copyright ownership. The ASF licenses this file

* to you under the Apache License, Version 2.0 (the

* "License"); you may not use this file except in compliance

* with the License. You may obtain a copy of the License at

*

* http://www.apache.org/licenses/LICENSE-2.0

*

* Unless required by applicable law or agreed to in writing, software

* distributed under the License is distributed on an "AS IS" BASIS,

* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

* See the License for the specific language governing permissions and

* limitations under the License.

*/

import org.apache.flink.streaming.api.functions.ProcessFunction

import org.apache.flink.streaming.api.scala._

import org.apache.flink.util.Collector

/**

* This example shows an implementation of WordCount with data from a text socket.

* To run the example make sure that the service providing the text data is already up and running.

*

* To start an example socket text stream on your local machine run netcat from a command line,

* where the parameter specifies the port number:

*

* {{{

* nc -lk 9999

* }}}

*

* Usage:

* {{{

* SocketTextStreamWordCount

* }}}

*

* This example shows how to:

*

* - use StreamExecutionEnvironment.socketTextStream

* - write a simple Flink Streaming program in scala.

* - write and use user-defined functions.

*/

object SocketTextStreamWordCount {

def main(args: Array[String]) {

if (args.length != 2) {

System.err.println("USAGE:\nSocketTextStreamWordCount ")

return

}

val hostName = args(0)

val port = args(1).toInt

val outputTag1 = OutputTag[String]("side-1")

val outputTag2 = OutputTag[String]("side-2")

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.getConfig.enableObjectReuse()

//Create streams for names and ages by mapping the inputs to the corresponding objects

val text = env.socketTextStream(hostName, port).slotSharingGroup("processElement")

val counts = text.flatMap {

_.toLowerCase.split("\\W+") filter {

_.nonEmpty

}

}

.process(new ProcessFunction[String, String] {

override def processElement(

value: String,

ctx: ProcessFunction[String, String]#Context,

out: Collector[String]): Unit = {

if (value.head <= 'm') ctx.output(outputTag1, String.valueOf(value))

else ctx.output(outputTag2, String.valueOf(value))

}

})

val sideOutputStream1: DataStream[String] = counts.getSideOutput(outputTag1)

val sideOutputStream2: DataStream[String] = counts.getSideOutput(outputTag2)

val output1 = sideOutputStream1.map {

(_, 1)

}.slotSharingGroup("map1")

.keyBy(0)

.sum(1)

val output2 = sideOutputStream2.map {

(_, 1)

}.slotSharingGroup("map2")

.keyBy(0)

.sum(1)

output1.print()

output2.print()

env.execute("Scala SocketTextStreamWordCount Example")

}

}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/260264.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/260264.shtml
英文地址,請注明出處:http://en.pswp.cn/news/260264.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

python的字符串定界符可以使用_使用Template格式化Python字符串的方法

對Python字符串&#xff0c;除了比較老舊的%&#xff0c;以及用來替換掉%的format&#xff0c;及在python 3.6中加入的f這三種格式化方法以外&#xff0c;還有可以使用Template對象來進行格式化。from string import Template&#xff0c;可以導入Template類。實例化Template類…

【ES實戰】ES6.7的tar包離線安裝幫助手冊

Elasticsearch6.7部署幫助手冊 校驗時間&#xff1a;2023年12月19日 文章目錄 Elasticsearch6.7部署幫助手冊安裝前準備安裝包安裝要求鎖定內存,修改最大文件描述符,最大線程數內核參數 部署規劃端口規劃用戶規劃目錄規劃 安裝步驟每個服務器配置JDK配置文件master角色node角色…

jenkins 部署文檔

Jenkins是一個非常出色的持續集成服務器&#xff0c;本文主要介紹在CentOS系統中Jenkins的基本安裝配置方法&#xff0c;供參考。一. 軟件包&#xff1a;1. 下載apache-maven-2.2.1-bin.tarhttp://www.apache.org/dyn/closer.cgi/maven/binaries/apache-maven-2.2.1-bin.tar.gz…

牛人,多看看他們寫的東西

計算機大師 Donald E. Knuth&#xff08;高德納&#xff09; 算法大師&#xff0c;我最崇拜的計算機科學家&#xff0c;沒有之一&#xff01;不認識高爺爺的人別說自己是學計算機的。《The Art of Computer Programming》絕對是計算機科學的圣經。對高爺爺的崇敬&#xff0c;對…

System.Math.Min(System.Threading.Interlocked.Increment(i), i - 1)

System.Math.Min(System.Threading.Interlocked.Increment(i), i - 1) 在vb里面 等價于ii-1 在C#里面 等價于i-- 是有C#自動轉VB時轉換的轉載于:https://www.cnblogs.com/YaDi/archive/2012/11/08/2759802.html

java快速查找中位數_用QuickSort快速查找中位數(median)

中位數(median)是一個排好序的元素中中間位置的元素&#xff0c;如果元素個數為偶數&#xff0c;則是中間兩個元素的平均值。例如(3,1,5)的中位數是3&#xff0c;而(2,1,3,5)的中位數是2.5。查找中位數屬于SelectionAlgorithms的一種。用快速排序可以做到每次divide之后&#x…

python安裝mysql數據庫_windows10安裝mysql-8.0.13(zip安裝)~Python安裝mysql

windows10安裝mysql-8.0.13(zip安裝)安裝環境說明系統版本&#xff1a;windows10mysql版本&#xff1a;mysql-8.0.13-winx64.zip下載地址&#xff1a;http://mirrors.163.com/mysql/Downloads/MySQL-8.0/mysql-8.0.13-winx64.zip解壓安裝包解壓路徑&#xff1a;D:\develop\soft…

centos 下使用sublime

CentOS 之 Sublime text3 安裝及配置&#xff08;不支持中文輸入&#xff09; sublime text 的界面友好&#xff0c;自動補全功能也不錯。 &#xff08;本來用vimphp_function.txt的形式進行補全的&#xff0c;但是配置后的補全不太滿意&#xff0c;放棄了。 具體參見&#xff…

20121108團隊博客(蘇若)

PS&#xff1a;這本是屬于昨晚的帖子&#xff0c;對不住忠仔。現在補上。 忠仔&#xff0c;終于交給了我一個實實在在的任務&#xff0c;很是欣喜&#xff0c;也很是忐忑&#xff0c;生怕自己不能及時完成任務。 好了&#xff0c;廢話不多說&#xff0c;步入正題。 接下任務【畫…

python 倒排索引 性能_python 實現倒排索引的方法

代碼如下&#xff1a;#encoding:utf-8fin open(1.txt, r)建立正向索引:“文檔1”的ID > 單詞1&#xff1a;出現位置列表&#xff1b;單詞2&#xff1a;出現位置列表&#xff1b;…………“文檔2”的ID > 此文檔出現的關鍵詞列表。forward_index {}for line in fin:line…

pythonnet下載_Python for .NET

Python for .NET 是一個可以讓 Python 程序員近乎無縫的集成 .NET 通用語言環境 CLR 和以及為 .NET 開發者提供一個強大的應用腳本工具。通過這個項目你可在 .NET 中完全使用 Python 來編寫整個應用&#xff0c;使用 .NET 服務和組件。這個包并沒有用 CLR 語言實現一個 Python&…

webService詳解

什么是webService WebService&#xff0c;顧名思義就是基于Web的服務。它使用Web(HTTP)方式&#xff0c;接收和響應外部系統的某種請求。從而實現遠程調用. 1:從WebService的工作模式上理解的話&#xff0c;它跟普通的Web程序&#xff08;比如ASP、JSP等&#xff09;并沒有本…

讀《有人負責,才有質量:寫給在集市中迷失的一代》總結與感想

在大伙都在吹捧“市集”開發軟件的方式的大浪潮下&#xff0c;作者看到了其中的不當之處&#xff0c;發現其中有許多的問題&#xff0c;因此寫下這篇文章給予吹捧“市集”的人一個提醒&#xff0c;甚至警告。 在該文章里&#xff0c;作者認為“市集”里的“農民”不可能建造出和…

php 判斷是否文件,利用PHP判斷文件是否為圖片的方法總結

前言在網頁設計中&#xff0c;如果需要圖片&#xff0c;我們通常拿到的是一個圖片的文件名。僅僅通過文件名是無法判斷該文件是否是一個圖片文件的。或許有的人以為通過后綴名就可以判斷&#xff0c;別忘了文件的后綴名是可以隨便改動的。更何況&#xff0c;在 Linux 系統下是不…

textedit怎么插入數據_還在手動插入Excel交叉空白行?這個小技巧10秒搞定

導讀&#xff1a;前幾天有同學在后臺提問&#xff0c;怎么快速在Excel中隔行插入一行或者多行空白行&#xff0c;其實在早期我們分享的小視頻中有利用過類似的小技巧來制作工資條&#xff0c;今天我們用它來插入空白行。文/ 芒種學院指北針Hello&#xff0c;大家好&#xff0c;…

python制作安裝包(setup.py)

1.制作setup.py from distutils.core import setupsetup(nameMyblog,version1.0,descriptionMy Blog Distribution Utilities,authorlujianxing,author_emaillujianxinglujianxing.com,urlhttp://blog.lujianxing.com,py_modules[foo] ) py_modules 定義 需要打包的模塊名 2.創…

[Ruby]$: 是什么意思?

ruby comes with a set of predefined variables$: default search path (array of paths)其他Ruby特殊變量&#xff1a; $! 最近一次的錯誤信息 $ 錯誤產生的位置 $_ gets最近讀的字符串 $. 解釋器最近讀的行數(line number) $& 最近一次與正則表達式匹配的字符串 $~ 作為…

rocketmq 啟動_016【windows版Rocketmq】小白學習Rocketmq單機部署

以前都是聽說MQ&#xff0c;或者在別人搭建好的基礎上去使用&#xff0c;沒有自己動手搭建過&#xff0c;就沒有更深入去理解。現在機會來啦.啦啦.啦啦啦......引用自己的CSDN文章href"https://blog.csdn.net/chenzhong2010/article/details/106699590或點擊左下角“閱讀原…

WPF WebBrowser 加載 html ,出現安全警告, 運行 腳本和 activeX 控件,

對于你的問題&#xff0c;只需要在你的HTML首行添加如下代碼即可隱藏安全提示條&#xff1a; <!-- saved from url(0014)about:internet --> 還有一個可選方案是使用Winform的WebBrowser控件&#xff0c;不需要更改HTML代碼&#xff0c;也不會出現安全提示&#xff0c;需…

資料下載資源網站

腳本之家&#xff1a;www.jb51.net 轉載于:https://www.cnblogs.com/dreammyle/p/3850250.html