java上傳kafka的方法_哪種方法是將所有數據從Kafka主題復制到接收器(文件或Hive表)的最佳方法?...

我正在使用Kafka Consumer API將所有數據從Kafka主題復制到Hive表 . 為此,我使用HDFS作為中間步驟 . 我使用唯一的組ID并將偏移重置為“最早”,以便從頭開始獲取所有數據,并在執行后忽略提交 . 然后我遍歷Kafka主題中的記錄,并將每條記錄保存到HDFS中的臨時文件中 . 然后我使用Spark從HDFS讀取數據,然后使用日期作為文件名將其保存到Parquet文件中 . 然后,我在Hive表中創建一個帶日期的分區,最后在Parquet中將文件作為分區加載到Hive中 .

正如您在下面的代碼中看到的,我使用了幾個中間步驟,這使得我的代碼遠非最佳 . 這是從Kafka主題復制所有數據的最佳推薦方法嗎?我做了一些研究,到目前為止,這是我設法開始工作的變通方法,但是,隨著記錄數量每天增加,我的執行時間達到了可容忍的極限(從2分鐘變為6分鐘到6分鐘)周) .

代碼在這里:

def start( lowerDate: String, upperDate: String )={

// Configurations for kafka consumer

val conf = ConfigFactory.parseResources("properties.conf")

val brokersip = conf.getString("enrichment.brokers.value")

val topics_in = conf.getString("enrichment.topics_in.value")

// Crea la sesion de Spark

val spark = SparkSession

.builder()

.master("yarn")

.appName("ParaTiUserXY")

.getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

import spark.implicits._

val properties = new Properties

properties.put("key.deserializer", classOf[StringDeserializer])

properties.put("value.deserializer", classOf[StringDeserializer])

properties.put("bootstrap.servers", brokersip)

properties.put("auto.offset.reset", "earliest")

properties.put("group.id", "ParaTiUserXYZZ12345")

//Schema para transformar los valores del topico de Kafka a JSON

val my_schema = new StructType()

.add("longitudCliente", StringType)

.add("latitudCliente", StringType)

.add("dni", StringType)

.add("alias", StringType)

.add("segmentoCliente", StringType)

.add("timestampCliente", StringType)

.add("dateCliente", StringType)

.add("timeCliente", StringType)

.add("tokenCliente", StringType)

.add("telefonoCliente", StringType)

val consumer = new KafkaConsumer[String, String](properties)

consumer.subscribe( util.Collections.singletonList("parati_rt_geoevents") )

val fs = {

val conf = new Configuration()

FileSystem.get(conf)

}

val temp_path:Path = new Path("hdfs:///tmp/s70956/tmpstgtopics")

if( fs.exists(temp_path)){

fs.delete(temp_path, true)

}

while(true)

{

val records=consumer.poll(100)

for (record

val data = record.value.toString

//println(data)

val dataos: FSDataOutputStream = fs.create(temp_path)

val bw: BufferedWriter = new BufferedWriter( new OutputStreamWriter(dataos, "UTF-8"))

bw.append(data)

bw.close

val data_schema = spark.read.schema(my_schema).json("hdfs:///tmp/s70956/tmpstgtopics")

val fechaCliente = data_schema.select("dateCliente").first.getString(0)

if( fechaCliente < upperDate && fechaCliente >= lowerDate){

data_schema.select("longitudCliente", "latitudCliente","dni", "alias",

"segmentoCliente", "timestampCliente", "dateCliente", "timeCliente",

"tokenCliente", "telefonoCliente")

.coalesce(1).write.mode(SaveMode.Append).parquet("/desa/landing/parati/xyuser/" + fechaCliente)

}

else if( fechaCliente < lowerDate){

//

}

else if( fechaCliente >= upperDate){

break;

}

}

}

consumer.close()

}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/542596.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/542596.shtml
英文地址,請注明出處:http://en.pswp.cn/news/542596.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

