欢迎访问 生活随笔!

生活随笔

当前位置: 首页 >

java实现Kafka生产者示例

发布时间:2025/7/14 51 豆豆
生活随笔 收集整理的这篇文章主要介绍了 java实现Kafka生产者示例 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

使用java实现Kafka的生产者

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869package com.lisg.kafkatest;import java.util.Properties;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.Partitioner;import kafka.producer.ProducerConfig;import kafka.serializer.StringEncoder;/** * Kafka生产者 * @author lisg * */public class KafkaProducer {    public static void main(String[] args) {                 Properties props = new Properties();        //根据这个配置获取metadata,不必是kafka集群上的所有broker,但最好至少有两个        props.put("metadata.broker.list", "vm1:9092,vm2:9092");        //消息传递到broker时的序列化方式        props.put("serializer.class", StringEncoder.class.getName());        //zk集群        props.put("zookeeper.connect", "vm1:2181");        //是否获取反馈        //0是不获取反馈(消息有可能传输失败)        //1是获取消息传递给leader后反馈(其他副本有可能接受消息失败)        //-1是所有in-sync replicas接受到消息时的反馈        props.put("request.required.acks", "1");//      props.put("partitioner.class", MyPartition.class.getName());                 //创建Kafka的生产者, key是消息的key的类型, value是消息的类型        Producer<Integer, String> producer = new Producer<Integer, String>(                new ProducerConfig(props));                 int count = 0;        while(true) {            String message = "message-" + ++count;            //消息主题是test            KeyedMessage<Integer, String> keyedMessage = new KeyedMessage<Integer, String>("test", message);            //message可以带key, 根据key来将消息分配到指定区, 如果没有key则随机分配到某个区//          KeyedMessage<Integer, String> keyedMessage = new KeyedMessage<Integer, String>("test", 1, message);            producer.send(keyedMessage);            System.out.println("send: " + message);            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }        }         //      producer.close();    } }/** * 自定义分区类 * */class MyPartition implements Partitioner {    public int partition(Object key, int numPartitions) {        return key.hashCode()%numPartitions;    }     }




来自为知笔记(Wiz)

附件列表

 

转载于:https://www.cnblogs.com/lishouguang/p/4560559.html

总结

以上是生活随笔为你收集整理的java实现Kafka生产者示例的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。