日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

java 高并发mqtt服务器_Boomer 实战压测 mqtt,2w 并发轻松实现

發(fā)布時(shí)間:2024/9/27 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java 高并发mqtt服务器_Boomer 实战压测 mqtt,2w 并发轻松实现 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

// main.go

// 代碼僅供參考,無(wú)法直接運(yùn)行.

package main

import (

"bytes"

"encoding/csv"

"fmt"

MQTT "github.com/eclipse/paho.mqtt.golang"

"github.com/myzhan/boomer"

"io"

"io/ioutil"

"log"

"os"

"strconv"

"strings"

"sync"

"time"

)

var rows [][]string // 讀取csv文件保存到這里

var clientTopic []map[string]MQTT.Client

var conn = 0 // 調(diào)試用

var failCount = 0 // 初始化失敗數(shù)量

var i = 0 // 控制并發(fā)

var j = 1 // 記錄消息發(fā)送成功

var f = 1 // 記錄消息發(fā)送失敗

var nowStr = strconv.Itoa(int(time.Now().Unix())) // 當(dāng)前時(shí)間戳,用來(lái)做后續(xù)查詢的消息的標(biāo)識(shí)符

func newConn(c MQTT.Client, clientId string, group *sync.WaitGroup) {

defer func() {

group.Add(-1)

err := recover()

if err != nil {

failCount++

fmt.Println("login fail clientId: ", clientId)

}

}()

token := c.Connect()

if token.Wait() && token.Error() != nil {

panic(token.Error())

}

// 組裝topic

topic := fmt.Sprintf("msg/%s/supply", clientId)

temp := make(map[string]MQTT.Client)

temp[topic] = c

clientTopic = append(clientTopic, temp)

conn++ // 調(diào)試用

}

func initClients() {

var wg sync.WaitGroup

server := "server_ip:1883"

for i := 0; i < len(rows); i++ {

wg.Add(1)

clientId, userName, passWord := rows[i][0], rows[i][1], rows[i][2]

opts := MQTT.NewClientOptions().AddBroker(server)

opts.SetUsername(userName)

opts.SetPassword(passWord)

opts.SetClientID(clientId)

opts.SetKeepAlive(300 * time.Second)

c := MQTT.NewClient(opts)

go newConn(c, clientId, &wg)

}

wg.Wait() // 等到所有協(xié)程執(zhí)行完成

fmt.Printf("init finish, clients len is %d \n", len(clientTopic))

fmt.Printf("conn: %d \n", conn)

fmt.Printf("failCount: %d \n", failCount)

}

func initCsvData() {

pwd, _ := os.Getwd()

b, err := ioutil.ReadFile(pwd + "/clients.csv")

fs := bytes.NewBuffer(b)

if err != nil {

log.Fatalf("can not open the file, err is %+v", err)

}

r := csv.NewReader(fs)

//針對(duì)大文件,一行一行的讀取文件

for {

row, err := r.Read()

if err != nil && err != io.EOF {

log.Fatalf("can not read, err is %+v", err)

}

if err == io.EOF {

break

}

rows = append(rows, row)

}

}

func login() {

server := "server_ip:port"

clientId, userName, passWord := rows[i][0], rows[i][1], rows[i][2]

start := time.Now()

opts := MQTT.NewClientOptions().AddBroker(server)

opts.SetUsername(userName)

opts.SetPassword(passWord)

opts.SetClientID(clientId)

c := MQTT.NewClient(opts)

token := c.Connect()

elapsed := time.Since(start)

if token.Error() == nil {

log.Println("success" + strconv.Itoa(j))

boomer.RecordSuccess("tcp", "login", elapsed.Nanoseconds()/int64(time.Millisecond), int64(10))

} else {

log.Println(token.Error())

boomer.RecordFailure("tcp", "login", elapsed.Nanoseconds()/int64(time.Millisecond), clientId)

}

c.Disconnect(5)

// avoid out of array

if i < len(clientTopic)-1 {

i++

} else {

i = 0

}

j++

}

func sendMsg() {

start := time.Now()

msgId := "msg" + strconv.Itoa(i)

var clientId string

var topic string

var c MQTT.Client

for k, v := range clientTopic[i] {

clientId = k[6:19]

topic = k

c = v // v就是一個(gè)connected的client

}

deviceTime := nowStr

str := []string{msgId, clientId, deviceTime}

msgPayload := strings.Join(str, "|")

if c.IsConnected() == true {

token := c.Publish(topic, 1, false, msgPayload)

token.Wait() 等待消息發(fā)送完成,雖然會(huì)拉低并發(fā),但必須要這么做,確保消息發(fā)送成功

elapsed := time.Since(start)

if token.Error() == nil {

fmt.Printf("this topic name is: %s \n", topic)

fmt.Printf("this topic payload is: %s \n", msgPayload)

fmt.Printf("success msg index: %v elapsed: %v \n", j, elapsed)

j++ // 消息發(fā)送成功, 記錄一條,并且也給locust記錄一條,方便后續(xù)校對(duì)數(shù)據(jù)量

boomer.RecordSuccess("tcp", "task", elapsed.Nanoseconds()/int64(time.Millisecond), int64(j))

// 避免數(shù)組越界

if i < len(clientTopic)-1 {

i++

} else {

i = 0

}

} else {

boomer.RecordFailure("tcp", "task", elapsed.Nanoseconds()/int64(time.Millisecond), msgPayload)

fmt.Printf("發(fā)送失敗, fail msg index: %v \n", f)

}

} else {

if token := c.Connect(); token.Wait() && token.Error() != nil {

elapsed := time.Since(start)

fmt.Printf("fail msg index: %v \n", f)

f++

boomer.RecordFailure("tcp", "task", elapsed.Nanoseconds()/int64(time.Millisecond), msgPayload)

}

}

}

func main() {

initCsvData()

initClients()

task1 := &boomer.Task{

Name: "myTask",

Weight: 1,

Fn: sendMsg,

}

//task2 := &boomer.Task{

// Name: "login",

// Weight: 1,

// Fn: login,

//}

boomer.Run(task1)

}

總結(jié)

以上是生活随笔為你收集整理的java 高并发mqtt服务器_Boomer 实战压测 mqtt,2w 并发轻松实现的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。