日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

mysql webhook_sql 数据定时发送webhook golang 服务

發布時間:2024/10/6 数据库 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 mysql webhook_sql 数据定时发送webhook golang 服务 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目的很簡單,主要是基于cron表達式定時獲取sql 數據庫數據(sql server,mysql,pg,clickhouse)同時通過webhook 發送到外部接口

幾個需求說明

應該基于配置管理,而不是代碼寫死的

支持多數據庫同時運行(減少運行的實例)

支持sql 數據的處理(對于不用webhook 的數據可能不一樣,我們需要處理下)

job 支持靈活的cron 表達式

應該輕量,簡單,容易使用

設計

簡單說明

參考了一個sql2slack 的服務,基于golang 編寫,使用hcl 進行配置管理,同時基于js 引擎處理數據,同時為了方便數據處理

提供了內置underscore ,對于cron 的處理基于golang 版本的cron表達式引擎 ,一些改進:基于hcl v2 版本,支持多js 引擎

(otto以及goja 基于配置指定),調整webhook 消息的發送,支持發送任意消息,同時調整cron支持秒的處理

job 格式說明

基于hcl ,Name 為label,實際上后邊可以調整下,將job 擴展為webhook以及db 模式的,

type Job struct {

Name ? ? ? ? ? ?string ? ? ? ?`hcl:",label"`

Driver ? ? ? ? ?string ? ? ? ?`hcl:"driver"`

DSN ? ? ? ? ? ? string ? ? ? ?`hcl:"dsn"`

Query ? ? ? ? ? string ? ? ? ?`hcl:"query"`

Webhook ? ? ? ? string ? ? ? ?`hcl:"webhook"`

Schedule ? ? ? ?string ? ? ? ?`hcl:"schedule"`

MessageString ? string ? ? ? ?`hcl:"message"`

MessageCompiled executor.JSVM `hcl:"-"`

Conn ? ? ? ? ? ?*sqlx.DB ? ? ?`hcl:"-"`

EngineName ? ? ?string ? ? ? ?`hcl:"jsengine"`

JSVM ? ? ? ? ? ?string ? ? ? ?`hcl:"-"`

Stmnt ? ? ? ? ? *sqlx.Stmt ? ?`hcl:"-"`

}

參考hcl配置

job tst {

webhook = "http://127.0.0.1:4195"

?

driver = "mysql"

?

dsn = "demo:demo@tcp(127.0.0.1:3306)/demo"

jsengine = "otto"

query = <

SELECT users.* FROM users

SQL

?

schedule = "* * * * * *"

?

message = <

if ( $rows.length < 1 ) {

return

}

log("this is a demo")

var msg = ?"";

_.chain($rows).pluck('name').each(function(name){

msg += name+"--------demo--from otto----";

})

var info = {

msgtype: "text",

text: {

content: msg

}

}

log(JSON.stringify(info))

send(JSON.stringify(info))

JS

}

代碼結構

├── Dockerfile

├── Makefile

├── README.md

├── cmd

│ ? ├── cli

│ ? │ ? ├── Dockerfile

│ ? │ ? ├── Makefile

│ ? │ ? ├── README.md

│ ? │ ? └── main.go

│ ? └── server

│ ? ? ? ├── Dockerfile

│ ? ? ? ├── Makefile

│ ? ? ? ├── README.md

│ ? ? ? └── main.go

├── demo.hcl

├── demo2.hcl

├── docker-compose.yaml

├── go.mod

├── go.sum

├── pkg

│ ? ├── agent

│ ? ├── buildinfo

│ ? │ ? └── version.go

│ ? ├── commands

│ ? │ ? ├── cli.go

│ ? │ ? └── server.go

│ ? ├── executor

│ ? │ ? └── jsengine.go

│ ? ├── jobs

│ ? │ ? └── job.go

│ ? ├── npm

│ ? │ ? └── bindata.go

│ ? ├── storage

│ ? └── webhooks

├── underscore-min.js

└── webhook.yaml

代碼說明

核心是 jsengine.go以及job.go,jsengine.go 包含了js 引擎的處理,job.go 主要是對于hcl 配置的解析以及cron 的處理

job.go

為了方便使用js engine 暴露了log $rows 以及send 發送,可以擴展,同時解析job

package jobs

?

import (

"encoding/json"

"errors"

"fmt"

"log"

"path/filepath"

?

"github.com/dop251/goja"

"github.com/go-resty/resty/v2"

"github.com/hashicorp/hcl/v2/hclsimple"

"github.com/jmoiron/sqlx"

"github.com/robertkrimen/otto"

"github.com/robfig/cron/v3"

"github.com/rongfengliang/sql-server-exporter/pkg/executor"

)

