spark-stream 訪問 Redis

最近在spark-stream上寫了一些流計算處理程序,程序架構如下

clipboard.png

程序運行在Spark-stream上,我的目標是kafka、Redis的參數都支持在啟動時指定。

在寫代碼時參考了這篇文章 https://www.iteblog.com/archi...,該文講的比較清楚,但是有兩個問題:

  1. 用scala實現的

  2. Redis服務器的地址是寫死的,我的程序要挪個位置,要重新改代碼編譯。

當時倒騰了一些時間,現在寫出來和大家分享,提高后來者的效率。

clipboard.png

如上圖Spark是分布式引擎,Driver中創建的Redis Pool,在Worker上又得重新創建,參考文章中是定義一個Redis連接池管理類,Redis Pool是類的靜態變量,類加載時由JVM自動創建。這個和我的預期有差距。

在Driver中創建Redis管理對象,然后將該對象廣播,然后在Worker上獲取該廣播對象,從而實現參數可變,但是Redis管理對象在每個Worker上又只實例化了一次。

Driver

Driver 指定序列化方式,Spark支持兩種序列化方式,Java 和 Kryo,Kryo更高效。

資料上說Kryo方式需要注冊類,但是我沒有注冊也能成功運行。

public static void main(String[] args) {if (args.length < 3) {System.err.println("Usage: kafka_spark_redis <brokers> <topics> <redisServer>\n" +"  <brokers> Kafka broker列表\n" +"  <topics> 要消費的topic列表\n" +" <redisServer> redis 服務器地址 \n\n");System.exit(1);}/* 解析參數 */String brokers = args[0];String topics = args[1];String redisServer = args[2];// 創建stream context,兩秒鐘的數據算一批SparkConf sparkConf = new SparkConf().setAppName("kafka_spark_redis");
//        sparkConf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer");//java的序列號速度沒有Kryo速度快sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
//        sparkConf.set("spark.kryo.registrator", "MyRegistrator");JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));JavaSparkContext sc = jssc.sparkContext();HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));HashMap<String, String> kafkaParams = new HashMap<String, String>();kafkaParams.put("metadata.broker.list", brokers);kafkaParams.put("group.id","kakou-test");//Redis連接池管理類RedisClient redisClient = new RedisClient(redisServer);//創建redis連接池管理類//廣播Reids連接池管理對象final Broadcast<RedisClient> broadcastRedis = sc.broadcast(redisClient);// 創建流處理對象JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc,String.class,               /* kafka key class */String.class,               /* kafka value class */StringDecoder.class,        /* key 解碼類 */StringDecoder.class,        /* value 解碼類 */kafkaParams,                /* kafka 參數,如設置kafka broker */topicsSet                   /* 待消費的topic名稱 */);// 將行分拆為單詞JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {//@Override// kafka傳來key-value對public String call(Tuple2<String, String> tuple2) {// 取value值return tuple2._2();}});/* 大量省略 */........}

RedisClient

RedisClient 是自己實現的類,在類中重載write/read這兩個序列化和反序列化函數,需要注意的是如果是Java Serializer 需要實現其它的接口。

在Driver廣播時會觸發調用write序列化函數。

public class RedisClient implements KryoSerializable {public static JedisPool jedisPool;public String host;public RedisClient(){Runtime.getRuntime().addShutdownHook(new CleanWorkThread());}public RedisClient(String host){this.host=host;Runtime.getRuntime().addShutdownHook(new CleanWorkThread());jedisPool = new JedisPool(new GenericObjectPoolConfig(), host);}static class CleanWorkThread extends Thread{@Overridepublic void run() {System.out.println("Destroy jedis pool");if (null != jedisPool){jedisPool.destroy();jedisPool = null;}}}public Jedis getResource(){return jedisPool.getResource();}public void returnResource(Jedis jedis){jedisPool.returnResource(jedis);}public void write(Kryo kryo, Output output) {kryo.writeObject(output, host);}public void read(Kryo kryo, Input input) {host=kryo.readObject(input, String.class);this.jedisPool =new JedisPool(new GenericObjectPoolConfig(), host) ;}
}

Worker

在foreachRDD中獲取廣播變量,由廣播變量觸發先調用RedisClient的無參反序列化函數,然后再調用反序列化函數,我們的做法是在反序列化函數中創建Redis Pool。

        //標準輸出,對車輛的車牌和黑名單進行匹配,對與匹配成功的,保存到redis上。paircar.foreachRDD(new Function2<JavaRDD<HashMap<String, String>>, Time, Void>() {public Void call(JavaRDD<HashMap<String, String>> rdd, Time time) throws Exception {Date now=new Date();rdd.foreachPartition(new VoidFunction<Iterator<HashMap<String, String>>>() {public void call(Iterator<HashMap<String, String>> it) throws Exception {String tmp1;String tmp2;Date now=new Date();RedisClient redisClient=broadcastRedis.getValue();Jedis jedis=redisClient.getResource();......redisClient.returnResource(jedis);}});

結語

Spark對分布式計算做了封裝,但很多場景下還是要了解它的工作機制,很多問題和性能優化都和Spark的工作機制緊密相關。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/397871.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/397871.shtml
英文地址,請注明出處:http://en.pswp.cn/news/397871.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

c語言打印空心等腰梯形樂學,C語言做激光發射

/*C語言做激光發射器游戲&#xff0c;按上下左右箭頭鍵移動發射器&#xff0c;按空格鍵發射激光&#xff0c;按Esc鍵結束游戲*/#include #include #include void main(){int i,j;//定義循環變量int x15,y10;//定義橫坐標與縱坐標初始值char in;int fire0;int kill0;int nx10;//…

東軟睿云用戶認證_【硬件資訊】塵埃落定!11代酷睿規格曝光!i7、i9難分差距,退回8核16線程!...

新聞①&#xff1a;Intel第11代酷睿處理器規格曝光&#xff0c;旗艦i9-11900K與i7-11700K同為8核16線程Intel的代號為Rocket Lake-S的第11代酷睿臺式機CPU陣容將于明年推出&#xff0c;其中四個型號的規格現已曝光。擁有8個Cypress Cove核心、5.3GHz、PL2功耗限制250W的酷睿i9-…

環上的游戲

環上的游戲&#xff08;cycle&#xff09;有一個取數的游戲。初始時&#xff0c;給出一個環&#xff0c;環上的每條邊上都有一個非負整數。這些整數中至少有一個0。然后&#xff0c;將一枚硬幣放在環上的一個節點上。兩個玩家就是以這個放硬幣的節點為起點開始這個游戲&#xf…

python基礎課程_2學習筆記3:圖形用戶界面

圖形用戶界面 豐富的平臺 寫作Python GUI程序前&#xff0c;須要決定使用哪個GUI平臺。 簡單來說&#xff0c;平臺是圖形組件的一個特定集合。能夠通過叫做GUI工具包的給定Python模塊進行訪問。工具包 描寫敘述 Tkinter 使用Tk平臺。非常easy得到。半標準。 wxpython 基于…

vim編輯器之按鍵說明

viim編輯器文本:純文本,ASCII test;文本編輯種類:行編輯器:sed全屏編輯器:nano,vi其他編輯器:gedit 一個簡單的圖形編輯器gvim 一個vim編輯器的圖形版本 一.打開文件幾種方法命令格式:vim [options] [files]常用選項:#:打開文件后,直接讓光標處于第#行的行首/PATTERN:打開…

idea ssm打war包_IDEA下從零開始搭建SpringBoot工程

SpringBoot的具體介紹可以參看其他網上介紹&#xff0c;這里就不多說了&#xff0c;就這幾天的學習&#xff0c;個人理解&#xff0c;簡而言之&#xff1a;如果想學習Java工程化、高性能及分布式、深入淺出。微服務、Spring&#xff0c;MyBatis&#xff0c;Netty源碼分析的朋友…

c語言一維數組轉化為二維矩陣,js將一維數組轉化為二維數組

遇到的問題&#xff1a;后端返回的是一組一維數組&#xff0c;但是需要展示的格式是二維數組&#xff0c;常見的場景舉例&#xff1a;后臺返回10個長度的數組&#xff0c;需要分成3個一組展示在banner上。例&#xff1a;[1,2,3,4,5,6,7,8,9,10] > [[1,2,3], [4,5,6], [7,8…

nano使用說明

Main nano help text The nano editor is designed to emulate 仿真、模擬 the functionality and ease-of-use of the UW Pico text editor. There are four main sections of the editor. The top line shows the program version, the current filename being edited, and w…

dataframe 眾數的方法_學習數據分析數據方法論 [描述性統計分析]

數理統計&#xff1a;數理統計是以概率論為基礎&#xff0c;研究社會和自然界中大量隨機現象數量變化基本規律的一種方法。分為&#xff1a;描述統計(描述統計的任務是搜集資料&#xff0c;進行整理、分組&#xff0c;編制次數分配表&#xff0c;繪制次數分配曲線&#xff0c;計…

c語言高級語言期中測試答案,上海理工大學C語言2011期中試題和答案

C語言2010/2011學年 第二學期 期中測試高級語言程序設計(C)試卷 A □BA1. 輸入一行字符&#xff0c;統計其中的英文字母個數。#include void main(){ char ch;int n0;printf(“Input a string:\n”);while(1){ chgetchar();if (ch \n ) break;if (ch> a && ch< z…

前端基礎進階(十):面向對象實戰之封裝拖拽對象

https://segmentfault.com/a/1190000012646488 https://yangbo5207.github.io/wutongluo/ 說明&#xff1a;此處只是記錄閱讀前端基礎進階的理解和總結&#xff0c;如有需要請閱讀上面的鏈接 1.如何讓元素動起來 要讓元素動起來就要修改元素的top、left 、translate 屬性。因為…

iOS - LocalCache 本地數據緩存

1、自定義方式本地數據緩存 1.1 自定義緩存 1 沙盒路徑下的 Library/Caches 用來存放緩存文件&#xff0c;保存從網絡下載的請求數據&#xff0c;后續仍然需要繼續使用的文件&#xff0c;例如網絡下載的離線數據&#xff0c;圖片&#xff0c;視頻文件等。該目錄中的文件系統不會…

如何構建ASP.NET MVC4JQueryAJaxJSon示例

背景&#xff1a; 博客中將構建一個小示例&#xff0c;用于演示在ASP.NET MVC4項目中&#xff0c;如何使用JQuery Ajax。 直接查看JSon部分 步驟&#xff1a; 1&#xff0c;添加控制器(HomeController)和動作方法(Index),并為Index動作方法添加視圖(Index.cshtml),視圖中HTML如…

echarts 有引導線和內部文字_點、線、面構圖的異同以及相互轉化

點、線、面構圖既有相似性&#xff0c;又有差異性。相似的是都有對齊、強調、群組、重復、突出層次的作用&#xff0c;不同的是點的特點是聚焦、線的特點是運動和方向性&#xff0c;面的特性是體量感、穩定性。點的情感最弱&#xff0c;線、面的情感要比點豐富。一、點、線、面…

c語言上機報告之水仙花數,C語言上機報告之水仙花數..doc

C語言上機報告之水仙花數.C語言程序設計上機報告課題名稱&#xff1a;水仙花數的算法院 (系)&#xff1a;工程學院專業班 級&#xff1a; 052126學生姓名&#xff1a; 喻培學 號&#xff1a; 20121004040指導教師&#xff1a; 熊慕舟2013年11月24日C語言上機報告之水仙花數上機…

《Python黑帽子:黑客與滲透測試編程之道》 Web攻擊

Web的套接字函數庫&#xff1a;urllib2 一開始以urllib2.py命名腳本&#xff0c;在Sublime Text中運行會出錯&#xff0c;糾錯后發現是重名了&#xff0c;改過來就好&#xff1a; #!/usr/bin/python #codingutf-8 import urllib2url "http://www.baidu.com"headers …

vCenter Converter Standalone使用文檔

文檔目的能夠使用vCenter Converter Standalone 將物理機操作系統遷移到虛擬機操作系統基礎知識vCenter Converter Standalone 能將物理機上的操作系統、VMware虛擬機上的操作系統或者Hype-V 上的虛擬機操作系統遷移到VMware上。系統環境操作系統&#xff1a;Windows Server 20…

1093芯片做正弦波逆變器_正弦波逆變器中的SPWM調制(鐘任生)

歡迎加入技術交流QQ群(2000人)&#xff1a;電力電子技術與新能源 905723370高可靠新能源行業頂尖自媒體在這里有電力電子、新能源干貨、行業發展趨勢分析、最新產品介紹、眾多技術達人與您分享經驗&#xff0c;歡迎關注我們&#xff0c;搜索微信公眾號&#xff1a;電力電子技術…

android 手機短信恢復,安卓手機短信刪除了怎么恢復?簡單恢復的方法

原標題&#xff1a;安卓手機短信刪除了怎么恢復&#xff1f;簡單恢復的方法安卓手機短信刪除了怎么恢復&#xff1f;手機短信是生活中不經常使用到&#xff0c;但是依然是十分重要的存在&#xff0c;因為我們現在比較喜歡用社交軟件與別人進行交流&#xff0c;但是在一些相對重…

Oracle-查看oracle是否有表被鎖

問題現象&#xff1a; 查看oracle是否有表被鎖 解決方法&#xff1a; select sid,serial#,program,terminal,username,b.object_id,c.object_name from v$session a, v$locked_object b, dba_objects c where a.sid b.session_id and b.object_id c.object_id;轉載于:http…