Kafka使用Java客户端进行访问
生活随笔
收集整理的這篇文章主要介紹了
Kafka使用Java客户端进行访问
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
本文環境如下:
操作系統:CentOS 6 32位
JDK版本:1.8.0_77 32位
Kafka版本:0.9.0.1(Scala 2.11)
1. maven依賴包
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.9.0.1</version> </dependency>2. 生產者代碼
package com.lnho.example.kafka;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "master:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for(int i = 0; i < 100; i++)producer.send(new ProducerRecord<>("topic1", Integer.toString(i), Integer.toString(i)));producer.close();} }3. 消費者代碼
package com.lnho.example.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays; import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "master:9092");props.put("group.id", "test");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("topic1"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());}} }4. 執行程序
lib底下需要有:kafka-clients-0.9.0.1.jar log4j-1.2.17.jar slf4j-api-1.7.6.jar slf4j-log4j12-1.7.6.jar
生產者:
消費者:
java -classpath kafka-example-1.0-SNAPSHOT.jar:lib/* com.lnho.example.kafka.KafkaConsumerExample總結
以上是生活随笔為你收集整理的Kafka使用Java客户端进行访问的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Kafka单机、集群模式安装详解(二)
- 下一篇: 记录一次大对象导致的Java堆内存溢出问