?

// Job is one type for sql data fetch

type Job struct {

Name ? ? ? ? ? ?string ? ? ? ?`hcl:",label"`

Driver ? ? ? ? ?string ? ? ? ?`hcl:"driver"`

DSN ? ? ? ? ? ? string ? ? ? ?`hcl:"dsn"`

Query ? ? ? ? ? string ? ? ? ?`hcl:"query"`

Webhook ? ? ? ? string ? ? ? ?`hcl:"webhook"`

Schedule ? ? ? ?string ? ? ? ?`hcl:"schedule"`

MessageString ? string ? ? ? ?`hcl:"message"`

MessageCompiled executor.JSVM `hcl:"-"`

Conn ? ? ? ? ? ?*sqlx.DB ? ? ?`hcl:"-"`

EngineName ? ? ?string ? ? ? ?`hcl:"jsengine"`

JSVM ? ? ? ? ? ?string ? ? ? ?`hcl:"-"`

Stmnt ? ? ? ? ? *sqlx.Stmt ? ?`hcl:"-"`

}

?

// ParseJobs parseJobs

func ParseJobs(jobsdir string) (map[string]*Job, *cron.Cron, error) {

var cronhub *cron.Cron = cron.New(cron.WithChain(

cron.SkipIfStillRunning(cron.DefaultLogger),

cron.Recover(cron.DefaultLogger),

), cron.WithParser(cron.NewParser(

cron.SecondOptional|cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor,

)))

files, err := filepath.Glob(filepath.Join(jobsdir, "*.hcl"))

if err != nil {

return nil, nil, err

}

?

result := map[string]*Job{}

?

for _, filename := range files {

var fileJobs struct {

Jobs []*Job `hcl:"job,block"`

}

if err != nil {

return nil, nil, err

}

err := hclsimple.DecodeFile(filename, nil, &fileJobs)

if err != nil {

return nil, nil, errors.New("#hcl: " + err.Error())

}

for _, job := range fileJobs.Jobs {

job.MessageCompiled, err = NewJSVM(job.EngineName, job.Name, fmt.Sprintf("(function(){%s})()", job.MessageString))

if err != nil {

return nil, nil, errors.New("#javascript: " + err.Error())

}

?

job.Conn, err = sqlx.Connect(job.Driver, job.DSN)

if err != nil {

return nil, nil, errors.New("#sql:" + job.Name + ": " + err.Error())

}

?

job.Stmnt, err = job.Conn.Preparex(job.Query)

if err != nil {

return nil, nil, errors.New("#sql:" + job.Name + ": " + err.Error())

}

?

if job.Webhook == "" {

return nil, nil, errors.New("#webhook:" + job.Name + ": webhook is required")

}

?

if err := (func(job *Job) error {

_, err := cronhub.AddFunc(job.Schedule, func() {

if err := job.Exec(); err != nil {

panic(err)

}

})

return err

})(job); err != nil {

return nil, nil, errors.New("#cron:" + job.Name + ":" + err.Error())

}

?

result[job.Name] = job

}

}

?

return result, cronhub, nil

}

?

// NewJSVM NewJSVM

func NewJSVM(engine string, name, src string) (executor.JSVM, error) {

var jsjvm executor.JSVM

switch engine {

case "goja":

jsjvm = executor.NewGojaExecutor(src, goja.New())

?

case "otto":

vm := otto.New()

script, err := vm.Compile(name, src)

if err != nil {

return nil, err

}

jsjvm = executor.NewOttoExecutor(src, vm, script)

default:

return nil, errors.New("not supported js engine")

?

}

return jsjvm, nil

}

?

// Exec job

func (j *Job) Exec() error {

rows, err := j.Stmnt.Queryx()

if err != nil {

return err

}

defer rows.Close()

var res []map[string]interface{}

for rows.Next() {

o := map[string]interface{}{}

if err := rows.MapScan(o); err != nil {

return err

}

for k, v := range o {

if nil == v {

continue

}

?

switch v.(type) {

case []uint8:

v = []byte(v.([]uint8))

default:

v, _ = json.Marshal(v)

}

?

var d interface{}

if nil == json.Unmarshal(v.([]byte), &d) {

o[k] = d

} else {

o[k] = string(v.([]byte))

}

}

res = append(res, o)

}

msg := ""

ctx := map[string]interface{}{

"$rows": res,

"log": ? log.Println,

"send": func(in ...interface{}) {

msg += fmt.Sprint(in...) + "\n"

},

}

if err := j.MessageCompiled.Execute(ctx); err != nil {

return err

}

_, err = resty.New().R().SetDoNotParseResponse(true).SetHeader("content-type", "application/json").SetBody(msg).Post(j.Webhook)

return err

}

