Spark-Streaming+Kafka+mysql實戰示例

文章目錄

  • 前言
  • 一、簡介
    • 1. Spark-Streaming簡介
    • 2. Kafka簡介
  • 二、實戰演練
    • 1. MySQL數據庫部分
    • 2. 導入依賴
    • 3. 編寫實體類代碼
    • 4. 編寫kafka主題管理代碼
    • 5. 編寫kafka生產者代碼
    • 6. 編寫Spark-Streaming代碼
  • 總結


前言

本文將介紹一個使用Spark Streaming和Kafka進行實時數據處理的示例。通過該示例,讀者將了解到如何使用Spark Streaming和Kafka處理實時數據流,以及如何將處理后的數據保存到MySQL數據庫中。示例涵蓋了從環境搭建到代碼實現的全過程,幫助讀者快速上手實時數據處理的開發。


一、簡介

1. Spark-Streaming簡介

Spark Streaming是Apache Spark的一個組件,用于實時流數據處理。它提供了高級別的API,可以使用類似于批處理的方式處理實時數據流。Spark Streaming可以與各種消息隊列系統集成,包括Kafka、RabbitMQ等。

2. Kafka簡介

Kafka是一個分布式流處理平臺,具有高吞吐量、可擴展性和可靠性。它提供了一種可持久化、分布式、分區的日志服務,用于處理實時數據流。Kafka使用發布-訂閱模型,消息被發布到一個或多個主題,然后由訂閱該主題的消費者進行消費。

二、實戰演練

1. MySQL數據庫部分

這部分代碼用于創建MySQL數據庫和數據表,以及將從Kafka獲取的數據保存到數據庫中。

create database kafkademo;

創建數據表:

CREATE TABLE kafka_tb
(`txid`      varchar(255) PRIMARY KEY,`version`   varchar(255),`connector` varchar(255),`name`      varchar(255),`ts_ms`     varchar(255),`snapshot`  varchar(255),`db`        varchar(255),`sequence`  varchar(255),`schema`    varchar(255),`table`     varchar(255),`lsn`       varchar(255),`xmin`      varchar(255)
);

2. 導入依賴

這部分代碼是Maven的依賴配置,用于引入所需的Spark、Kafka和MySQL相關的庫。

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.26</version>
</dependency>
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.4.0</version>
</dependency>
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.4.0</version>
</dependency>
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.4.0</version>
</dependency>
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>2.4.0</version>
</dependency>
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version>
</dependency>
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><scope>compile</scope>
</dependency>

3. 編寫實體類代碼

這部分代碼定義了一個Java類EntityMessage,用于將從Kafka獲取的JSON數據轉換為Java對象。

import lombok.Data;import java.io.Serializable;@Data
public class EntityMessage implements Serializable {private String op;private String ts_ms;private String transaction;private DataItem dataItem;@Datapublic static class DataItem {private String version;private String connector;private String name;private String ts_ms;private String snapshot;private String db;private String[] sequence;private String schema;private String table;private String txId;private String lsn;private String xmin;}
}

4. 編寫kafka主題管理代碼

