influxdb InfluxDB 是一個(gè)開源分布式時(shí)序、事件和指標(biāo)數(shù)據(jù)庫。使用 Go 語言編寫,無需外部依賴。其設(shè)計(jì)目標(biāo)是實(shí)現(xiàn)分布式和水平伸縮擴(kuò)展。
influxDB啟動(dòng)流程:
?1? 用docker下拉influxdb的鏡像
docker pull tutum/influxdb docekr
2 Docker環(huán)境下運(yùn)行influxdb
docker run -d -p 8083:8083 -p8086:8086 --expose 8090 --expose 8099 --name influxsrv tutum/influxdb
各個(gè)參數(shù)含義:
-d:容器在后臺(tái)運(yùn)行
-p:將容器內(nèi)端口映射到宿主機(jī)端口,格式為 宿主機(jī)端口:容器內(nèi)端口;
8083是influxdb的web管理工具端口
8086是influxdb的HTTP API端口
--expose:可以讓容器接受外部傳入的數(shù)據(jù)
--name:容器名稱 最后是鏡像名稱+tag,鏡像為tutum/influxdb,tag的值0.8.8指定了要運(yùn)行的版本,默認(rèn)是latest。
3 啟動(dòng)influxdb后,influxdb會(huì)啟動(dòng)一個(gè)內(nèi)部的HTTP server管理工具,用戶可以通過接入該web服務(wù)器來操作influxdb。
當(dāng)然,也可以通過CLI即命令行的方式訪問influxdb。
打開瀏覽器,輸入http://127.0.0.1:8083,訪問管理工具的主頁
4 Influxdb客戶端 可以參考里面例子
https://github.com/influxdata/influxdb/tree/master/client
PS. Influxdb原理詳解
https://www.linuxdaxue.com/influxdb-principle.html
Grafana Grafana 是一個(gè)開源的時(shí)序性統(tǒng)計(jì)和監(jiān)控平臺(tái),支持例如 elasticsearch、graphite、influxdb 等眾多的數(shù)據(jù)源,并以功能強(qiáng)大的界面編輯器著稱。
官網(wǎng):https://grafana.com/
grafana啟動(dòng)流程:
1 docker 拉取鏡像
docker run -d --name=grafana -p 3000:3000 grafana/grafana
2 訪問管理工具的主頁
瀏覽器127.0.0.1:3000 ,? 登錄 grafana的默認(rèn)端口是3000,用戶名和密碼為 admin / admin,配置文件/etc/grafana/grafana.ini,更改配置文件后需要重啟grafana。
3. 創(chuàng)建數(shù)據(jù)庫,綁定influxdb
4. 創(chuàng)建一個(gè)新的面板
home —> New Dashboard —> Graph —> 點(diǎn)擊,Edit
5 Edit中的Metrics就是構(gòu)造一個(gè)SQL的查詢語句
Golang打點(diǎn) 監(jiān)控日志程序通過 influxdb 將需要的內(nèi)容打點(diǎn)到influxdb?
1.導(dǎo)入 github.com/influxdata/influxdb/client/v2
2.創(chuàng)建influxdb client
// Create a new HTTPClientc, err := client.NewHTTPClient(client.HTTPConfig{Addr: addr,Username: username,Password: password,})
if err != nil {log.Fatal(err)}defer c.Close()
復(fù)制代碼
3.創(chuàng)建需要打的點(diǎn)的格式,類型
// Create a new point batchbp, err := client.NewBatchPoints(client.BatchPointsConfig{Database: database,Precision: precision,})
if err != nil {log.Fatal(err)}
復(fù)制代碼
4.創(chuàng)建點(diǎn),將點(diǎn)添加進(jìn)influxdb數(shù)據(jù)庫
// Create a point and add to batch//Tags:Path,Method,Scheme,Statustags := map[string]string{
"Path" : v.Path,
"Method" : v.Method,
"Scheme" : v.Scheme,
"Status" : v.Status,}fields := map[string]interface{}{
"UpstreamTime" : v.UpstreamTime,
"RequestTime" : v.RequestTime,
"BytesSent" : v.BytesSent,}pt, err := client.NewPoint(
"nginx_log" , tags, fields, v.TimeLocal)
if err != nil {log.Fatal(err)}bp.AddPoint(pt)// Write the batch
if err := c.Write(bp); err != nil {log.Fatal(err)}
復(fù)制代碼 Golang 完整代碼 imooc.log日志格式如下:
172.0.0.12 - - [02/May/2018:17:17:35 +0000] http "GET /foo?query=t HTTP/1.0" 200 2133 "-" "KeepAliveClient" "-" 1.005 1.854
172.0.0.12 - - [02/May/2018:17:17:36 +0000] http "POST /bar?query=t HTTP/1.0" 300 2133 "-" "KeepAliveClient" "-" 1.025 1.854
代碼邏輯主要是? 通過讀取模塊讀取imooc.log日志文件中日志,然后通過正則表達(dá)式,一行一行解析獲取數(shù)據(jù),并通過寫入模塊將數(shù)據(jù)通過influxdb客戶端打點(diǎn),最后通過grafana去顯示數(shù)據(jù)圖形.
package mainimport (
"bufio" "fmt" "github.com/influxdata/influxdb/client/v2" "io" "net/url" "os" "regexp" "strconv" "strings" "time" "flag" "log" "net/http" "encoding/json"
)const (TypeHandleLine = 0TypeErrNum = 1TpsIntervalTime = 5
)var TypeMonitorChan = make(chan int,200)
type Message struct {TimeLocal time.TimeBytesSent intPath, Method, Scheme, Status stringUpstreamTime, RequestTime
float 64
}//系統(tǒng)狀態(tài)監(jiān)控
type SystemInfo struct {HandleLine int `json:
"handleLine" ` //總處理日志行數(shù)Tps
float 64 `json:
"tps" ` //系統(tǒng)吞吐量ReadChanLen int `json:
"readChanLen" ` //
read channel 長度WriterChanLen int `json:
"writeChanLen" ` //write channel 長度RunTime string `json:
"ruanTime" ` //運(yùn)行總時(shí)間ErrNum int `json:
"errNum" ` //錯(cuò)誤數(shù)
}
type Monitor struct {startTime time.Timedata SystemInfotpsSli []inttps
float 64
}func (m *Monitor)start(lp *LogProcess) {go
func () {
for n := range TypeMonitorChan {switch n {
case TypeErrNum:m.data.ErrNum += 1
case TypeHandleLine:m.data.HandleLine += 1}}}()ticker := time.NewTicker(time.Second *TpsIntervalTime)go
func () {
for {<-ticker.Cm.tpsSli = append(m.tpsSli,m.data.HandleLine)
if len(m.tpsSli) > 2 {m.tpsSli = m.tpsSli[1:]m.tps =
float 64(m.tpsSli[1] - m.tpsSli[0])/TpsIntervalTime}}}()http.HandleFunc(
"/monitor" , func(writer http.ResponseWriter, request *http.Request) {m.data.RunTime = time.Now().Sub(m.startTime).String()m.data.ReadChanLen = len(lp.rc)m.data.WriterChanLen = len(lp.wc)m.data.Tps = m.tpsret ,_ := json.MarshalIndent(m.data,
"" ,
"\t" )io.WriteString(writer,string(ret))})http.ListenAndServe(
":9193" ,nil)
}
type Reader interface {Read(rc chan []byte)
}
type Writer interface {Writer(wc chan *Message)
}
type LogProcess struct {rc chan []bytewc chan *Message
read Readerwrite Writer
}
type ReadFromFile struct {path string //讀取文件的路徑
}//讀取模塊
func (r *ReadFromFile) Read(rc chan []byte) {//打開文件f, err := os.Open(r.path)fmt.Println(r.path)
if err != nil {panic(fmt.S
printf (
"open file err :" , err.Error()))}//從文件末尾開始逐行讀取文件內(nèi)容f.Seek(0, 2) //2,代表將指正移動(dòng)到末尾rd := bufio.NewReader(f)
for {line, err := rd.ReadBytes(
'\n' ) //連續(xù)讀取內(nèi)容知道需要
'\n' 結(jié)束
if err == io.EOF {time.Sleep(5000 * time.Microsecond)
continue }
else if err != nil {panic(fmt.S
printf (
"ReadBytes err :" , err.Error()))}TypeMonitorChan <- TypeHandleLinerc <- line[:len(line)-1]}}
type WriteToinfluxDB struct {influxDBDsn string //influx data
source
}//寫入模塊
/**1.初始化influxdb client2. 從Write Channel中讀取監(jiān)控?cái)?shù)據(jù)3. 構(gòu)造數(shù)據(jù)并寫入influxdb
*/
func (w *WriteToinfluxDB) Writer(wc chan *Message) {infSli := strings.Split(w.influxDBDsn,
"@" )addr := infSli[0]username := infSli[1]password := infSli[2]database := infSli[3]precision := infSli[4]// Create a new HTTPClientc, err := client.NewHTTPClient(client.HTTPConfig{Addr: addr,Username: username,Password: password,})
if err != nil {log.Fatal(err)}defer c.Close()
for v := range wc {// Create a new point batchbp, err := client.NewBatchPoints(client.BatchPointsConfig{Database: database,Precision: precision,})
if err != nil {log.Fatal(err)}// Create a point and add to batch//Tags:Path,Method,Scheme,Statustags := map[string]string{
"Path" : v.Path,
"Method" : v.Method,
"Scheme" : v.Scheme,
"Status" : v.Status,}fields := map[string]interface{}{
"UpstreamTime" : v.UpstreamTime,
"RequestTime" : v.RequestTime,
"BytesSent" : v.BytesSent,}fmt.Println(
"taps:" ,tags)fmt.Println(
"fields:" ,fields)pt, err := client.NewPoint(
"nginx_log" , tags, fields, v.TimeLocal)
if err != nil {log.Fatal(err)}bp.AddPoint(pt)// Write the batch
if err := c.Write(bp); err != nil {log.Fatal(err)}// Close client resources
if err := c.Close(); err != nil {log.Fatal(err)}log.Println(
"write success" )}}//解析模塊
func (l *LogProcess)
Process () {/**172.0.012 - - [04/Mar/2018:13:49:52 +0000] http
"GET /foo?query=t HTTP/1.0" 200 2133
"-" "KeepAliveClient" "-" 1.005 1.854([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\
"([^" ]+)\
"\s+(\d{3})\s+(\d+)\s+\"([^" ]+)\
"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+)*/r := regexp.MustCompile(`([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^" ]+)\
"\s+(\d{3})\s+(\d+)\s+\"([^" ]+)\
"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+)`)for v := range l.rc {ret := r.FindStringSubmatch(string(v))if len(ret) != 14 {TypeMonitorChan <- TypeErrNumfmt.Println(" FindStringSubmatch fail:
", string(v))fmt.Println(len(ret))continue}message := &Message{}//時(shí)間: [04/Mar/2018:13:49:52 +0000]loc, _ := time.LoadLocation(" Asia/Shanghai
")t, err := time.ParseInLocation(" 02/Jan/2006:15:04:05 +0000
", ret[4], loc)if err != nil {TypeMonitorChan <- TypeErrNumfmt.Println(" ParseInLocation fail:
", err.Error(), ret[4])}message.TimeLocal = t//字符串長度: 2133byteSent, _ := strconv.Atoi(ret[8])message.BytesSent = byteSent//" GET /foo?query=t HTTP/1.0
"reqSli := strings.Split(ret[6], " ")if len(reqSli) != 3 {TypeMonitorChan <- TypeErrNumfmt.Println(" strings.Split fail:
", ret[6])continue}message.Method = reqSli[0]u, err := url.Parse(reqSli[1])if err != nil {TypeMonitorChan <- TypeErrNumfmt.Println(" url parse fail:
", err)continue}message.Path = u.Path//httpmessage.Scheme = ret[5]//code: 200message.Status = ret[7]//1.005upstreamTime, _ := strconv.ParseFloat(ret[12], 64)message.UpstreamTime = upstreamTime//1.854requestTime, _ := strconv.ParseFloat(ret[13], 64)message.RequestTime = requestTime//fmt.Println(message)l.wc <- message}
}/**
分析監(jiān)控需求:某個(gè)協(xié)議下的某個(gè)請求在某個(gè)請求方法的 QPS&響應(yīng)時(shí)間&流量*/
func main() {var path, influDsn stringflag.StringVar(&path, " path
", " ./imooc.log
", " read file path
")flag.StringVar(&influDsn, " influxDsn
", " http://127.0.01:8086@imooc@imoocpass@imooc@s
", " influx data
source ")flag.Parse()r := &ReadFromFile{path: path,}w := &WriteToinfluxDB{influxDBDsn: influDsn,}lp := &LogProcess{rc: make(chan []byte,200),wc: make(chan *Message),read: r,write: w,}go lp.read.Read(lp.rc)for i:=1;i<2 ; i++ {go lp.Process()}for i:=1;i<4 ; i++ {go lp.write.Writer(lp.wc)}fmt.Println(" begin !!!
")m:= &Monitor{startTime:time.Now(),data:SystemInfo{},}m.start(lp)
}
復(fù)制代碼
總結(jié)
以上是生活随笔 為你收集整理的influxDB+grafana 日志监控平台(Golang) 的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔 推薦給好友。