日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka环境安装及简单使用(单机版)

發(fā)布時(shí)間:2025/7/25 编程问答 18 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka环境安装及简单使用(单机版) 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一個(gè)分布式發(fā)布-訂閱消息傳遞系統(tǒng)

特點(diǎn):

??? 高吞吐量、低延遲

使用場(chǎng)景(舉例):

??? 日志收集:用kafka收集各種服務(wù)產(chǎn)生的log,通過kafka以統(tǒng)一的接口服務(wù)的方式開放給各種consumer,如hadoop,hbase等

?

下載安裝:

??? 1.下載地址? ? 選擇一個(gè)版本的kafka進(jìn)行下載

??? 2.解壓

tar -zxvf kafka_2.11-0.9.0.1.tgz mv kafka_2.11-0.9.0.1 /opt/

??? 3.配置環(huán)境變量(可選步驟)

?

上手使用:

??? 1.config目錄配置文件(zookeeper.properties,service.properties,producer.properties,consumer.properties)

  我們暫時(shí)先不管這些配置文件,遵守初始的配置

??? 2.先啟動(dòng)zookeeper - kafka依賴與zookeeper 實(shí)現(xiàn)分布式一致性

  我們下載的kafka安裝包,就自帶了zookeeepr,zookeeper.properties就是自帶的zk的配置文件

nohup bin/zookeeper-server-start.sh config/zookeeepr.properties& nohup &是實(shí)現(xiàn)在后臺(tái)啟動(dòng)

???

??? 3.再啟動(dòng)kafka服務(wù)

bin/kafka-server-start.sh config/server.properties

???

??? 4.創(chuàng)建一個(gè)Topic

bin/kafka-topics.sh --create --topic test1 --zookeeper localehost:2181 --config max.message.bytes=12800000 --config flush.messages=1 --partitions 5 --replication-factor 1

?

??? 4.再啟動(dòng)kafka生產(chǎn)端

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1

?

??? 5.在新窗口再啟動(dòng)kafka消費(fèi)端

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test1 --from-beginning

?

??? 6.在生產(chǎn)窗口輸入任意字符,觀察在消費(fèi)端是否能夠收到相應(yīng)字符

  

如果無法收到正確字符,或者報(bào)錯(cuò),嘗試從以下方面排查:

??? 1.服務(wù)是否都按順序正常啟動(dòng)

??? 2.命令中開啟的服務(wù)端口是否和相應(yīng)的配置文件中的配置對(duì)應(yīng)

??????? 注:生產(chǎn)端訪問的端口不是? zookeeper的localhost:2181, 而是producer.properties中配置的broker的端口,默認(rèn)為9092

??????? 注:這個(gè)broker的端口是需要在 server中有相應(yīng)的配置才可以

?

簡(jiǎn)單介紹一下上面提到了config目錄下面的配置,以及kafka集群的搭建

server.properties:一個(gè)server.properties文件代表了一個(gè)kafka服務(wù),也就是一個(gè)Broker

所以說,如果我們想搭建一個(gè)kafka集群,需要有不同的 server.properties文件,來啟動(dòng)多個(gè)broker,多個(gè)borker組成kafka cluster

??? 注:每個(gè)server.properties配置文件中的 broker.id(服務(wù)器唯一標(biāo)識(shí))不能一樣

???????? port(服務(wù)器監(jiān)聽端口號(hào))不能一樣

???????? zookeeper.connect(zookeeper的連接ip及端口),需和zookeeper.properties保持一致

?

kafka在Java程序的簡(jiǎn)單示例:

? 生產(chǎn):

