Kafka命令行的使用/Spark-Streaming核心編程(二)

Kafka命令行的使用

創建topic

kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --topic test1 --partitions 3 --replication-factor 3

分區數量,副本數量,都是必須的。

數據的形式:

主題名稱-分區編號。

在Kafka的數據目錄下查看。

設定副本數量,不能大于broker的數量。

2.2查看所有的topic

kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181

2.3查看某個topic的詳細信息

kafka-topics.sh --describe --zookeeper node01:2181,node02:2181,node03:2181?--topic test1

ISR:?In-Sync Replicas ??可以提供服務的副本。

AR = ISR + OSR

2.4刪除topic

kafka-topics.sh --delete --zookeeper node01:2181,node02:2181,node03:2181?--topic test1

2.5生產數據

kafka-console-producer.sh:

指定broker

指定topic

寫數據的命令:

kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test1

Spark-Streaming核心編程(二)

  1. 需求:通過 SparkStreaming 從 Kafka 讀取數據,并將讀取過來的數據做簡單計算,最終打印到控制臺。
  2. 導入依賴

<dependency>
????<groupId>org.apache.spark</groupId>
????<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
????<version>3.0.0</version>
</dependency>

  1. 編寫代碼

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.streaming.dstream.{DStream, InputDStream}

object DirectAPI {
??def main(args: Array[String]): Unit = {
????val sparkConf = new SparkConf().setMaster("local[*]").setAppName("direct")

????val ssc = new StreamingContext(sparkConf, Seconds(3))

????// 定義?Kafka 相關參數
????val kafkaPara: Map[String, Object] = Map[String, Object](
??????ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "node01:9092,node02:9092,node03:9092",
??????ConsumerConfig.GROUP_ID_CONFIG -> "kafka",
??????"key.deserializer" -> classOf[StringDeserializer],
??????"value.deserializer" -> classOf[StringDeserializer]
????)

????// 通過讀取?Kafka 數據,創建?DStream
????val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
??????ssc,
??????LocationStrategies.PreferConsistent,
??????ConsumerStrategies.Subscribe[String, String](Set("kafka"), kafkaPara)
????)

????// 提取出數據中的?value 部分
????val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())

????// wordCount 計算邏輯
????valueDStream.flatMap(_.split(" "))
??????.map((_, 1))
??????.reduceByKey(_ + _)
??????.print()

????ssc.start()
????ssc.awaitTermination()
??}
}

  1. 開啟Kafka集群

  1. 開啟Kafka生產者,產生數據

kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092?--topic kafka

  1. 運行程序,接收Kafka生產的數據并進行相應處理

8)查看消費進度

kafka-consumer-groups.sh --describe --bootstrap-server node01:9092,node02:9092,node03:9092 --group kafka

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/78640.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/78640.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/78640.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Python3:Jupyterlab 安裝和配置

Python3:Jupyterlab 安裝和配置 Jupyter源于Ipython Notebook項目&#xff0c;是使用Python&#xff08;也有R、Julia、Node等其他語言的內核&#xff09;進行代碼演示、數據分析、機器學習、可視化、教學的非常好的工具。 最新的基于web的交互式開發環境&#xff0c;適用于n…

快速排序及其在Unity游戲開發中的應用

一、快速排序(Quick Sort) 快速排序是一種**分治法(Divide and Conquer)**思想的排序算法,它的基本步驟是: 選一個基準元素(pivot):通常選第一個元素、最后一個元素,或者隨機一個。分區(Partition):把數組分成兩部分,小于等于 pivot 的放左邊,大于 pivot 的放右…

【硬核干貨】SonarQube安全功能

原文鏈接&#xff1a;【硬核干貨】SonarQube安全功能 關于曉數神州 曉數神州堅持以“客戶為中心”的宗旨&#xff0c;為客戶提供專業的解決方案和技術服務&#xff0c;構建多引擎數字化體系。 核心業務1&#xff1a;聚焦DevOps全棧產品&#xff0c;打造需求管理、項目管理、開…

修改el-select背景顏色

