日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

Go 学习笔记(58)— Go 第三方库之 etcd/clientv3(连接客户端、PUT、GET、Lease、Op、Txn、Watch 基础概念说明)

發布時間:2023/11/28 生活经验 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Go 学习笔记(58)— Go 第三方库之 etcd/clientv3(连接客户端、PUT、GET、Lease、Op、Txn、Watch 基础概念说明) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1. 安裝 Golang 的 Etcd 包

我們使用 v3 版本的 etcd client , 首先通過 go get 下載并編譯安裝 etcd clinet v3

go get -v github.com/coreos/etcd/clientv3

該命令會將包下載到 $GOPATH/src/github.com/coreos/etcd/clientv3 中,所有相關依賴包會自動下載編譯,包括protobufgrpc等。

我們主要梳理一下使用 etcd 時經常用到的主要 API 并進行演示。

2. 連接客戶端

用程序訪問 etcd 首先要創建 client ,它需要傳入一個 Config 配置,這里傳了 2 個選項:

  • Endpointsetcd 的多個節點服務地址;
  • DialTimeout :創建 client 的首次連接超時時間,這里傳了 5 秒,如果 5 秒都沒有連接成功就會返回 err ,一旦 client 創建成功,我們就不用再關心后續底層連接的狀態了, client 內部會重連;
cli, err := clientv3.New(clientv3.Config{Endpoints:   []string{"localhost:2379"},// Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"}DialTimeout: 5 * time.Second,
})

返回的client,它的類型具體如下:

type Client struct {ClusterKVLeaseWatcherAuthMaintenance// Username is a user name for authentication.Username string// Password is a password for authentication.Password string// contains filtered or unexported fields
}

類型中的成員是 etcd 客戶端幾何核心功能模塊的具體實現,它們分別用于:

  • Cluster :向集群里增加 etcd 服務端節點之類,屬于管理員操作。
  • KV :我們主要使用的功能,即 K-V 鍵值庫的操作。
  • Lease :租約相關操作,比如申請一個 TTL=10 秒的租約(應用給 key 可以實現鍵值的自動過期)。
  • Watcher :觀察訂閱,從而監聽最新的數據變化。
  • Auth :管理 etcd 的用戶和權限,屬于管理員操作。
  • Maintenance :維護 etcd ,比如主動遷移 etcdleader 節點,屬于管理員操作。

我們需要使用什么功能,就去 client 里獲取對應的成員即可。

Client.KV 是一個 interface ,提供了關于 K-V 操作的所有方法:

type KV interface {Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)// Delete deletes a key, or optionally using WithRange(end), [key, end).Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)// Compact compacts etcd KV history before the given rev.Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)Do(ctx context.Context, op Op) (OpResponse, error)// Txn creates a transaction.Txn(ctx context.Context) Txn
}

我們通過方法clientv3.NewKV()來獲得 KV 接口的實現(實現中內置了錯誤重試機制):

kv := clientv3.NewKV(cli)

接下來,我們將通過kv操作 etcd 中的數據。

3. PUT 設置操作

putResp, err := kv.Put(context.TODO(),"/test/key1", "Hello etcd!")

第一個參數是 goroutine 的上下文 Context 。后面兩個參數分別是 keyvalue ,對于 etcd 來說, key=/test/key1 只是一個字符串而已,但是對我們而言卻可以模擬出目錄層級關系。

Put 函數的聲明如下:

// Put puts a key-value pair into etcd.
// Note that key,value can be plain bytes array and string is
// an immutable representation of that bytes array.
// To get a string of bytes, do string([]byte{0x10, 0x20}).
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)

除了上面例子中的三個的參數,還支持一個變長參數,可以傳遞一些控制項來影響 Put 的行為,例如可以攜帶一個 lease ID 來支持 key 過期。

Put 操作返回的是 PutResponse ,不同的 KV 操作對應不同的 response 結構,所有 KV 操作返回的 response 結構如下:

type (CompactResponse pb.CompactionResponsePutResponse     pb.PutResponseGetResponse     pb.RangeResponseDeleteResponse  pb.DeleteRangeResponseTxnResponse     pb.TxnResponse
)

程序代碼里導入 clientv3 后在 VSCode 中可以很快定位到 PutResponse 的定義文件中, PutResponse 只是 pb.PutResponse 的類型別名,通過VSCode跳轉過去后可以看到 PutResponse 的詳細定義。

