可以在可視化的工具通過點擊來操作kafka完成主題的創建,分區等操作
注意: 安裝完后桌面不會有快捷方式,需要去電腦上搜索,或者去自己選的安裝位置找到發送快捷方式到桌面!
?連接配置
創建主題
刪除主題
主題下的數據查看
數據顯示問題說明
修改工具的數據顯示類型
發送消息數據到kafka
Kafka的Python API的操作
模塊安裝
純Python的方式操作Kafka。
準備工作:在node1的節點上安裝一個python用于操作Kafka的庫
安裝kafka-python 模模塊 ,模塊中提供了操作kafka的方法
在線安裝
在node1上安裝就可以,需要保證服務器能夠連接網絡
?安裝命令: python -m pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple
離線安裝
將kafka_python-2.0.2-py2.py3-none-any.whl安裝包上傳服務器software目錄下進行安裝
?安裝命令: pip install kafka_python-2.0.2-py2.py3-none-any.whl
模塊使用
API使用的參考文檔: Usage — kafka-python 2.0.2-dev documentation
模塊中封裝了兩個類,
一個是生成者類KafkaProducer,提供了向kafka寫數據的方法
另一個是消費者類KafkaConsumer,提供了讀取kafka數據的方法
完成生產者代碼
生成者類KafkaProducer,提供了向kafka寫數據的方法
?send(topic,valu)方法: 發送消息topic參數:指定向哪個主題發送消息value參數:指定發送的消息數據 ,數據類型要求是bytes類型
示例:
?# 導包from kafka import KafkaProducer?# 編寫代碼if __name__ == '__main__':# 創建生產者對象并指定對應服務器producer = KafkaProducer(bootstrap_servers=['node1:9092'])# 發送消息for i in range(1,101):future = producer.send('kafka', f'hi_kafka_{i}'.encode())# 獲取元數據record_metadata = future.get()# 從元數據中獲取主題,分區,偏移print(record_metadata.topic)print(record_metadata.partition)print(record_metadata.offset)
完成消費者代碼
消費者類KafkaConsumer,提供了讀取kafka數據的方法
?KafkaConsumer(topic,bootstrap_servers)第一個參數:指定消費者連接的主題,第二個參數:指定消費者連接的kafka服務器
示例:
?# 導包from kafka import KafkaConsumer?# 編寫代碼if __name__ == '__main__':?# 創建消費者對象consumer = KafkaConsumer('kafka',bootstrap_servers=['node1:9092'])# 遍歷對象for message in consumer:?# 格式化打印,設置相關參數# 因為value是二進制,需要decode解碼print ("主題:%s,分區:%d,偏移:%d : key=%s value=%s"% (message.topic, message.partition,message.offset, message.key, message.value.decode('utf8')))?
可能遇到的錯誤:
?原因: 服務器環境有問題。是因為服務器上既安裝了kafka-python的第三方依賴,同時還安裝kafka的第三方依賴。可以通過pip list | grep kafka進行確定解決辦法: 先將這兩個第三方依賴全部卸載,然后再重新執行如下命令python -m pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple