kafka之操作示例

一、常用shell命令

#1、創建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replications 1 --topic test#2、查看創建的topic
bin/kafka-topics.sh --list --zookeeper localhost:2181#3、生產者發布消息命令
(執行完此命令后在控制臺輸入要發送的消息,回車即可)
bin/kafka-console-producer.sh --broker-list 192.168.91.231:9092,192.168.91.231:9093,192.168.91.231:9094 --topic test#4、消費者接受消息命令
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning#5、kafka啟動
首先啟動zookeeper zkServer.sh start(相當于一個server,kafka會連接這個server)
bin/kafka-server-start.sh config/server.properties # 啟動kafka#6、查看kafka節點數目
在zookeeper中查看,登錄客戶端bin/zkCli.sh 執行ls /brokers/ids 查看節點數目及節點ID,[0,1,2]#7、kafka中的概念
生產者 Producer、代理Broker、消費者Consumer、主題Topic、分區 Partition、消費者組 Consumer Group#8、查看主顆信息
bin/kafka-topics.sh --zookeeper 192.168.91.231:2181 [加其他選項]eg:
bin/kafka-topics.sh --zookeeper 192.168.91.231:2181 --describe
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test#9、為主題創建分區
一共創建八個分區,編號分別為0~7
bin/kafka-topics.sh --zookeeper 192.168.91.231:2181 --alter -partitions 8 -topic test#10、查看kafka進程
ps -eflgrep server.properties
ps -eflgrep server-1.properties
ps -eflgrep server-2.properties#11、kafka宕機重啟后,消息不會丟失#12、kafka其中一個broker宕機后,對消費者和生產者影響很小(命令行下測試)
消費者會嘗試連接,連接不到,返回java.net.ConnectException:Connection refused異常 生產者可能會在發送消息的時候報異常,但會很快連接到其他broker,繼續正常使用#13.查看kafka消息隊列的積壓情況
bin/kafka-consumer-groups.sh --zookeeper 192.168.91.231:2181 --describe --group console-consumer-37289#14.kafka 中查看所有的group列表信息
bin/kafka-consumer-groups.sh --zookeeper 192.168.91.231:2181 --list

二、python操作kafka

本地安裝與啟動(基于Docker)

#1、下載zookeeper鏡像與kafka鏡像:
docker pull registry.cn-shanghai.aliyuncs.com/egon-k8s-test/kafka-zookeeper:3.4.6
docker pull registry.cn-shanghai.aliyuncs.com/egon-k8s-test/wurstmeister-kafka:2.13-2.8.1

#2、本地啟動zookeeper
docker run -d --name zookeeper -p 2181:2181 -t registry.cn-shanghai.aliyuncs.com/egon-k8s-test/kafka-zookeeper:3.4.6

#3、本地啟動kafka(注意下述代碼,將kafka啟動在9092端口)
docker run -d --name kafka --publish 9092:9092 --link zookeeper \
--enV KAFKA ZO0KEEPER CONNECT=zookeeper:2181 \
--enV KAFKA ADVERTISED HOST NAME=192.168.71.113 \
--enV KAFKA ADVERTISED PORT=9092 \
registry.cn-shanghai.aliyuncs.com/egon-k8s-test/wurstmeister-kafka:2.13-2.8.1

上面寫的localhost沒有影響,查看端口如下
# netstat -tuanlp | grep 9092
tcp 0 0 0.0.0.0:9092 0.0.0.0:*LISTEN 102483/docker-proxy
tcp6 00:::9092 :::* LISTEN 102487/docker-proxy

#4、進入kafka bash
docker exec it kafka bash
cd /opt/kafka/bin

#5、創建Topic,分區為2,Topic name為'kafka_demo'
kafka-topics.sh --create --zookeeper zookeeper:2181 \
--replication-factor 1 --partitions 2 --topic kafka_demo

kafka-topics.sh --create --zookeeper zookeeper:2181 \
--replication-factor 1 --partitions 2 --topic egon

數據存在哪里
[root@web02 ~]# docker exec -it kafka bash
bash-5.1#
bash-5.1#
bash-5.1#
bash-5.1# ls /kafka/
kafka-logs-f33383f9c414
bash-5.1#
bash-5.1#
bash-5.1#
bash-5.1# 1s /kafka/kafka-logs-f33383f9c414/
kafka_demo-0 kafka_demo-1
egon-0 egon-1
.........
bash-5.1#
bash-5.1#
bash-5.1#
bash-5.1# ls /kafka/kafka-logs-f33383f9c414/egon-0
00000000000000000000.index0000000000000000000.timeindex
00000000800000080000.1og leader-epoch-checkpoint

#6、查看當前所有topic
kafka-topics.sh --zookeeper zookeeper:2181 --list