type PutResponse struct {Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`// if prev_kv is set in the request, the previous key-value pair will be returned.PrevKv *mvccpb.KeyValue `protobuf:"bytes,2,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"`
}

Header 里保存的主要是本次更新的 revision 信息,而 PrevKv 可以返回 Put 覆蓋之前的 value 是什么(目前是 nil ,后面會說原因),把返回的 PutResponse 打印出來看一下:

fmt.Printf("PutResponse: %v, err: %v", putResp, err)
// output
// PutResponse: &{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:3 raft_term:7  <nil>}, err: <nil>%

我們需要判斷 err 來確定操作是否成功。

我們再 Put 其他 2 個 key ,用于后續演示:

kv.Put(context.TODO(),"/test/key2", "Hello World!")
// 再寫一個同前綴的干擾項
kv.Put(context.TODO(), "/testspam", "spam")

現在 /test 目錄下有兩個鍵: key1key2 , 而 /testspam 并不歸屬于 /test 目錄。

代碼示例:

package mainimport ("context""fmt""time""github.com/coreos/etcd/clientv3"
)func main() {config := clientv3.Config{Endpoints:   []string{"127.0.0.1:2379"}, // 集群列表DialTimeout: 5 * time.Second,}// 建立一個客戶端client, err := clientv3.New(config)if err != nil {fmt.Println(err)return}// 用于讀寫etcd的鍵值對kv := clientv3.NewKV(client)// clientv3.WithPrevKV() 是一個可選控制項,用于獲取在設置當前鍵值對之前的該鍵的鍵值對// 有了該控制項后,putResp 才有 PrevKv 的屬性,即獲取之前的鍵值對。// context.TODO() 表示當前還不知道用哪個 context 控制該操作,先用該字段占位putResp, err := kv.Put(context.TODO(), "/demo/A/B", "hello", clientv3.WithPrevKV())if err != nil {fmt.Println(err)}fmt.Println("putResp is ", putResp)fmt.Println("Revision:", putResp.Header.Revision)if putResp.PrevKv != nil {fmt.Println("PrevValue:", string(putResp.PrevKv.Value))}}

4. GET 獲取操作

使用 KVGet 方法來讀取給定鍵的值:

getResp, err := kv.Get(context.TODO(), "/test/key1")

其函數聲明如下:

// Get retrieves keys.
// By default, Get will return the value for "key", if any.
// When passed WithRange(end), Get will return the keys in the range [key, end).
// When passed WithFromKey(), Get returns keys greater than or equal to key.
// When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision;
// if the required revision is compacted, the request will fail with ErrCompacted .
// When passed WithLimit(limit), the number of returned keys is bounded by limit.
// When passed WithSort(), the keys will be sorted.
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)

Put 類似,函數注釋里提示我們可以傳遞一些控制參數來影響 Get 的行為,比如: WithFromKey 表示讀取從參數 key 開始遞增的所有 key ,而不是讀取單個 key

在上面的例子中,我沒有傳遞 opOption ,所以就是獲取 key=/test/key1 的最新版本數據。這里 err 并不能反饋出 key 是否存在(只能反饋出本次操作因為各種原因異常了),我們需要通過 GetResponse (實際上是 pb.RangeResponse )判斷 key 是否存在:

type RangeResponse struct {Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`// kvs is the list of key-value pairs matched by the range request.// kvs is empty when count is requested.Kvs []*mvccpb.KeyValue `protobuf:"bytes,2,rep,name=kvs" json:"kvs,omitempty"`// more indicates if there are more keys to return in the requested range.More bool `protobuf:"varint,3,opt,name=more,proto3" json:"more,omitempty"`// count is set to the number of keys within the range when requested.Count int64 `protobuf:"varint,4,opt,name=count,proto3" json:"count,omitempty"`
}

Kvs 字段,保存了本次 Get 查詢到的所有 k-v 對,因為上述例子只 Get 了一個單 key ,所以只需要判斷一下 len(Kvs) 是否等于 1 即可知道 key 是否存在。

RangeResponse.MoreCount,當我們使用withLimit()等選項進行Get時會發揮作用,相當于翻頁查詢。
接下來,我們通過給 Get 查詢增加 WithPrefix 選項,獲取 /test 目錄下的所有子元素:

