Docker kafka

閱讀目錄

  • 一、下載鏡像
  • 二、先啟動zookeeper
  • 三、啟動kafka
  • 四、創建一個topic(使用代碼次步可省略)
  • 五、kafka設置分區數量
  • 六、python代碼


回到頂部

一、下載鏡像

docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka

回到頂部

二、先啟動zookeeper

#單機方式
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

回到頂部

三、啟動kafka

#單機方式
docker run -d --name kafka \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=10.0.0.101:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.0.0.101:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka

回到頂部

四、創建一個topic(使用代碼次步可省略)

#進入容器
docker exec -it ${CONTAINER ID} /bin/bash
cd opt/bin
#單機方式:創建一個主題
bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic mykafka
#運行一個生產者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mykafka
#運行一個消費者
bin/kafka-console-consumer.sh --zookeeper zookeeper:2181 --topic mykafka --from-beginning

回到頂部

五、kafka設置分區數量

#分區數量的作用:有多少分區就能負載多少個消費者,生產者會自動分配給分區數據,每個消費者只消費自己分區的數據,每個分區有自己獨立的offset
#進入kafka容器
vi opt/kafka/config/server.properties
修改run.partitions=2
#退出容器
ctrl+p+q
#重啟容器
docker restart kafka
?
#修改指定topic
./kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 3 --topic topicname

回到頂部

六、python代碼

#生產者
from kafka import KafkaProducer
import json
import datetime
?
topic='test'
producer = KafkaProducer(bootstrap_servers='10.0.0.101:9092',value_serializer=lambda m:json.dumps(m).encode("utf-8"))  # 連接kafka
# 參數bootstrap_servers:指定kafka連接地址
# 參數value_serializer:指定序列化的方式,我們定義json來序列化數據,當字典傳入kafka時自動轉換成bytes
# 用戶密碼登入參數
# security_protocol="SASL_PLAINTEXT"
# sasl_mechanism="PLAIN"
# sasl_plain_username="maple"
# sasl_plain_password="maple"
?
for i in range(1000):data={"num":i,"ts":datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}producer.send(topic,data)
?
producer.close()
#消費者
from kafka import KafkaConsumer
import time
?
topic = 'test'
consumer = KafkaConsumer(topic, bootstrap_servers=['10.0.0.101:9092'], group_id="test", auto_offset_reset="earliest")
# 參數bootstrap_servers:指定kafka連接地址
# 參數group_id:如果2個程序的topic和group_id相同,那么他們讀取的數據不會重復,2個程序的topic相同,group_id不同,那么他們各自消費相同的數據,互不影響
# 參數auto_offset_reset:默認為latest表示offset設置為當前程序啟動時的數據位置,earliest表示offset設置為0,在你的group_id第一次運行時,還沒有offset的時候,給你設定初始offset。一旦group_id有了offset,那么此參數就不起作用了
?
?
for msg in consumer:recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)print(recv)# time.sleep(1)
#運行3個消費者結果
test:0:3212: key=None value=b'{"num": 981, "ts": "2021-02-23 16:38:14"}'
test:0:3213: key=None value=b'{"num": 982, "ts": "2021-02-23 16:38:14"}'
test:0:3214: key=None value=b'{"num": 987, "ts": "2021-02-23 16:38:14"}'
test:0:3215: key=None value=b'{"num": 997, "ts": "2021-02-23 16:38:14"}'
test:0:3216: key=None value=b'{"num": 998, "ts": "2021-02-23 16:38:14"}'
test:0:3217: key=None value=b'{"num": 999, "ts": "2021-02-23 16:38:14"}'
?
test:1:353: key=None value=b'{"num": 970, "ts": "2021-02-23 16:38:14"}'
test:1:354: key=None value=b'{"num": 977, "ts": "2021-02-23 16:38:14"}'
test:1:355: key=None value=b'{"num": 978, "ts": "2021-02-23 16:38:14"}'
test:1:356: key=None value=b'{"num": 979, "ts": "2021-02-23 16:38:14"}'
test:1:357: key=None value=b'{"num": 984, "ts": "2021-02-23 16:38:14"}'
test:1:358: key=None value=b'{"num": 985, "ts": "2021-02-23 16:38:14"}'
test:1:359: key=None value=b'{"num": 994, "ts": "2021-02-23 16:38:14"}'
?
test:2:317: key=None value=b'{"num": 989, "ts": "2021-02-23 16:38:14"}'
test:2:318: key=None value=b'{"num": 990, "ts": "2021-02-23 16:38:14"}'
test:2:319: key=None value=b'{"num": 991, "ts": "2021-02-23 16:38:14"}'
test:2:320: key=None value=b'{"num": 992, "ts": "2021-02-23 16:38:14"}'
test:2:321: key=None value=b'{"num": 993, "ts": "2021-02-23 16:38:14"}'
test:2:322: key=None value=b'{"num": 995, "ts": "2021-02-23 16:38:14"}'
test:2:323: key=None value=b'{"num": 996, "ts": "2021-02-23 16:38:14"}'

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/386429.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/386429.shtml
英文地址,請注明出處:http://en.pswp.cn/news/386429.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

