Kafka基本使用
Kafka基本使用
官網地址 ?http://kafka.apache.org/ ? 一切應以官網文檔為準。
?
安裝
download里下載要安裝的版本。或者直接wget該網址。如wget http://mirrors.cnnic.cn/apache/kafka/0.8.2.1/kafka_2.11-0.8.2.1.tgz
quickstart里有安裝方法:
tar -zxvf kafka_2.11-0.8.2.1.tgz
config/server.properties下有各種各種配置
先要保證zookeeper以啟動,然后啟動kafka ? ?bin/kafka-server-start.sh?config/server.properties?
?
?
測試
可以先測試一下:這些命令都以官網最新的為準: ?建topic(指定zookeeper factor?partitions ?topic名) ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 看topic (指定zookeeper) ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?bin/kafka-topics.sh --list --zookeeper ?localhost:2181
啟動consumer ?(指定zookeeper ?topic ? from開始) 啟動后不要關 ? ? ? ? ??bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
再啟一個窗口 ?啟動producer ?(指定broker topic) 輸入內容回車,去consumer觀察 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
?
報錯
注意版本問題,如果javaApi producer版本高,想在客戶端consumer啟動低版本驗證,會不停的報錯:
Closing socket connection to/127,0,0,1.(kafka.network.Processor) ?無法識別客戶端消息。
由于需要定時啟動Kafka?consumer拉取數據,第一次啟動后,沒有關掉線程。
但Kafka?consumer是非線程安全的,第二次消費數據時會報錯:
java.util.ConcurrentModificationException:?KafkaConsumer?is?not?safe?for?multi-threaded?access ?
轉載于:https://www.cnblogs.com/tree1123/p/6760688.html
總結
- 上一篇: 关于checkbox
- 下一篇: 构建之法 第三次心得