rangeResp, err := kv.Get(context.TODO(), "/test/", clientv3.WithPrefix())

WithPrefix()是指查找以/test/為前綴的所有 key ,因此可以模擬出查找子目錄的效果。

etcd是一個有序的 k-v 存儲,因此 /test/ 為前綴的 key 總是順序排列在一起。

withPrefix()實際上會轉化為范圍查詢,它根據前綴/test/生成了一個前閉后開的key range:[“/test/”, “/test0”),為什么呢?因為比/大的字符是0,所以以/test0作為范圍的末尾,就可以掃描到所有以/test/為前綴的 key 了。

在之前,我們 Put 了一個/testspam鍵值,因為不符合/test/前綴(注意末尾的 / ),所以就不會被這次Get獲取到。但是,如果查詢的前綴是/test,那么/testspam就會被返回,使用時一定要特別注意。

打印 rangeResp.Kvs 可以看到獲得了兩個鍵值:

[key:"/test/key1" create_revision:2 mod_revision:13 version:6 value:"Hello etcd!"  
key:"/test/key2" create_revision:5 mod_revision:14 version:4 value:"Hello World!" ]

代碼示例:

package mainimport ("context""fmt""time""github.com/coreos/etcd/clientv3"
)func main() {config := clientv3.Config{Endpoints:   []string{"192.168.0.113:2379"}, // 集群列表DialTimeout: 5 * time.Second,}// 建立一個客戶端client, err := clientv3.New(config)if err != nil {fmt.Println(err)return}// 用于讀寫etcd的鍵值對kv := clientv3.NewKV(client)kv.Put(context.TODO(), "/demo/A/B", "BBB", clientv3.WithPrevKV())kv.Put(context.TODO(), "/demo/A/C", "CCC", clientv3.WithPrevKV())// 	讀取/demo/A/為前綴的所有key// clientv3.WithPrefix() , clientv3.WithCountOnly() 可以有多個并以 逗號分隔即可getResp, err := kv.Get(context.TODO(), "/demo/A/", clientv3.WithPrefix() /*,clientv3.WithCountOnly()*/)if err != nil {fmt.Println(err)}fmt.Println(getResp.Kvs, getResp.Count)for _, resp := range getResp.Kvs {fmt.Printf("key: %s, value:%s\n", string(resp.Key), string(resp.Value))}
}

輸出結果為:

[key:"/demo/A/B" create_revision:6 mod_revision:22 version:6 value:"BBB"  
key:"/demo/A/C" create_revision:7 mod_revision:23 version:12 value:"CCC" ] 2
key: /demo/A/B, value:BBB
key: /demo/A/C, value:CCC

5. Delete 操作

示例代碼:

package mainimport ("context""fmt""time""github.com/coreos/etcd/clientv3"
)func main() {config := clientv3.Config{Endpoints:   []string{"192.168.0.113:2379"}, // 集群列表DialTimeout: 5 * time.Second,}// 建立一個客戶端client, err := clientv3.New(config)if err != nil {fmt.Println(err)return}// 用于讀寫etcd的鍵值對kv := clientv3.NewKV(client)kv.Put(context.TODO(), "/demo/A/B1", "BBB", clientv3.WithPrevKV())kv.Put(context.TODO(), "/demo/A/B2", "CCC", clientv3.WithPrevKV())kv.Put(context.TODO(), "/demo/A/B3", "DDD", clientv3.WithPrevKV())/*clientv3.WithFromKey() 表示針對的key操作是大于等于當前給定的keyclientv3.WithPrevKV() 表示返回的 response 中含有之前刪除的值,否則下面的 delResp.PrevKvs 為空*/delResp, err := kv.Delete(context.TODO(), "/demo/A/B",clientv3.WithFromKey(), clientv3.WithPrevKV())if err != nil {fmt.Println(err)}// 查看被刪除的 key 和 value 是什么if delResp.PrevKvs != nil {// if len(delResp.PrevKvs) != 0 {for _, kvpair := range delResp.PrevKvs {fmt.Println("已刪除:", string(kvpair.Key), string(kvpair.Value))}}
}

輸出結果:

已刪除: /demo/A/B1 BBB
已刪除: /demo/A/B2 CCC
已刪除: /demo/A/B3 DDD

6. Lease 租約操作

etcd 客戶端的 Lease 對象可以通過以下的代碼獲取到

lease := clientv3.NewLease(cli)

lease 對象是 Lease 接口的實現, Lease 接口的聲明如下:

type Lease interface {// Grant 創建一個新租約Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)// Revoke 銷毀給定租約ID的租約Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)// TimeToLive retrieves the lease information of the given lease ID.TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)// Leases retrieves all leases.Leases(ctx context.Context) (*LeaseLeasesResponse, error)// KeepAlive keeps the given lease alive forever.KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)// KeepAliveOnce renews the lease once. In most of the cases, KeepAlive// should be used instead of KeepAliveOnce.KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)// Close releases all resources Lease keeps for efficient communication// with the etcd server.Close() error
}

Lease 提供了以下功能:

  • Grant :分配一個租約;
  • Revoke :釋放一個租約;
  • TimeToLive :獲取剩余TTL時間;
  • Leases :列舉所有etcd中的租約;
  • KeepAlive :自動定時的續約某個租約;
  • KeepAliveOnce :為某個租約續約一次;
  • Close :釋放當前客戶端建立的所有租約;

要想實現 key 自動過期,首先得創建一個租約,下面的代碼創建一個 TTL 為 10 秒的租約:

grantResp, err := lease.Grant(context.TODO(), 10)

返回的 grantResponse 的結構體聲明如下:

// LeaseGrantResponse wraps the protobuf message LeaseGrantResponse.
type LeaseGrantResponse struct {*pb.ResponseHeaderID    LeaseIDTTL   int64Error string
}

在應用程序代碼中主要使用到的是租約 ID

接下來我們用這個 Leaseetcd 中存儲一個 10 秒過期的 key

kv.Put(context.TODO(), "/test/vanish", "vanish in 10s", clientv3.WithLease(grantResp.ID))

這里特別需要注意,有一種情況是在 Put 之前 Lease 已經過期了,那么這個 Put 操作會返回 error ,此時你需要重新分配 Lease

當我們實現服務注冊時,需要主動給 Lease 進行續約,通常是以小于 TTL 的間隔循環調用 LeaseKeepAliveOnce() 方法對租約進行續期,一旦某個服務節點出錯無法完成租約的續期,等 key 過期后客戶端即無法在查詢服務時獲得對應節點的服務,這樣就通過租約到期實現了服務的錯誤隔離。

keepResp, err := lease.KeepAliveOnce(context.TODO(), grantResp.ID)

或者使用KeepAlive()方法,其會返回<-chan *LeaseKeepAliveResponse只讀通道,每次自動續租成功后會向通道中發送信號。

一般都用KeepAlive()方法, KeepAlivePut 一樣,如果在執行之前 Lease 就已經過期了,那么需要重新分配 Leaseetcd 并沒有提供 API 來實現原子的 Put with Lease ,需要我們自己判斷 err 重新分配 Lease

示例代碼

package mainimport ("context""fmt""time""github.com/coreos/etcd/clientv3"
)func main() {config := clientv3.Config{Endpoints:   []string{"192.168.0.113:2379"}, // 集群列表DialTimeout: 5 * time.Second,}// 建立一個客戶端client, err := clientv3.New(config)if err != nil {fmt.Println(err)return}// 創建一個lease(租約)對象lease := clientv3.NewLease(client)// 申請一個10秒的租約leaseGrantResp, err := lease.Grant(context.TODO(), 10)if err != nil {fmt.Println(err)return}// 拿到租約的IDleaseId := leaseGrantResp.ID// 自動永久續租keepRespChan, err := lease.KeepAlive(context.TODO(), leaseId)if err != nil {fmt.Println(err)return}// 處理續約應答的協程go func() {for {select {case keepResp := <-keepRespChan:if keepResp == nil {fmt.Println("租約已經失效了")goto END} else { // 每秒會續租一次, 所以就會受到一次應答fmt.Println("收到自動續租應答:", keepResp.ID)}}}END:}()// 獲得kv API子集kv := clientv3.NewKV(client)// Put一個KV, 讓它與租約關聯起來, 從而實現10秒后自動過期putResp, err := kv.Put(context.TODO(), "/demo/A/B1", "hello", clientv3.WithLease(leaseId))if err != nil {fmt.Println(err)return}fmt.Println("寫入成功:", putResp.Header.Revision)// 定時的看一下key過期了沒有for {getResp, err := kv.Get(context.TODO(), "/demo/A/B1")if err != nil {fmt.Println(err)return}if getResp.Count == 0 {fmt.Println("kv過期了")break}fmt.Println("還沒過期:", getResp.Kvs)time.Sleep(2 * time.Second)}
}

輸出結果:

收到自動續租應答: 8488292048996991588
寫入成功: 80
還沒過期: [key:"/demo/A/B1" create_revision:80 mod_revision:80 version:1 value:"hello" lease:8488292048996991588 ]
還沒過期: [key:"/demo/A/B1" create_revision:80 mod_revision:80 version:1 value:"hello" lease:8488292048996991588 ]
收到自動續租應答: 8488292048996991588
還沒過期: [key:"/demo/A/B1" create_revision:80 mod_revision:80 version:1 value:"hello" lease:8488292048996991588 ]
還沒過期: [key:"/demo/A/B1" create_revision:80 mod_revision:80 version:1 value:"hello" lease:8488292048996991588 ]
收到自動續租應答: 8488292048996991588

7. Op 獲取設置聯合操作

Op 字面意思就是”操作”, GetPut 都屬于 Op ,只是為了簡化用戶開發而開放的特殊 API

KV 對象有一個 Do 方法接受一個 Op

// Do applies a single Op on KV without a transaction.
// Do is useful when creating arbitrary operations to be issued at a
// later time; the user can range over the operations, calling Do to
// execute them. Get/Put/Delete, on the other hand, are best suited
// for when the operation should be issued at the time of declaration.
Do(ctx context.Context, op Op) (OpResponse, error)

其參數 Op 是一個抽象的操作,可以是 Put/Get/Delete… ;而 OpResponse 是一個抽象的結果,可以是 PutResponse/GetResponse…

可以通過 Client 中定義的一些方法來創建 Op

  • func OpDelete(key string, opts …OpOption) Op
  • func OpGet(key string, opts …OpOption) Op
  • func OpPut(key, val string, opts …OpOption) Op
  • func OpTxn(cmps []Cmp, thenOps []Op, elseOps []Op) Op

其實和直接調用 KV.PutKV.GET 沒什么區別。下面是一個例子:

cli, err := clientv3.New(clientv3.Config{Endpoints:   endpoints,DialTimeout: dialTimeout,
})
if err != nil {log.Fatal(err)
}
defer cli.Close()
ops := []clientv3.Op{clientv3.OpPut("put-key", "123"),clientv3.OpGet("put-key"),clientv3.OpPut("put-key", "456")}
for _, op := range ops {if _, err := cli.Do(context.TODO(), op); err != nil {log.Fatal(err)}
}

Op 交給 Do 方法執行,返回的 opResp 結構如下:

type OpResponse struct {put *PutResponseget *GetResponsedel *DeleteResponsetxn *TxnResponse
}

你的操作是什么類型,你就用哪個指針來訪問對應的結果。

示例代碼:

package mainimport ("context""fmt""time""github.com/coreos/etcd/clientv3"
)func main() {config := clientv3.Config{Endpoints:   []string{"192.168.0.113:2379"}, // 集群列表DialTimeout: 5 * time.Second,}// 建立一個客戶端client, err := clientv3.New(config)if err != nil {fmt.Println(err)return}// 獲得kv API子集kv := clientv3.NewKV(client)// 創建Op: operationputOp := clientv3.OpPut("/demo/A/B1", "BBBBB")// 執行OP 	// kv.Do(op)opResp, err := kv.Do(context.TODO(), putOp)if err != nil {fmt.Println(err)return}fmt.Println("寫入Revision:", opResp.Put().Header.Revision)// 創建OpgetOp := clientv3.OpGet("/demo/A/B1")// 執行OPopResp, err = kv.Do(context.TODO(), getOp)if err != nil {fmt.Println(err)return}// 打印 create rev == mod revfmt.Println("數據Revision:", opResp.Get().Kvs[0].ModRevision) fmt.Println("數據value:", string(opResp.Get().Kvs[0].Value))
}

輸出結果:

寫入Revision: 105
數據Revision: 105
數據value: BBBBB

8. Txn 事務操作

etcd 中事務是原子執行的,只支持 if … then … else … 這種表達。首先來看一下 Txn 中定義的方法:

type Txn interface {// If takes a list of comparison. If all comparisons passed in succeed,// the operations passed into Then() will be executed. Or the operations// passed into Else() will be executed.If(cs ...Cmp) Txn// Then takes a list of operations. The Ops list will be executed, if the// comparisons passed in If() succeed.Then(ops ...Op) Txn// Else takes a list of operations. The Ops list will be executed, if the// comparisons passed in If() fail.Else(ops ...Op) Txn// Commit tries to commit the transaction.Commit() (*TxnResponse, error)
}

Txn 必須是這樣使用的:If(滿足條件) Then(執行若干Op) Else(執行若干Op)。

If 中支持傳入多個 Cmp 比較條件,如果所有條件滿足,則執行 Then 中的 Op (上一節介紹過Op),否則執行 Else中Op

首先,我們需要開啟一個事務,這是通過 KV 對象的方法實現的:

txn := kv.Txn(context.TODO())

下面的測試程序,判斷如果 k1 的值大于 v1 并且 k1 的版本號是 2,則 Put 鍵值 k2k3 ,否則 Put 鍵值 k4k5

kv.Txn(context.TODO()).If(clientv3.Compare(clientv3.Value(k1), ">", v1),clientv3.Compare(clientv3.Version(k1), "=", 2)
).Then(clientv3.OpPut(k2,v2), clentv3.OpPut(k3,v3)
).Else(clientv3.OpPut(k4,v4), clientv3.OpPut(k5,v5)
).Commit()

類似于 clientv3.Value() 用于指定 key 屬性的,有這么幾個方法:

  • func CreateRevision(key string) Cmp:key=xxx的創建版本必須滿足…
  • func LeaseValue(key string) Cmp:key=xxx的Lease ID必須滿足…
  • func ModRevision(key string) Cmp:key=xxx的最后修改版本必須滿足…
  • func Value(key string) Cmp:key=xxx的創建值必須滿足…
  • func Version(key string) Cmp:key=xxx的累計更新次數必須滿足…
package mainimport ("context""fmt""time""github.com/coreos/etcd/clientv3"
)func main() {config := clientv3.Config{Endpoints:   []string{"192.168.0.113:2379"}, // 集群列表DialTimeout: 5 * time.Second,}// 建立一個客戶端client, err := clientv3.New(config)if err != nil {fmt.Println(err)return}// lease實現鎖自動過期:// op操作// txn事務: if else then// 1, 上鎖 (創建租約, 自動續租, 拿著租約去搶占一個key)lease := clientv3.NewLease(client)// 申請一個5秒的租約leaseGrantResp, err := lease.Grant(context.TODO(), 5)if err != nil {fmt.Println(err)return}// 拿到租約的IDleaseId := leaseGrantResp.ID// 準備一個用于取消自動續租的contextctx, cancelFunc := context.WithCancel(context.TODO())// 確保函數退出后, 自動續租會停止defer cancelFunc()defer lease.Revoke(context.TODO(), leaseId)// 5秒后會取消自動續租keepRespChan, err := lease.KeepAlive(ctx, leaseId)if err != nil {fmt.Println(err)return}// 處理續約應答的協程go func() {for {select {case keepResp := <-keepRespChan:if keepResp == nil {fmt.Println("租約已經失效了")goto END} else { // 每秒會續租一次, 所以就會受到一次應答fmt.Println("收到自動續租應答:", keepResp.ID)}}}END:}()//  if 不存在key, then 設置它, else 搶鎖失敗kv := clientv3.NewKV(client)// 創建事務txn := kv.Txn(context.TODO())// 定義事務// 如果key不存在txn.If(clientv3.Compare(clientv3.CreateRevision("/demo/A/B1"), "=", 0)).Then(clientv3.OpPut("/demo/A/B1", "xxx", clientv3.WithLease(leaseId))).Else(clientv3.OpGet("/demo/A/B1")) // 否則搶鎖失敗// 提交事務txnResp, err := txn.Commit()if err != nil {fmt.Println(err)return // 沒有問題}// 判斷是否搶到了鎖if !txnResp.Succeeded {fmt.Println("鎖被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))return}// 2, 處理業務fmt.Println("處理任務")time.Sleep(5 * time.Second)// 3, 釋放鎖(取消自動續租, 釋放租約)// defer 會把租約釋放掉, 關聯的KV就被刪除了
}

輸出結果:

收到自動續租應答: 8488292048996991680
鎖被占用: BBBBB

9. Watch 監聽操作

Watch 用于監聽某個鍵的變化, Watch調用后返回一個WatchChan,它的類型聲明如下:

type WatchChan <-chan WatchResponse
type WatchResponse struct {Header pb.ResponseHeaderEvents []*EventCompactRevision int64Canceled boolCreated bool
}

當監聽的 key 有變化后會向WatchChan發送WatchResponse

Watch 的典型應用場景是應用于系統配置的熱加載,我們可以在系統讀取到存儲在 etcd key 中的配置后,用 Watch 監聽 key 的變化。在單獨的 goroutine 中接收 WatchChan 發送過來的數據,并將更新應用到系統設置的配置變量中,比如像下面這樣在 goroutine 中更新變量 appConfig ,這樣系統就實現了配置變量的熱加載。

type AppConfig struct {config1 stringconfig2 string
}var appConfig Appconfigfunc watchConfig(clt *clientv3.Client, key string, ss interface{}) {watchCh := clt.Watch(context.TODO(), key)go func() {for res := range watchCh {value := res.Events[0].Kv.Valueif err := json.Unmarshal(value, ss); err != nil {fmt.Println("now", time.Now(), "watchConfig err", err)continue}fmt.Println("now", time.Now(), "watchConfig", ss)}}()
}
watchConfig(client, "config_key", &appConfig)

完整示例代碼:

package mainimport ("context""fmt""time""github.com/coreos/etcd/clientv3""github.com/coreos/etcd/mvcc/mvccpb"
)func main() {config := clientv3.Config{Endpoints:   []string{"192.168.0.113:2379"}, // 集群列表DialTimeout: 5 * time.Second,}// 建立一個客戶端client, err := clientv3.New(config)if err != nil {fmt.Println(err)return}// 獲得kv API子集kv := clientv3.NewKV(client)// 模擬etcd中KV的變化go func() {for {kv.Put(context.TODO(), "/demo/A/B1", "i am B1")kv.Delete(context.TODO(), "/demo/A/B1")time.Sleep(1 * time.Second)}}()// 先GET到當前的值,并監聽后續變化getResp, err := kv.Get(context.TODO(), "/demo/A/B1")if err != nil {fmt.Println(err)return}// 現在key是存在的if len(getResp.Kvs) != 0 {fmt.Println("當前值:", string(getResp.Kvs[0].Value))}// 當前etcd集群事務ID, 單調遞增的watchStartRevision := getResp.Header.Revision + 1// 創建一個watcherwatcher := clientv3.NewWatcher(client)// 啟動監聽fmt.Println("從該版本向后監聽:", watchStartRevision)// 創建一個 5s 后取消的上下文ctx, cancelFunc := context.WithCancel(context.TODO())time.AfterFunc(5*time.Second, func() {cancelFunc()})// 該監聽動作在 5s 后取消watchRespChan := watcher.Watch(ctx, "/demo/A/B1", clientv3.WithRev(watchStartRevision))// 處理kv變化事件for watchResp := range watchRespChan {for _, event := range watchResp.Events {switch event.Type {case mvccpb.PUT:fmt.Println("修改為:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)case mvccpb.DELETE:fmt.Println("刪除了", "Revision:", event.Kv.ModRevision)}}}}

輸出結果:

從該版本向后監聽: 94
修改為: i am B1 Revision: 94 94
刪除了 Revision: 95
修改為: i am B1 Revision: 96 96
刪除了 Revision: 97
修改為: i am B1 Revision: 98 98
刪除了 Revision: 99
修改為: i am B1 Revision: 100 100
刪除了 Revision: 101
修改為: i am B1 Revision: 102 102
刪除了 Revision: 103

8. 參考資料

https://segmentfault.com/a/1190000020868242?utm_source=tag-newest
https://godoc.org/github.com/coreos/etcd/clientv3
https://pkg.go.dev/go.etcd.io/etcd/clientv3?tab=doc

總結

以上是生活随笔為你收集整理的Go 学习笔记(58)— Go 第三方库之 etcd/clientv3(连接客户端、PUT、GET、Lease、Op、Txn、Watch 基础概念说明)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。