日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka消息序列化和反序列化(下)

發布時間:2024/4/11 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka消息序列化和反序列化(下) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。

歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-message-serialize-and-deserialize-2/


接上一篇:Kafka消息序列化和反序列化(上)。

有序列化就會有反序列化,反序列化的操作是在Kafka Consumer中完成的,使用起來只需要配置一下key.deserializer和value.deseriaizer。對應上面自定義的Company類型的Deserializer就需要實現org.apache.kafka.common.serialization.Deserializer接口,這個接口同樣有三個方法:

  • public void configure(Map<String, ?> configs, boolean isKey):用來配置當前類。
  • public byte[] serialize(String topic, T data):用來執行反序列化。如果data為null建議處理的時候直接返回null而不是拋出一個異常。
  • public void close():用來關閉當前序列化器。
  • 下面就來看一下DemoSerializer對應的反序列化的DemoDeserializer,詳細代碼如下:

    public class DemoDeserializer implements Deserializer<Company> {public void configure(Map<String, ?> configs, boolean isKey) {}public Company deserialize(String topic, byte[] data) {if (data == null) {return null;}if (data.length < 8) {throw new SerializationException("Size of data received by DemoDeserializer is shorter than expected!");}ByteBuffer buffer = ByteBuffer.wrap(data);int nameLen, addressLen;String name, address;nameLen = buffer.getInt();byte[] nameBytes = new byte[nameLen];buffer.get(nameBytes);addressLen = buffer.getInt();byte[] addressBytes = new byte[addressLen];buffer.get(addressBytes);try {name = new String(nameBytes, "UTF-8");address = new String(addressBytes, "UTF-8");} catch (UnsupportedEncodingException e) {throw new SerializationException("Error occur when deserializing!");}return new Company(name,address);}public void close() {} }

    有些讀者可能對新版的Consumer不是很熟悉,這里順帶著舉一個完整的消費示例,并以DemoDeserializer作為消息Value的反序列化器。

    Properties properties = new Properties(); properties.put("bootstrap.servers", brokerList); properties.put("group.id", consumerGroup); properties.put("session.timeout.ms", 10000); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "com.hidden.client.DemoDeserializer"); properties.put("client.id", "hidden-consumer-client-id-zzh-2"); KafkaConsumer<String, Company> consumer = new KafkaConsumer<String, Company>(properties); consumer.subscribe(Arrays.asList(topic)); try {while (true) {ConsumerRecords<String, Company> records = consumer.poll(100);for (ConsumerRecord<String, Company> record : records) {String info = String.format("topic=%s, partition=%s, offset=%d, consumer=%s, country=%s",record.topic(), record.partition(), record.offset(), record.key(), record.value());System.out.println(info);}consumer.commitAsync(new OffsetCommitCallback() {public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (exception != null) {String error = String.format("Commit failed for offsets {}", offsets, exception);System.out.println(error);}}});} } finally {consumer.close(); }

    有些時候自定義的類型還可以和Avro、ProtoBuf等聯合使用,而且這樣更加的方便快捷,比如我們將前面Company的Serializer和Deserializer用Protostuff包裝一下,由于篇幅限制,筆者這里只羅列出對應的serialize和deserialize方法,詳細參考如下:

    public byte[] serialize(String topic, Company data) {if (data == null) {return null;}Schema schema = (Schema) RuntimeSchema.getSchema(data.getClass());LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);byte[] protostuff = null;try {protostuff = ProtostuffIOUtil.toByteArray(data, schema, buffer);} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);} finally {buffer.clear();}return protostuff; }public Company deserialize(String topic, byte[] data) {if (data == null) {return null;}Schema schema = RuntimeSchema.getSchema(Company.class);Company ans = new Company();ProtostuffIOUtil.mergeFrom(data, ans, schema);return ans; }

    如果Company的字段很多,我們使用Protostuff進一步封裝一下的方式就顯得簡潔很多。不過這個不是最主要的,而最主要的是經過Protostuff包裝之后,這個Serializer和Deserializer可以向前兼容(新加字段采用默認值)和向后兼容(忽略新加字段),這個特性Avro和Protobuf也都具備。

    自定義的類型有一個不得不面對的問題就是Kafka Producer和Kafka Consumer之間的序列化和反序列化的兼容性,試想對于StringSerializer來說,Kafka Consumer可以順其自然的采用StringDeserializer,不過對于Company這種專用類型,某個服務使用DemoSerializer進行了序列化之后,那么下游的消費者服務必須也要實現對應的DemoDeserializer。再者,如果上游的Company類型改變,下游也需要跟著重新實現一個新的DemoSerializer,這個后面所面臨的難題可想而知。所以,如無特殊需要,筆者不建議使用自定義的序列化和反序列化器;如有業務需要,也要使用通用的Avro、Protobuf、Protostuff等序列化工具包裝,盡可能的實現得更加通用且向前后兼容。

    題外話,對于Kafka的“深耕者”Confluent來說,還有其自身的一套序列化和反序列化解決方案(io.confluent.kafka.serializer.KafkaAvroSerializer),GitHub上有相關資料,讀者如有興趣可以自行擴展學習。


    參考資料

  • protostuff序列化/反序列化
  • 歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-message-serialize-and-deserialize-2/


    歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。


    超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生

    總結

    以上是生活随笔為你收集整理的Kafka消息序列化和反序列化(下)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。