kafka整合flume與DStream轉換

一、Kafka整合flume

cd /opt/software/flume/conf/

vi flume-kafka.conf

a1.sources=r1

a1.sinks=k1

a1.channels=c1

a1.sources.r1.type=spooldirt

a1.sources.r1.spoolDir=/root/flume-kafka

a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.topic=testTopice

a1.sinks.k1.kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092

a1.sinks.k1.kafka.flumeBatchSize=20

a1.sinks.k1.kafka.producer.acks=1

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactionCapacity=100

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

cd /root/

mkdir flume-kafka

ll

drwxr-xr-x?? 2 root root 4096 11月? 8 22:02 agent3

-rw-------.? 1 root root? 955 9月?? 6 16:41 anaconda-ks.cfg

-rw-r--r--?? 1 root root??? 1 10月 25 18:30 exec-logger.conf

drwxr-xr-x?? 2 root root?? 27 11月 15 18:00 flume-hive

drwxr-xr-x?? 2 root root??? 6 12月? 3 03:59 flume-kafka

-rw-r--r--?? 1 root root?? 63 11月? 8 23:01 flume-position.json

drwxr-xr-x? 22 root root 4096 12月? 3 03:59 kafkadata

drwxr-xr-x?? 3 root root?? 21 10月 11 18:32 opt

drwxr-xr-x?? 3 root root 4096 11月? 8 18:17 testDir

drwxr-xr-x?? 2 root root?? 38 11月? 8 19:01 testdir2

-rw-r--r--?? 1 root root? 108 11月 15 17:09 test.log

drwxr-xr-x?? 2 root root 4096 11月? 8 18:49 testSink

kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --topic testTopic --partitions 3 --replication-factor 2

flume-ng agent -c /opt/software/flume/conf/ -f /opt/software/flume/conf/flume-kafka.conf -n a1

kafka-console-consumer.sh --topic testTopic --bootstrap-server node01:9092,node:9092,node03:9092 --from-beginning

cd /root/flume-kafka/

echo "hello" >>test3.txt

echo "hello flume" >>test2.txt

cd /opt/software/flume/conf/

vi kafka-flume.conf

kafka-console-consumer.sh --topic testTopic --bootstrap-server node01:9092,node:9092,node03:9092 --from-beginning

Hello

?hello kafka

hello flume

flume-ng agent -c conf/ -f conf/kafka-flume.conf -n a1 -Dflume.root.logger=INFO,console

二、kafka架構深入

分區策略:輪詢(RoundRobin)、按 Key 哈希(Hash)、自定義分區。

數據可靠性:通過 ACK 機制(0、1、-1)和 ISR(同步副本集合)保證,acks=-1時需等待 Leader 和 Follower 全部落盤。

事務與冪等性:0.11 版本引入冪等性(enable.idompotence=true),結合 At Least Once 實現 Exactly Once 語義。

三、Spark-Streaming核心編程

1.DStream 轉換

DStream 是 Spark-Streaming 處理實時數據的基本單位,可以理解為 “實時數據流”。

轉換操作就是對這個數據流進行加工處理,比如過濾、拆分、統計等,就像工廠流水線對原材料進行加工一樣。

操作分為兩類:

無狀態轉換:只處理當前批次的數據,不關心歷史數據(比如統計當前 3 秒內的單詞數)。

有狀態轉換:會記住歷史數據(比如統計從程序啟動到現在的總單詞數),文檔里沒詳細講,重點在無狀態部分。

2.無狀態轉換的常見操作

無狀態轉換就像 “即處理即丟棄”,每次只處理當前批次的數據,不保留之前的結果。

常見函數舉例

3.Transform轉換

Transform是一個 “萬能轉換” 函數,可以對每個批次的 RDD(DStream 內部由多個 RDD 組成)執行任意自定義操作,甚至可以使用 Spark 原生的 RDD 函數(即使 DStream 沒有直接提供)

4.Join轉換

join用于合并兩個數據流中相同鍵的數據,就像拼拼圖一樣,只有鍵匹配的部分才能拼在一起。

適用于合并兩個來源的單詞數據

最后運行結果

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/903536.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/903536.shtml
英文地址,請注明出處:http://en.pswp.cn/news/903536.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

網絡通訊【QTcpServer、QTcpSocket、QAbstractSocket】

目錄 QTcpServer class簡單描述成員函數和信號 QTcpSocket Class詳細描述成員函數和信號 QAbstractSocket Class詳細描述成員函數和信號成員函數說明文檔 QT實現服務器和客戶端通訊服務器端:通訊流程原代碼 客戶端通信流程原代碼 QTcpServer class header: #includ…

大模型在腎癌診療全流程中的應用研究報告

目錄 一、引言 1.1 研究背景與意義 1.2 研究目的與方法 1.3 國內外研究現狀 二、大模型預測腎癌術前情況 2.1 基于影像組學的腎癌良惡性及分級預測 2.1.1 MRI 影像組學模型預測腎透明細胞癌分級 2.1.2 CT 影像深度學習模型鑒別腎腫物良惡性及侵襲性 2.2 大模型對手術風…

網絡原理 - 11(HTTP/HTTPS - 2 - 請求)

目錄 HTTP 請求(Request) 認識 URL URL 基本格式 關于 URL encode 認識方法(method) 1. GET 方法 2. POST 方法 認識請求“報頭”(header) Host Content-Length Content-Type User-Agent&…

實現MySQL高可用性:從原理到實踐

目錄 一、概述 1.什么是MySQL高可用 2.方案組成 3.優勢 二、資源清單 三、案例實施 1.修改主機名 2.安裝MySQL數據庫(Master1、Master2) 3.配置mysql雙主復制 4.安裝haproxy(keepalived1、keepalived2) 5.安裝keepaliv…

