日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

java判断读到末尾_Flink实战:自定义KafkaDeserializationSchema(Java/Scala)

發布時間:2023/12/15 java 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java判断读到末尾_Flink实战:自定义KafkaDeserializationSchema(Java/Scala) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

微信公眾號: 大數據開發運維架構

關注可了解更多大數據相關的資訊。問題或建議,請公眾號留言;

如果您覺得“大數據開發運維架構”對你有幫助,歡迎轉發朋友圈


kafka中的數據通常是鍵值對的,所以我們這里自定義反序列化類從kafka中消費鍵值對的消息,為方便大家學習,這里我實現了Java/Scala兩個版本,由于比較簡單這里直接上代碼:

一、Scala代碼:

1.自定義反序列化類:

package comhadoop.ljs.flink010.kafkaimport org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchemaimport org.apache.kafka.clients.consumer.ConsumerRecord/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-04-25 18:31 * @version: v1.0 * @description: comhadoop.ljs.flink010.kafka */class MyKafkaDeserializationSchema extends KafkaDeserializationSchema[ConsumerRecord[String, String]]{ /*是否流結束,比如讀到一個key為end的字符串結束,這里不再判斷,直接返回false 不結束*/ override def isEndOfStream(t: ConsumerRecord[String, String]): Boolean ={ false } override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): ConsumerRecord[String, String] = { new ConsumerRecord(record.topic(),record.partition(),record.offset(),new String(record.key(),"UTF-8"),new String(record.value(),"UTF-8")) } /*用于獲取反序列化對象的類型*/ override def getProducedType: TypeInformation[ConsumerRecord[String, String]] = { TypeInformation.of(new TypeHint[ConsumerRecord[String, String]] {}) }}

2.主函數類:

package comhadoop.ljs.flink010.kafkaimport java.util.Propertiesimport org.apache.flink.api.common.functions.MapFunctionimport org.apache.flink.streaming.api.datastream.DataStreamimport org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionimport org.apache.kafka.clients.consumer.ConsumerRecordimport org.apache.kafka.common.serialization.StringDeserializer/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-04-25 16:32 * @version: v1.0 * @description: comhadoop.ljs.flink010.kafka */object KafkaDeserializerSchemaTest { def main(args: Array[String]): Unit = { /*環境初始化*/ val senv:StreamExecutionEnvironment =StreamExecutionEnvironment.getExecutionEnvironment() /*啟用checkpoint,這里我沒有對消息體的key value進行判斷,即使為空啟動了checkpoint,遇到錯誤也會無限次重啟*/ senv.enableCheckpointing(2000) /*topic2不存在話會自動在kafka創建,一個分區 分區名稱0*/ val myConsumer=new FlinkKafkaConsumer[ConsumerRecord[String, String]]("topic3",new MyKafkaDeserializationSchema(),getKafkaConfig()) /*指定消費位點*/ val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]() /*這里從topic3 的0分區的第一條開始消費*/ specificStartOffsets.put(new KafkaTopicPartition("topic3", 0), 0L) myConsumer.setStartFromSpecificOffsets(specificStartOffsets) /*指定source數據源*/ val source:DataStream[ConsumerRecord[String, String]]=senv.addSource(myConsumer) val keyValue=source.map(new MapFunction[ConsumerRecord[String, String],String] { override def map(message: ConsumerRecord[String, String]): String = { "key" + message.key + " value:" + message.value } }) /*打印接收的數據*/ keyValue.print() /*啟動執行*/ senv.execute() } def getKafkaConfig():Properties={ val props:Properties=new Properties() props.setProperty("bootstrap.servers","worker1.hadoop.ljs:6667,worker2.hadoop.ljs:6667") props.setProperty("group.id","topic_1") props.setProperty("key.deserializer",classOf[StringDeserializer].getName) props.setProperty("value.deserializer",classOf[StringDeserializer].getName) props.setProperty("auto.offset.reset","latest") props }}

二、Java代碼:
1.自定義反序列化類:

