閱讀目錄
- 一、下載鏡像
- 二、先啟動zookeeper
- 三、啟動kafka
- 四、創建一個topic(使用代碼次步可省略)
- 五、kafka設置分區數量
- 六、python代碼
一、下載鏡像
docker pull wurstmeister/zookeeper docker pull wurstmeister/kafka
二、先啟動zookeeper
#單機方式 docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
三、啟動kafka
#單機方式 docker run -d --name kafka \ -p 9092:9092 \ -e KAFKA_BROKER_ID=0 \ -e KAFKA_ZOOKEEPER_CONNECT=10.0.0.101:2181 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.0.0.101:9092 \ -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
四、創建一個topic(使用代碼次步可省略)
#進入容器 docker exec -it ${CONTAINER ID} /bin/bash cd opt/bin #單機方式:創建一個主題 bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic mykafka #運行一個生產者 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mykafka #運行一個消費者 bin/kafka-console-consumer.sh --zookeeper zookeeper:2181 --topic mykafka --from-beginning
五、kafka設置分區數量
#分區數量的作用:有多少分區就能負載多少個消費者,生產者會自動分配給分區數據,每個消費者只消費自己分區的數據,每個分區有自己獨立的offset #進入kafka容器 vi opt/kafka/config/server.properties 修改run.partitions=2 #退出容器 ctrl+p+q #重啟容器 docker restart kafka ? #修改指定topic ./kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 3 --topic topicname
六、python代碼
#生產者 from kafka import KafkaProducer import json import datetime ? topic='test' producer = KafkaProducer(bootstrap_servers='10.0.0.101:9092',value_serializer=lambda m:json.dumps(m).encode("utf-8")) # 連接kafka # 參數bootstrap_servers:指定kafka連接地址 # 參數value_serializer:指定序列化的方式,我們定義json來序列化數據,當字典傳入kafka時自動轉換成bytes # 用戶密碼登入參數 # security_protocol="SASL_PLAINTEXT" # sasl_mechanism="PLAIN" # sasl_plain_username="maple" # sasl_plain_password="maple" ? for i in range(1000):data={"num":i,"ts":datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}producer.send(topic,data) ? producer.close()
#消費者 from kafka import KafkaConsumer import time ? topic = 'test' consumer = KafkaConsumer(topic, bootstrap_servers=['10.0.0.101:9092'], group_id="test", auto_offset_reset="earliest") # 參數bootstrap_servers:指定kafka連接地址 # 參數group_id:如果2個程序的topic和group_id相同,那么他們讀取的數據不會重復,2個程序的topic相同,group_id不同,那么他們各自消費相同的數據,互不影響 # 參數auto_offset_reset:默認為latest表示offset設置為當前程序啟動時的數據位置,earliest表示offset設置為0,在你的group_id第一次運行時,還沒有offset的時候,給你設定初始offset。一旦group_id有了offset,那么此參數就不起作用了 ? ? for msg in consumer:recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)print(recv)# time.sleep(1)
#運行3個消費者結果 test:0:3212: key=None value=b'{"num": 981, "ts": "2021-02-23 16:38:14"}' test:0:3213: key=None value=b'{"num": 982, "ts": "2021-02-23 16:38:14"}' test:0:3214: key=None value=b'{"num": 987, "ts": "2021-02-23 16:38:14"}' test:0:3215: key=None value=b'{"num": 997, "ts": "2021-02-23 16:38:14"}' test:0:3216: key=None value=b'{"num": 998, "ts": "2021-02-23 16:38:14"}' test:0:3217: key=None value=b'{"num": 999, "ts": "2021-02-23 16:38:14"}' ? test:1:353: key=None value=b'{"num": 970, "ts": "2021-02-23 16:38:14"}' test:1:354: key=None value=b'{"num": 977, "ts": "2021-02-23 16:38:14"}' test:1:355: key=None value=b'{"num": 978, "ts": "2021-02-23 16:38:14"}' test:1:356: key=None value=b'{"num": 979, "ts": "2021-02-23 16:38:14"}' test:1:357: key=None value=b'{"num": 984, "ts": "2021-02-23 16:38:14"}' test:1:358: key=None value=b'{"num": 985, "ts": "2021-02-23 16:38:14"}' test:1:359: key=None value=b'{"num": 994, "ts": "2021-02-23 16:38:14"}' ? test:2:317: key=None value=b'{"num": 989, "ts": "2021-02-23 16:38:14"}' test:2:318: key=None value=b'{"num": 990, "ts": "2021-02-23 16:38:14"}' test:2:319: key=None value=b'{"num": 991, "ts": "2021-02-23 16:38:14"}' test:2:320: key=None value=b'{"num": 992, "ts": "2021-02-23 16:38:14"}' test:2:321: key=None value=b'{"num": 993, "ts": "2021-02-23 16:38:14"}' test:2:322: key=None value=b'{"num": 995, "ts": "2021-02-23 16:38:14"}' test:2:323: key=None value=b'{"num": 996, "ts": "2021-02-23 16:38:14"}'