public class JavaKafkaProducer {private Logger logger = Logger.getLogger(JavaKafkaProducer.class);public static final String TOPIC_NAME = "test1";public static final char[] charts = "qazwsxedcrfvtgbyhnujmikolp1234567890".toCharArray();public static final int chartsLength = charts.length;public static void main(String[] args) {String brokerList = "127.0.0.1:9092";Properties props = new Properties();props.put("metadata.broker.list", brokerList);/*** 0表示不等待結(jié)果返回<br/>* 1表示等待至少有一個(gè)服務(wù)器返回?cái)?shù)據(jù)接收標(biāo)識(shí)<br/>* -1表示必須接收到所有的服務(wù)器返回標(biāo)識(shí),及同步寫入<br/>* */props.put("request.required.acks", "0");/*** 內(nèi)部發(fā)送數(shù)據(jù)是異步還是同步* sync:同步, 默認(rèn)* async:異步*/props.put("producer.type", "async");/*** 設(shè)置序列化的類* 可選:kafka.serializer.StringEncoder* 默認(rèn):kafka.serializer.DefaultEncoder*/props.put("serializer.class", "kafka.serializer.StringEncoder");/*** 設(shè)置分區(qū)類* 根據(jù)key進(jìn)行數(shù)據(jù)分區(qū)* 默認(rèn)是:kafka.producer.DefaultPartitioner ==> 安裝key的hash進(jìn)行分區(qū)* 可選:kafka.serializer.ByteArrayPartitioner ==> 轉(zhuǎn)換為字節(jié)數(shù)組后進(jìn)行hash分區(qū)*/props.put("partitioner.class", "com.kafka.JavaKafkaProducerPartitioner");// 重試次數(shù)props.put("message.send.max.retries", "3");// 異步提交的時(shí)候(async),并發(fā)提交的記錄數(shù)props.put("batch.num.messages", "200");// 設(shè)置緩沖區(qū)大小,默認(rèn)10KBprops.put("send.buffer.bytes", "102400");// 2. 構(gòu)建Kafka Producer Configuration上下文ProducerConfig config = new ProducerConfig(props);// 3. 構(gòu)建Producer對(duì)象final Producer<String, String> producer = new Producer<String, String>(config);// 4. 發(fā)送數(shù)據(jù)到服務(wù)器,并發(fā)線程發(fā)送final AtomicBoolean flag = new AtomicBoolean(true);int numThreads = 50;ExecutorService pool = Executors.newFixedThreadPool(numThreads);for (int i = 0; i < 5; i++) {pool.submit(new Thread(new Runnable() {@Overridepublic void run() {while (flag.get()) {// 發(fā)送數(shù)據(jù)KeyedMessage message = generateKeyedMessage();producer.send(message);System.out.println("發(fā)送數(shù)據(jù):" + message);// 休眠一下try {int least = 10;int bound = 100;Thread.sleep(ThreadLocalRandom.current().nextInt(least, bound));} catch (InterruptedException e) {e.printStackTrace();}}System.out.println(Thread.currentThread().getName() + " shutdown....");}}, "Thread-" + i));}// 5. 等待執(zhí)行完成long sleepMillis = 600000;try {Thread.sleep(sleepMillis);} catch (InterruptedException e) {e.printStackTrace();}flag.set(false);// 6. 關(guān)閉資源 pool.shutdown();try {pool.awaitTermination(6, TimeUnit.SECONDS);} catch (InterruptedException e) {} finally {producer.close(); // 最后之后調(diào)用 }}/*** 產(chǎn)生一個(gè)消息** @return*/private static KeyedMessage<String, String> generateKeyedMessage() {String key = "key_" + ThreadLocalRandom.current().nextInt(10, 99);StringBuilder sb = new StringBuilder();int num = ThreadLocalRandom.current().nextInt(1, 5);for (int i = 0; i < num; i++) {sb.append(generateStringMessage(ThreadLocalRandom.current().nextInt(3, 20))).append(" ");}String message = sb.toString().trim();return new KeyedMessage(TOPIC_NAME, key, message);}/*** 產(chǎn)生一個(gè)給定長(zhǎng)度的字符串** @param numItems* @return*/private static String generateStringMessage(int numItems) {StringBuilder sb = new StringBuilder();for (int i = 0; i < numItems; i++) {sb.append(charts[ThreadLocalRandom.current().nextInt(chartsLength)]);}return sb.toString();} }

?

? 消費(fèi):

public class JavaKafkaConsumerHighAPITest {public static void main(String[] args) {String zookeeper = "127.0.0.1";String groupId = "test-consumer-group";String topic = "test1";int threads = 1;JavaKafkaConsumerHighAPI example = new JavaKafkaConsumerHighAPI(topic, threads, zookeeper, groupId);new Thread(example).start();// 執(zhí)行10秒后結(jié)束int sleepMillis = 600000;try {Thread.sleep(sleepMillis);} catch (InterruptedException e) {e.printStackTrace();}// 關(guān)閉 example.shutdown();} }

?

?

kafka各組件說明:

??? 1.Broker -- 每個(gè)kafka server稱為一個(gè)Broker,多個(gè)borker組成kafka cluster。

??? 2.Topic? --? Topic 就是消息類別名,一個(gè)topic中通常放置一類消息。每個(gè)topic都有一個(gè)或者多個(gè)訂閱者,也就是消息的消費(fèi)者consumer。

??????? Producer將消息推送到topic,由訂閱該topic的consumer從topic中拉取消息。

??????? 一個(gè)Broker上可以創(chuàng)建一個(gè)或者多個(gè)Topic。同一個(gè)topic可以在同一集群下的多個(gè)Broker中分布。

??? ....

?

?

參考博文:http://www.cnblogs.com/liuming1992/tag/Kafka/

?

轉(zhuǎn)載于:https://www.cnblogs.com/xuzekun/p/8986540.html

總結(jié)

以上是生活随笔為你收集整理的kafka环境安装及简单使用(单机版)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。