日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

5、kafka的操作

發布時間:2025/5/22 编程问答 83 豆豆
生活随笔 收集整理的這篇文章主要介紹了 5、kafka的操作 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

2019獨角獸企業重金招聘Python工程師標準>>>

#1、通過shell腳本

  • 查看當前服務器中的所有topic

    bin/kafka-topics.sh --list --zookeeper zk01:2181

  • 創建topic

    bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic test

  • 刪除topic

    bin/kafka-topics.sh --delete --zookeeper zk01:2181 --topic test

    需要server.properties中設置delete.topic.enable=true否則只是標記刪除或者直接重啟。

  • 生產數據

    kafka-console-producer.sh --broker-list kafka01:9092 --topic itheima

    演變操作:(不需要任何的數據采集工具)tail -F /root/log.log | kafka-console-producer.sh --broker-list hadoop1:9092 --topic accesslogs
  • 消費消息

    sh bin/kafka-console-consumer.sh --zookeeper zk01:2181 --from-beginning --topic test1

  • 查看消費位置

    sh kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181 --group testGroup

  • 查看某個Topic的詳情

    sh kafka-topics.sh --topic test --describe --zookeeper zk01:2181

  • 對分區數進行修改

    kafka-topics.sh --zookeeper zk01 --alter --partitions 15 --topic utopic

#2、通過Java的api操作:

生產者API

public class KafkaProducerSimple {public static void main(String[] args) throws InterruptedException {String TOPIC = "test9";Properties props = new Properties();props.put("serializer.class", "kafka.serializer.StringEncoder");props.put("metadata.broker.list", "kafka01:9092");props.put("request.required.acks", "1");//定義一個partition分區器props.put("partitioner.class", "cn.itcast.storm.kafka.producer.MyLogPartitioner");Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));int messageNo = 0;while (true){String messageStr = new String("produce數據");KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(TOPIC, messageNo + "", messageStr);producer.send(keyedMessage);messageNo +=1;}} }

消費者API

public class KafkaConsumerSimple implements Runnable {public String title;public KafkaStream<byte[], byte[]> stream;public KafkaConsumerSimple(String title, KafkaStream<byte[], byte[]> stream) {this.title = title;this.stream = stream;}public void run() {System.out.println("開始運行 " + title);ConsumerIterator<byte[], byte[]> it = stream.iterator();/*** 不停地從stream讀取新到來的消息,在等待新的消息時,hasNext()會阻塞* 如果調用 `ConsumerConnector#shutdown`,那么`hasNext`會返回false* */while (it.hasNext()) {MessageAndMetadata<byte[], byte[]> data = it.next();String topic = data.topic();int partition = data.partition();long offset = data.offset();String msg = new String(data.message());System.out.println(String.format("Consumer: [%s], Topic: [%s], PartitionId: [%d], Offset: [%d], msg: [%s]",title, topic, partition, offset, msg));}System.err.println(String.format("Consumer: [%s] exiting ...", title));}public static void main(String[] args) throws Exception{// main方法Properties props = new Properties();props.put("group.id", "testGroup");props.put("zookeeper.connect", "zk01:2181,zk02:2181,zk03:2181");props.put("auto.offset.reset", "largest");props.put("auto.commit.interval.ms", "1000");props.put("partition.assignment.strategy", "roundrobin");ConsumerConfig config = new ConsumerConfig(props);String topic = "test9";//只要ConsumerConnector還在的話,consumer會一直等待新消息,不會自己退出ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config);//創建topicCountMapMap<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic,9);Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap);//取出 `kafkaTest` 對應的 streamsList<KafkaStream<byte[], byte[]>> streams = topicStreamsMap.get(topic);//創建一個容量為20的線程池ExecutorService executor = Executors.newFixedThreadPool(9);//創建20個consumer threadsfor (int i = 0; i < streams.size(); i++)executor.execute(new KafkaConsumerSimple("消費者" + (i + 1), streams.get(i)));} }

轉載于:https://my.oschina.net/liufukin/blog/800434

總結

以上是生活随笔為你收集整理的5、kafka的操作的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。