欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 运维知识 > linux >内容正文

linux

kafka Windows客户端Linux服务器---转

发布时间:2025/4/5 linux 41 豆豆
生活随笔 收集整理的这篇文章主要介绍了 kafka Windows客户端Linux服务器---转 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

原文:http://blog.csdn.net/jingshuigg/article/details/25001979

一、对于服务器端的搭建可以参考上一篇文章:kafka单机版环境搭建与测试

服务器端IP :10.0.30.221

运行环境的目录如下:

 

需要改动config文件夹下的server.properties中的以下两个属性

zookeeper.connect=localhost:2181改成zookeeper.connect=00.00.00.01 (IP地址):2181

以及默认注释掉的

#host.name=localhost改成host.name=00.00.00.01 (IP地址)

host.name不更改会造成客户端报如下的错误

Exception in thread "Thread-0" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) at kafka.producer.Producer.send(Producer.scala:76) at kafka.javaapi.producer.Producer.send(Producer.scala:33) at its.kafka.Producer.run(Producer.java:46)

上述步骤完成以后,按照《kafka单机版环境搭建与测试》中的方法启动zookeeper-server和kafka-server即可

 

二、客户端搭建

客户端使用的win7系统,在Eclipse中连接服务器

1.在eclipse下新建工程kafka_producer,目录如下:

注意:将config文件夹下的log4j.properties文件放在src下,这样才起作用,可以观测到日志信息

producer的代码如下:

import java.util.Properties; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig;public class Producer extends Thread{private final kafka.javaapi.producer.Producer<Integer, String> producer;private final String topic;private final String name;private final int numsOfMessage;private final Properties props = new Properties();public Producer(String name,String topic,int numsOfMessage){props.put("serializer.class", "kafka.serializer.StringEncoder");props.put("metadata.broker.list", "10.0.30.221:9092");//异步发送//props.put("producer.type", "async");//每次发送多少条//props.put("batch.num.messages", "100");producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));this.topic = topic;this.name = name;this.numsOfMessage = numsOfMessage;}public void run() {int messageNo = 1;while(messageNo <= numsOfMessage) { //每个生产者生产的消息数;String message = new String(name+"'s Message_" + messageNo+"******");KeyedMessage<Integer, String> messageForSend = new KeyedMessage<Integer, String>(topic, message);producer.send(messageForSend);messageNo++;}producer.close();} }

启动producer的代码如下:

 

import java.util.concurrent.TimeUnit;public class KafkaProducerDemo implements KafkaProperties{ public static void main(String[] args){StartThread(1,"testTopic",10);}/*** @param numsOfProducer 生产者的数目* @param topic 消息的主题* @param numsOfMessage 每个生产者生产的消息树* @return */public static void StartThread(int numsOfProducer,String topic,int numsOfMessage){for(int i = 1; i <= numsOfProducer; i ++ ){String name = "Producer" + i;new Producer(name,topic,numsOfMessage).start(); }} }

 

2.在eclipse下新建kafka_consumer工程,目录如下:

 

consumer代码如下:

import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector;public class Consumer extends Thread {private final ConsumerConnector consumer;private final String topic;private final String name;public Consumer(String name,String topic){consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());this.topic = topic;this.name = name;}private static ConsumerConfig createConsumerConfig(){Properties props = new Properties();props.put("zookeeper.connect", KafkaProperties.zkConnect);props.put("group.id", KafkaProperties.groupId);props.put("zookeeper.session.timeout.ms", "60000");props.put("zookeeper.sync.time.ms", "200");props.put("auto.commit.interval.ms", "1000");//每次最少接收的字节数,默认是1//props.put("fetch.min.bytes", "1024");//每次最少等待时间,默认是100//props.put("fetch.wait.max.ms", "600000");return new ConsumerConfig(props);}public void run() {Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(1));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);ConsumerIterator<byte[], byte[]> it = stream.iterator();while(it.hasNext()){System.out.println("************"+name+" gets "+new String(it.next().message()));}} }

启动consumer的代码:

 

public class KafkaConsumerDemo implements KafkaProperties {public static void main(String[] args){//Consumer1Consumer consumerThread1 = new Consumer("Consumer1",KafkaProperties.topic);consumerThread1.start();} }

properties的代码(为了传递属性值,当然也可以是xml提供属性值):

public interface KafkaProperties{final static String zkConnect = "10.0.30.221:2181"; final static String groupId = "group1";final static String topic = "testTopic";final static String kafkaServerURL = "10.0.30.221";final static int kafkaServerPort = 9092;final static int kafkaProducerBufferSize = 64*1024;final static int connectionTimeOut = 100000;final static int reconnectInterval = 10000;final static String clientId = "SimpleConsumerDemoClient"; }

 

3.启动consumer然后再启动producer,则在eclipse的Console窗口中观察到:

 

转载于:https://www.cnblogs.com/davidwang456/p/4201875.html

总结

以上是生活随笔为你收集整理的kafka Windows客户端Linux服务器---转的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。