package com.hadoop.ljs.flink110.kafka;import org.apache.flink.api.common.typeinfo.TypeHint;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;import org.apache.kafka.clients.consumer.ConsumerRecord;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-04-25 18:45 * @version: v1.0 * @description: com.hadoop.ljs.flink110.kafka */public class MyKafkaDeserializationSchema implements KafkaDeserializationSchema> { private static String encoding = "UTF8"; @Override public boolean isEndOfStream(ConsumerRecord nextElement) { return false; } @Override public ConsumerRecord deserialize(ConsumerRecord record) throws Exception { /* System.out.println("Record--partition::"+record.partition()); System.out.println("Record--offset::"+record.offset()); System.out.println("Record--timestamp::"+record.timestamp()); System.out.println("Record--timestampType::"+record.timestampType()); System.out.println("Record--checksum::"+record.checksum()); System.out.println("Record--key::"+record.key()); System.out.println("Record--value::"+record.value());*/ return new ConsumerRecord(record.topic(), record.partition(), record.offset(), record.timestamp(), record.timestampType(), record.checksum(), record.serializedKeySize(), record.serializedValueSize(), /*這里我沒有進行空值判斷,生產一定記得處理*/ new String(record.key(), encoding), new String(record.value(), encoding)); } @Override public TypeInformation> getProducedType() { return TypeInformation.of(new TypeHint>(){}); }}

2.主函數類:

package com.hadoop.ljs.flink110.kafka;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;import org.apache.kafka.clients.consumer.ConsumerRecord;import java.util.HashMap;import java.util.Map;import java.util.Properties;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-04-25 18:41 * @version: v1.0 * @description: com.hadoop.ljs.flink110.kafka */public class KafkaDeserializerSchemaTest { public static void main(String[] args) throws Exception { /*環境初始化*/ StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); /*啟用checkpoint,這里我沒有對消息體的key value進行判斷,即使為空啟動了checkpoint,遇到錯誤也會無限次重啟*/ senv.enableCheckpointing(2000); /*topic2不存在話會自動在kafka創建,一個分區 分區名稱0*/ FlinkKafkaConsumer> myConsumer=new FlinkKafkaConsumer>("topic3",new MyKafkaDeserializationSchema(),getKafkaConfig()); /*指定消費位點*/ Map specificStartOffsets = new HashMap<>(); /*這里從topic3 的0分區的第一條開始消費*/ specificStartOffsets.put(new KafkaTopicPartition("topic3", 0), 0L); myConsumer.setStartFromSpecificOffsets(specificStartOffsets); DataStream> source = senv.addSource(myConsumer); DataStream keyValue = source.map(new MapFunction, String>() { @Override public String map(ConsumerRecord message) throws Exception { return "key"+message.key()+" value:"+message.value(); } }); /*打印結果*/ keyValue.print(); /*啟動執行*/ senv.execute(); } public static Properties getKafkaConfig(){ Properties props=new Properties(); props.setProperty("bootstrap.servers","worker1.hadoop.ljs:6667,worker2.hadoop.ljs:6667"); props.setProperty("group.id","topic_group2"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("auto.offset.reset","latest"); return props; }}

三、函數測試

1.KafkaProducer發送測試數據類:

package com.hadoop.ljs.kafka220;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Date;import java.util.Properties;public class KafkaPartitionProducer extends Thread{ private static long count =10; private static String topic="topic3"; private static String brokerList="worker1.hadoop.ljs:6667,worker2.hadoop.ljs:6667"; public static void main(String[] args) { KafkaPartitionProducer jproducer = new KafkaPartitionProducer(); jproducer.start(); } @Override public void run() { producer(); } private void producer() { Properties props = config(); KafkaProducer producer = new KafkaProducer<>(props); ProducerRecord record=null; System.out.println("kafka生產數據條數:"+count); for (int i = 1; i <= count; i++) { String json = "{"id":" + i + ","ip":"192.168.0." + i + "","date":" + new Date().toString() + "}"; String key ="key"+i; record = new ProducerRecord(topic, key, json); producer.send(record, (metadata, e) -> { // 使用回調函數 if (null != e) { e.printStackTrace(); } if (null != metadata) { System.out.println(String.format("offset: %s, partition:%s, topic:%s timestamp:%s", metadata.offset(), metadata.partition(), metadata.topic(), metadata.timestamp())); } }); } producer.close(); } private Properties config() { Properties props = new Properties(); props.put("bootstrap.servers",brokerList); props.put("acks", "1"); props.put("retries", 3); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); /*自定義分區,兩種形式*/ /*props.put("partitioner.class", PartitionUtil.class.getName());*/ return props; }}

2.測試結果

如果覺得我的文章能幫到您,請關注微信公眾號“大數據開發運維架構”,并轉發朋友圈,謝謝支持!!!

總結

以上是生活随笔為你收集整理的java判断读到末尾_Flink实战:自定义KafkaDeserializationSchema(Java/Scala)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。