日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

5、kafka的操作

發布時間:2025/5/22 106 豆豆
生活随笔 收集整理的這篇文章主要介紹了 5、kafka的操作 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

2019獨角獸企業重金招聘Python工程師標準>>>

#1、通過shell腳本

  • 查看當前服務器中的所有topic

    bin/kafka-topics.sh --list --zookeeper zk01:2181

  • 創建topic

    bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic test

  • 刪除topic

    bin/kafka-topics.sh --delete --zookeeper zk01:2181 --topic test

    需要server.properties中設置delete.topic.enable=true否則只是標記刪除或者直接重啟。

  • 生產數據

    kafka-console-producer.sh --broker-list kafka01:9092 --topic itheima

    演變操作:(不需要任何的數據采集工具)tail -F /root/log.log | kafka-console-producer.sh --broker-list hadoop1:9092 --topic accesslogs
  • 消費消息

    sh bin/kafka-console-consumer.sh --zookeeper zk01:2181 --from-beginning --topic test1

  • 查看消費位置

    sh kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181 --group testGroup

  • 查看某個Topic的詳情

    sh kafka-topics.sh --topic test --describe --zookeeper zk01:2181

  • 對分區數進行修改

    kafka-topics.sh --zookeeper zk01 --alter --partitions 15 --topic utopic

#2、通過Java的api操作:

生產者API

public class KafkaProducerSimple {public static void main(String[] args) throws InterruptedException {String TOPIC = "test9";Properties props = new Properties();props.put("serializer.class", "kafka.serializer.StringEncoder");props.put("metadata.broker.list", "kafka01:9092");props.put("request.required.acks", "1");//定義一個partition分區器props.put("partitioner.class", "cn.itcast.storm.kafka.producer.MyLogPartitioner");Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));int messageNo = 0;while (true){String messageStr = new String("produce數據");KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(TOPIC, messageNo + "", messageStr);producer.send(keyedMessage);messageNo +=1;}} }

消費者API

public class KafkaConsumerSimple implements Runnable {public String title;public KafkaStream<byte[], byte[]> stream;public KafkaConsumerSimple(String title, KafkaStream<byte[], byte[]> stream) {this.title = title;this.stream = stream;}public void run() {System.out.println("開始運行 " + title);ConsumerIterator<byte[], byte[]> it = stream.iterator();/*** 不停地從stream讀取新到來的消息,在等待新的消息時,hasNext()會阻塞* 如果調用 `ConsumerConnector#shutdown`,那么`hasNext`會返回false* */while (it.hasNext()) {MessageAndMetadata<byte[], byte[]> data = it.next();String topic = data.topic();int partition = data.partition();long offset = data.offset();String msg = new String(data.message());System.out.println(String.format("Consumer: [%s], Topic: [%s], PartitionId: [%d], Offset: [%d], msg: [%s]",title, topic, partition, offset, msg));}System.err.println(String.format("Consumer: [%s] exiting ...", title));}public static void main(String[] args) throws Exception{// main方法Properties props = new Properties();props.put("group.id", "testGroup");props.put("zookeeper.connect", "zk01:2181,zk02:2181,zk03:2181");props.put("auto.offset.reset", "largest");props.put("auto.commit.interval.ms", "1000");props.put("partition.assignment.strategy", "roundrobin");ConsumerConfig config = new ConsumerConfig(props);String topic = "test9";//只要ConsumerConnector還在的話,consumer會一直等待新消息,不會自己退出ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config);//創建topicCountMapMap<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic,9);Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap);//取出 `kafkaTest` 對應的 streamsList<KafkaStream<byte[], byte[]>> streams = topicStreamsMap.get(topic);//創建一個容量為20的線程池ExecutorService executor = Executors.newFixedThreadPool(9);//創建20個consumer threadsfor (int i = 0; i < streams.size(); i++)executor.execute(new KafkaConsumerSimple("消費者" + (i + 1), streams.get(i)));} }

轉載于:https://my.oschina.net/liufukin/blog/800434

總結

以上是生活随笔為你收集整理的5、kafka的操作的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。