日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Golang 连接Kafka

發(fā)布時(shí)間:2024/4/17 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Golang 连接Kafka 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Kafka介紹

Kafka是Apache軟件基金會開發(fā)的一個開源流處理平臺,由Java和Scala編寫;Kafka是一種高吞吐、分布式、基于訂閱發(fā)布的消息系統(tǒng)。

?

Kafka名稱解釋

  • Producer:生產(chǎn)者
  • Consumer:消費(fèi)者
  • Topic:消息主題,每一類的消息稱之為一個主題
  • Broker:Kafka以集群的方式運(yùn)行,可以由一個或多個服務(wù)器組成,每個服務(wù)器叫做一個broker
  • Partition:物理概念上的分區(qū),為了提供系統(tǒng)吞吐量,在物理上每個Topic會分為一個或多個Partition

?

Kafka架構(gòu)圖

一個典型的Kafka集群中包含若干Producer,若干broker(Kafka支持水平擴(kuò)展,一般broker數(shù)量越多,集群吞吐率越高),若干Consumer Group,以及一個Zookeeper集群。

Kafka通過Zookeeper管理集群配置及服務(wù)協(xié)同,Producer使用push模式將消息發(fā)布到broker,Consumer通過監(jiān)聽使用pull模式從broker訂閱并消費(fèi)消息。

圖上有個細(xì)節(jié)需要注意,producer給broker的過程是push,也就是有數(shù)據(jù)就推送給broker,而consumer給broker的過程是pull,是通過consumer主動去拉數(shù)據(jù)的,而不是broker把數(shù)據(jù)主動發(fā)送給consumer端的。

?

Kafka與RabbitMQ比較

  • Kafka比RabbitMQ性能要高
  • RabbitMQ比Kafka可靠性要高
  • 因此在金融支付領(lǐng)域使用RabbitMQ居多,而在日志處理、大數(shù)據(jù)等方面Kafka使用居多。

?

Kafka安裝

第一步 下載Kafka:

  地址 http://kafka.apache.org/downloads

第二步 解壓Kafka:

  tar -zxvf kafka.tgz -C? /usr/local/kafka

第三步 運(yùn)行Zookeeper:

? ?以后臺方式運(yùn)行 /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &? zookeeper端口 2181

第四步 運(yùn)行Kafka:

? ? ?以后臺方式運(yùn)行?/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties? kafka端口 9092

?

Kafka圖形管理工具

http://www.kafkatool.com/download.html?

?

Go語言中使用Kafka

Sarama is an MIT-licensed Go client library for?Apache Kafka?version 0.8 (and later).

安裝sarama

  go get github.com/Shopify/sarama

Producer

package mainimport ("fmt""github.com/Shopify/sarama" )func main() {// 新建一個arama配置實(shí)例config := sarama.NewConfig()// WaitForAll waits for all in-sync replicas to commit before responding.config.Producer.RequiredAcks = sarama.WaitForAll// NewRandomPartitioner returns a Partitioner which chooses a random partition each time.config.Producer.Partitioner = sarama.NewRandomPartitionerconfig.Producer.Return.Successes = true// 新建一個同步生產(chǎn)者client, err := sarama.NewSyncProducer([]string{"172.16.65.210:9092"}, config)if err != nil {fmt.Println("producer close, err:", err)return}defer client.Close()// 定義一個生產(chǎn)消息,包括Topic、消息內(nèi)容、msg := &sarama.ProducerMessage{}msg.Topic = "revolution"msg.Key = sarama.StringEncoder("miles")msg.Value = sarama.StringEncoder("hello world...")// 發(fā)送消息pid, offset, err := client.SendMessage(msg)msg2 := &sarama.ProducerMessage{}msg2.Topic = "revolution"msg2.Key = sarama.StringEncoder("monroe")msg2.Value = sarama.StringEncoder("hello world2...")pid2, offset2, err := client.SendMessage(msg2)if err != nil {fmt.Println("send message failed,", err)return}fmt.Printf("pid:%v offset:%v\n", pid, offset)fmt.Printf("pid2:%v offset2:%v\n", pid2, offset2) }

Consumer

package mainimport ("sync""github.com/Shopify/sarama""fmt" )var wg sync.WaitGroupfunc main() {consumer, err := sarama.NewConsumer([]string{"172.16.65.210:9092"}, nil)if err != nil {fmt.Println("consumer connect error:", err)return}fmt.Println("connnect success...")defer consumer.Close()partitions, err := consumer.Partitions("revolution")if err != nil {fmt.Println("geet partitions failed, err:", err)return}for _, p := range partitions {partitionConsumer, err := consumer.ConsumePartition("revolution", p, sarama.OffsetOldest)if err != nil {fmt.Println("partitionConsumer err:", err)continue}wg.Add(1)go func(){for m := range partitionConsumer.Messages() {fmt.Printf("key: %s, text: %s, offset: %d\n", string(m.Key), string(m.Value), m.Offset)}wg.Done()}()}wg.Wait() }

?

?

轉(zhuǎn)載于:https://www.cnblogs.com/vincenshen/p/9824486.html

總結(jié)

以上是生活随笔為你收集整理的Golang 连接Kafka的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。