jsengine.go

js 引擎的處理使用了JSVM 接口,同時實現了otto 以及goja 的擴展,都包含了underscore 庫

package executor

?

import (

"github.com/dop251/goja"

"github.com/dop251/goja_nodejs/require"

"github.com/robertkrimen/otto"

"github.com/rongfengliang/sql-server-exporter/pkg/npm"

)

?

// JSVM js Engine define

type JSVM interface {

// Execute job command

Execute(map[string]interface{}) error

}

?

// GojaExecutor goja js executor engine

type GojaExecutor struct {

Src string

VM ?*goja.Runtime

}

?

// Execute goja execute command

func (goja *GojaExecutor) Execute(context map[string]interface{}) error {

for k, v := range context {

goja.VM.Set(k, v)

}

_, err := goja.VM.RunString(goja.Src)

return err

}

?

// NewGojaExecutor GojaExecutor

func NewGojaExecutor(src string, vm *goja.Runtime) JSVM {

registry := require.NewRegistryWithLoader(func(path string) ([]byte, error) {

return npm.Asset(path)

})

m, _ := registry.Enable(vm).Require("underscore-min.js")

vm.Set("_", m)

return &GojaExecutor{

Src: src,

VM: ?vm,

}

}

?

// OttoExecutor Otto js executor engine

type OttoExecutor struct {

Src ? ?string

VM ? ? *otto.Otto

Script *otto.Script

}

?

// Execute goja execute command

func (otto *OttoExecutor) Execute(context map[string]interface{}) error {

for k, v := range context {

if err := otto.VM.Set(k, v); err != nil {

return err

}

}

_, err := otto.VM.Run(otto.Script)

return err

}

?

// Execute js exec script method with vm

func Execute(jsvm JSVM, context map[string]interface{}) error {

return jsvm.Execute(context)

}

?

// NewOttoExecutor OttoExecutor

func NewOttoExecutor(src string, vm *otto.Otto, script *otto.Script) JSVM {

return &OttoExecutor{

Src: ? ?src,

VM: ? ? vm,

Script: script,

}

}

server.go

主要是server 端啟動的,包含參數的解析以及加載依賴的job 基于urfave/cli/ 提供cli 的處理

package commands

?

import (

"fmt"

"log"

"os"

?

"github.com/rongfengliang/sql-server-exporter/pkg/buildinfo"

"github.com/rongfengliang/sql-server-exporter/pkg/jobs"

"github.com/urfave/cli/v2"

)

?

// Server server

type Server struct {

}

?

// NewServer return one Server Instance

func NewServer() *Server {

return &Server{}

}

?

// Run run

func (s *Server) Run() {

// TODos

// load jobs create scheduler info

app := cli.NewApp()

app.Usage = "basic sql server data fetch service"

app.Flags = []cli.Flag{

&cli.StringFlag{

Name: ?"jobs-dir",

Usage: "set job dirs",

Value: ".",

},

}

app.Commands = []*cli.Command{{

Name: ? ?"version",

Aliases: []string{"v"},

Usage: ? "print application version",

Action: func(c *cli.Context) error {

fmt.Println(buildinfo.Version)

return nil

},

}, {

Name: ?"start",

Usage: "start service",

Action: func(c *cli.Context) error {

fmt.Println(c.String("jobs-dir"))

jobdir := c.String("jobs-dir")

if jobdir != "" {

loadJobs, cronhub, err := jobs.ParseJobs(jobdir)

if err != nil {

log.Fatal(err.Error())

}

for _, v := range loadJobs {

log.Println(v.EngineName)

}

cronhub.Run()

}

return nil

},

}}

err := app.Run(os.Args)

if err != nil {

log.Fatal(err)

}

}

server 啟動入口

主要是提供了加載sql driver 以及調用server.go 的解析

package main

?

import (

_ "github.com/ClickHouse/clickhouse-go"

_ "github.com/denisenkom/go-mssqldb"

_ "github.com/go-sql-driver/mysql"

_ "github.com/lib/pq"

_ "github.com/robertkrimen/otto/underscore"

"github.com/rongfengliang/sql-server-exporter/pkg/commands"

)

?

