為了向Kafka集群生產和消費消息,我們可以使用confluent-kafka
庫,它是Confluent為Python提供的官方Kafka客戶端。以下是一個簡化的示例,展示如何將Kafka的生產者和消費者操作封裝到一個類中:
首先,確保你已經安裝了所需的庫:
pip install confluent-kafka
然后,你可以使用以下代碼:
from confluent_kafka import Producer, Consumer, KafkaErrorclass KafkaManager:def __init__(self, bootstrap_servers):self.bootstrap_servers = bootstrap_serversdef produce(self, topic, key, value):"""生產消息到Kafka"""p = Producer({'bootstrap.servers': self.bootstrap_servers})def delivery_report(err, msg):"""Called once for each message produced to indicate delivery result."""if err is not None:print('Message delivery failed: {}'.format(err))else:print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))p.produce(topic, key=key, value=value, callback=delivery_report)p.flush()def consume(self, topic, group_id, timeout=1.0):"""從Kafka消費消息"""c = Consumer({'bootstrap.servers': self.bootstrap_servers,'group.id': group_id,'auto.offset.reset': 'earliest'})c.subscribe([topic])while True:msg = c.poll(timeout)if msg is None:continueif msg.error():if msg.error().code() == KafkaError._PARTITION_EOF:print('Reached end of partition')else:print('Error while consuming message: {}'.format(msg.error()))else:print('Received message: {}'.format(msg.value().decode('utf-8')))c.close()# 使用示例
if __name__ == "__main__":manager = KafkaManager('localhost:9092')# 生產消息manager.produce('test_topic', 'key1', 'value1')# 消費消息manager.consume('test_topic', 'test_group')
pip install kafka-python
from kafka import KafkaProducer, KafkaConsumerclass KafkaManager:def __init__(self, bootstrap_servers):self.bootstrap_servers = bootstrap_serversdef produce(self, topic, key, value):"""生產消息到Kafka"""producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers,key_serializer=str.encode,value_serializer=str.encode)producer.send(topic, key=key, value=value)producer.flush()producer.close()def consume(self, topic, group_id, timeout=10):"""從Kafka消費消息"""consumer = KafkaConsumer(topic,bootstrap_servers=self.bootstrap_servers,group_id=group_id,auto_offset_reset='earliest',key_deserializer=bytes.decode,value_deserializer=bytes.decode)for message in consumer:print(f"Received message: {message.value}")consumer.close()# 使用示例
if __name__ == "__main__":manager = KafkaManager('localhost:9092')# 生產消息manager.produce('test_topic', 'key1', 'value1')# 消費消息manager.consume('test_topic', 'test_group')