修改el-select背景顏色 /* 修改el-select樣式--直接覆蓋默認樣式&#xff08;推薦&#xff09; */ ::v-deep .el-select .el-input__inner {background-color: #1d2b72 !important; /* 修改輸入框背景色 */color: #fff; } ::v-deep .el-select .el-input__wrapper {background-…

Unity-粒子系統:螢火蟲粒子特效效果及參數

螢火蟲特效由兩部分組成。螢火蟲粒子底色粒子面片。螢火蟲的旋轉飛動主要由 Noise參數和Color over Lifetime模塊控制。 貼圖&#xff1a;中間實周邊虛的圓&#xff0c;可隨意自行制作 Shader&#xff1a;Universal Render Pipeline/2D/Sprite-Lit-Default 以下是粒子詳細參…

K8S Service 原理、圖例——深度好文

一、理論介紹 1.1、3W 法則 1、是什么&#xff1f; Service 是一種為一組功能相同的 pod 提供單一不變的接入點的資源。當 Service 存在時&#xff0c;它的IP地址和端口不會改變。客戶端通過IP地址和端口號與 Service 建立連接&#xff0c;這些連接會被路由到提供該 Service 的…

Alibaba Cloud Linux 3.2104 LTS 64位 容器優化版安裝docker docker compose記錄

整個安裝過程耗時4小時。&#xff08;包含以下檢查內容:&#xff09; 檢查該linux版本信息&#xff08;并通過監控指標檢查運行狀態/cpu占用/內存占用/磁盤讀取寫入IOPS /同時連接數&#xff09; 1&#xff1a;根據當前的系統進行yum與dnf的升級&#xff0c;保持穩定修復的版本…

STM32N6570-DK ISP調試

STM32N6570-DK之ISP調試應用 準備工作-下載安裝軟件包:一、使用STM32CubeProgrammer給板子燒入STM32N6_ISP_IQTune_App_revC01-v1.1.0-trusted.bin。二、打開STM32 ISP IQTune.exe ,出現可連接端口:三、根據教程進行相應調試:準備工作-下載安裝軟件包: https://www.st.co…

12.thinkphp驗證

一&#xff0e;驗證器定義 1. 驗證器的使用&#xff0c;我們必須先定義它&#xff0c;系統提供了一條命令直接生成想要的類&#xff1b; php think make:validate User 2. 這條命令會自動在應用目錄下生成一個validate文件夾&#xff0c;并生成User.php類&#xff1b; class…

OpenWrt 與 Docker:打造輕量級容器化應用平臺技術分享

文章目錄 前言一、OpenWrt 與 Docker 的集成前提1.1 硬件與內核要求1.2 軟件依賴 二、Docker 環境部署與驗證2.1 基礎服務配置2.2 存儲驅動適配 三、容器化應用部署實踐3.1 資源限制策略3.2 Docker Compose 適配 四、性能優化與監控4.1 容器資源監控4.2 鏡像精簡策略 五、典型問…

EasyRTC音視頻實時通話嵌入式SDK,打造社交娛樂低延遲實時互動的新體驗

一、方案背景 在數字化時代&#xff0c;社交娛樂已經成為人們生活中不可或缺的一部分。隨著移動互聯網和智能設備的普及&#xff0c;用戶對實時互動的需求越來越高。EasyRTC作為一款基于WebRTC技術的實時音視頻通信解決方案&#xff0c;憑借其低延遲、高穩定性和跨平臺兼容性&…

軟件編程命名規范

編程命名規范是保證代碼可讀性、可維護性和團隊協作效率的重要基礎。以下是涵蓋主流編程語言的通用命名規范&#xff0c;結合行業最佳實踐和常見規范&#xff08;如Google、Microsoft、Airbnb等風格指南&#xff09;&#xff1a; 一、通用命名原則 清晰優先&#xff1a;名稱應…

換張電話卡能改變IP屬地嗎?一文解讀

在互聯網時代&#xff0c;IP屬地&#xff08;即網絡定位信息&#xff09;的顯示引發了許多用戶的關注。有人好奇&#xff1a;更換電話卡&#xff08;SIM卡&#xff09;是否能改變自己的IP屬地&#xff1f;本文將解析IP屬地的定義、電話卡的作用&#xff0c;并深入探討兩者之間的…

前端:純HTML、CSS和JS菜單樣式

實現了一個多級折疊菜單系統,使用純HTML、CSS和JavaScript(無任何框架) 一、二級菜單展開 1、實現效果 初始狀態-展示全部一級菜單 選中共狀態,一級標題選中共為藍色背景色,二級標題選中共為藍色文字,展開右側圖標為-,后縮狀態右側圖標為+ 2、實現 ??HTML結構?? …

Centos8 安裝 Docker

yum 更換國內源 1. 備份原 yum 配置 cd /etc/yum.repos.d/ mkdir backup mv *.repo backup/2. 下載新 yum 配置&#xff08;阿里源&#xff09; wget -O /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-8.repo3. 替換源中的系統版本變量 sed -…

AI測試工具Testim——告別自動化測試維護難題

隨著人工智能技術的快速發展&#xff0c;AI測試工具正在成為提升軟件研發效能的關鍵。每款AI的特性各有差異&#xff0c;今天&#xff0c;我們就給大家介紹一款專注于Web和移動應用的端到端的AI測試工具--Testim。 Testim的簡介 官網地址&#xff1a;https://www.testim.io/ 簡…

【默子AI】萬字長文:MCP與A2A協議詳解

【默子AI】萬字長文&#xff1a;MCP與A2A協議詳解 引言&#xff1a; 讓一個大模型憑空解決所有問題&#xff0c;就像讓一個書呆子不借助工具就去修汽車 即便他腦子里裝滿了理論知識&#xff0c;也缺少實踐的“手腳”。 長期以來&#xff0c;AI助手&#xff08;尤其是LLM&#x…

LeNet5 神經網絡的參數解析和圖片尺寸解析

1.LeNet-5 神經網絡 以下是針對 LeNet-5 神經網絡的詳細參數解析和圖片尺寸變化分析&#xff0c;和原始論文設計&#xff0c;通過分步計算說明各層的張量變換過程。 經典的 LeNet-5架構簡化版&#xff08;原始論文輸入為 32x32&#xff0c;MNIST 常用 28x28 需調整&#xff09…

第二節:文件系統

理論知識 文件系統的基本概念&#xff1a;文件系統是操作系統中負責管理持久數據的子系統&#xff0c;它將數據組織成文件和目錄的形式&#xff0c;方便用戶存儲和訪問數據。Linux文件系統的類型&#xff1a;常見的 Linux 文件系統類型有 Ext2、Ext3、Ext4、XFS、Btrfs 等。Ex…

Python數據結構與算法(5)——動態規劃

Python數據結構與算法(5)——動態規劃 0. 學習目標1. 動態規劃的基本概念1.1 什么是動態規劃1.2 動態規劃的核心思想1.3 動態規劃的適用條件2. 動態規劃的實現思路2.1 自頂向下:備忘錄法 (Memoization)2.2 自底向上:表格法(Tabulation)3. 0/1 背包問題4. 最長公共子序列5…