openstack nova-network 的小bug的排錯經歷

環境是 nova-network vmwareflatdhcp錯誤表現為 開出來的虛擬機有一定幾率獲取不到dhcp地址&#xff0c;手工賦予ip則正常&#xff0c;用flat模式注入的ip正常&#xff0c;下面是排錯過程1首先找網絡防火墻已經把 dnsmasq對應的端口已經打開抓包結果&#xff1a;可以看到虛擬機…

anaconda base環境_anaconda中安裝packages:pip還是conda install?

conda install我就不說了&#xff0c;這都不會別學了就。Using command:$ which -a pip, the terminal will return:This indicates two different pip path to install packages[1].在tf23環境中pip install在base環境中pip install在windows下powershell內&#xff0c;進入到…

Java ClassLoader setDefaultAssertionStatus()方法與示例

ClassLoader類setDefaultAssertionStatus()方法 (ClassLoader Class setDefaultAssertionStatus() method) setDefaultAssertionStatus() method is available in java.lang package. setDefaultAssertionStatus()方法在java.lang包中可用。 setDefaultAssertionStatus() metho…

【風馬一族_xml】xmlp之dtd1

什么是XML約束&#xff1f;在xml技術里&#xff0c;可以編寫一個文檔來約束一個xml文檔的寫法&#xff0c;這稱之為xml約束 2. 為什么要使用xml約束&#xff1f; 參看提示欄 3. xml約束的作用&#xff1f; 約束xml的寫法對xml進行校驗4. 常見的xml約束技術 xml dtdxml Schema…

java ssm框架 緩存_SSM框架之MyBatis3專題4:查詢緩存

查詢緩存的使用&#xff0c;主要是為了提高查詢訪問速度。將用戶對同一數據的重復查詢過程簡化&#xff0c;不再每次均從數據庫中查詢獲取結果數據&#xff0c;從而提高訪問速度。MyBatis的查詢緩存機制&#xff0c;根據緩存區的作用域(聲明周期)可劃分為兩種&#xff1a;一級查…

matplotlib畫圖_漂亮,超詳細的matplotlib畫圖基礎

來自 | 逐夢erhttps://zhumenger.blog.csdn.net/article/details/106530281本文僅作技術交流&#xff0c;如有侵權&#xff0c;請聯系后臺刪除。數據可視化非常重要&#xff0c;因為錯誤或不充分的數據表示方法可能會毀掉原本很出色的數據分析工作。matplotlib 庫是專門用于開發…

c# 2維數組 取一維_C#| 不同類型的一維數組聲明

c# 2維數組 取一維In the below example, we are declaring an integer array (one dimensional) with following styles: 在下面的示例中&#xff0c;我們聲明具有以下樣式的整數數組(一維) &#xff1a; 1) One dimensional Array declaration with initialization (without…

Java編程經典10道_Java經典編程題50道之十二

企業發放的獎金根據利潤提成&#xff1a;利潤(I)低于或等于10萬元時&#xff0c;獎金可提10%&#xff1b;利潤高于10萬元&#xff0c;低于20萬元時&#xff0c;低于10萬元的部分按10%提成&#xff0c; 高于10萬元的部分 &#xff0c;可提成7.5%&#xff1b;20萬到40萬之間時&am…

RHEL7 單獨安裝圖形 X11

RHEL7 默認是最小化安裝&#xff08;Minimal Install&#xff09;&#xff0c;沒有圖形界面&#xff0c; 我們應該選擇Server with GUI。若已錯過此步驟&#xff0c;我們采用以下方式補充安裝GUI界面。 先配置yum源可以參考我的這篇文章http://blog.itpub.net/27771627/viewspa…

android recycleview長按多選_UI設計中Android和IOS設計差異總結

由于設計師、產品經理使用的移動設備大部分是iPhone&#xff0c;所以在做設計時&#xff0c;容易忽略Android和iOS的差異&#xff0c;按照iOS的規范進行設計&#xff0c;兩端只做一套。只做一套的會存在兩個問題&#xff1a;1、安卓用戶的使用習慣不太適應iOS的設計&#xff0c…

