日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

如何创建Kafka客户端:Avro Producer和Consumer Client

發布時間:2024/4/17 编程问答 54 豆豆
生活随笔 收集整理的這篇文章主要介紹了 如何创建Kafka客户端:Avro Producer和Consumer Client 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1.目標 - Kafka客戶端

在本文的Kafka客戶端中,我們將學習如何使用Kafka API?創建Apache Kafka客戶端。有幾種方法可以創建Kafka客戶端,例如最多一次,至少一次,以及一次性消息處理需求。因此,在這個Kafka客戶端教程中,我們將學習所有三種方式的詳細描述。此外,我們將詳細介紹如何使用Avro客戶端。

那么,讓我們開始Kafka客戶端教程。

如何創建Kafka客戶端:Avro Producer和Consumer Client

2. Kafka客戶是什么?

  • 創建Kafka客戶端的先決條件
  • 最初,為了創建Kafka客戶端,我們必須在本地計算機上設置Apache Kafka中間件。?
  • 此外,在開始創建Kafka客戶端之前,本地安裝的單個節點Kafka實例必須與我們的本地機器一起運行,并且需要運行Zookeeper和arning Kafka節點。
  • 學習Apache Kafka用例|?Kafka應用程序
    此外,在Kafka客戶端中創建一個名為normal-topic的主題,其中包含兩個分區,命令為:

    bin/kafka-topics --zookeeper localhost:2181 --create --topic normal-topic --partitions 2 --replication-factor 1

    ?

  • bin / kafka-topics --zookeeper localhost:2181 --create --topic normal-topic --partitions 2 --rerelication -factor 1
  • 此外,執行以下命令,以檢查創建的主題的狀態:

    bin/kafka-topics --list --topic normal-topic --zookeeper localhost:2181

    ?

  • bin / kafka-topics --list --topic normal-topic --zookeeper localhost:2181
  • 此外,要在需要更改主題時增加分區,請執行以下命令:

    bin/kafka-topics.sh --alter --topic normal-topic --zookeeper localhost:2181 --partitions 2

    ?

  • bin / kafka-topics.sh --alter --topic normal-topic --zookeeper localhost:2181 --partitions 2
  • 3.卡夫卡制片人客戶

    這里是以下代碼來實現Kafka生產者客戶端。它將有助于發送文本消息并調整循環以控制需要發送以創建Kafka客戶端的消息數量:

    public class ProducerExample {public static void main(String[] str) throws InterruptedException, IOException {System.out.println("Starting ProducerExample ...");sendMessages();}private static void sendMessages() throws InterruptedException, IOException {Producer<String, String> producer = createProducer();sendMessages(producer);// Allow the producer to complete sending of the messages before program exit.Thread.sleep(20);}private static Producer<String, String> createProducer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("acks", "all");props.put("retries", 0);// Controls how much bytes sender would wait to batch up before publishing to Kafka.props.put("batch.size", 10);props.put("linger.ms", 1);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");return new KafkaProducer(props);}private static void sendMessages(Producer<String, String> producer) {String topic = "normal-topic";int partition = 0;long record = 1;for (int i = 1; i <= 10; i++) {producer.send(new ProducerRecord<String, String>(topic, partition, Long.toString(record),Long.toString(record++)));}} }

    ?

    4.消費者可以注冊Kafka

    首先,讓我們學習幾種方法,Kafka消費者客戶可以通過這種方式向Kafka經紀人注冊。具體來說,有兩種方法,使用subscribe方法調用或使用assign方法調用。讓我們詳細了解這兩種Kafka客戶端方法。

    一個。使用訂閱方法調用

    使用訂閱方法調用時,Kafka會在添加/刪除主題或分區時,或者在添加或刪除使用者時自動重新平衡可用的使用者。

    灣?使用分配方法調用。

    但是,當消費者使用assign方法調用注冊時,Kafka客戶端不提供消費者的自動重新平衡。
    讓我們修改Kafka架構及其基本概念
    上述任何一種注冊選項都可以被最多一次,至少一次或完全一次的消費者使用。
    一世。最多一次卡夫卡消費者(零次或多次交付)
    基本上,這是卡夫卡消費者的默認行為。
    要在Kafka客戶端中配置此類型的使用者,請按照下列步驟操作:

    • 首先,將'enable.auto.commit'設置為true。
    • 另外,將'auto.commit.interval.ms'設置為較低的時間范圍。
    • 確保不要調用consumer.commitSync();?來自消費者。此外,Kafka將使用此消費者配置以指定的時間間隔自動提交偏移量。

    然而,消費者有可能表現出最多一次或至少一次的行為,而消費者則以這種方式配置。雖然,讓我們將此消費者聲明為最多一次,因為最多一次是較低的消息傳遞保證。讓我們詳細討論兩種消費者行為:

    • 最多一次的情景

    發生提交間隔的時刻,以及觸發Kafka自動提交上次使用的偏移的時刻,這種情況發生。但是,讓我們假設消息和消費者在處理之間崩潰了。然后,當消費者重新啟動時,它開始從最后提交的偏移量接收消息。同時,消費者可能會丟失一些消息。
    探索卡夫卡的優勢與劣勢

    • 至少一次的情況

    當消費者處理消息并將消息提交到其持久存儲中時,消費者在此時崩潰,這種情況發生。但是,讓我們假設Kafka沒有機會向代理提交偏移,因為提交間隔還沒有通過。然后,當消費者重新啟動時,它會從最后一個提交的偏移量中獲得一些較舊的消息。
    卡夫卡消費者代碼:

    public class AtMostOnceConsumer {public static void main(String[] str) throws InterruptedException {System.out.println("Starting AtMostOnceConsumer ...");execute();}private static void execute() throws InterruptedException {KafkaConsumer<String, String> consumer = createConsumer();// Subscribe to all partition in that topic. 'assign' could be used here// instead of 'subscribe' to subscribe to specific partition.consumer.subscribe(Arrays.asList("normal-topic"));processRecords(consumer);}private static KafkaConsumer<String, String> createConsumer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");String consumeGroup = "cg1";props.put("group.id", consumeGroup);// Set this property, if auto commit should happen.props.put("enable.auto.commit", "true");// Auto commit interval, kafka would commit offset at this interval.props.put("auto.commit.interval.ms", "101");// This is how to control number of records being read in each pollprops.put("max.partition.fetch.bytes", "135");// Set this if you want to always read from beginning.// props.put("auto.offset.reset", "earliest");props.put("heartbeat.interval.ms", "3000");props.put("session.timeout.ms", "6001");props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<String, String>(props);}private static void processRecords(KafkaConsumer<String, String> consumer) {while (true) {ConsumerRecords<String, String> records = consumer.poll(100);long lastOffset = 0;for (ConsumerRecord<String, String> record : records) {System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());lastOffset = record.offset();}System.out.println("lastOffset read: " + lastOffset);process();}}private static void process() throws InterruptedException {// create some delay to simulate processing of the message.Thread.sleep(20);} }

    ?

    II。至少一次Kafka Consumer(一個或多個消息傳遞,可能重復)
    為了配置此類型的使用者,請按照下列步驟操作:

    • 首先,將'enable.auto.commit'設置為false或
    • 另外,將'enable.auto.commit'設置為true,將'auto.commit.interval.ms'設置為更高的數字。

    通過調用consumer.commitSync(),Consumer現在應該控制消息偏移提交給Kafka;?
    此外,為了避免重復消息的重新處理,在消費者中實現“冪等”行為,尤其是對于這種類型的消費者,因為在以下場景中,可能發生重復的消息傳遞。
    我們來討論Apache Kafka Security |?Kafka代碼的需求和組成部分

    public class AtLeastOnceConsumer {public static void main(String[] str) throws InterruptedException {System.out.println("Starting AutoOffsetGuranteedAtLeastOnceConsumer ...");execute();}private static void execute() throws InterruptedException {KafkaConsumer<String, String> consumer = createConsumer();// Subscribe to all partition in that topic. 'assign' could be used here// instead of 'subscribe' to subscribe to specific partition.consumer.subscribe(Arrays.asList("normal-topic"));processRecords(consumer);}private static KafkaConsumer<String, String> createConsumer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");String consumeGroup = "cg1";props.put("group.id", consumeGroup);// Set this property, if auto commit should happen.props.put("enable.auto.commit", "true");// Make Auto commit interval to a big number so that auto commit does not happen,// we are going to control the offset commit via consumer.commitSync(); after processing // message.props.put("auto.commit.interval.ms", "999999999999");// This is how to control number of messages being read in each pollprops.put("max.partition.fetch.bytes", "135");props.put("heartbeat.interval.ms", "3000");props.put("session.timeout.ms", "6001");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<String, String>(props);}private static void processRecords(KafkaConsumer<String, String> consumer) throws {while (true) {ConsumerRecords<String, String> records = consumer.poll(100);long lastOffset = 0;for (ConsumerRecord<String, String> record : records) {System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());lastOffset = record.offset();}System.out.println("lastOffset read: " + lastOffset);process();// Below call is important to control the offset commit. Do this call after you// finish processing the business process. consumer.commitSync();}}private static void process() throws InterruptedException {// create some delay to simulate processing of the record.Thread.sleep(20);} }

    ?

    III。通過訂閱(一個且只有一個消息傳遞)完全一次Kafka動態消費者
    這里,通過'subscribe'(1,a)注冊方法調用,消費者向Kafka注冊。
    確保在這種情況下應手動管理偏移量。要在Kafka客戶端中設置完全一次的方案,請按照下列步驟操作:

    • 首先,設置enable.auto.commit = false。
    • 處理完消息后,請勿調用consumer.commitSync()。
    • 此外,通過進行“訂閱”調用,將消費者注冊到主題。
    • 要從該主題/分區的特定偏移量開始讀取,請實現ConsumerRebalanceListener。此外,在偵聽器中執行consumer.seek(topicPartition,offset)。
    • 作為安全網,實施冪等。

    碼:

    public class ExactlyOnceDynamicConsumer {private static OffsetManager offsetManager = new OffsetManager("storage2");public static void main(String[] str) throws InterruptedException {System.out.println("Starting ExactlyOnceDynamicConsumer ...");readMessages();}private static void readMessages() throws InterruptedException {KafkaConsumer<String, String> consumer = createConsumer();// Manually controlling offset but register consumer to topics to get dynamically// assigned partitions. Inside MyConsumerRebalancerListener use// consumer.seek(topicPartition,offset) to control offset which messages to be read.consumer.subscribe(Arrays.asList("normal-topic"),new MyConsumerRebalancerListener(consumer));processRecords(consumer);}private static KafkaConsumer<String, String> createConsumer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");String consumeGroup = "cg3";props.put("group.id", consumeGroup);// To turn off the auto commit, below is a key setting.props.put("enable.auto.commit", "false");props.put("heartbeat.interval.ms", "2000");props.put("session.timeout.ms", "6001");// Control maximum data on each poll, make sure this value is bigger than the maximum // single message sizeprops.put("max.partition.fetch.bytes", "140");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<String, String>(props);}private static void processRecords(KafkaConsumer<String, String> consumer)while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());// Save processed offset in external storage. offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(), record.offset());}}} } public class MyConsumerRebalancerListener implements org.apache.kafka.clients.consumer.ConsumerRebalanceListener {private OffsetManager offsetManager = new OffsetManager("storage2");private Consumer<String, String> consumer;public MyConsumerRebalancerListener(Consumer<String, String> consumer) {this.consumer = consumer;}public void onPartitionsRevoked(Collection<TopicPartition> partitions) {for (TopicPartition partition : partitions) {offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(), consumer.position(partition));}}public void onPartitionsAssigned(Collection<TopicPartition> partitions) {for (TopicPartition partition : partitions) {consumer.seek(partition, offsetManager.readOffsetFromExternalStore(partition.topic(), partition.partition()));}} } /** * The partition offset are stored in an external storage. In this case in a local file system where * program runs. */ public class OffsetManager {private String storagePrefix;public OffsetMpublic class ExactlyOnceDynamicConsumer {private static OffsetManager offsetManager = new OffsetManager("storage2");public static void main(String[] str) throws InterruptedException {System.out.println("Starting ExactlyOnceDynamicConsumer ...");readMessages();}private static void readMessages() throws InterruptedException {KafkaConsumer<String, String> consumer = createConsumer()// Manually controlling offset but register consumer to topics to get dynamically// assigned partitions. Inside MyConsumerRebalancerListener use// consumer.seek(topicPartition,offset) to control offset which messages to be read.consumer.subscribe(Arrays.asList("normal-topic"),new MyConsumerRebalancerListener(consumer));processRecords(consumer);}private static KafkaConsumer<String, String> createConsumer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");String consumeGroup = "cg3";props.put("group.id", consumeGroup);// To turn off the auto commit, below is a key setting.props.put("enable.auto.commit", "false");props.put("heartbeat.interval.ms", "2000");props.put("session.timeout.ms", "6001");// Control maximum data on each poll, make sure this value is bigger than the maximum // single message sizeprops.put("max.partition.fetch.bytes", "140");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<String, String>(props);}private static void processRecords(KafkaConsumer<String, String> consumer) {while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());// Save processed offset in external storage. offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(), record.offset());}}} } public class MyConsumerRebalancerListener implements org.apache.kafka.clients.consumer.ConsumerRebalanceListener {private OffsetManager offsetManager = new OffsetManager("storage2");private Consumer<String, String> consumer;public MyConsumerRebalancerListener(Consumer<String, String> consumer) {this.consumer = consumer;}public void onPartitionsRevoked(Collection<TopicPartition> partitions) {for (TopicPartition partition : partitions) {offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(), consumer.position(partition));}}public void onPartitionsAssigned(Collection<TopicPartition> partitions) {for (TopicPartition partition : partitions) {consumer.seek(partition, offsetManager.readOffsetFromExternalStore(partition.topic(), partition.partition()));}} } /** * The partition offset are stored in an external storage. In this case in a local file system where * program runs. */ public class OffsetManager {private String storagePrefix;public OffsetManager(String storagePrefix) {this.storagePrefix = storagePrefix;}/*** in an external storage, overwrite the offset for the topic.** @param topic - Topic name.* @param partition - Partition of the topic.* @param offset - offset to be stored.*/void saveOffsetInExternalStore(String topic, int partition, long offset) {try {FileWriter writer = new FileWriter(storageName(topic, partition), false);BufferedWriter bufferedWriter = new BufferedWriter(writer);bufferedWriter.write(offset + "");bufferedWriter.flush();bufferedWriter.close();} catch (Exception e) {e.printStackTrace();throw new RuntimeException(e);}}/*** @return he last offset + 1 for the provided topic and partition.*/long readOffsetFromExternalStore(String topic, int partition) {try {Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;} catch (Exception e) {e.printStackTrace();}return 0;}private String storageName(String topic, int partition) {return storagePrefix + "-" + topic + "-" + partition;} } anager(String storagePrefix) {this.storagePrefix = storagePrefix;}/*** in an external storage, overwrite the offset for the topic.** @param topic - Topic name.* @param partition - Partition of the topic.* @param offset - offset to be stored.*/void saveOffsetInExternalStore(String topic, int partition, long offset) {try {FileWriter writer = new FileWriter(storageName(topic, partition), false);BufferedWriter bufferedWriter = new BufferedWriter(writer);bufferedWriter.write(offset + "");bufferedWriter.flush();bufferedWriter.close();} catch (Exception e) {e.printStackTrace();throw new RuntimeException(e);}}/*** @return he last offset + 1 for the provided topic and partition.*/long readOffsetFromExternalStore(String topic, int partition) {try {Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;} catch (Exception e) {e.printStackTrace();}return 0;}private String storageName(String topic, int partition) {return storagePrefix + "-" + topic + "-" + partition;} }

    ?

    看看Storm Kafka與配置和代碼的集成
    iv。完全一次Kafka靜態消費者通過分配(一次和一次消息傳遞)
    這里,通過'assign(2)注冊方法調用,消費者向Kafka客戶注冊。
    確保在這種情況下應手動管理偏移量。要通過Assign設置Exactly-once Kafka Static Consumer,請按照下列步驟操作:

    • 首先,設置enable.auto.commit = false
    • 請記住,在處理完消息后,請不要調用consumer.commitSync()。
    • 此外,通過使用'assign'調用,將consumer注冊到特定分區。
    • 通過調用consumer.seek(topicPartition,offset),在消費者啟動時尋找特定的消息偏移量。
    • 另外,作為安全網,實施冪等。

    碼:

    public class ExactlyOnceStaticConsumer {private static OffsetManager offsetManager = new OffsetManager("storage1");public static void main(String[] str) throws InterruptedException, IOException {System.out.println("Starting ExactlyOnceStaticConsumer ...");readMessages();}private static void readMessages() throws InterruptedException, IOException {KafkaConsumer<String, String> consumer = createConsumer();String topic = "normal-topic";int partition =1;TopicPartition topicPartition =registerConsumerToSpecificPartition(consumer, topic, partition);// Read the offset for the topic and partition from external storage.long offset = offsetManager.readOffsetFromExternalStore(topic, partition);// Use seek and go to exact offset for that topic and partition. consumer.seek(topicPartition, offset);processRecords(consumer);}private static KafkaConsumer<String, String> createConsumer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");String consumeGroup = "cg2";props.put("group.id", consumeGroup);// To turn off the auto commit, below is a key setting.props.put("enable.auto.commit", "false");props.put("heartbeat.interval.ms", "2000");props.put("session.timeout.ms", "6001");// control maximum data on each poll, make sure this value is bigger than the maximum // single message sizeprops.put("max.partition.fetch.bytes", "140");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<String, String>(props);}/*** Manually listens for specific topic partition. Now, see an example of how to * dynamically listens to partition and want to manually control offset,* ExactlyOnceDynamicConsumer.java*/private static TopicPartition registerConsumerToSpecificPartition(KafkaConsumer<String, String> consumer, String topic, int partition) {TopicPartition topicPartition = new TopicPartition(topic, partition);List<TopicPartition> partitions = Arrays.asList(topicPartition);consumer.assign(partitions);return topicPartition;}/*** Process data and store offset in external store. Best practice is to do these operations* atomically.*/private static void processRecords(KafkaConsumer<String, String> consumer) throws {while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(), record.offset());}}} }

    ?

    5. Avro制片人和消費者

    在定義Avro時,它是一種開源二進制消息交換協議。基本上,為了通過線路發送優化的消息,這也減少了網絡開銷,我們使用它。此外,對于可以使用JSON定義的消息,Avro可以強制執行模式。通過使用這些模式,Avro可以使用各種編程語言生成綁定對象。將Avro與Kafka一起使用是本機支持的,也是強烈推薦的。
    閱讀Apache Kafka + Spark Streaming Integration
    下面是一個簡單的Avro消費者和制作人。

    public class AvroConsumerExample {public static void main(String[] str) throws InterruptedException {System.out.println("Starting AutoOffsetAvroConsumerExample ...");readMessages();}private static void readMessages() throws InterruptedException {KafkaConsumer<String, byte[]> consumer = createConsumer();// Assign to specific topic and partition.consumer.assign(Arrays.asList(new TopicPartition("avro-topic", 0)));processRecords(consumer);}private static void processRecords(KafkaConsumer<String, byte[]> consumer) throws {while (true) {ConsumerRecords<String, byte[]> records = consumer.poll(100);long lastOffset = 0;for (ConsumerRecord<String, byte[]> record : records) {GenericRecord genericRecord = AvroSupport.byteArrayToData(AvroSupport.getSchema(), record.value());String firstName = AvroSupport.getValue(genericRecord, "firstName", String.class);System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(), record.key(), firstName);lastOffset = record.offset();}System.out.println("lastOffset read: " + lastOffset);consumer.commitSync();}}private static KafkaConsumer<String, byte[]> createConsumer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");String consumeGroup = "cg1";props.put("group.id", consumeGroup);props.put("enable.auto.commit", "true");props.put("auto.offset.reset", "earliest");props.put("auto.commit.interval.ms", "100");props.put("heartbeat.interval.ms", "3000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");return new KafkaConsumer<String, byte[]>(props);} } public class AvroProducerExample {public static void main(String[] str) throws InterruptedException, IOException {System.out.println("Starting ProducerAvroExample ...");sendMessages();}private static void sendMessages() throws InterruptedException, IOException {Producer<String, byte[]> producer = createProducer();sendRecords(producer);}private static Producer<String, byte[]> createProducer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");return new KafkaProducer(props);}private static void sendRecords(Producer<String, byte[]> producer) throws IOException, {String topic = "avro-topic";int partition = 0;while (true) {for (int i = 1; i < 100; i++)producer.send(new ProducerRecord<String, byte[]>(topic, partition, Integer.toString(0), record(i + "")));}}private static byte[] record(String name) throws IOException {GenericRecord record = new GenericData.Record(AvroSupport.getSchema());record.put("firstName", name);return AvroSupport.dataToByteArray(AvroSupport.getSchema(), record);} }

    ?

    所以,這完全是關于Kafka客戶端的。希望您喜歡我們對如何創建Kafka客戶端的解釋。

    六,結論

    因此,我們已經看到了使用Kafka API創建Kafka客戶端的所有方法。此外,在這個Kafka Clients教程中,我們討論了Kafka Producer Client,Kafka Consumer Client。除此之外,我們還了解了Avro Kafka Producer和Consumer Kafka客戶。但是,如果對Kafka客戶有任何疑問,請隨時通過評論部分詢問。?

    轉載于:https://www.cnblogs.com/a00ium/p/10852433.html

    與50位技術專家面對面20年技術見證,附贈技術全景圖

    總結

    以上是生活随笔為你收集整理的如何创建Kafka客户端:Avro Producer和Consumer Client的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。