安装
shellpip install kafka-python
生产者
pythonfrom kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='101.112.143.254:59092')
future = producer.send('topic_wx_notice', key=b'my_key', value='my_valuefdsfsdf啊沙发舒服'.encode(), partition=0)
result = future.get(timeout=10)
print(result)
消费者
给group_id后,同一个group里面的消费者去消费消息只能消费一次。
pythonfrom kafka import KafkaConsumer
consumer = KafkaConsumer('topic_wx_notice', bootstrap_servers='101.112.143.254:59092',group_id="weiyiid",
value_deserializer=lambda m: m.decode())
for msg in consumer:
print(msg)
本文作者:Dong
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!