Kafka集成Flume/Spark/Flink(大數據)/SpringBoot

Kafka集成Flume

在這里插入圖片描述

Flume生產者

在這里插入圖片描述
③、安裝Flume,上傳apache-flume的壓縮包.tar.gz到Linux系統的software,并解壓到/opt/module目錄下,并修改其名稱為flume
在這里插入圖片描述
在這里插入圖片描述

在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述

Flume消費者

在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述

Kafka集成Spark

在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述

生產者

在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述

object SparkKafkaProducer{def main(args:Array[String]):Unit = {//配置信息val properties  = new Properties()properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092")properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])//創建一個生產者var producer = new KafkaProducer[String,String](properties)//發送數據for(i <- 1 to 5){producer.send(new ProducerRecord[String,String]("first","atguigu"+i))}//關閉資源producer.close()}
}

在這里插入圖片描述

消費者
在這里插入圖片描述

Object SparkKafkaConsumer{def main(args:Array[String]):Unit = {//初始化上下文環境val conf = new SparkConf().setMaster("local[*]").setAppName("spark-kafka")val ssc = new StreamingContext(conf,Seconds(3))//消費數據val kafkapara = Map[String,Object](ConsumerConfig.BOOT_STRAP_SERVERS_CONFIG->"hadoop102:9092,hadoop103:9092",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],ConsumerConfig.GROUP_ID_CONFIG->"test")val kafkaDStream = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreFerConsistent,ConsumerStrategies.Subscribe[String,String](Set("first"),kafkapara))val valueDStream = kafkaDStream.map(record=>record.value())valueDStream.print()//執行代碼,并阻塞ssc.start()ssc.awaitTermination()}
}

Kafka集成Flink

在這里插入圖片描述

創建maven項目,導入以下依賴
在這里插入圖片描述
resources里面添加log4j.properties文件,可以更改打印日志的級別為error
在這里插入圖片描述

Flink生產者

public class FlinkafkaProducer1{public static void main(String[] args){//獲取環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);//準備數據源ArrayList<String> wordList = new ArrayList<>();wordList.add("hello");wordList.add("atguigu");DataStreamSource<String> stream = env.fromCollection();//創建一個kafka生產者Properties properteis = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("first",new SimpleStringSchema(),properties);//添加數據源Kafka生產者stream.addSink(kafkaProducer);//執行env.execute();}
}

在這里插入圖片描述

Flink消費者

public class FlinkafkaConsumer1{public static void main(String[] args){//獲取環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);//創建一個消費者Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("first",new SimpleSStringSchema(),properties);//關聯消費者和flink流env.addSource(kafkaConsumer).print();//執行env.execute();}
}

Kafka集成SpringBoot

在這里插入圖片描述
在這里插入圖片描述

生產者
在這里插入圖片描述
在這里插入圖片描述
通過瀏覽器發送
在這里插入圖片描述
在這里插入圖片描述

消費者

在這里插入圖片描述

在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/85531.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/85531.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/85531.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

debian12.9或ubuntu,vagrant離線安裝插件vagrant-libvirt,20250601

系統盤: https://mirror.lzu.edu.cn/debian-cd/12.9.0/amd64/iso-dvd/debian-12.9.0-amd64-DVD-1.iso 需要的依賴包,無需安裝ruby( sudo apt install -y ruby-full ruby-dev rubygems,后來發現不安裝會有編譯警告,還是安裝吧 ) ,無需安裝 zlib1g-dev liblzma-dev libxml2-de…

2025年軟件測試面試八股文(含答案+文檔)

&#x1f345; 點擊文末小卡片&#xff0c;免費獲取軟件測試全套資料&#xff0c;資料在手&#xff0c;漲薪更快 Part1 1、你的測試職業發展是什么&#xff1f; 測試經驗越多&#xff0c;測試能力越高。所以我的職業發展是需要時間積累的&#xff0c;一步步向著高級測試工程師…

[CSS3]響應式布局

導讀 響應式就是一套代碼, 兼容大中小不同的屏幕, 即網頁內容不變, 網頁布局隨屏幕切換而改變 媒體查詢 響應式布局的核心技術是媒體查詢 媒體查詢可以檢測屏幕尺寸, 設置差異化的css 開發中的常用寫法 使用范圍屬性, 劃定屏幕范圍 max-width 最大寬度min-width 最小寬度 …

在 Windows安裝 make 的幾種方式

在 Windows 上使用 make&#xff08;通常用于自動化構建 C/C 項目等&#xff09;有幾種方法。以下是最常見的幾種安裝和使用方法&#xff1a; 文章目錄 ? 方法一&#xff1a;使用 Chocolatey 安裝 GNU Make&#xff08;推薦&#xff09;? 方法二&#xff1a;使用 WSL&#xf…

深度學習筆記25-RNN心臟病預測(Pytorch)

&#x1f368; 本文為&#x1f517;365天深度學習訓練營中的學習記錄博客&#x1f356; 原作者&#xff1a;K同學啊 一、前期準備 1.數據處理 import torch.nn.functional as F import numpy as np import pandas as pd import torch from torch import nn dfpd.read_csv(r&…

Pytorch知識點2

Pytorch知識點 1、官方教程2、張量&#x1f9f1; 0、數組概念&#x1f9f1; 1. 創建張量&#x1f4d0; 2. 張量形狀與維度&#x1f522; 3. 張量數據類型? 4. 張量的數學與邏輯操作&#x1f504; 5. 張量的就地操作&#x1f4e6; 6. 復制張量&#x1f680; 7. 將張量移動到加速…

池中錦鯉的自我修養,聊聊蓄水池算法

面試如泡池&#xff0c;蓄水似人生 起初你滿懷期待跳進大廠池子&#xff0c;以為自己是天選之子&#xff0c;結果發現池子里早擠滿了和你一樣的“錦鯉候選人”。HR的漁網一撒&#xff0c;撈誰全看概率——這不就是蓄水池算法的精髓嗎&#xff1f; 初入池&#xff08;i≤k&…

Linux應用開發之網絡套接字編程

套接字&#xff08;Socket&#xff09;是計算機網絡數據通信的基本概念和編程接口&#xff0c;允許不同主機上的進程&#xff08;運行中的程序&#xff09;通過網絡進行數據交換。它為應用層軟件提供了發送和接收數據的能力&#xff0c;使得開發者可以在不用深入了解底層網絡細…

小白的進階之路系列之六----人工智能從初步到精通pytorch數據集與數據加載器

本文將介紹以下內容: 數據集與數據加載器 數據遷移 如何建立神經網絡 數據集與數據加載器 處理數據樣本的代碼可能會變得混亂且難以維護;理想情況下,我們希望我們的數據集代碼與模型訓練代碼解耦,以獲得更好的可讀性和模塊化。PyTorch提供了兩個數據原語:torch.utils…

深入理解設計模式之中介者模式

深入理解設計模式之&#xff1a;中介者模式&#xff08;Mediator Pattern&#xff09; 一、什么是中介者模式&#xff1f; 中介者模式&#xff08;Mediator Pattern&#xff09;是一種行為型設計模式。它通過引入一個中介對象&#xff0c;來封裝一組對象之間的交互&#xff0…

基于通義千問的兒童陪伴學習和成長的智能應用架構。

1.整體架構概覽 我們的兒童聊天助手將采用典型的語音交互系統架構,結合大模型能力和外部知識庫: 2. 技術方案分解 2.1. 前端應用/設備 選擇: 移動App(iOS/Android)、Web應用,或者集成到智能音箱/平板等硬件設備中。技術棧: 移動App: React Native / Flutter (跨平臺…

Python Day40

Task&#xff1a; 1.彩色和灰度圖片測試和訓練的規范寫法&#xff1a;封裝在函數中 2.展平操作&#xff1a;除第一個維度batchsize外全部展平 3.dropout操作&#xff1a;訓練階段隨機丟棄神經元&#xff0c;測試階段eval模式關閉dropout 作業&#xff1a;仔細學習下測試和訓練代…

WordPress_suretriggers 權限繞過漏洞復現(CVE-2025-3102)

免責申明: 本文所描述的漏洞及其復現步驟僅供網絡安全研究與教育目的使用。任何人不得將本文提供的信息用于非法目的或未經授權的系統測試。作者不對任何由于使用本文信息而導致的直接或間接損害承擔責任。如涉及侵權,請及時與我們聯系,我們將盡快處理并刪除相關內容。 前…

基于Spring Boot 電商書城平臺系統設計與實現(源碼+文檔+部署講解)

技術范圍&#xff1a;SpringBoot、Vue、SSM、HLMT、Jsp、PHP、Nodejs、Python、爬蟲、數據可視化、小程序、安卓app、大數據、物聯網、機器學習等設計與開發。 主要內容&#xff1a;免費功能設計、開題報告、任務書、中期檢查PPT、系統功能實現、代碼編寫、論文編寫和輔導、論文…

LeetCode 39.組合總和:回溯法與剪枝優化的完美結合

一、問題本質與形式化定義 1.1 題目形式化描述 輸入&#xff1a;無重復整數數組candidates、目標值target輸出&#xff1a;所有和為target的組合集合&#xff0c;滿足&#xff1a; 元素可重復使用組合內元素非降序&#xff08;避免重復解&#xff09;解集無重復組合 1.2 問…

windows11安裝編譯QtMvvm

windows11安裝編譯QtMvvm 1 從github下載代碼2 官方的Download/Installtion3 自行構建編譯QtMvvm遇到的問題3.1 `qmake`問題執行命令報錯原因分析qmake報錯:找不到編譯器 cl解決方案3.2 `make qmake_all`問題執行命令報錯原因分析make命令未識別解決方案3.3 缺少`perl`問題執行…

unix/linux source 命令,其歷史爭議、兼容性、生態、未來展望

現在把目光投向unix/linux source命令的歷史爭議、兼容性、生態和未來展望,這能讓我們更全面地理解一個技術點在更廣闊的圖景中所處的位置。 一、歷史爭議與設計權衡 雖然 source (或 .) 命令功能強大且不可或缺,但在其發展和使用過程中,也存在一些微妙的爭議或設計上的權衡…

開發時如何通過Service暴露應用?ClusterIP、NodePort和LoadBalancer類型的使用場景分別是什么?

一、Service核心概念 Service通過標簽選擇器&#xff08;Label Selector&#xff09;關聯Pod&#xff0c;為動態變化的Pod集合提供穩定的虛擬IP和DNS名稱&#xff0c;主要解決&#xff1a; 服務發現負載均衡流量路由 二、Service類型詳解 1. ClusterIP&#xff08;默認類型…

從線性代數到線性回歸——機器學習視角

真正不懂數學就能理解機器學習其實是個神話。我認為&#xff0c;AI 在商業世界可以不懂數學甚至不懂編程也能應用&#xff0c;但對于技術人員來說&#xff0c;一些基礎數學是必須的。本文收集了我認為理解學習本質所必需的數學基礎&#xff0c;至少在概念層面要掌握。畢竟&…

華為IP(7)

端口隔離技術 產生的背景 1.以太交換網絡中為了實現報文之間的二層隔離&#xff0c;用戶通常將不同的端口加入不同的VLAN&#xff0c;實現二層廣播域的隔離。 2.大型網絡中&#xff0c;業務需求種類繁多&#xff0c;只通過VLAN實現二層隔離&#xff0c;會浪費有限的VLAN資源…