這部分代碼用于創建、刪除和修改Kafka主題的一些操作。

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;import java.util.*;
import java.util.concurrent.ExecutionException;public class KafkaTopicManager {private static final String BOOTSTRAP_SERVERS = "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092";public void createTopic(String topicName, int numPartitions, short replicationFactor) throws ExecutionException, InterruptedException {Properties properties = new Properties();properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/211572.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/211572.shtml
英文地址,請注明出處:http://en.pswp.cn/news/211572.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

實戰1-python爬取安全客新聞

一般步驟&#xff1a;確定網站--搭建關系--發送請求--接受響應--篩選數據--保存本地 1.拿到網站首先要查看我們要爬取的目錄是否被允許 一般網站都會議/robots.txt目錄&#xff0c;告訴你哪些地址可爬&#xff0c;哪些不可爬&#xff0c;以安全客為例子 2. 首先測試在不登錄的…

Docker Network(網絡)——8

目錄&#xff1a; Docker 為什么需要網絡管理Docker 網絡架構簡介 CNMLibnetwork驅動常見網絡類型 bridge 網絡host 網絡container 網絡none 網絡overlay 網絡docker 網絡管理命令 docker network createdocker network inspectdocker network connectdocker network disconne…

class072 最長遞增子序列問題與擴展【算法】

class072 最長遞增子序列問題與擴展【算法】 code1 300. 最長遞增子序列 // 最長遞增子序列和最長不下降子序列 // 給定一個整數數組nums // 找到其中最長嚴格遞增子序列長度、最長不下降子序列長度 // 測試鏈接 : https://leetcode.cn/problems/longest-increasing-subsequen…

830. 單調棧

?????? ??????830. 單調棧 - AcWing題庫 給定一個長度為 N 的整數數列&#xff0c;輸出每個數左邊第一個比它小的數&#xff0c;如果不存在則輸出 ?1?1。 輸入格式 第一行包含整數 N&#xff0c;表示數列長度。 第二行包含 N個整數&#xff0c;表示整數數列…

你知道MySQL中 group by 怎么優化嗎

更好的閱讀體驗&#xff0c;請點擊 YinKai s Blog。 ? 在 MySQL 中 group by 用于按照一個或多個列對結果集進行分組。在討論 group by 怎么優化之前&#xff0c;我們先來看看 group by 的執行流程&#xff0c;這樣我們才能對癥下藥。 group by 執行流程 ? 我們先用下面的 …

Ubuntu 18.04使用Qemu和GDB搭建運行內核的環境

安裝busybox 參考博客&#xff1a; 使用GDBQEMU調試Linux內核環境搭建 一文教你如何使用GDBQemu調試Linux內核 ubuntu22.04搭建qemu環境測試內核 交叉編譯busybox 編譯busybox出現Library m is needed, can’t exclude it (yet)的解釋 S3C2440 制作最新busybox文件系統 https:…

block-recurrent-transformer-pytorch 學習筆記

目錄 有依賴項1&#xff1a; 沒有依賴項&#xff0c;沒有使用例子 沒有依賴項2&#xff1a; 有依賴項1&#xff1a; GitHub - dashstander/block-recurrent-transformer: Pytorch implementation of "Block Recurrent Transformers" (Hutchins & Schlag et a…

gd32和stm32的區別

gd32和stm32的區別 現在的市場上有很多種不同類型的微控制器&#xff0c;其中比較常見的有兩種&#xff0c;即gd32和stm32。兩種微控制器都是中國和歐洲的兩個公司分別推出的&#xff0c;但是它們之間有很多區別&#xff0c;本文將會深入探討這些區別。 1.起源和歷史 gd32是…

2024年網絡安全競賽-Web安全應用

Web安全應用 (一)拓撲圖 任務環境說明: 1.獲取PHP的版本號作為Flag值提交;(例如:5.2.14) 2.獲取MySQL數據庫的版本號作為Flag值提交;(例如:5.0.22) 3.獲取系統的內核版本號作為Flag值提交;(例如:2.6.18) 4.獲取網站后臺管理員admin用戶的密碼作為Flag值提交…

udp多播組播

import socket ,struct,time# 組播地址和端口號 MCAST_GRP 239.0.0.1 MCAST_PORT 8888 # 創建UDP socket對象 sock socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) # 綁定socket對象到本地端口號 # sock.bind((MCAST_GRP, MCAST_PORT)) …

【4】PyQt輸入框

1. 單行文本輸入框 QLineEdit控件可以輸入單行文本 from PyQt5.QtWidgets import QApplication, QWidget, QLineEdit, QVBoxLayout from PyQt5.QtCore import * from PyQt5.QtGui import QIcon import sysdef init_widget(w: QWidget):# 修改窗口標題w.setWindowTitle(單行輸…

前端面試——CSS面經(持續更新)

1. CSS選擇器及其優先級 !important > 行內樣式 > id選擇器 > 類/偽類/屬性選擇器 > 標簽/偽元素選擇器 > 子/后臺選擇器 > *通配符 2. 重排和重繪是什么&#xff1f;瀏覽器的渲染機制是什么&#xff1f; 重排(回流)&#xff1a;當增加或刪除dom節點&…

【面試經典150 | 二叉樹】從中序與后序遍歷序列構造二叉樹

文章目錄 寫在前面Tag題目來源題目解讀解題思路方法一&#xff1a;遞歸 寫在最后 寫在前面 本專欄專注于分析與講解【面試經典150】算法&#xff0c;兩到三天更新一篇文章&#xff0c;歡迎催更…… 專欄內容以分析題目為主&#xff0c;并附帶一些對于本題涉及到的數據結構等內容…

Android : Room 數據庫的基本用法 —簡單應用

1.Room介紹&#xff1a; Android Room 是 Android 官方提供的一個持久性庫&#xff0c;用于在 Android 應用程序中管理數據庫。它提供了一個簡單的 API 層&#xff0c;使得使用 SQLite 數據庫變得更加容易和方便。 以下是 Android Room 的主要特點&#xff1a; 對象關系映射…

9.MySQL 索引

目錄 ???????概述 概念&#xff1a; 單列索引 普通索引 創建索引 查看索引 刪除索引 唯一索引 創建唯一索引 刪除唯一索引 主鍵索引 組合索引 創建索引 全文索引 概述 使用全文索引 空間索引 內部原理 相關算法&#xff1a; hash算法 二叉樹算法 …

Spring基于XML文件配置AOP

AOP AOP&#xff0c;面向切面編程&#xff0c;是對面向對象編程OOP的升華。OOP是縱向對一個事物的抽象&#xff0c;一個對象包括靜態的屬性信息&#xff0c;包括動態的方法信息等。而AOP是橫向的對不同事物的抽象&#xff0c;屬性與屬性、方法與方法、對象與對象都可以組成一個…

12.10多種編碼方式,編碼方案選擇策略(遞歸級聯),PDE,RLE代碼

作者如何選擇和設計編碼方案&#xff0c;以實現高效的解壓縮和高壓縮比&#xff1f;BtrBlocks是否適用于所有類型的數據&#xff1f; 選擇和設計編碼方案&#xff1a; 結合多種高效編碼方案&#xff1a;BtrBlocks 通過選擇一組針對不同數據分布的高效編碼方案&#xff0c;實現…

js判斷是否對象自身為空

文章目錄 一、前言二、JSON.stringify三、for in 配合 hasOwnProperty四、Object.keys五、Object.getOwnPropertyNames六、Object.getOwnPropertyNames 結合 Object.getOwnPropertySymbols七、Reflect.ownKeys八、最后 一、前言 如何判斷一個對象為空&#xff1f; 先上結論&a…

MySql復習筆記03(小滴課堂) 事務,視圖,觸發器,存儲過程

mysql 必備核心知識之事務的詳細解析&#xff1a; 創建一個數據庫表&#xff1a; 添加數據并開啟事務。 添加數據并查詢。 登錄另一臺服務器發現查不到這個表中的數據。 這是因為事務開啟了&#xff0c;但是沒有提交&#xff0c;只是把數據存到了內存中&#xff0c;還沒有寫入…