日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java 高并发mqtt服务器_Boomer 实战压测 mqtt,2w 并发轻松实现

發布時間:2024/9/27 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java 高并发mqtt服务器_Boomer 实战压测 mqtt,2w 并发轻松实现 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

// main.go

// 代碼僅供參考,無法直接運行.

package main

import (

"bytes"

"encoding/csv"

"fmt"

MQTT "github.com/eclipse/paho.mqtt.golang"

"github.com/myzhan/boomer"

"io"

"io/ioutil"

"log"

"os"

"strconv"

"strings"

"sync"

"time"

)

var rows [][]string // 讀取csv文件保存到這里

var clientTopic []map[string]MQTT.Client

var conn = 0 // 調試用

var failCount = 0 // 初始化失敗數量

var i = 0 // 控制并發

var j = 1 // 記錄消息發送成功

var f = 1 // 記錄消息發送失敗

var nowStr = strconv.Itoa(int(time.Now().Unix())) // 當前時間戳,用來做后續查詢的消息的標識符

func newConn(c MQTT.Client, clientId string, group *sync.WaitGroup) {

defer func() {

group.Add(-1)

err := recover()

if err != nil {

failCount++

fmt.Println("login fail clientId: ", clientId)

}

}()

token := c.Connect()

if token.Wait() && token.Error() != nil {

panic(token.Error())

}

// 組裝topic

topic := fmt.Sprintf("msg/%s/supply", clientId)

temp := make(map[string]MQTT.Client)

temp[topic] = c

clientTopic = append(clientTopic, temp)

conn++ // 調試用

}

func initClients() {

var wg sync.WaitGroup

server := "server_ip:1883"

for i := 0; i < len(rows); i++ {

wg.Add(1)

clientId, userName, passWord := rows[i][0], rows[i][1], rows[i][2]

opts := MQTT.NewClientOptions().AddBroker(server)

opts.SetUsername(userName)

opts.SetPassword(passWord)

opts.SetClientID(clientId)

opts.SetKeepAlive(300 * time.Second)

c := MQTT.NewClient(opts)

go newConn(c, clientId, &wg)

}

wg.Wait() // 等到所有協程執行完成

fmt.Printf("init finish, clients len is %d \n", len(clientTopic))

fmt.Printf("conn: %d \n", conn)

fmt.Printf("failCount: %d \n", failCount)

}

func initCsvData() {

pwd, _ := os.Getwd()

b, err := ioutil.ReadFile(pwd + "/clients.csv")

fs := bytes.NewBuffer(b)

if err != nil {

log.Fatalf("can not open the file, err is %+v", err)

}

r := csv.NewReader(fs)

//針對大文件,一行一行的讀取文件

for {

row, err := r.Read()

if err != nil && err != io.EOF {

log.Fatalf("can not read, err is %+v", err)

}

if err == io.EOF {

break

}

rows = append(rows, row)

}

}

func login() {

server := "server_ip:port"

clientId, userName, passWord := rows[i][0], rows[i][1], rows[i][2]

start := time.Now()

opts := MQTT.NewClientOptions().AddBroker(server)

opts.SetUsername(userName)

opts.SetPassword(passWord)

opts.SetClientID(clientId)

c := MQTT.NewClient(opts)

token := c.Connect()

elapsed := time.Since(start)

if token.Error() == nil {

log.Println("success" + strconv.Itoa(j))

boomer.RecordSuccess("tcp", "login", elapsed.Nanoseconds()/int64(time.Millisecond), int64(10))

} else {

log.Println(token.Error())

boomer.RecordFailure("tcp", "login", elapsed.Nanoseconds()/int64(time.Millisecond), clientId)

}

c.Disconnect(5)

// avoid out of array

if i < len(clientTopic)-1 {

i++

} else {

i = 0

}

j++

}

func sendMsg() {

start := time.Now()

msgId := "msg" + strconv.Itoa(i)

var clientId string

var topic string

var c MQTT.Client

for k, v := range clientTopic[i] {

clientId = k[6:19]

topic = k

c = v // v就是一個connected的client

}

deviceTime := nowStr

str := []string{msgId, clientId, deviceTime}

msgPayload := strings.Join(str, "|")

if c.IsConnected() == true {

token := c.Publish(topic, 1, false, msgPayload)

token.Wait() 等待消息發送完成,雖然會拉低并發,但必須要這么做,確保消息發送成功

elapsed := time.Since(start)

if token.Error() == nil {

fmt.Printf("this topic name is: %s \n", topic)

fmt.Printf("this topic payload is: %s \n", msgPayload)

fmt.Printf("success msg index: %v elapsed: %v \n", j, elapsed)

j++ // 消息發送成功, 記錄一條,并且也給locust記錄一條,方便后續校對數據量

boomer.RecordSuccess("tcp", "task", elapsed.Nanoseconds()/int64(time.Millisecond), int64(j))

// 避免數組越界

if i < len(clientTopic)-1 {

i++

} else {

i = 0

}

} else {

boomer.RecordFailure("tcp", "task", elapsed.Nanoseconds()/int64(time.Millisecond), msgPayload)

fmt.Printf("發送失敗, fail msg index: %v \n", f)

}

} else {

if token := c.Connect(); token.Wait() && token.Error() != nil {

elapsed := time.Since(start)

fmt.Printf("fail msg index: %v \n", f)

f++

boomer.RecordFailure("tcp", "task", elapsed.Nanoseconds()/int64(time.Millisecond), msgPayload)

}

}

}

func main() {

initCsvData()

initClients()

task1 := &boomer.Task{

Name: "myTask",

Weight: 1,

Fn: sendMsg,

}

//task2 := &boomer.Task{

// Name: "login",

// Weight: 1,

// Fn: login,

//}

boomer.Run(task1)

}

總結

以上是生活随笔為你收集整理的java 高并发mqtt服务器_Boomer 实战压测 mqtt,2w 并发轻松实现的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。