欢迎访问 生活随笔!

生活随笔

当前位置: 首页 >

Kafka消息序列化和反序列化(下)

发布时间:2024/4/11 46 豆豆
生活随笔 收集整理的这篇文章主要介绍了 Kafka消息序列化和反序列化(下) 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。

欢迎跳转到本文的原文链接:https://honeypps.com/mq/kafka-message-serialize-and-deserialize-2/


接上一篇:Kafka消息序列化和反序列化(上)。

有序列化就会有反序列化,反序列化的操作是在Kafka Consumer中完成的,使用起来只需要配置一下key.deserializer和value.deseriaizer。对应上面自定义的Company类型的Deserializer就需要实现org.apache.kafka.common.serialization.Deserializer接口,这个接口同样有三个方法:

  • public void configure(Map<String, ?> configs, boolean isKey):用来配置当前类。
  • public byte[] serialize(String topic, T data):用来执行反序列化。如果data为null建议处理的时候直接返回null而不是抛出一个异常。
  • public void close():用来关闭当前序列化器。
  • 下面就来看一下DemoSerializer对应的反序列化的DemoDeserializer,详细代码如下:

    public class DemoDeserializer implements Deserializer<Company> {public void configure(Map<String, ?> configs, boolean isKey) {}public Company deserialize(String topic, byte[] data) {if (data == null) {return null;}if (data.length < 8) {throw new SerializationException("Size of data received by DemoDeserializer is shorter than expected!");}ByteBuffer buffer = ByteBuffer.wrap(data);int nameLen, addressLen;String name, address;nameLen = buffer.getInt();byte[] nameBytes = new byte[nameLen];buffer.get(nameBytes);addressLen = buffer.getInt();byte[] addressBytes = new byte[addressLen];buffer.get(addressBytes);try {name = new String(nameBytes, "UTF-8");address = new String(addressBytes, "UTF-8");} catch (UnsupportedEncodingException e) {throw new SerializationException("Error occur when deserializing!");}return new Company(name,address);}public void close() {} }

    有些读者可能对新版的Consumer不是很熟悉,这里顺带着举一个完整的消费示例,并以DemoDeserializer作为消息Value的反序列化器。

    Properties properties = new Properties(); properties.put("bootstrap.servers", brokerList); properties.put("group.id", consumerGroup); properties.put("session.timeout.ms", 10000); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "com.hidden.client.DemoDeserializer"); properties.put("client.id", "hidden-consumer-client-id-zzh-2"); KafkaConsumer<String, Company> consumer = new KafkaConsumer<String, Company>(properties); consumer.subscribe(Arrays.asList(topic)); try {while (true) {ConsumerRecords<String, Company> records = consumer.poll(100);for (ConsumerRecord<String, Company> record : records) {String info = String.format("topic=%s, partition=%s, offset=%d, consumer=%s, country=%s",record.topic(), record.partition(), record.offset(), record.key(), record.value());System.out.println(info);}consumer.commitAsync(new OffsetCommitCallback() {public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (exception != null) {String error = String.format("Commit failed for offsets {}", offsets, exception);System.out.println(error);}}});} } finally {consumer.close(); }

    有些时候自定义的类型还可以和Avro、ProtoBuf等联合使用,而且这样更加的方便快捷,比如我们将前面Company的Serializer和Deserializer用Protostuff包装一下,由于篇幅限制,笔者这里只罗列出对应的serialize和deserialize方法,详细参考如下:

    public byte[] serialize(String topic, Company data) {if (data == null) {return null;}Schema schema = (Schema) RuntimeSchema.getSchema(data.getClass());LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);byte[] protostuff = null;try {protostuff = ProtostuffIOUtil.toByteArray(data, schema, buffer);} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);} finally {buffer.clear();}return protostuff; }public Company deserialize(String topic, byte[] data) {if (data == null) {return null;}Schema schema = RuntimeSchema.getSchema(Company.class);Company ans = new Company();ProtostuffIOUtil.mergeFrom(data, ans, schema);return ans; }

    如果Company的字段很多,我们使用Protostuff进一步封装一下的方式就显得简洁很多。不过这个不是最主要的,而最主要的是经过Protostuff包装之后,这个Serializer和Deserializer可以向前兼容(新加字段采用默认值)和向后兼容(忽略新加字段),这个特性Avro和Protobuf也都具备。

    自定义的类型有一个不得不面对的问题就是Kafka Producer和Kafka Consumer之间的序列化和反序列化的兼容性,试想对于StringSerializer来说,Kafka Consumer可以顺其自然的采用StringDeserializer,不过对于Company这种专用类型,某个服务使用DemoSerializer进行了序列化之后,那么下游的消费者服务必须也要实现对应的DemoDeserializer。再者,如果上游的Company类型改变,下游也需要跟着重新实现一个新的DemoSerializer,这个后面所面临的难题可想而知。所以,如无特殊需要,笔者不建议使用自定义的序列化和反序列化器;如有业务需要,也要使用通用的Avro、Protobuf、Protostuff等序列化工具包装,尽可能的实现得更加通用且向前后兼容。

    题外话,对于Kafka的“深耕者”Confluent来说,还有其自身的一套序列化和反序列化解决方案(io.confluent.kafka.serializer.KafkaAvroSerializer),GitHub上有相关资料,读者如有兴趣可以自行扩展学习。


    参考资料

  • protostuff序列化/反序列化
  • 欢迎跳转到本文的原文链接:https://honeypps.com/mq/kafka-message-serialize-and-deserialize-2/


    欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。


    超强干货来袭 云风专访:近40年码龄,通宵达旦的技术人生

    总结

    以上是生活随笔为你收集整理的Kafka消息序列化和反序列化(下)的全部内容,希望文章能够帮你解决所遇到的问题。

    如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。