kafka直連方式消費多個topic

一個消費者組可以消費多個topic,以前寫過一篇一個消費者消費一個topic的,這次的是一個消費者組通過直連方式消費多個topic,做了小測試,結果是正確的,通過查看zookeeper的客戶端,zookeeper記錄了偏移量

package day04

/*
消費多個topic
*/
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import scala.collection.mutable.ListBuffer
import org.I0Itec.zkclient.ZkClient
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Duration, StreamingContext}

object OrderDemoYY1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("yy").setMaster("local[*]")
val ssc = new StreamingContext(conf,Duration(5000))
//消費3個topic
val topic1 = "wc"
val topic2 ="wc1"
val topic3 ="wc2"
//組名
val groupid ="GPMMVV"
//zookeeper地址
val zkQuorum = "hadoop01:2181,hadoop02:2181,hadoop03:2181"
//brokerList
val brokerList = "hadoop01:9092,hadoop02:9092,hadoop03:9092"
//把消費的分區放到Set集合中,可以在第一次讀取時作為參數傳入
val topics = Set(topic1,topic2,topic3)
//ListBuffer時有序的,按下標有序
val topicsList = ListBuffer[String](topic1,topic2,topic3)
//設置kafka的參數
val kafkaParams = Map(
"metadata.broker.list"->brokerList,
"groupid"->groupid,
"auto.offset.reset"->kafka.api.OffsetRequest.SmallestTimeString
//默認時從頭開始讀的
)

//new ListBuffer用來存放ZKGroupTopicDirs, 用來保存偏移量的地址
//因為有多個topic,對應的也就有多個ZKGroupTopicDirs
var zkGTList:ListBuffer[ZKGroupTopicDirs] =new ListBuffer[ZKGroupTopicDirs]()
//根據topicList 新建 ZKGroupTopicDirs 添加到zkGTList
for(tp <- topicsList){
val topicDirs = new ZKGroupTopicDirs(groupid,tp)
zkGTList += topicDirs
}
//新建zkClient,用來獲取偏移量和更新偏移量
val zkClient = new ZkClient(zkQuorum)
//新建一個InputDStream,要是var,因為有兩種情況,消費過? 沒有消費過? 根據情況賦值
var kafkaDStream :InputDStream[(String,String)] = null
//創建一個Map,(key,value)-》( 對應的時Topic和分區 ,偏移量)
var fromOffset = Map[TopicAndPartition,Long]()

//獲取每個topic是否被消費過
var childrens:ListBuffer[Int] =new ListBuffer[Int]()
var flag = false //有topic被消費過則為true
for (topicDir <- zkGTList){ //循環存放偏移量的
//通過zkClient.countChidren來獲取每個topic對應的分區中的偏移量ZKGroupTopicDirs的對象
val child: Int = zkClient.countChildren(topicDir.consumerOffsetDir)
childrens +www.mhylpt.com= child
if(child>0){
flag = true
}
}


if(flag){//消費過
for(z <- 0 until topics.size){ //根據topicsList的的下表獲取相應的child和ZKGroupTopicDirs
val child = childrens(z)
val gpDirs = zkGTList(z)
val topicn = topicsList(z)
for(i <- 0 until child)www.mcyllpt.com/{
//循環child, 根據使用zkClient.readData方法,u獲取topic的每個分區的偏移量
val offset = zkClient.readData[String](gpDirs.consumerOffsetDir+"/"+i)
val tp = new TopicAndPartition(www.michenggw.com/ topicn,i)
fromOffset += tp -> offset.toLong
}
}
//返回的而結果是 kafka的key,默認是null, value是kafka中的值
val messageHandler =www.gcyl159.com/?(mmd:MessageAndMetadata[String,String])=www.gcyl152.com>{
(mmd.key(),mmd.message())
}
//創建kafkaDStream
kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](
ssc,kafkaParams,fromOffset,messageHandler
)
}else{//以前沒有讀取過
kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](
ssc,kafkaParams,topics
)
}

/*val children1 = zkClient.countChildren(zKGroupTopicDirs1.consumerOffsetDir)
val children2 = zkClient.countChildren(zKGroupTopicDirs2.consumerOffsetDir)
if(children1>0 || children2>0){
if(children1>0){
for (i <- 0 until children1){
val offset = zkClient.readData[String](zKGroupTopicDirs1.consumerOffsetDir+"/"+i)
val tp = new TopicAndPartition(topic1,i)
fromOffset += tp ->offset.toLong
}
}
if(children2>0){
for (i <- 0 until children1){
val offset = zkClient.readData[String](zKGroupTopicDirs2.consumerOffsetDir+"/"+i)
val tp = new TopicAndPartition(topic2,i)
fromOffset += tp ->offset.toLong
}
}
val messageHandler =(mmd:MessageAndMetadata[String,String])=>{
(mmd.key(),mmd.message())
}
kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc,
kafkaParams,fromOffset,messageHandler)
}else{
kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
}*/