#7、命令行操作
$docker exec -ti kafka sh
/ # cd /opt/kafka/bin
/ # kafka-console-producer.sh --bootstrap-server 192.168.71.113:9092 --topic test_topic
然后一行行輸入,回車即發送一條消息
>111
>222
>333

另外一個終端
$ docker exec -ti kafka sh
/ # cd /opt/kafka/bin
/ # kafka-console-consumer.sh --bootstrap-server 192.168.71.113:9092 --topic test_topic --from-beginning可以收到消息
111
222
333

#8、安裝kafka-python
pip install kafka-python

代碼示例:

# pip3 install kafka-python  # 版本是2.0.2
from kafka import KafkaProducer, KafkaConsumer
import json
import threading
import time# Kafka broker address
bootstrap_servers = '192.168.71.113:9092'# Topic name
topic = 'test_topic'# Producer function
def kafka_producer():producer = KafkaProducer(bootstrap_servers=bootstrap_servers,value_serializer=lambda v: json.dumps(v).encode('utf-8'))try:for i in range(10):message = {'message': f'Hello Kafka! Message {i}'}producer.send(topic, value=message)print(f"Sent: {message}")time.sleep(1)else:print("發送完成")except Exception as ex:print(f"Exception occurred: {ex}")finally:producer.close()# Consumer function
def kafka_consumer():consumer = KafkaConsumer(topic,bootstrap_servers=bootstrap_servers,auto_offset_reset='earliest',consumer_timeout_ms=5000)  # 設置超時時間為1秒try:for message in consumer:print(f"Received: {message.value}")else:print("消費完畢,等5000毫秒超時即可結束,執行finally內的代碼")except Exception as ex:print(f"Exception occurred: {ex}")finally:print("消費者結束")consumer.close()# Create threads for producer and consumer
producer_thread = threading.Thread(target=kafka_producer)
consumer_thread = threading.Thread(target=kafka_consumer)# Start both threads
producer_thread.start()
consumer_thread.start()# Wait for threads to complete
producer_thread.join()
consumer_thread.join()print("Kafka producer and consumer threads have finished.")

執行結果:

? ? ? ? ? ? ? ? ??

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/82174.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/82174.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/82174.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

網絡安全基礎--第七課

路由表 路由器的轉發原理:當一個數據包進入路由器,路由器將基于數據包中的目標IP地址,查詢本地 路由表,若表中存在記錄,則將無條件按記錄轉發,若沒有記錄,路由器不能泛洪,因為路由器…

Java SpringBoot 扣子CozeAI SseEmitter流式對話完整實戰 打字機效果

書接上回:springBoot 整合 扣子cozeAI 智能體 對話https://blog.csdn.net/weixin_44548582/article/details/147457236 上文實現的是一次性等待并得到完整的AI回復內容,但隨著問題和AI的邏輯日趨復雜,會明顯增加這個等待時間,這對…

《AVL樹完全解析:平衡之道與C++實現》

目錄 AVL樹的核心概念數據結構與節點定義插入操作與平衡因子更新旋轉操作:從理論到代碼雙旋場景深度剖析平衡檢測與測試策略性能分析與工程實踐總結 0.前置知識:BS樹 代碼實現部分對和BS樹相似的部分會省略。 1. AVL樹的核心概念 1.1 平衡二叉搜索樹…

跨平臺游戲引擎 Axmol-2.6.0 發布

Axmol 2.6.0 版本是一個以錯誤修復和功能改進為主的次要LTS長期支持版本 🙏感謝所有貢獻者及財務贊助者:scorewarrior、peterkharitonov、duong、thienphuoc、bingsoo、asnagni、paulocoutinhox、DelinWorks 相對于2.5.0版本的重要變更: 通…

【Django Serializer】一篇文章詳解 Django 序列化器

