spark3 streaming 讀kafka寫es

1. 代碼

package data_import
import org.apache.spark.sql.{DataFrame, Row, SparkSession, SaveMode}
import org.apache.spark.sql.types.{ArrayType, DoubleType, LongType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.commons.lang3.exception.ExceptionUtils
import java.time.Instant
import java.time.LocalDateTime
import java.time.ZoneId
import org.apache.spark._
import org.apache.spark.streaming._
import alarm.Alarm
import org.elasticsearch.spark.sql._/** 用戶特征記錄*/
object KafkaImport {case class Parram(env: String, dst_table: String, kafka_address: String, kafka_topic: String, kafka_group_id: String, trigger_time: String)def main(args: Array[String]): Unit = {val param: Parram = utils.parseParam[Parram](args)println("args:" + param.toString())try {processTable(param)Alarm.alarm(env = param.env,level = Alarm.LevelInfo,content = "UserFeature Success")} catch {case e: Exception =>val msg = s"UserFeature handle failed,Error message:${e.getClass()}:${e.getMessage()}===>${ExceptionUtils.getStackTrace(e)}==>argsMap:${param.toString()}"println(msg)Alarm.alarm(env = param.env, level = Alarm.LevelWarning, content = msg)}}def processTable(param: Parram): Unit = {val conf = new SparkConf().setAppName("appName").setMaster("yarn")val ssc = new StreamingContext(conf, Seconds(param.trigger_time.toInt))val ss = SparkSession.builder.appName("KafkaImport").config("spark.sql.mergeSmallFiles.enabled", "true").config("spark.sql.mergeSmallFiles.threshold.avgSize", "true").config("spark.sql.mergeSmallFiles.maxSizePerTask", "true").config("es.net.http.auth.user", "elastic").config("es.net.http.auth.pass", "YX2021@greendog").getOrCreate()val kafkaParams = Map[String, Object]("bootstrap.servers" -> param.kafka_address,"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> param.kafka_group_id,"auto.offset.reset" -> "earliest","enable.auto.commit" -> (false: java.lang.Boolean))val topics = Array(param.kafka_topic)val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))val schema: StructType = StructType(List(StructField("key", StringType, true),StructField("value", StringType, true)))stream.foreachRDD { rdd =>val data1 = rdd.map { record => Row(record.key, record.value) }val userData = ss.createDataFrame(data1, schema).withColumn("id", get_json_object(col("value"), "$.ctx.session_id").cast(StringType)).withColumn("app_id", get_json_object(col("value"), "$.ctx.app_id").cast(StringType)).withColumn("user_id", get_json_object(col("value"), "$.ctx.user_id").cast(StringType)).withColumn("session_id", get_json_object(col("value"), "$.ctx.session_id").cast(StringType)).withColumn("time", get_json_object(col("value"), "$.time").cast(LongType)).withColumn("datetime", getDayHourTime(col("time")).cast(TimestampType))userData.show()println("嘗試連接ES...")val esDF = ss.read.format("org.elasticsearch.spark.sql").option("es.nodes", "192.168.145.43").option("es.port", "9200").option("es.net.http.auth.user", "elastic").option("es.net.http.auth.pass", "YX2021@greendog").load("test_saylo_user_feature_30033")println(s"索引中存在 ${esDF.count()} 條記錄")userData.select(col("id"), col("session_id"), col("value"), col("app_id"), col("datetime")).filter(col("id").isNotNull && col("id") =!= "").write.option("es.nodes", "192.168.145.43").option("es.nodes.wan.only", "true").option("es.port", "9200").option("es.mapping.id", "id") // 替換為您的實際ID字段名// .option("es.mapping.type", "user_id:keyword") // 替換為您的實際ID字段名.mode("append").option("es.write.operation", "upsert").format("org.elasticsearch.spark.sql").option("es.net.http.auth.user", "elastic").option("es.net.http.auth.pass", "YX2021@greendog").save("test_saylo_user_feature_30033")val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}ssc.start()// 等待停止ssc.awaitTermination()}private val getDayHourTime = udf((timestamp: Long) => {utils.getDayTime(timestamp)})
}

2. 依賴

<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xverse.saylo.rec</groupId><artifactId>saylo_rec_data_offline_v2</artifactId><version>1.0.0</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><java.version>1.8</java.version><scala.version>2.12.10</scala.version><spark.version>3.2.2</spark.version><jackson.version>2.14.0</jackson.version><shade.jar.name>${project.artifactId}-${project.version}-jar-with-dependencies.jar</shade.jar.name></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-spark-30_2.12</artifactId><version>7.12.0</version></dependency><dependency><groupId>commons-httpclient</groupId><artifactId>commons-httpclient</artifactId><version>3.1</version>  <!-- 或者你需要的版本 --></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.10.1</version>    <!-- 使用最新版本 --></dependency><dependency><groupId>com.qcloud</groupId><artifactId>cos_api</artifactId><version>5.6.227</version></dependency><dependency><groupId>com.typesafe.play</groupId><artifactId>play-json_2.12</artifactId><version>2.9.2</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-annotations</artifactId><version>${jackson.version}</version>    <!-- Add Jackson dependencies --></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>${jackson.version}</version></dependency><!--    <dependency>--><!--      <groupId>org.apache.hadoop</groupId>--><!--      <artifactId>hadoop-client</artifactId>--><!--      <version>${spark.version}</version>--><!--    </dependency>--><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>3.6.1</version></dependency><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java-util</artifactId><version>3.6.1</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-netty</artifactId><version>1.64.0</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-protobuf</artifactId><version>1.64.0</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-stub</artifactId><version>1.64.0</version></dependency><!-- <dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.6</version></dependency> --><dependency><groupId>org.json4s</groupId><artifactId>json4s-core_2.12</artifactId><version>3.6.6</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.6.3</version></dependency><dependency><groupId>com.tencentcloudapi</groupId><artifactId>tencentcloud-sdk-java-cls</artifactId><version>3.1.1174</version>    <!-- 使用最新版本 --></dependency><dependency><groupId>com.tencentcloudapi.cls</groupId><artifactId>tencentcloud-cls-sdk-java</artifactId><version>1.0.15</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency></dependencies><build><extensions><extension><groupId>kr.motd.maven</groupId><artifactId>os-maven-plugin</artifactId><version>1.6.0</version></extension></extensions><plugins><!--      <plugin>--><!--        <groupId>org.scala-tools</groupId>--><!--        <artifactId>maven-scala-plugin</artifactId>--><!--        <version>2.9.1</version>--><!--        <executions>--><!--          <execution>--><!--            <goals>--><!--              <goal>compile</goal>--><!--              <goal>testCompile</goal>--><!--            </goals>--><!--          </execution>--><!--        </executions>--><!--      </plugin>--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target><encoding>utf-8</encoding></configuration></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.0</version><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>compile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.5.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><finalName>${shade.jar.name}</finalName><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><relocations><!-- <relocation><pattern>org.apache.commons</pattern><shadedPattern>com.acme.shaded.apachecommons</shadedPattern></relocation> --><relocation><pattern>com.google.protobuf</pattern><shadedPattern>my.project.shaded.protobuf</shadedPattern></relocation></relocations><createDependencyReducedPom>false</createDependencyReducedPom><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>my.Application</mainClass><manifestEntries><Implementation-Version>${version}</Implementation-Version><Main-Class>my.Application</Main-Class></manifestEntries></transformer><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /></transformers></configuration></execution></executions></plugin></plugins>
</build>
</project>

3. 注意事項

  1. ElasticSearch 7.x 默認不在支持指定索引類型
    如果es版本是7.+
    需注意
    save應寫為
    .save(“test_saylo_user_feature_30033”)
    而不是
    .save(“test_saylo_user_feature_30033/docs”)
    否則會報類型轉換錯誤。例如
    [user_id] cannot be changed from type [keyword] to [text]

  2. 依賴沖突
    找不到類

    Caused by: java.lang.ClassNotFoundException: com.acme.shaded.apachecommons.httpclient.HttpConnectionManager
    

在pom文件中手動加入,具體參加上面的pom文件

    <dependency><groupId>commons-httpclient</groupId><artifactId>commons-httpclient</artifactId><version>3.1</version>   </dependency>

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/90719.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/90719.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/90719.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【跟著PMP學習項目管理】每日一練 - 3

1、你是一個建筑項目的項目經理。電工已經開始鋪設路線,此時客戶帶著一個變更請求來找你。他需要增加插座,你認為這會增加相關工作的成本。你要做的第一件事? A、拒絕做出變更,因為這會增加項目的成本并超出預算 B、參考項目管理計劃,查看是否應當處理這個變更 C、查閱…

CentOS 安裝 JDK+ NGINX+ Tomcat + Redis + MySQL搭建項目環境

目錄第一步&#xff1a;安裝JDK 1.8方法 1&#xff1a;安裝 Oracle JDK 1.8方法 2&#xff1a;安裝 OpenJDK 1.8第二步&#xff1a;使用yum安裝NGINX第三步&#xff1a;安裝Tomcat第四步&#xff1a;安裝Redis第五步&#xff1a;安裝MySQL第六步&#xff1a;MySQL版本兼容性問題…

如何設計一個登錄管理系統:單點登錄系統架構設計

關鍵詞&#xff1a;如何設計一個登錄管理系統、登錄系統架構、用戶認證、系統安全設計 &#x1f4cb; 目錄 開篇&#xff1a;為什么登錄系統這么重要&#xff1f;整體架構設計核心功能模塊安全設計要點技術實現細節性能優化策略總結與展望 開篇&#xff1a;為什么登錄系統這么…

論跡不論心

2025年7月11日&#xff0c;16~26℃&#xff0c;陰 緊急不緊急重要 備考ing 備課不重要 遇見&#xff1a;免費人格測試 | 16Personalities&#xff0c;下面是我的結果 INFJ分析與優化建議 User: Anonymous (隱藏) Created: 2025/7/11 23:38 Updated: 2025/7/11 23:43 Exported:…

【面板數據】省級泰爾指數及城鄉收入差距測算(1990-2024年)

對中國各地區1990-2024年的泰爾指數、城鄉收入差距進行測算。本文參考龍海明等&#xff08;2015&#xff09;&#xff0c;程名望、張家平&#xff08;2019&#xff09;的做法&#xff0c;采用泰爾指數測算城鄉收入差距。參考陳斌開、林毅夫&#xff08;2013&#xff09;的做法&…

http get和http post的區別

HTTP GET 和 HTTP POST 是兩種最常用的 HTTP 請求方法&#xff0c;它們在用途、數據傳輸方式、安全性等方面存在顯著差異。以下是它們的主要區別&#xff1a;1. 用途GET&#xff1a;主要用于請求從服務器獲取資源&#xff0c;比如獲取網頁內容、查詢數據庫等。GET 請求不應該用…

I2C集成電路總線

&#xff08;摘要&#xff1a;空閑時&#xff0c;時鐘線數據線都是高電平&#xff0c;主機發送數據前&#xff0c;要在時鐘為高電平時&#xff0c;把數據線從高電平拉低&#xff0c;數據發送采取高位先行&#xff0c;時鐘線低電平時可以修改數據線&#xff0c;時鐘線高電平時要…

為了安全應該使用非root用戶啟動nginx

nginx基線安全&#xff0c;修復步驟。主要是由于使用了root用戶啟動nginx。為了安全應該使用非root用戶啟動nginx一、檢查項和問題檢查項分類檢查項名稱身份鑒別檢查是否配置Nginx賬號鎖定策略。服務配置檢查Nginx進程啟動賬號。服務配置Nginx后端服務指定的Header隱藏狀態服務…

論文解析篇 | YOLOv12:以注意力機制為核心的實時目標檢測算法

前言&#xff1a;Hello大家好&#xff0c;我是小哥談。長期以來&#xff0c;改進YOLO框架的網絡架構一直至關重要&#xff0c;但盡管注意力機制在建模能力方面已被證明具有優越性&#xff0c;相關改進仍主要集中在基于卷積神經網絡&#xff08;CNN&#xff09;的方法上。這是因…

學習C++、QT---20(C++的常用的4種信號與槽、自定義信號與槽的講解)

每日一言相信自己&#xff0c;你比想象中更接近成功&#xff0c;繼續勇往直前吧&#xff01;那么我們開始用這4種方法進行信號與槽的通信第一種信號與槽的綁定方式我們將按鍵右鍵后轉到槽會自動跳轉到這個widget.h文件里面并自動生成了定義&#xff0c;我們要記住我們這個按鈕叫…

Anolis OS 23 架構支持家族新成員:Anolis OS 23.3 版本及 RISC-V 預覽版發布

自 Anolis OS 23 版本發布之始&#xff0c;龍蜥社區就一直致力于探索同源異構的發行版能力&#xff0c;從 Anolis OS 23.1 版本支持龍芯架構同源異構開始&#xff0c;社區就在持續不斷地尋找更多的異構可能性。 RISC-V 作為開放、模塊化、可擴展的指令集架構&#xff0c;正成為…

4萬億英偉達,憑什么?

CUDA正是英偉達所有神話的起點。它不是一個產品&#xff0c;而是一個生態系統。當越多的開發者使用CUDA&#xff0c;就會催生越多的基于CUDA的應用程序和框架&#xff1b;這些殺手級應用又會吸引更多的用戶和開發者投身于CUDA生態。這個正向飛輪一旦轉動起來&#xff0c;其產生…

Unity3D iOS閃退問題解決方案

前言 在Unity3D開發中解決iOS閃退問題需要系統性排查&#xff0c;以下是關鍵步驟和解決方案&#xff1a; 對惹&#xff0c;這里有一個游戲開發交流小組&#xff0c;希望大家可以點擊進來一起交流一下開發經驗呀&#xff01; 1. 獲取崩潰日志&#xff08;關鍵第一步&#xff…

嵌入式八股文之 GPIO

1. GPIO 的基本概念(1) 什么是 GPIO&#xff1f;GPIO 的中文意思是通用輸入輸出端口&#xff08;General Purpose Input/Output&#xff09;&#xff0c;是嵌入式系統中可編程控制的通用引腳&#xff0c;可通過軟件配置為輸入或輸出模式。&#xff08;背誦&#xff09;(2) 它的…

Umi-OCR 的 Docker安裝(win制作鏡像,Linux(Ubuntu Server 22.04)離線部署)

前置博客&#xff1a;Ubuntu-Server 22.04.4 詳細安裝圖文教程 wget命令在windows終端下不能使用的原因及解決辦法 在 Ubuntu 22.04 LTS 上離線安裝 Docker 手把手教你在Win11下安裝docker Umi-OCR 安裝docker時報錯&#xff1a;workstation服務啟動報錯。錯誤1075&#…

力扣242.有效的字母異位詞

給定兩個字符串 s 和 t &#xff0c;編寫一個函數來判斷 t 是否是 s 的 字母異位詞。示例 1:輸入: s "anagram", t "nagaram" 輸出: true示例 2:輸入: s "rat", t "car" 輸出: false提示:1 < s.length, t.length < 5 * 104s…

基于Springboot+UniApp+Ai實現模擬面試小工具二:后端項目搭建

本節介紹本項目后端項目的開發工具及基礎項目的搭建&#xff0c;包括開發工具介紹及后端項目的創建和依賴框架的引入及對應配置。 源碼下載&#xff1a; 點擊下載 講解視頻&#xff1a; UniappSpringbootKimi實現模擬面試小程序-Springboot項目創建一&#xff0e;開發工具 1.…

Linux711 Mysql

模版 root192.168.235.130s password:┌──────────────────────────────────────────────────────────────────────┐│ ? MobaXterm Personal Edition v23.2 ? ││…

QT 秘鑰生成工具

該項目是注冊機和驗證機項目&#xff0c;分別是密鑰生成工具&#xff0c;和密鑰驗證demo,可以識別電腦唯一標識碼。#include "frmmain.h" #include "ui_frmmain.h" #include "qmessagebox.h" #include "qfile.h" #pragma execution_ch…

PyTorch神經網絡訓練全流程詳解:從線性層到參數優化

目錄 一、神經網絡訓練的核心組件 二、代碼逐行解析與知識點 三、核心組件詳解 3.1 線性層(nn.Linear) 3.2 損失函數(nn.MSELoss) 3.3 優化器(optim.SGD) 四、訓練流程詳解 五、實際應用建議 六、完整訓練循環示例 七、總結 在深度學習實踐中&#xff0c;理解神經網絡…