日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

go使用kafka

發布時間:2024/4/11 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 go使用kafka 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

原文鏈接:https://www.cnblogs.com/angelyan/p/10800739.html

windows上kafka的安裝

1.安裝jdk

下載地址:https://www.oracle.com/technetwork/java/javase/downloads/jre8-downloads-2133155.html

下載需要注冊oracle

添加環境變量JAVA_HOME=C:\Program Files\Java\jre1.8.0_211

2.安裝Zookeeper

下載地址:https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

①進入zookeeper的相關設置所在的文件目錄,例如本文的:D:\zookeeper-3.4.14\conf

②將"zoo_sample.cfg"重命名為"zoo.cfg"

③打開zoo.cfg 找到并編輯:

dataDir=/tmp/zookeeper to D:/zookeeper-3.4.14/data或 D:/zookeeper-3.4.14/data(路徑僅為示例,具體可根據需要配置)

④與配置jre類似,在系統環境變量中添加:

a.系統變量中添加ZOOKEEPER_HOME=D:\zookeeper-3.4.14

b.編輯系統變量中的path變量,增加%ZOOKEEPER_HOME%\bin

⑤在zoo.cfg文件中修改默認的Zookeeper端口(默認端口2181)

⑥打開cmd窗口,輸入zkserver,運行Zookeeper

3.安裝kafka

下載地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/2.2.0/kafka_2.11-2.2.0.tgz

①進入kafka配置文件所在目錄,D:\kafka_2.11-2.2.0\config

②編輯文件"server.properties",找到并編輯:

log.dirs=/tmp/kafka-logs to log.dirs=D:/kafka_2.11-2.2.0/kafka-logs

③在server.properties文件中,zookeeper.connect=localhost:2181代表kafka所連接的zookeeper所在的服務器IP以及端口,可根據需要更改

修改advertised.host.name=服務器ip

④進入kafka安裝目錄D:\kafka_2.11-2.2.0,打開cmd啟動

.\bin\windows\kafka-server-start.bat .\config\server.properties

補充知識點

topic

topic是存儲消息的邏輯概念,不同的topic下的數據是分開存儲的。不同的 topic 的消息是分開存儲的, 每個 topic 可以有多個生產者向它發送消息,也可以有多 個消費者去消費其中的消息。

partition

一個 topic 可以劃分多個分區partition(每個 Topic至少有一個分區partition),同一topic下的不同分區包含的消息是不同的。第i個分區分配在第?i mod n?個broker上。

每個消息在被添加到分區時,都會被分配一個offset(稱之為偏移量),它是消息在此分區中的唯一編號,kafka通過offset 保證消息在分區內的順序,offset的順序不跨分區,即kafka 只保證在同一個分區內的消息是有序的。

offset

每個消息在被添加到分區時,都會被分配一個 offset(稱之為偏移量),它是消息在此分區中的唯一編號,kafka 通過 offset 保證消息在分區內的順序。offset 的順序不跨分區,即 kafka 只保證在同一個分區內的消息是有序的; 對于應用層的消費來說,每次消費一個消息并且提交以后,會保存當前消費到的最近的一個 offset。

?

docker安裝

1、下載鏡像
這里使用了wurstmeister/kafka和wurstmeister/zookeeper這兩個版本的鏡像

docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka

2、啟動

啟動zookeeper容器 docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper 啟動kafka docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.0.168:2181,192.168.0.169:2181,192.168.0.170:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.170:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka這里面主要設置了4個參數KAFKA_BROKER_ID=0 KAFKA_ZOOKEEPER_CONNECT=192.168.0.168:2181,192.168.0.169:2181,192.168.0.170:2181 KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.170:9092 KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092KAFKA_ZOOKEEPER_CONNECT 配置的是zookeeper的地址,可以單節點配置,也可以配置zookeeper集群多節點,用逗號隔開中間兩個參數的192.168.0.170改為宿主機器的IP地址,如果不這么設置,可能會導致在別的機器上訪問不到kafka。

?

3、進入kafka容器

docker exec -it ${CONTAINER ID} /bin/bash進入kafka默認目錄 /opt/kafka_2.11-0.10.1.0

?

go運行代碼

producer

package mainimport ("fmt""time""github.com/Shopify/sarama" )func main() {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAllconfig.Producer.Partitioner = sarama.NewRandomPartitionerconfig.Producer.Return.Successes = trueclient, err := sarama.NewSyncProducer([]string{"192.168.3.118:9092"}, config)if err != nil {fmt.Println("producer close, err:", err)return}defer client.Close()for {msg := &sarama.ProducerMessage{}msg.Topic = "nginx_log"msg.Value = sarama.StringEncoder("this is a good test, my message is good")pid, offset, err := client.SendMessage(msg)if err != nil {fmt.Println("send message failed,", err)return}fmt.Printf("pid:%v offset:%v\n", pid, offset)time.Sleep(10 * time.Second)} }