func main() {

// create Server instance

s := commands.NewServer()

s.Run()

}

測試

構建

以及make,可以參考源碼

make

運行環境準備

docker-compose.yaml

version: "3"

services:

webhook:

image: jeffail/benthos

volumes:

- "./webhook.yaml:/benthos.yaml"

ports:

- "4195:4195"

mysql:

image: mysql:5.7.16

ports:

- 3306:3306

command: --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci

environment:

MYSQL_ROOT_PASSWORD: demo

MYSQL_DATABASE: demo

MYSQL_USER: demo

MYSQL_PASSWORD: demo

TZ: Asia/Shanghai

webhook.yaml

input:

type: broker

broker:

inputs:

- type: http_server

http_server:

path: /

processors:

- type: text

text:

operator: prepend

value: "get message: "

output:

type: stdout

數據準備

CREATE TABLE `users` (

`name` varchar(100) ,

`status` varchar(100)

) ENGINE=InnoDB

?

INSERT INTO demo.users

(name, status)

VALUES('dalong', '0');

INSERT INTO demo.users

(name, status)

VALUES('demo', '1');

INSERT INTO demo.users

(name, status)

VALUES('rong', '1');

運行效果

./bin/exporter-server start

說明

以上是一個簡單的介紹,詳細的可以參考github 代碼

參考資料

總結

以上是生活随笔為你收集整理的mysql webhook_sql 数据定时发送webhook golang 服务的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 91欧美精品| 中文字幕日韩三级 | 黄色三级网络 | 1024金沙人妻一区二区三区 | 午夜激情电影在线观看 | 粉嫩av懂色av蜜臀av分享 | 日韩一区二区不卡视频 | 黑森林av| 亚洲精品久久久久久久久久久 | 中文字幕有码在线视频 | 日本黄色片段 | 在线成人一区 | 久久久久欧美 | 一区二区三区激情视频 | 熟妇熟女乱妇乱女网站 | 一级特黄妇女高潮2 | 巨胸喷奶水www久久久免费动漫 | 美女屁股眼视频免费 | 中文字幕一区二区三区人妻电影 | 黄色99视频 | 夜夜操免费视频 | 成年网站免费在线观看 | 精品免费一区二区三区 | 日韩精品一区二区三区在线视频 | 日韩一级影视 | 99在线精品观看 | 成人动漫av| 亚洲精品国产精品国自产网站 | 狠狠热免费视频 | 密桃成熟时在线观看 | 日韩精品在线视频 | 720url在线观看免费版 | 国产裸体无遮挡 | 天天操天天干天天干 | 亚洲人在线 | 毛片无遮挡| 在线观看视频日韩 | 99欧美精品| 日韩欧美黄色片 | 奇米在线777 | 午夜精品福利影院 | 欧美一级片免费 | 五月婷婷丁香综合 | 日本精品在线观看视频 | 伊久久 | 亚洲成在线观看 | 亚洲国产精品无码观看久久 | 精品无人区无码乱码毛片国产 | 精品成人一区二区三区久久精品 | 日韩视频 中文字幕 | 日本在线一级片 | 久久九九免费视频 | 天堂一二三区 | 日本草草影院 | 91干视频 | 日韩高清久久 | 亚色视频在线 | 成年人在线免费 | 国产精品99精品无码视亚 | 97av在线播放| 久在线视频 | 天天干天| 日本黄色xxx | 日本黄色成人 | 亚洲剧情av | 日韩欧美视频在线 | 精品成人一区二区三区久久精品 | av久草 | 亚洲日日操 | 91亚洲国产成人精品性色 | 波多野结衣a级片 | 免费观看污视频 | 神马伦理视频 | 国产亚洲精品久久久久丝瓜 | 男女无遮挡猛进猛出 | 亚洲黄色免费在线观看 | 欧美日韩免费看 | 免费在线一区二区 | 荫蒂被男人添免费视频 | 伊人视频| 精品韩国一区二区三区 | 高h1v| 中文字幕一区二区三区乱码不卡 | 精品人人妻人人澡人人爽牛牛 | 欧美性猛交xxxx乱大交 | 欧美日韩国产传媒 | 欧美一区二区三区四区在线观看 | 毛片自拍 | 欧美偷拍精品 | 国产初高中真实精品视频 | 精品女同一区二区三区 | 日韩国产片 | 韩国三级在线播放 | 超碰人人爱| 一本一道久久a久久精品蜜桃 | www.久久99 | 一级大片免费 | 福利一区在线观看 | 国产重口老太伦 |