日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

mysql switch binlog_如何使用 Golang 处理 MySQL 的 binlog

發布時間:2025/3/19 数据库 16 豆豆
生活随笔 收集整理的這篇文章主要介紹了 mysql switch binlog_如何使用 Golang 处理 MySQL 的 binlog 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

大家好,我是 Artem,一名 Golang 開發。我們的團隊花費了大量時間訓練 MySQL binlog。這里整合一些簡單用法,不會放過任何隱藏的陷阱。示例代碼將在最后顯示。

每次從 數據庫 查詢的返回結果中拉取用戶信息時,主項目中會有高負載模塊。此時使用緩存是一個不錯的建議,但是什么時候重置緩存呢?這需要由數據來決定更新時間。

MySQL 的主從復制是一個很棒的設計。而我們的守護進程可以視為一個通過 binlog 獲取數據的 slave,binlog 設置成 row 格式。這樣就能使用所有的數據庫命令,但事務下的命令只有在提交后才會記錄。在達到內存的使用限制后(默認為 1GB),會開啟另一個文件,每個新文件的名稱后都會有一個增量。

本文將分為以下兩部分:

如何處理 binlog 中的新數據

如何設置和擴展

part 1. 快速運行

連接到一個新的 channel(chanal 是一個庫的標簽)。我們將使用 binlog 中的 row 格式https://mariadb.com/kb/en/library/binary-log-formats/

func binLogListener() {

c, err := getDefaultCanal()

if err == nil {

coords, err := c.GetMasterPos()

if err == nil {

c.SetEventHandler(&binlogHandler{})

c.RunFrom(coords)

}

}

}

func getDefaultCanal() (*canal.Canal, error) {

cfg := canal.NewDefaultConfig()

cfg.Addr = fmt.Sprintf("%s:%d", "127.0.0.1", 3306)

cfg.User = "root"

cfg.Password = "root"

cfg.Flavor = "mysql"

cfg.Dump.ExecutionPath = ""

return canal.NewCanal(cfg)

}

現在來進行封裝

type binlogHandler struct {

canal.DummyEventHandler // Dummy handler from external lib

BinlogParser // Our custom helper

}

func (h *binlogHandler) OnRow(e *canal.RowsEvent) error {return nil}

func (h *binlogHandler) String() string {return "binlogHandler"}

然后我們可以在 OnRow() 方法中添加一些代碼邏輯,讓他更好用

func (h *binlogHandler) OnRow(e *canal.RowsEvent) error {

var n int //starting value

var k int // step

switch e.Action {

case canal.DeleteAction:

return nil // not covered in example

case canal.UpdateAction:

n = 1

k = 2

case canal.InsertAction:

n = 0

k = 1

}

for i := n; i < len(e.Rows); i += k {

key := e.Table.Schema + "." + e.Table.Name

switch key {

case User{}.SchemaName() + "." + User{}.TableName():

/*

Real data parsing

*/

}

}

return nil

}

這個包裝器的主要邏輯是解析接收到的數據。我們可以通過更新的兩個條件獲取數據(第一條包含初始數據,第二條則是更新數據),同時也支持多行插入和多行更新。在這種情況下,執行 UPDATE 操作時,每次都要使用第二個條件。而執行 INSERT 時,需要操作每一行,為此我們需要使用 n 和 k 變量。

從 binlog 中獲取一個模版,逐行加載數據,每個 column 都標明注釋:

type User struct {

Id int `gorm:"column:id"`

Name string `gorm:"column:name"`

Status string `gorm:"column:status"`

Created time.Time `gorm:"column:created"`

}

func (User) TableName() string {

return "User"

}

func (User) SchemaName() string {

return "Test"

}

MySQL 的表結構體

CREATE TABLE Test.User(

id INT AUTO_INCREMENT PRIMARY KEY,

name VARCHAR(40) NULL ,

status ENUM("active","deleted") DEFAULT "active",

created TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL ON UPDATE CURRENT_TIMESTAMP

)

ENGINE =InnoDB;

現在使用代碼的方式實現

user := User{}

h.GetBinLogData(&user, e, i)

最后,我們可以通過新增加的用戶變量獲取數據。打印出來讓它看起來更美觀。

if e.Action == canal.UpdateAction {

oldUser := User{}

h.GetBinLogData(&oldUser, e, i-1)

fmt.Printf("User %d is updated from name %s to name %s\n", user.Id, oldUser.Name, user.Name, )

} else {

fmt.Printf("User %d is created with name %s\n", user.Id, user.Name, )

}

太好了,代碼即將實現, "Hello, binlog world":