Kotlin程序用于打印JVM版本的Kotlin(打印Java屬性)

Here, we will create a Kotlin program to print Kotlin, JVM version (printing Java properties). As Kotlin can be seen as an upgrade of Java, so we will get all versions of java (JVM) using Kotlin also. 在這里&#xff0c;我們將創建一個Kotlin程序以打印JVM版本…

自定義動畫屬性java_創建酷炫動畫效果的10個JavaScript庫

原標題&#xff1a;創建酷炫動畫效果的10個JavaScript庫1) Dynamics.jsDynamics.js是設計基于物理規律的動畫的重要Java庫。它可以賦予生命給所有包含CSS 和SVG屬性的DOM(文本對象模型)元素&#xff0c;換句話說&#xff0c;Dynamics.js適用于所有Java對象以及一系列其它的元素…

php xlsx里插入圖片_常見的 PHP 面試題和答案分享

如何直接將輸出顯示給瀏覽器&#xff1f;將輸出直接顯示給瀏覽器&#xff0c;我們必須使用特殊標記 <&#xff1f;and&#xff1f;>。PHP 是否支持多重繼承&#xff1f;PHP 只支持單繼承。PHP 的類使用關鍵字 extends 繼承另一個類獲取圖片屬性&#xff08;size, width, …

java調用構造函數中某一個值_Java如何在枚舉的構造函數中調用另一個枚舉值

Java中的枚舉(enum)是一種存儲一組常量值的數據類型。您可以使用枚舉來存儲固定值&#xff0c;例如一周中的天&#xff0c;一年中的月等。您可以使用關鍵字 enum定義枚舉&#xff0c;后跟枚舉的名稱為-enum Days {SUNDAY, MONDAY, TUESDAY, WEDNESDAY, THURSDAY, FRIDAY, SATUR…

python 示例_Python日歷類| yeardatescalendar()方法與示例

python 示例Python Calendar.yeardatescalendar()方法 (Python Calendar.yeardatescalendar() Method) Calendar.yeardatescalendar() method is an inbuilt method of the Calendar class of calendar module in Python. It uses an instance of this class and returns a lis…

js:插入節點appendChild insertBefore使用方法

首先 從定義來理解 這兩個方法&#xff1a; appendChild() 方法&#xff1a;可向節點的子節點列表的末尾添加新的子節點。語法&#xff1a;appendChild(newchild) insertBefore() 方法&#xff1a;可在已有的子節點前插入一個新的子節點。語法 &#xff1a;insertBefore(newchi…

pandas concat_pandas-數據合并-concat(最全參數解釋,含代碼和實例)

pandas中的concat的功能&#xff1a;假設你現在需要將多個數據合并&#xff0c;前提是&#xff1a;這幾個文件列名都一致&#xff0c;也就是說這幾個文件格式完全一樣&#xff0c;只是數據不太一樣&#xff0c;類似于合并多個文件這種&#xff0c;實際數據分析中也會遇到這種情…

java中的de是什么_【轉】java中main函數解析

源地址&#xff1a;http://www.cnblogs.com/xwdreamer/archive/2012/04/09/2438845.html從寫java至今&#xff0c;寫的最多的可能就是主函數public static void main(String[] args) {}但是以前一直都沒有問自己&#xff0c;為什么要這么寫&#xff0c;因為在c語言中就沒有這樣…

JAVA多線程(一)線程安全問題產生的原因

JAVA線程內存與主存間映射示意圖Java內存模型中規定了所有的變量都存儲在主內存中&#xff0c;每條線程還有自己的工作內存&#xff0c;線程的工作內存中保存了該線程使用的變量到主內存副本拷貝&#xff0c;線程對變量的所有操作&#xff08;讀取、賦值&#xff09;都必須在工…