簡介
???? python連接kafka的標準庫,kafka-python和pykafka。kafka-python使用的人多是比較成熟的庫,kafka-python并沒有zk的支持。pykafka是Samsa的升級版本,使用samsa連接zookeeper,生產者直接連接kafka服務器列表,消費者才用zookeeper。
安裝
# PyPI安裝
pip install kafka-python# conda安裝
conda install -c conda-forge kafka-python# anaconda自帶pip安裝
/root/anaconda3/bin/pip install kafka-python
官方鏈接
- 官網:https://kafka-python.readthedocs.io/en/master/index.html
- git:https://github.com/dpkp/kafka-python
注意:1.4.0 以上的 kafka-python 版本使用了獨立的心跳線程去上報心跳
生產者
API:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html。
生產者代碼是線程安全的,支持多線程,而消費者則不然。
類 KafkaProducer
class kafka.KafkaProducer(**configs)
- bootstrap_servers –'host[:port]'字符串,或者由'host[:port]'組成的字符串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host為broker(Broker:緩存代理,Kafka集群中的單臺服務器)地址,默認值為 localhost, port默認值為9092,這里可以不用填寫所有broker的host和port,但必須保證至少有一個broker)
- key_serializer (可調用對象) –用于轉換用戶提供的key值為字節,必須返回字節數據。 如果為None,則等同調用f(key)。 默認值: None.
- value_serializer(可調用對象) – 用于轉換用戶提供的value消息值為字節,必須返回字節數據。 如果為None,則等同調用f(value)。 默認值: None.
方法
send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)
函數返回FutureRecordMetadata類型的RecordMetadata數據
- topic(str) – 設置消息將要發布到的主題,即消息所屬主題
- value(可選) – 消息內容,必須為字節數據,或者通過value_serializer序列化后的字節數據。如果為None,則key必填,消息等同于“刪除”。( If value is None, key is required and message acts as a ‘delete’)
- partition (int, 可選) – 指定分區。如果未設置,則使用配置的partitioner
- key (可選) – 和消息對應的key,可用于決定消息發送到哪個分區。如果partition為None,則相同key的消息會被發布到相同分區(但是如果key為None,則隨機選取分區)(If partition is None (and producer’s partitioner config is left as default), then messages with the same key will be delivered to the same partition (but if key is None, partition is chosen randomly)). 必須為字節數據或者通過配置的key_serializer序列化后的字節數據.
- headers (可選) – 設置消息header,header-value鍵值對表示的list。list項為元組:格式 (str_header,bytes_value)
- timestamp_ms (int, 可選) –毫秒數 (從1970 1月1日 UTC算起) ,作為消息時間戳。默認為當前時間
flush(timeout=None)
發送所有可以立即獲取的緩沖消息(即時linger_ms大于0),線程block直到這些記錄發送完成。當一個線程等待flush調用完成而block時,其它線程可以繼續發送消息。
注意:flush調用不保證記錄發送成功
metrics(raw=False)
獲取生產者性能指標。
#-*- encoding:utf-8 -*-
from kafka import KafkaProducer
import jsonproducer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
for i in range(0, 100):producer.send('MY_TOPIC1', value=b'lai zi shouke de msg', key=None, headers=None, partition=None, timestamp_ms=None)# Block直到單條消息發送完或者超時
future = producer.send('MY_TOPIC1', value=b'another msg',key=b'othermsg')
result = future.get(timeout=60)
print(result)
# future.get函數等待單條消息發送完成或超時,經測試,必須有這個函數,不然發送不出去,或用time.sleep代替,待驗證# Block直到所有阻塞的消息發送到網絡
# 注意: 該操作不保證傳輸或者消息發送成功,僅在配置了linger_ms的情況下有用。(It is really only useful if you configure internal batching using linger_ms# 序列化json數據
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('MY_TOPIC1', {'shouke':'kafka'})# 序列化字符串key
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', key_serializer=str.encode)
producer.send('MY_TOPIC1', b'shouke', key='strKey')# 壓縮
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092',compression_type='gzip')
for i in range(2):producer.send('MY_TOPIC1', ('msg %d' % i).encode('utf-8'))# 消息記錄攜帶header
producer.send('MY_TOPIC1', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64'),])# 獲取性能數據(注意,實踐發現分區較多的情況下,該操作比較耗時
metrics = producer.metrics()
print(metrics)
producer.flush()
實踐中遇到錯誤: kafka.errors.NoBrokersAvailable: NoBrokersAvailable,解決方案如下:
進入到配置目錄(config),編輯server.properties文件,查找并設置listener,配置監聽端口,格式:listeners = listener_name://host_name:port,供kafka客戶端連接用的ip和端口,例中配置如下:
listeners=PLAINTEXT://127.0.0.1:9092
消費者
參考API:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
消費者代碼不是線程安全的,最好不要用多線程
類KafkaConsumer
class kafka.KafkaConsumer(*topics, **configs)
*topics (str) – 可選,設置需要訂閱的topic,如果未設置,需要在消費記錄前調用subscribe或者assign。
- bootstrap_servers –'host[:port]'字符串,或者由'host[:port]'組成的字符串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host為broker(Broker:緩存代理,Kafka集群中的單臺服務器)地址,默認值為 localhost, port默認值為9092,這里可以不用填寫所有broker的host和port,但必須保證至少有一個broker)
- client_id (str) – 客戶端名稱,默認值: ‘kafka-python-{version}’
- group_id (str or None) – 消費組名稱。如果為None,則通過group coordinator auto-partition分區分配,offset提交被禁用。默認為None
- auto_offset_reset (str) – 重置offset策略: 'earliest'將移動到最老的可用消息, 'latest'將移動到最近消息。 設置為其它任何值將拋出異常。默認值:'latest'。
- enable_auto_commit (bool) – 如果為True,將自動定時提交消費者offset。默認為True。
- auto_commit_interval_ms (int) – 自動提交offset之間的間隔毫秒數。如果enable_auto_commit 為true,默認值為: 5000。
- value_deserializer(可調用對象) - 攜帶原始消息value并返回反序列化后的value
- consumer_timeout_ms – 毫秒數,若不指定 consumer_timeout_ms,默認一直循環等待接收,若指定,則超時返回,不再等待
- max_poll_interval_ms – 毫秒數,它表示最大的poll數據間隔,如果超過這個間隔沒有發起pool請求,但heartbeat仍舊在發,就認為該 consumer 處于 livelock 狀態,進行 reblancing
- session_timout_ms – 毫秒數,控制心跳超時時間。在分布式系統中,由于網絡問題你不清楚沒接收到心跳,是因為對方真正掛了還是只是因為負載過重沒來得及發生心跳或是網絡堵塞。所以一般會約定一個時間,超時即判定對方掛了
- heartbeat_interval_ms – 毫秒數,控制心跳發送頻率,頻率越高越不容易被誤判,但也會消耗更多資源。
- max_pool_record(int),kafka 每次 pool 拉取消息的最大數量
subscribe(topics=(), pattern=None, listener=None)
訂閱需要的主題
- topics (list) – 需要訂閱的主題列表
- pattern (str) – 用于匹配可用主題的模式,即正則表達式。注意:必須提供topics、pattern兩者參數之一,但不能同時提供兩者。
metrics(raw=False)
獲取消費者性能指標。
#-*- encoding:utf-8 -*-
from kafka import KafkaConsumer
from kafka import TopicPartition
import json
consumer = KafkaConsumer('MY_TOPIC1',bootstrap_servers=['127.0.0.1:9092'],auto_offset_reset='latest', # 消費 kafka 中最近的數據,如果設置為 earliest 則消費最早的未被消費的數據enable_auto_commit=True, # 自動提交消費者的 offsetauto_commit_interval_ms=3000, # 自動提交消費者 offset 的時間間隔group_id='MY_GROUP1',consumer_timeout_ms= 10000, # 如果 10 秒內 kafka 中沒有可供消費的數據,自動退出client_id='consumer-python3'
)for msg in consumer:print (msg)print('topic: ', msg.topic)print('partition: ', msg.partition)print('key: ', msg.key, 'value: ', msg.value)print('offset:', msg.offset)print('headers:', msg.headers)# Get consumer metrics
metrics = consumer.metrics()
print(metrics)# 通過assign、subscribe兩者之一為消費者設置消費的主題
consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'],auto_offset_reset='latest',enable_auto_commit=True, # 自動提交消費數據的 offsetconsumer_timeout_ms= 10000, # 如果 10 秒內 kafka 中沒有可供消費的數據,自動退出value_deserializer=lambda m: json.loads(m.decode('ascii')), #消費json 格式的消息client_id='consumer-python3'
)# consumer.assign([TopicPartition('MY_TOPIC1', 0)])
# msg = next(consumer)
# print(msg)
consumer.subscribe('MY_TOPIC1')
for msg in consumer:print (msg)
客戶端
- 參考API: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaClient.html
- 用于異步請求/響應網絡I / O的網絡客戶端。
- 這是一個內部類,用于實現面向用戶的生產者和消費者客戶端。
- 此類不是線程安全的!
- 參考API:?https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html?
- 管理Kafka集群
類?KafkaClient
class kafka.client.KafkaClient(**configs)
- bootstrap_servers –'host[:port]'字符串,或者由'host[:port]'組成的字符串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host為broker(Broker:緩存代理,Kafka集群中的單臺服務器)地址,默認值為 localhost, port默認值為9092,這里可以不用填寫所有broker的host和port,但必須保證至少有一個broker)
- client_id (str) – 客戶端名稱,默認值: ‘kafka-python-{version}’
- request_timeout_ms (int) – 客戶端請求超時時間,單位毫秒。默認值: 30000.
方法
brokers()
獲取所有broker元數據
available_partitions_for_topic(topic)
返回主題的所有分區
#-*- encoding:utf-8 -*-
from kafka.client import KafkaClientclient = KafkaClient(bootstrap_servers=['127.0.0.1:9092'], request_timeout_ms=3000)# 獲取所有broker
brokers = client.cluster.brokers()
for broker in brokers:print('broker: ', broker) # broker: BrokerMetadata(nodeId=0, host='127.0.0.1', port=9092, rack=None)print('broker nodeId: ', broker.nodeId) # broker nodeId: 0# 獲取主題的所有分區
topic = 'MY_TOPIC1'
partitions = client.cluster.available_partitions_for_topic(topic)
print(partitions) # {0}partition_dict = {}
partition_dict[topic] = [partition for partition in partitions]
print(partition_dict) # {'MY_TOPIC1': [0]}
?
類?KafkaAdminClient
class kafka.client.KafkaAdminClient(**configs)
- bootstrap_servers –'host[:port]'字符串,或者由'host[:port]'組成的字符串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host為broker(Broker:緩存代理,Kafka集群中的單臺服務器)地址,默認值為 localhost, port默認值為9092,這里可以不用填寫所有broker的host和port,但必須保證至少有一個broker)
- client_id (str) – 客戶端名稱,默認值: ‘kafka-python-{version}’
- request_timeout_ms (int) – 客戶端請求超時時間,單位毫秒。默認值: 30000.
方法
list_topics()
獲取所有的 topic
create_partitions(topic_partitions,timeout_ms = None,validate_only = False )
為現有主題創建其他分區。返回值:合適版本的CreatePartitionsResponse類。
- topic_partitions –主題名稱字符串到NewPartition對象的映射。
- timeout_ms –代理返回之前等待創建新分區的毫秒數。
- validate_only –如果為True,則實際上不創建新分區。默認值:False
create_topics(new_topics,timeout_ms = None,validate_only = False )
在集群中創建新主題。返回值:合適版本的CreateTopicResponse類。
- new_topics – NewTopic對象的列表。
- timeout_ms –代理返回之前等待創建新主題的毫秒。
- validate_only –如果為True,則實際上不創建新主題。并非所有版本都支持。默認值:False
delete_topics(主題,timeout_ms =無)
從集群中刪除主題。返回值:合適版本的DeleteTopicsResponse類。
- 主題-主題名稱的字符串列表。
- timeout_ms –代理返回之前等待刪除主題的毫秒數。
describe_consumer_groups(group_ids,group_coordinator_id = None,include_authorized_operations = False)
描述一組消費者group。返回值:組說明列表。目前,組描述是DescribeGroupsResponse的原始結果。
- group_ids –消費者組ID的列表。這些通常是作為字符串的組名。
- group_coordinator_id –組的協調器代理的node_id。如果設置為None,它將查詢群集中的每個組以找到該組的協調器。如果您已經知道組協調器,則明確指定此選項對于避免額外的網絡往返很有用。這僅在所有group_id具有相同的協調器時才有用,否則會出錯。默認值:無。
- include_authorized_operations –是否包括有關允許組執行的操作的信息。僅在API版本> = v3上受支持。默認值:False。
list_consumer_group_offsets(group_id,group_coordinator_id = None,partitions = None)
獲取單個消費者組的消費者offset。注意:這不會驗證group_id或分區在集群中是否實際存在。一旦遇到任何錯誤,就會立即報錯。? ?返回字典:具有TopicPartition鍵和OffsetAndMetada值的字典。省略未指定且group_id沒有記錄偏移的分區。偏移值-1表示group_id對于該TopicPartition沒有偏移。一個-1只能發生于顯式指定的分區。?
- group_id –要獲取其偏移量的消費者組ID名稱。
- group_coordinator_id –組的協調代理的node_id。如果設置為None,將查詢群集以查找組協調器。如果您已經知道組協調器,則明確指定此選項對于防止額外的網絡往返很有用。默認值:無。
- partitions –要獲取其偏移量的TopicPartitions列表。在> = 0.10.2上,可以將其設置為“無”以獲取使用者組的所有已知偏移量。默認值:無。
list_consumer_groups(broker_ids = None)
列出集群已知的所有消費者組。這將返回消費者組元組的列表。元組由使用者組名稱和使用者組協議類型組成。僅返回將偏移量存儲在Kafka中的消費者組。對于使用Kafka <0.9 API創建的群組,協議類型將為空字符串,因為盡管它們將偏移量存儲在Kafka中,但它們并不使用Kafka進行群組協調。對于使用Kafka> = 0.9創建的群組,協議類型通常為“消費者”。
- broker_ids –用于查詢使用者組的代理節點ID的列表。如果設置為None,將查詢集群中的所有代理。明確指定經紀人對于確定哪些消費者組由這些經紀人進行協調很有用。默認值:無
from kafka.admin import KafkaAdminClient, NewTopic
client = KafkaAdminClient(bootstrap_servers="localhost:9092")
topic_list = []# 創建自定義分區的topic 可以使用以下方法創建名稱為test,12個分區3份副本的topic
topic_list.append(NewTopic(name="test", num_partitions=12, replication_factor=3))
client.create_topics(new_topics=topic_list, validate_only=False)# 獲取所有的 topic
client.list_topics()# 刪除 topic
client.delete_topics(['test', 'ssl_test']) # 傳入要刪除的 topic 列表# list_consumer_groups()的返回值是一個元組(消費者組的名稱,消費組協議類型)組成的列表。
client.list_consumer_groups()
# [('xray', 'consumer'), ('awvs', 'consumer')]# 返回值是一個字典,字典的key是TopicPartition,值是OffsetAndMetada
client.list_consumer_group_offsets('awvs')
# {TopicPartition(topic='scan, partition=0): OffsetAndMetadata(offset=17, metadata='')
?