這些年Android面試的那些套路,社招面試心得

前言 說不焦慮其實是假的,因為無論是現在還是最近幾年,很早就有人察覺Android開發的野蠻生長時代已經過去。過去的優勢是市場需要,這個技術少有人有,所以在搶占市場的時候,基本上滿足需要就已經可以了。但是現在&…

flask第一章:項目環境搭建

windows環境pycharmpython3 1、命令提示窗口 1)創建項目目錄:mkdir myblog 2)cd myblog 3)創建虛擬環境:python -m venv myvenv 4)激活虛擬環境:venv\Scripts\activate 5)安裝flask&…

windows docker 空出C盤 遷移到其他盤

下面是操作方法: 首先關閉docker 關閉所有發行版:wsl --shutdown 將docker-desktop-data導出到D:\SoftwareData\wsl\docker-desktop-data\docker-desktop-data.tar(注意,原有的docker images不會一起導出)wsl --expo…

安卓開發入門到精通!免費Android高級工程師學習資源,系列篇

前言 2017年進大學開始接觸Android,從剛開始接觸就不斷地聽到Android市場飽和,工作難找等消息。雖然當時也非常迷茫,不過由于第一次深入接觸編程語言,再加上自己的一點興趣,就一直堅持下來了。 到現在要畢業了&#…

安卓開發基礎面試題,9次Android面試經驗總結,面試必備

前言 上回承諾過大家,一定會出 HTTP 的系列文章,今天終于整理完成了。作為一個 web 開發,HTTP 幾乎是天天要打交道的東西,但我發現大部分人對 HTTP 只是淺嘗輒止,對更多的細節及原理就了解不深了,在面試的…

基于TCP的在線聊天程序

在線聊天服務端 import tkinter import tkinter.font as tkFont import socket import threading import time import sys class ServerUI():local127.0.0.1port5505global serverSock;flagFalsedef __init__(self):self.roottkinter.Tk()self.root.title(在線聊天-服務端v1.0)…

Docker安裝Confluence

Docker安裝Confluence 參考鏈接: https://my.oschina.net/u/2289161/blog/1648587 https://hub.docker.com/r/cptactionhank/atlassian-confluence/dockerfile https://my.oschina.net/u/2289161/blog/1647061 https://my.oschina.net/u/2289161/blog/838218 https://hub.…

安卓開發基礎面試題,Android面試必備的集合源碼詳解,附小技巧

去年無疑是 Flutter 技術如火如荼發展的一年。 每一個移動開發者都在為 Flutter 帶來的“快速開發、富有表現力和靈活的 UI、原生性能”的特色和理念而癡狂,從超級 App 到獨立應用,從純 Flutter 到混合棧,開發者們在不同的場景下樂此不疲的探…

『算法』讀書筆記 1.4算法分析 Part1

Chapter 1 本章結構 1.1Java語法 1.2數據抽象 1.3集合類抽象數據類型:背包 (Bags) 、隊列 (Queues) 、棧 (Stacks) 1.4算法分析 1.5連通性問題-Case Study: Union - Find ADT 本節開篇使用了一個ThreeSum程序進行示例: ThreeSum所起到的作用…

JS調用MetaMask調用啟動轉賬

1 、代碼必須跑在nginx下,否則沒有eth對象。 2、可以下載ganache來單跑個私服,然后安裝谷歌metamask瀏覽器插件來實驗 3、賬戶1:0xFA387e41FA471172cC729167EBD4862aA7020D91 賬戶2:0x818DF62ff0bE3B28AE8be25e2e848E10138018B7 4、1000000000000000 …

安卓開發工程師面試題!春招我借這份PDF的復習思路,不吃透都對不起自己

寫在前面 身邊有不少去大廠面試的朋友,其中小金面試字節跳動的經歷很有意義,在這里分享給大家。小金是末流211計算機專業大三本科生,前幾天面試了字節跳動的廣州Android開發實習生。下面是他的面試經歷,還有一些他自己的經驗。 …

合算的日本料理

巨鹿路和那個茂名路路口的《和味》,有預訂的話才98一個人,味道不錯,樓上的桃子MM服務狠好,笑容狠甜。那里的東西味道還是狠正宗的,除了一個色拉不對。那里的清酒和梅酒都不錯,尤其梅酒。生牛肉雖然沒有大漁…

安卓開發必須會的技能!淺談Android消息機制原理,威力加強版

目錄 想要成為一名優秀的Android開發,你需要一份完備的知識體系,在這里,讓我們一起成長為自己所想的那樣。 PagerAdapter 介紹ViwePager 緩存策略ViewPager 布局處理ViewPager 事件處理相關內容 Android 基礎 1.Activity 1、 什么是 Activi…

NuGet 無法連接到遠程服務器-解決方法(轉)

原地址: http://www.lixin.me/blog/2012/03/01/29362 今天打開NuGet的Manage NuGet Packages,顯示“無法連接到遠程服務器”。打開Setting-》Package Manager-》Package Sources。看到里面有一個源:https://go.microsoft.com/fwlink/?LinkID…

安卓開發面試書籍,全世界都在問Android開發涼了嗎?建議收藏

前言 本想今年辭掉工作大干一場,沒想到碰到疫情,家里蹲了3個月…,還好字節能給一次機會。前陣子字節跳動的提前批開始了,看宣傳是說有海量HC,機會多多,本著漲漲面經的心理,然后就投遞了一下杭州…

杭州集訓Day5

下面是Day5的題目!(其實都咕了好幾天了 1007040210. T1 皇后 XY 的疑難 (1s 512MB) 1.1 題目描述有一個n*n的王國城堡地圖上,皇后XY喜歡看騎士之間的戰斗,于是他準備布置m個騎士,其中每一個騎士都可以向8個方向&#x…

安卓開發面試書籍,每個程序員都必須掌握的8種數據結構!面試必會

前言 本篇文章主要記錄分享我的面試準備過程。 很多朋友問我為什么離職 關于離職原因,馬云有一句經典的話“要么錢沒給到位,要么心委屈了”,想必大家耳熟能詳了,我這里再細說一下我個人離職原因: 工資倒掛&#xf…

使用thinkPHP做注冊程序的實例

登錄界面&#xff1a; 數據庫和數據表的結構 具體的操作步驟如下&#xff1a; 第一步&#xff1a;入口文件index.php內容 (此文件基本是屬于固定的格式&#xff09; <?phpdefine(THINK_PATH,./ThinkPHP/);define(APP_NAME,MyApp);define(APP_PAHT,./MyApp/);require_once T…

安卓開發面試技能介紹,來一份全面的面試寶典練練手,不吃透都對不起自己

前言 網上有很多對程序員簡歷的一些指導&#xff0c;這里就不重述&#xff0c;大家可以搜下網上其他大神的總結&#xff0c;結合自身情況修改下。我有幾點建議&#xff1a; 1.盡量不要花哨&#xff0c;程序員和設計師或者產品運營還不一樣&#xff0c;我們的簡歷成功與否決定…

上交所行情文件導入數據庫

事情的起因很簡單&#xff0c;需要將股票收盤行情導入數據庫&#xff0c;因為科創板交易時間延長&#xff0c;需要將原有的程序進行改造&#xff0c;眾所周知&#xff0c;程序員永遠是不夠用的&#xff0c;只能自己解決這個問題。 方式是用定時器調用shell腳本。 上交所的mktdt…