Python kafka操作实例
生活随笔
收集整理的这篇文章主要介绍了
Python kafka操作实例
小编觉得挺不错的,现在分享给大家,帮大家做个参考.
一、基本概念
- Topic:一组消息数据的标记符;
- Producer:生产者,用于生产数据,可将生产后的消息送入指定的Topic;
- Consumer:消费者,获取数据,可消费指定的Topic;
- Group:消费者组,同一个group可以有多个消费者,一条消息在一个group中,只会被一个消费者获取;
- Partition:分区,为了保证kafka的吞吐量,一个Topic可以设置多个分区。同一分区只能被一个消费者订阅。
二、本地安装与启动(基于Docker)
2. 本地启动zookeeper
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper3. 本地启动kafka
docker run -d --name kafka --publish 9092:9092 --link zookeeper \ --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ --env KAFKA_ADVERTISED_HOST_NAME=localhost \ --env KAFKA_ADVERTISED_PORT=9092 \ wurstmeister/kafka:latest注意:上述代码,将kafka启动在9092端口
4. 进入kafka bash
docker exec -it kafka bash cd /opt/kafka/bin5. 创建Topic,分区为2,Topic name为'kafka_demo'
kafka-topics.sh --create --zookeeper zookeeper:2181 \ --replication-factor 1 --partitions 2 --topic kafka_demo6. 查看当前所有topic
kafka-topics.sh --zookeeper zookeeper:2181 --list7. 安装kafka-python
pip install kafka-python三、生产者(Producer)与消费者(Consumer)
个人封装
生产者和消费者的简易Demo,这里一起演示:
#!/usr/bin/env python # -*- coding: utf-8 -*-import json import tracebackfrom kafka import KafkaConsumer, KafkaProducer, TopicPartition""" kafka 生产者 """ class KProducer(object):def __init__(self, bootstrap_servers):""":param bootstrap_servers: 地址"""# json 格式化发送的内容self.producer = KafkaProducer(bootstrap_servers = bootstrap_servers,value_serializer = lambda m: json.dumps(m).encode("ascii")# compression_type = "gzip" # 压缩消息发送)def sync_producer(self, topic, data):"""同步发送 数据:param topic: topic:param data_li: 发送数据:return:"""future = self.producer.send(topic, data)record_metadata = future.get(timeout=10) # 同步确认消费partition = record_metadata.partition # 数据所在的分区offset = record_metadata.offset # 数据所在分区的位置print("save success, partition: {}, offset: {}".format(partition, offset))def asyn_producer(self, topic, data):"""异步发送数据:param topic: topic:param data_li:发送数据:return:"""self.producer.send(topic, data)self.producer.flush() # 批量提交def asyn_producer_callback(self, topic, data):"""异步发送数据 + 发送状态处理:param topic: topic:param data_li:发送数据:return:"""self.producer.send(topic, data).add_callback(self.send_success).add_errback(self.send_error)self.producer.flush() # 批量提交def send_success(self, *args, **kwargs):"""异步发送成功回调函数"""print('save success')returndef send_error(self, *args, **kwargs):"""异步发送错误回调函数"""print('save error')returndef close_producer(self):try:self.producer.close()except:pass""" kafka 消费商 """ class PConsumers(object):def __init__(self, bootstrap_servers, group_id):""":param bootstrap_servers: 地址"""self.bootstrap_servers = bootstrap_serversself.group_id = group_id# 获取规定个数的数据(可修改做无限持续获取数据)def get_message(self, topic, count=1):""":param topic: topic:param count: 取的条数:return: msg"""counter = 0msg = []try:consumer = KafkaConsumer(topic,bootstrap_servers = self.bootstrap_servers,group_id = self.group_id,value_deserializer = lambda m: json.loads(m.decode("ascii")), # 确定返回结果json还是strauto_offset_reset = "earliest")for message in consumer:print("%s:%d:%d: key=%s value=%s header=%s" % (message.topic, message.partition,message.offset, message.key, message.value, message.headers))msg.append(message.value)counter += 1if count == counter:breakelse:continueconsumer.close()except Exception as e:print("{0}, {1}".format(e, traceback.print_exc()))return Nonereturn msg# 查看剩余量def get_count(self, topic):""":param topic: topic:return: count"""try:consumer = KafkaConsumer(topic,bootstrap_servers = self.bootstrap_servers,group_id = self.group_id)partitions = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]#print("start to cal offset:")# totaltoff = consumer.end_offsets(partitions)toff = [(key.partition, toff[key]) for key in toff.keys()]toff.sort()#print("total offset: {}".format(str(toff)))# currentcoff = [(x.partition, consumer.committed(x)) for x in partitions]coff.sort()#print("current offset: {}".format(str(coff)))# cal sum and lefttoff_sum = sum([x[1] for x in toff])cur_sum = sum([x[1] for x in coff if x[1] is not None])left_sum = toff_sum - cur_sum#print("kafka left: {}".format(left_sum))consumer.close()except Exception as e:print("{0}, {1}".format(e, traceback.print_exc()))return Nonereturn left_sumif __name__ == "__main__":send_data_li = {"test": 1}#kp = KProducer(topic="test", bootstrap_servers='127.0.0.1:9001,127.0.0.1:9002')kp = KProducer(bootstrap_servers="1.1.1.1:9092")# 同步发送#kp.sync_producer(send_data_li)# 异步发送# kp.asyn_producer(send_data_li)# 异步+回调kp.asyn_producer_callback(topic="test", data=send_data_li)#kp.close_producer()#cp = PConsumers(bootstrap_servers="1.1.1.1:9092", topic="detect-file")cp = PConsumers(bootstrap_servers="1.1.1.1:9092", group_id = "boxer")#cp = PConsumers(bootstrap_servers="1.1.1.1:9092", topic="custom-event")#print(cp.get_count(topic="test"))print(cp.get_message(topic="test"))
总结
以上是生活随笔为你收集整理的Python kafka操作实例的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: 流程控制语句 if
- 下一篇: 【Python+selenium Wen