var offsetRanges = Array[OffsetRange]www.hjpt521.com() //用來記錄更新的每個topic的分區偏移量

kafkaDStream.foreachRDD(kafkaRDD=>{
//kafkaRDD是一個KafkaRDD,可以轉換成HasOffsetRanges對象,從而獲取offsetRanges
offsetRanges= kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges
kafkaRDD.foreach(println)www.365soke.com //打印

for(o <- offsetRanges){
val topicNN: String = o.topic //獲取topic
val offset: Long = o.untilOffset //獲取偏移量
val partition: Int = o.partition //獲取分區
val i = topicsList.indexOf(topicNN) //通過topicList查找topic的下標,找到與之對應的ZKGroupTopicDirs
val gpDir = zkGTList(i)
//通過ZkUtils更新偏移量
ZkUtils.updatePersistentPath(zkClient,gpDir.consumerOffsetDir+"/"+partition,offset.toString)
/*if(topicNN.equals(topic1)){
ZkUtils.updatePersistentPath(zkClient,zKGroupTopicDirs1.consumerOffsetDir+"/"+partition,offset.toString)
}else if(topicNN.equals(topic2)){
ZkUtils.updatePersistentPath(zkClient,zKGroupTopicDirs2.consumerOffsetDir+"/"+partition,offset.toString)
}*/
}
})

ssc.start()
ssc.awaitTermination(www.dfgjyl.cn)

可以通過zookeeper的客戶端,在/consumers中查看偏移量,
我的3個topic中,其中wc和wc1只有1個分區,可以通過下圖可看出wc1的0分區偏移量13

轉載于:https://www.cnblogs.com/qwangxiao/p/9971006.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/450682.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/450682.shtml
英文地址,請注明出處:http://en.pswp.cn/news/450682.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

100個經典的C語言算法

100個經典的C算法 C語言的學習要從基礎開始&#xff0c;這里是100個經典的算法 題目&#xff1a;古典問題&#xff1a;有一對兔子&#xff0c;從出生后第3個月起每個月都生一對兔子&#xff0c;小兔 子長到第三個月后每個月又生一對兔子&#xff0c;假如兔子都不死&#xff0c;…

MySQL常見面試題目詳解

文章目錄1. SQL1.1 介紹一下數據庫分頁1.2 介紹一下SQL中的聚合函數1.3 表跟表是怎么關聯的&#xff1f;1.4 說一說你對外連接的了解1.5 說一說數據庫的左連接和右連接1.6 SQL中怎么將行轉成列&#xff1f;1.7 談談你對SQL注入的理解1.8 將一張表的部分數據更新到另一張表&…

[轉]windows系統激活

原文鏈接主題&#xff1a;使用kms激活&#xff0c;可以直接使用命令來完成。 方法&#xff1a;在win10桌面狀態下&#xff0c;右擊windows徽標或按快捷鍵windowsx&#xff0c;點擊命令提示符&#xff08;管理員&#xff09; 用到的命令是slmgr&#xff0c;手動kms激活命令如下&…

jackson annotations注解詳解

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 官方WIKI&#xff1a;https://github.com/FasterXML/jackson-databind/wiki jackson 1.x和2.x版本的注解是放置在不同的包下的 1.x是在…

JS-for的衍生對象

在js中一般使用方法&#xff1a; 1.常規的for(var i0;i<length;i) 2.for-in:for(var item in list) 3.for of 描述&#xff1a;對應于一個對象的每個屬性&#xff0c;或一個數組的每個元素&#xff0c;執行一個或多個語句。 語法&#xff1a;for (variable in [object | ar…

浮點數在計算機中存儲方式

C語言和C#語言中&#xff0c;對于浮點類型的數據采用單精度類型&#xff08;float&#xff09;和雙精度類型(double)來存儲&#xff0c;float數據占用32bit,double數據占用64bit,我們在聲明一個變量float f 2.25f的時候&#xff0c;是如何分配內存的呢&#xff1f;如果胡亂分配…

操作系統面試題目詳解

文章目錄1.13 什么是協程&#xff1f;1.14 為什么協程比線程切換的開銷小&#xff1f;1.15 線程和進程的區別&#xff1f;1.16 進程切換為什么比線程更消耗資源&#xff1f;1.17 介紹一下進程之間的通信。1.18 介紹一下信號量。1.19 說說僵尸進程和孤兒進程。1.20 請介紹進程之…

(項目)在線教育平臺(六)