func main() {

go binLogListener()

// placeholder for your handsome code

time.Sleep(2 * time.Minute)

fmt.Print("Thx for watching")

}

新增和更新用戶:

INSERT INTO Test.User (`id`,`name`) VALUE (1,"Jack");

UPDATE Test.User SET name="Jonh" WHERE id=1;

結果展示:

User 1 is created with name Jack

User 1 name changed from Jack to Jonh

這段代碼通過 binlog 來解析新增的 row,并通過數據表獲取我們需要的數據,在結構體中解析數據并輸出結果。我沒有介紹所有的數據解析器(BinlogParser),這其中還隱藏了一些 hydration 邏輯模型。

part 2. 正如 cobb 所說,我們需要更加深入了解

解析器的隱藏部分是基于反射,可以使用下面這種方式來進行 hydration 模型。

h.GetBinLogData(&user, e, i)

使用一些簡單的數據類型來處理

bool

int

float64

string

time.Time

也可以通過 JSON 來解析結構體。

如果你需要更多的數據類型 , 或者你只是想知道 binlog 是如何進行解析工作的 , 最好的辦法是自己擴展解析類型。

下面是一個int

類型的實例 :

type User struct {

Id int `gorm:"column:id"`

}

我們可以通過反射來獲取類型名稱。parseTagSetting 方法可以使注釋更便于使用:

element := User{} //In common cases we have interface, but here we will start with model

v := reflect.ValueOf(element)

s := reflect.Indirect(v)

t := s.Type()

num := t.NumField()

parsedTag := parseTagSetting(t.Field(k).Tag)

if columnName, ok = parsedTag["COLUMN"]; !ok || columnName == "COLUMN" {

continue

}

for k := 0; k < num; k++ {

name := s.Field(k).Type().Name()

switch name {

case "int":

// here we deal with an incoming row

}

}

獲取了類型名稱的同時也可以通過反射來設置它的值

