日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Kafka使用Java客户端进行访问

發布時間:2024/2/28 java 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka使用Java客户端进行访问 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本文環境如下:
操作系統:CentOS 6 32位
JDK版本:1.8.0_77 32位
Kafka版本:0.9.0.1(Scala 2.11)

1. maven依賴包

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.9.0.1</version> </dependency>

2. 生產者代碼

package com.lnho.example.kafka;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "master:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for(int i = 0; i < 100; i++)producer.send(new ProducerRecord<>("topic1", Integer.toString(i), Integer.toString(i)));producer.close();} }

3. 消費者代碼

package com.lnho.example.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays; import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "master:9092");props.put("group.id", "test");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("topic1"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());}} }

4. 執行程序

lib底下需要有:kafka-clients-0.9.0.1.jar log4j-1.2.17.jar slf4j-api-1.7.6.jar slf4j-log4j12-1.7.6.jar
生產者:

java -classpath kafka-example-1.0-SNAPSHOT.jar:lib/* com.lnho.example.kafka.KafkaProducerExample

消費者:

java -classpath kafka-example-1.0-SNAPSHOT.jar:lib/* com.lnho.example.kafka.KafkaConsumerExample

總結

以上是生活随笔為你收集整理的Kafka使用Java客户端进行访问的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。