第一章 Django 序列化器概述 1.1 序列化器的定義 1.1.1 序列化與反序列化的概念 1. 序列化 想象你有一個裝滿各種物品(數據對象)的大箱子(數據庫),但是你要把這些物品通過一個狹窄的管道(網絡&#xff…

關于spring @Bean里調用其他產生bean的方法

背景 常常見到如下代碼 Bean public TestBean testBean() {TestBean t new TestBean();System.out.println("testBean:" t);return t; }Bean public FooBean fooBean() {TestBean t testBean();System.out.println("這里看似是自己new的,但因為…

Level1.7列表

1.7_1列表(索引切片) #1.列表 students[Bob,Alice,Jim,Mike,Judy] print(students)#2.在列表(添加不同數據類型,查看列表是否可以運行?是否為列表類型?) students[Bob,Alice,Jim,Mike,Judy,123…

Python爬蟲實戰:研究Cola框架相關技術

一、Cola 框架概述 Cola 是一款基于 Python 的異步爬蟲框架,專為高效抓取和處理大規模數據設計。它結合了 Scrapy 的強大功能和 asyncio 的異步性能優勢,特別適合需要高并發處理的爬蟲任務。 1.1 核心特性 異步 IO 支持:基于 asyncio 實現非阻塞 IO,大幅提高并發性能模塊…

vue2中el-table 實現前端分頁

一些接口不分頁的數據列表,一次性返回大量數據會導致前端渲染卡頓,接口不做分頁的情況下前端可以截取數據來做分頁 以下是一個例子,被截取的列表和全量數據在同一個棧內存空間,所以如果有表格內的表單編輯,新的值也會事…

Python + moviepy:根據圖片或數據高效生成視頻全流程詳解

前言 在數據可視化、自媒體內容生產、學術匯報等領域,我們常常需要將一組圖片或一段變動的數據,自動合成為視頻文件。這樣不僅能提升內容表現力,也極大節省了人工操作時間。Python作為數據處理和自動化領域的王者,其`moviepy`庫為我們提供了靈活高效的視頻生成方案。本文將…

科技賦能,開啟現代健康養生新潮流

在科技與生活深度融合的當下,健康養生也迎來了全新的打開方式。無需傳統醫學的介入,借助現代科學與智能設備,我們能以更高效、精準的方式守護健康。? 飲食管理步入精準化時代。利用手機上的營養計算 APP,錄入每日飲食&#xff0…

Ubuntu24.04 LTS安裝java8、mysql8.0

在 Ubuntu 24.04 上安裝 OpenJDK OpenJDK 包在 Ubuntu 24.04 的默認存儲庫中隨時可用。 打開終端并運行以下 apt 命令: sudo apt update查看是否已經安裝java java --version如果未安裝會有提示,直接復制命令安裝即可,默認版本: sudo apt in…

深度學習框架顯存泄漏診斷手冊(基于PyTorch的Memory Snapshot對比分析方法)

點擊 “AladdinEdu,同學們用得起的【H卡】算力平臺”,H卡級別算力,按量計費,靈活彈性,頂級配置,學生專屬優惠。 一、顯存泄漏:深度學習開發者的"隱形殺手" 在深度學習模型的訓練與推…

Pytorch分布式訓練,數據并行,單機多卡,多機多卡

分布式訓練 所有代碼可以見我github 倉庫:https://github.com/xiejialong/ddp_learning.git 數據并行(Data Parallelism,DP) 跨多個gpu訓練模型的最簡單方法是使用 torch.nn.DataParallel. 在這種方法中,模型被復制…

【論文閱讀】——D^3-Human: Dynamic Disentangled Digital Human from Monocular Vi

文章目錄 摘要1 引言2 相關工作3 方法3.1 HmSDF 表示3.2 區域聚合3.3. 變形場3.4. 遮擋感知可微分渲染3.5 訓練3.5.1 訓練策略3.5.2 重建損失3.5.3 正則化限制 4. 實驗4.1 定量評估4.2 定性評價4.3 消融研究4.4 應用程序 5 結論 摘要 我們介紹 D 3 D^{3} D3人,一種…

docker commit除了提交容器成鏡像,還能搞什么之修改cmd命令

要讓新鏡像默認啟動時執行 /usr/sbin/sshd -D,需在提交鏡像時 ??顯式指定新的啟動命令??。 方法一:提交時通過 --change 覆蓋 CMD docker commit --changeCMD ["/usr/sbin/sshd", "-D"] v2 project:v2 方法二:重…

為什么我輸入對了密碼,還是不能用 su 切換到 root?

“為什么我輸入對了密碼,還是不能用 su 切換到 root?” 其實這背后可能不是“密碼錯了”,而是系統不允許你用 su 切 root,即使密碼對了。 👇 以下是最常見的幾個真正原因: ? 1. Root 用戶沒有設置密碼&…

轉移dp簡單數學數論

1.轉移dp問題 昨天的練習賽上有一個很好玩的起終點問題,第一時間給出bfs的寫法。 但是寫到后面發現不行,還得是的dp轉移的寫法才能完美的解決這道題目。 每個格子可以經過可以不經過,因此它的狀態空間是2^(n*m)&…

IP查詢基礎介紹

IP 查詢原理 IP 地址是網絡設備唯一標識,IP 查詢通過解析 IP 地址獲取地理位置、運營商等信息。目前主流的 IPv4(32 位)與 IPv6(128 位)協議,前者理論提供約 43 億地址,后者地址空間近乎無限。…

Linux命令簡介

1 Linux系統的命令概述 在 Linux 操作系統中,凡是在字符操作界面中輸入能夠完成特定操作和任務的字符串都可以稱為命令。嚴格來說,命令通常只代表實現某一類功能的指令或程序的名稱。 1.1 Shell Linux 命令的執行必須依賴于 Shell 命令解釋器。Shell …