CSS學習筆記8——表格

一、表格 1-1、創建表格 在Word文檔中,如果要創建表格,只需插入表格,然后設定相應的行數和列數即可。然而在HTML網頁中,所有的元素都是通過標簽定義的,要想創建表格,就需要使用與表格相關的標簽。使用標簽…

爬蟲學習筆記(一)

目的 通過編寫程序爬取互聯網上的優質資源 爬蟲必須要使用python嗎 非也~ 編程語言知識工具,抓取到數據才是目的,而大多數爬蟲采用python語言編寫的原因是python的語法比較簡單,python寫爬蟲比較簡單!好用!而且pyt…

大廠面試:MySQL篇

前言 本章內容來自B站黑馬程序員java大廠面試題和小林coding 博主學習筆記,如果有不對的地方,海涵。 如果這篇文章對你有幫助,可以點點關注,點點贊,謝謝你! 1.MySQL優化 1.1 定位慢查詢 定位 一個SQL…

C++_數據結構_詳解紅黑樹

?? 歡迎大家來到小傘的大講堂?? 🎈🎈養成好習慣,先贊后看哦~🎈🎈 所屬專欄:C學習 小傘的主頁:xiaosan_blog 制作不易!點個贊吧!!謝謝喵!&…

DNA復制過程3D動畫教學工具

DNA復制過程3D動畫教學工具 訪問工具頁面: DNA復制動畫演示 工具介紹 我開發了一個交互式的DNA復制過程3D動畫演示工具,用于分子生物學教學。這個工具直觀展示了: DNA雙螺旋結構的解旋過程堿基互補配對原理半保留復制機制完整的復制周期動畫 主要特點…

使用阿里云 CDN 保護網站真實 IP:完整配置指南

使用阿里云 CDN 保護網站真實 IP:完整配置指南 一、寶塔面板準備工作1. 確認網站部署狀態2. 寶塔中檢查網站配置 二、配置阿里云 CDN1. 添加域名到 CDN2. 配置 DNS 解析3. 配置成功確認 三、寶塔面板安全加固(隱藏 IP 的關鍵步驟)1. 禁止通過…

PHP經驗筆記

isset — 檢測變量是否設置,并且不是NULL; 若變量存在且值不為NULL,則返回 TURE 若變量存在且其值為NULL或變量不存在,則返回 FALSE 結論 1. 當變量為空字符串、數值0和布爾值false時,isset全部返回true 2. 當變量不存在和變量存在且值為NULL…

Linux——安裝NVM

1. 安裝命令 官方地址:https://github.com/nvm-sh/nvm curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.40.3/install.sh | bash2. 安裝完成后執行命令 source ~/.bashrc3. 驗證 nvm -v

CentOS 7 磁盤陣列搭建與管理全攻略

CentOS 7 磁盤陣列搭建與管理全攻略 在數據存儲需求日益增長的今天,磁盤陣列(RAID)憑借其卓越的性能、數據安全性和可靠性,成為企業級服務器和數據中心的核心存儲解決方案。CentOS 7 作為一款穩定且功能強大的 Linux 操作系統&am…

C++每日訓練 Day 18:構建響應式表單與數據驗證(初學者友好)

📘 本篇目標:在前幾日協程與事件驅動機制基礎上,構建一個響應式表單系統,實現用戶輸入的異步驗證與反饋。通過協程掛起/恢復機制,簡化異步邏輯,提升代碼可讀性。 🔁 回顧 Day 17:響應…

Vue初步總結-摘自 黑馬程序員

本文摘自 bilibili 前端最新Vue2Vue3基礎入門到實戰項目全套教程,自學前端vue就選黑馬程序員,一套全通關! 更多詳情可參考: https://www.yuque.com/u26161316/pic6n4/heyv8nv8ubfk3fhe?singleDoc# 《Vue》

【基于Qt的QQ音樂播放器開發實戰:從0到1打造全功能音樂播放應用】

🌹 作者: 云小逸 🤟 個人主頁: 云小逸的主頁 🤟 motto: 要敢于一個人默默的面對自己,強大自己才是核心。不要等到什么都沒有了,才下定決心去做。種一顆樹,最好的時間是十年前,其次就是現在&…

線程池(二):深入剖析synchronized關鍵字的底層原理

線程池(二):深入剖析synchronized關鍵字的底層原理 線程池(二):深入剖析synchronized關鍵字的底層原理一、基本使用1.1 修飾實例方法1.2 修飾靜態方法1.3 修飾代碼塊 二、Monitor2.1 Monitor的概念2.2 Moni…

Linux CentOS 7 安裝Apache 部署html頁面

*、使用yum包管理器安裝Apache。運行以下命令: sudo yum install httpd *、啟動Apache服務 sudo systemctl start httpd *、設置Apache服務開機自啟 # 啟用開機自啟動 sudo systemctl enable httpd# 禁用開機自啟動 sudo systemctl disable httpd *、驗證Apac…

前端設置三行文本省略號,失效為什么?

實際效果:第三行出現省略號,但是第四行依舊顯示了部分文字 這個問題通常是由于 CSS 多行文本截斷(-webkit-line-clamp)的計算方式或布局沖突導致的。以下是完整解決方案,確保三行文本截斷正確顯示省略號,并…

git學習之git常用命令

1. 初始化倉庫 git init初始化一個新的 Git 倉庫。 2. 克隆遠程倉庫 git clone <repository-url>從遠程服務器克隆一個已有倉庫到本地。 3. 配置用戶名和郵箱 git config --global user.name "Your Name" git config --global user.email "youexampl…