func (v Value) SetInt(x int64) {//...

解析注釋幫助器(從 Gorm 庫獲取)

func parseTagSetting(tags reflect.StructTag) map[string]string {

setting := map[string]string{}

for _, str := range []string{tags.Get("sql"), tags.Get("gorm")} {

tags := strings.Split(str, ";")

for _, value := range tags {

v := strings.Split(value, ":")

k := strings.TrimSpace(strings.ToUpper(v[0]))

if len(v) >= 2 {

setting[k] = strings.Join(v[1:], ":")

} else {

setting[k] = k

}

}

}

return setting

}

解析器中有 int64 類型,我們可以創建一個將 int64 轉換為 row 類型的方法:

func (m *BinlogParser) intHelper(e *canal.RowsEvent, n int, columnName string) int64 {

columnId := m.getBinlogIdByName(e, columnName)

if e.Table.Columns[columnId].Type != schema.TYPE_NUMBER {

return 0

}

switch e.Rows[n][columnId].(type) {

case int8:

return int64(e.Rows[n][columnId].(int8))

case int32:

return int64(e.Rows[n][columnId].(int32))

case int64:

return e.Rows[n][columnId].(int64)

case int:

return int64(e.Rows[n][columnId].(int))

case uint8:

return int64(e.Rows[n][columnId].(uint8))

case uint16:

return int64(e.Rows[n][columnId].(uint16))

case uint32:

return int64(e.Rows[n][columnId].(uint32))

case uint64:

return int64(e.Rows[n][columnId].(uint64))

case uint:

return int64(e.Rows[n][columnId].(uint))

}

return 0

}

除了 getBinlogIdByName() 方法,所有東西看起來都是合理的。

需要使用 trivial 幫助器來處理 column 名而不是它的 id,這樣可以:

使用 Gorm 注釋來處理字段名:

在開頭和中間添加字段名時不需要額外修改:

使用字段名處理比 column3 更方便。

最后,我們加入以下處理:

s.Field(k).SetInt(m.intHelper(e, n, columnName))

還有兩個例子

ENUM: 我們將獲取的值作為索引——所以“ active ”狀態會被設置為 0。同樣的,我們也需要用 enum 字符串表示,而不是 id,這些可以從字段介紹中獲取。重要提示,值中的 1 描述的是 0 值索引字段,數組的值是從 0 開始的。

Enum 的解析如下:

func (m *BinlogParser) stringHelper(e *canal.RowsEvent, n int, columnName string) string {

columnId := m.getBinlogIdByName(e, columnName)

if e.Table.Columns[columnId].Type == schema.TYPE_ENUM {

values := e.Table.Columns[columnId].EnumValues //fields value

if len(values) == 0 || e.Rows[n][columnId] == nil {

return ""

}

return values[e.Rows[n][columnId].(int64)-1] // first id in result is zero one in values

}

}

存儲 JSON

這難道不是個好主意嗎? JSON 是 MySQL 引擎側的字符串,我們可以將序列化的數據指向解析器。為此,可以添加一個自定義 Gorm 注釋——“ fromJson ”,以下是不同數據之間的例子:

type JsonData struct {

Int int `gorm:"column:int"`

StructData TestData `gorm:"column:struct_data;fromJson"`

MapData map[string]string `gorm:"column:map_data;fromJson"`

SliceData []int `gorm:"column:slice_data;fromJson"`

}

type TestData struct {

Test string `json:"test"`

Int int `json:"int"`

}

雖然可以創造很多條件來實現,但是新增字段會損壞它。上 Stack Overflow 尋找答案的結果可能是,“如何從未知的 JSON 結構體解析 ? ” “ 不知道你為什么需要這樣,但你可以試試 ...”

將結構體轉換為接口可以實現:

if _, ok := parsedTag["FROMJSON"]; ok {

newObject := reflect.New(s.Field(k).Type()).Interface()

json := m.stringHelper(e, n, columnName)

jsoniter.Unmarshal([]byte(json), &newObject)

s.Field(k).Set(reflect.ValueOf(newObject).Elem().Convert(s.Field(k).Type()))

}

總結

以上是生活随笔為你收集整理的mysql switch binlog_如何使用 Golang 处理 MySQL 的 binlog的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 美女搞黄视频网站 | 欧美性生交大片免费看app麻豆 | 欧洲成人午夜精品无码区久久 | 蜜乳av一区 | 午夜亚洲一区 | 天天操天天操天天干 | 法国空姐在线观看免费 | 激情五月婷婷综合网 | 久久久精品影院 | 久久伊人成人网 | 日韩精品久久久久久 | 中国黄色片子 | 俺去久久| 成人区视频 | 一区免费 | 久久国产香蕉视频 | 搞黄视频在线观看 | 久久午夜神器 | 久久综合久久久久 | 精品国产一区三区 | 蜜臀久久精品久久久久 | 91娇羞白丝 | 午夜精品福利在线观看 | 欧美一区免费观看 | 99精品一区二区三区无码吞精 | 国产精品社区 | 国产在线观看www | 久久伊人成人网 | 亚洲福利一区二区三区 | 特种兵之深入敌后高清全集免费观看 | 丝袜一区二区三区四区 | 素人一区二区 | 一卡二卡三卡在线视频 | 伊人91| 日本少妇裸体 | 国产精品1页 | 美女黄视频网站 | av男人的天堂在线观看 | 97视频在线免费观看 | 高清一区二区在线 | 亚州av一区二区 | 亚洲AV永久无码国产精品国产 | 色狠狠一区二区三区 | 中文精品在线 | www国产成人 | 午夜影院0606 | 日韩在线视频免费 | 亚洲成人午夜影院 | 狠狠躁日日躁夜夜躁 | 成人精品 | 国产精品高潮呻吟久久aⅴ码 | 亚洲区免费 | xxxxxx欧美| 国产精品剧情一区 | 亚洲日本一区二区 | 91口爆一区二区三区在线 | 成年精品 | 亚洲精品第五页 | 色www| 7788色淫网站小说 | 亚洲热热 | 四虎影视成人 | 视频在线免费观看 | 国产日韩一级片 | 俺去草| 激情 小说 亚洲 图片 伦 | 久久精品黄 | 欧美日韩国产一级片 | 日韩一区不卡视频 | 久久精品首页 | 国产精品午夜电影 | 粉嫩av一区二区三区四区五区 | 中文字幕+乱码+中文乱码www | 天天色综合天天 | 五月天啪啪 | 美女色综合| 午夜影院免费体验区 | 白嫩少妇激情无码 | 国产精品97 | 日韩一级欧美 | 精品久久蜜桃 | 天天射av| 国产只有精品 | 免费91看片 | 国产伦精品一区二区三区妓女下载 | 教练含着她的乳奶揉搓揉捏动态图 | 妞干网这里只有精品 | 国产亚洲第一页 | 我们的生活第五季在线观看免费 | 男女高h视频 | 日本a级大片 | 国产九色在线 | 在线免费你懂的 | 搡国产老太xxx网站 高h喷汁呻吟3p | 韩国黄色大片 | 国产又爽又黄免费视频 | 午夜激情小视频 | 婷婷久久精品 | 国产日产精品一区二区三区四区 |