?

?

?consumer

package mainimport ("fmt""strings""sync""github.com/Shopify/sarama""time" )var (wg sync.WaitGroup )func main() {consumer, err := sarama.NewConsumer(strings.Split("127.0.0.1:9092", ","), nil)if err != nil {fmt.Println("Failed to start consumer: %s", err)return}partitionList, err := consumer.Partitions("nginx_log")if err != nil {fmt.Println("Failed to get the list of partitions: ", err)return}fmt.Println(partitionList)for partition := range partitionList {pc, err := consumer.ConsumePartition("nginx_log", int32(partition), sarama.OffsetNewest)if err != nil {fmt.Printf("Failed to start consumer for partition %d: %s\n", partition, err)return}defer pc.AsyncClose()go func(pc sarama.PartitionConsumer) {wg.Add(1)for msg := range pc.Messages() {fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))fmt.Println()}wg.Done()}(pc)}time.Sleep(10*time.Second)wg.Wait()consumer.Close() }

?

?

超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生

總結

以上是生活随笔為你收集整理的go使用kafka的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 欧美成人精品二区三区99精品 | 亚洲欧美综合精品久久成人 | 亚洲精品久久久久久久久 | av色片| 亚洲日本成人 | 涩涩视频在线观看免费 | 日韩人妻精品无码一区二区三区 | 欧美日韩国产综合在线 | 欧美性网址| 伊人中文字幕 | 欧美大尺度做爰啪啪床戏明星 | 日本国产精品一区 | 午夜网站在线观看 | 色呦呦网站在线观看 | 欧美一区二区三区大屁股撅起来 | 日本久久网站 | 国产一区二区三区高清视频 | 久久久国产免费 | 黄色资源在线播放 | 88av.com | 日韩综合一区二区三区 | 少妇熟女一区二区 | 黄色avv | 久久久久久网站 | 天天玩夜夜操 | 在线看中文字幕 | 不卡日韩 | 男人天堂资源 | 久久aaaa片一区二区 | 日本在线播放一区 | 日本无遮羞调教打屁股网站 | 一级国产特黄bbbbb | 噜噜噜久久 | 中文字幕av无码一区二区三区 | 国产精品三区在线观看 | 精品日韩一区二区三区 | 国产日韩视频 | 国产乱码精品一区二区三区不卡 | 天天做日日干 | 天天躁日日躁狠狠躁av麻豆 | 午夜激情亚洲 | 亚洲石原莉奈一区二区在线观看 | 毛片a片免费看 | 亚洲av无码一区二区乱子伦 | 亚洲涩网 | 亚洲免费网站 | av在线资源站 | 亚洲欧美色图视频 | 国产二级一片内射视频播放 | 超碰偷拍| 俺啪也| 久久久久久久国产精品美女 | 97免费视频观看 | 爱爱视频网站 | 在线观看亚洲区 | 日本视频一区二区 | 亚洲欧美成人一区二区三区 | 国产人成在线 | 日韩一区二区在线免费观看 | 乱码一区二区三区 | 国产99久久久国产精品 | 神马老子午夜 | 蜜桃久久精品成人无码av | 免费看国产片在线观看 | 国产无遮挡又黄又爽 | 天堂视频在线观看免费 | 伊人98| 免费黄色网址在线 | 18视频网站在线观看 | 国产精品区在线观看 | 污片网站在线观看 | 国产女人和拘做受视频免费 | 九一亚色| 黄在线观看免费 | 国产911 | 久久无码人妻丰满熟妇区毛片 | av资源网在线 | 欧美国产日本在线 | 好吊操视频这里只有精品 | 午夜中文字幕 | 丁香婷婷亚洲 | 国产日韩免费 | 亚洲性激情 | 精品久久久久久久免费人妻 | 污站在线观看 | 精品毛片一区二区三区 | 狠狠人妻久久久久久综合 | 久久久视频在线 | 美女作爱网站 | 欧美18免费视频 | 日韩免费福利视频 | 99精品综合| 国产又色又爽又黄又免费 | 99精品国产成人一区二区 | 欧美日韩在线一区二区 | 妹子干综合 | 热99这里只有精品 | 国产精品一线二线 | 99热久久这里只有精品 |