当前位置:
首页 >
kafka的简单使用
发布时间:2025/7/14
45
豆豆
生活随笔
收集整理的这篇文章主要介绍了
kafka的简单使用
小编觉得挺不错的,现在分享给大家,帮大家做个参考.
为什么80%的码农都做不了架构师?>>>
在eclipse中新建kafka-demo的maven项目,pom.xml依赖如下
ProducerDemo.java
package com.leech.kafka.demo;import java.util.Date; import java.util.Properties;import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig;public class ProducerDemo {public static void main(String[] args) {//Random rnd = new Random();int events=100;// 设置配置属性Properties props = new Properties();//props.put("metadata.broker.list","172.168.63.221:9092,172.168.63.233:9092,172.168.63.234:9092");props.put("metadata.broker.list","192.168.1.82:9092");props.put("serializer.class", "kafka.serializer.StringEncoder");// key.serializer.class默认为serializer.classprops.put("key.serializer.class", "kafka.serializer.StringEncoder");// 可选配置,如果不配置,则使用默认的partitionerprops.put("partitioner.class", "com.leech.kafka.demo.PartitionerDemo");// 触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失// 值为0,1,-1,可以参考// http://kafka.apache.org/08/configuration.htmlprops.put("request.required.acks", "1");ProducerConfig config = new ProducerConfig(props);// 创建producerProducer<String, String> producer = new Producer<String, String>(config);// 产生并发送消息long start=System.currentTimeMillis();for (long i = 0; i < events; i++) {long runtime = new Date().getTime();String ip = "192.168.2." + i;//rnd.nextInt(255);String msg = runtime + ",www.example.com," + ip;//如果topic不存在,则会自动创建,默认replication-factor为1,partitions为0KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);producer.send(data);}System.out.println("耗时:" + (System.currentTimeMillis() - start));// 关闭producerproducer.close();} }ConsumerDemo.java
package com.leech.kafka.demo;import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector;/*** 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example* * @author Fung**/ public class ConsumerDemo {private final ConsumerConnector consumer;private final String topic;private ExecutorService executor;public ConsumerDemo(String a_zookeeper, String a_groupId, String a_topic) {consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));this.topic = a_topic;}public void shutdown() {if (consumer != null)consumer.shutdown();if (executor != null)executor.shutdown();}public void run(int numThreads) {Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(numThreads));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);// now launch all the threadsexecutor = Executors.newFixedThreadPool(numThreads);// now create an object to consume the messages//int threadNumber = 0;for (final KafkaStream stream : streams) {executor.submit(new ConsumerMsgTask(stream, threadNumber));threadNumber++;}}private static ConsumerConfig createConsumerConfig(String a_zookeeper,String a_groupId) {Properties props = new Properties();props.put("zookeeper.connect", a_zookeeper);props.put("group.id", a_groupId);props.put("zookeeper.session.timeout.ms", "400");props.put("zookeeper.sync.time.ms", "200");props.put("auto.commit.interval.ms", "1000");return new ConsumerConfig(props);}public static void main(String[] arg) {String[] args = { "192.168.1.82:2181", "group-1", "page_visits", "12" };String zooKeeper = args[0];String groupId = args[1];String topic = args[2];int threads = Integer.parseInt(args[3]);ConsumerDemo demo = new ConsumerDemo(zooKeeper, groupId, topic);demo.run(threads);try {Thread.sleep(10000);} catch (InterruptedException ie) {}demo.shutdown();} }ConsumerMsgTask.java
package com.leech.kafka.demo;import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream;public class ConsumerMsgTask implements Runnable {private KafkaStream m_stream;private int m_threadNumber;public ConsumerMsgTask(KafkaStream stream, int threadNumber) {m_threadNumber = threadNumber;m_stream = stream;}@Overridepublic void run() {ConsumerIterator<byte[], byte[]> it = m_stream.iterator();while (it.hasNext())System.out.println("Thread " + m_threadNumber + ": "+ new String(it.next().message()));System.out.println("Shutting down Thread: " + m_threadNumber);} }PartitionerDemo.java
package com.leech.kafka.demo;import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties;public class PartitionerDemo implements Partitioner {public PartitionerDemo(VerifiableProperties props) {}@Overridepublic int partition(Object obj, int numPartitions) {int partition = 0;if (obj instanceof String) {String key=(String)obj;int offset = key.lastIndexOf('.');if (offset > 0) {partition = Integer.parseInt(key.substring(offset + 1)) % numPartitions;}}else{partition = obj.toString().length() % numPartitions;}return partition;}}转载于:https://my.oschina.net/chaun/blog/408511
总结
以上是生活随笔为你收集整理的kafka的简单使用的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: 关于wcf三大工具的使用(wsdl.ex
- 下一篇: Firefox 修改User Agent