八、授課機構功能 1、模板繼承 如果幾個頁面的大體結構相同&#xff0c;可以使用繼承的方式來實現母版的重用性&#xff0c;也就是子版繼承母版的內容&#xff0c;既可以使用模板的內容&#xff0c;也可以重寫需要改變的地地方。 首先完成授課機構的頁面&#xff0c;通過頁面顯…

C語言 socket 編程學習

對于SOCKET在這里我不想究其歷史,我只想說其時它是一種進程通訊的方式,簡言之就是調用這個網絡庫的一些API函數就能實現分布在不同主機的相關進程之間的數據交換. SOCKET中首先我們要理解如下幾個定義概念: 一是IP地址:IP Address我想很容易理解,就是依照TCP/IP協議分配…

dependency 中的 classifier屬性

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 classifier元素用來幫助定義構件輸出的一些附屬構件。附屬構件與主構件對應&#xff0c;比如主構件是 kimi-app-2.0.0.jar 該項目可能還…

PHP超全局變量$_SERVER

$_SERVER 是一個包含了諸如頭信息(header)、路徑(path)、以及腳本位置(script locations)等等信息的數組。這個數組中的項目由 Web 服務器創建。不能保證每個服務器都提供全部項目&#xff1b;服務器可能會忽略一些&#xff0c;或者提供一些沒有在這里列舉出來的項目。 $_SERVE…

VC讀寫XML文件

1、安裝MSXML 4.0 SP2。在VC6中建立一個基于Dialog的工程。如圖&#xff1a; 在界面上放置3個編輯框、1個按鈕控件。其中屬性設置如下。 編輯框&#xff1a; IDCategoryVariable TypeVariable NameIDC_IDValueCStringm_strIdIDC_AUTHORValueCStringm_strAuthorIDC_TITLEValueCS…

XCode10 swift4.2 適配遇到的坑

以下是2018年10月23日更新 經過大約一個月的時間的適配&#xff0c;項目正式使用XCode10(以下簡稱為10 or XC10)大部分庫都升級為Swift4.2&#xff08;以下簡稱為 4.2 or S4.2&#xff09;&#xff0c;下面是適配過程中遇到的一些坑。 1. Swift4、Swift4.2混編 如果你對項目是小…

學生管理系統Java版

簡單的學生管理系統 主界面編寫&#xff1a; 1.用輸出語句完成主界面的編寫 2.用Scanner語句實現鍵盤的錄入 3.用swich語句完成操作的選擇 4.用循環完成再次回到主界面 代碼實現&#xff1a; while (true) {//1.用輸出語句完成主界面的編寫System.out.println("--------…

dubbo 配置文件詳解

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 一、dubbo常用配置 <dubbo:service/> 服務配置&#xff0c;用于暴露一個服務&#xff0c;定義服務的元信息&#xff0c;一個服務可…

ASP.NET Core 實戰:Linux 小白的 .NET Core 部署之路

一、前言 最近一段時間自己主要的學習計劃還是按照畢業后設定的計劃&#xff0c;自己一步步的搭建一個前后端分離的 ASP.NET Core 項目&#xff0c;目前也還在繼續學習 Vue 中&#xff0c;雖然中間斷了很長時間&#xff0c;好歹還是堅持下來了&#xff0c;嗯&#xff0c;看了看…

學以致用十三-----Centos7.2+python3+YouCompleteMe成功歷程

歷經幾天的摸索&#xff0c;趟過幾趟坑之后&#xff0c;終于完成YouCompleteMe的安裝配置。 今天同樣是個不能忘記的日子&#xff0c;國恥日&#xff0c;勿忘國恥。&#xff08;9.18&#xff09; 服務器安裝好&#xff0c;基本配置配置好后&#xff0c;開始安裝。 一、檢查服務…

VC畫圖用到的主要方法

1。鼠標落下&#xff0c;記錄鼠標的起始位置 void CMyEasyDrawView::OnLButtonDown(UINT nFlags, CPoint point) { // TODO: 在此添加消息處理程序代碼和/或調用默認值 //graph->m_nTypedlg-> m_bStartDraw true; m_PtPress m_PtLast point; CView::OnLButtonDown…

【最新版】Java學習路線(含B站口碑推薦視頻鏈接)

文章目錄關于如何自學一、計算機網絡二、數據結構與算法三、操作系統四、計算機組成原理五、編譯原理六、設計模式七、MySQL八、實操工具九、JAVA并發與JVM十、Redis十一、Linux十二、Java路線學習尚硅谷黑馬程序員動力節點狂神說十三、Java基礎十四、JavaWeb十五、框架十六、微…

記錄no static method cannot be reference

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 報錯如題&#xff1a; no static method cannot be reference 我一直以為是在靜態方法中調用了非靜態方法&#xff0c;實際上只是我在注…