最新要闻

广告

手机

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

家电

etcd 租约、Watch功能、分布式锁的golang实践

来源:博客园
目录
  • 背景
  • etcd 租约
    • 不自动续约
    • 自动续约
    • 取消续约
  • Watch 机制
  • 使用txn实现分布式锁
  • null

背景

本文使用 Golang语言的SDK包 go.etcd.io/etcd/clientv3 实践etcd的租约、Watch等功能,并且实现分布式锁的业务场景。

etcd 租约

etcd过期时间可以通过设置ttl的方式, 通过租约可以控制一组key的过期时间,可以通过续租的方式保持key不过期

//etcd 租约与续约实践func LeaseTest(env string, ttl int64) (err error) {cli, err := getEtcdCli(env)  //封装的clientv3客户端,不用太关心if err != nil {return}lease := clientv3.NewLease(cli)    leaseGrant, err := lease.Grant(context.Background(), ttl)  //声明一个租约,并且设置ttlif err != nil { return}if _, err = cli.Put(context.Background(), "ping", "pong", clientv3.WithLease(leaseGrant.ID)); err != nil {  //设置key value 并且绑定租约return}/*保持长链接,每s续租一次 */keepRespChan, err := lease.KeepAlive(context.TODO(), leaseGrant.ID)if err != nil {fmt.Println(err)return}go func() {        //查看续期情况 非必需,帮助观察续租的过程for {select {case resp := <-keepRespChan:if resp == nil {fmt.Println("租约失效")return} else {fmt.Println("租约成功", resp)}}}}()for {            //持续检测key是否过期values, err := cli.Get(context.Background(), "ping")if err != nil {break}if values.Count == 0 {fmt.Println("已经过期")} else {                        fmt.Println("没过期", values.Kvs)                 }time.Sleep(time.Second * 1)}return}

不自动续约

把 lease.KeepAlive 去掉ttl时间之后,租约过期后key删除


(资料图片仅供参考)

自动续约

使lease.KeepAlive生效,以及打印测试可以看出自动续约就是没秒续约一次。

取消续约

两种形式第一种:

lease.Revoke(context.Background(), leaseGrant.ID)

测试:租约失效之后,租约的key会立马被删掉第二种:

ctx, cancelFunc := context.WithCancel(context.TODO())keepRespChan, err := lease.KeepAlive(ctx, leaseGrant.ID)···cancelFunc()  

测试:租约失效之后,key的ttl到之后删除key

Watch 机制

watch机制可以使客户端监听etcd的某个key的变化,可以实现配置推送,主动下发等业务场景

//etcd 的watch功能func WatchTest(env string) (err error) {ctx := context.Background()cli, err := getEtcdCli(env)if err != nil {return err}go func() {for {  //模拟key的变化cli.Put(ctx, "ping", "pong")cli.Delete(ctx, "ping")time.Sleep(time.Second)}}()pingVal, err := cli.Get(ctx, "ping")if err != nil || len(pingVal.Kvs) == 0 {return err}watchStartRevision := pingVal.Header.Revision + 1  //获取revision,观察这个revision之后的变化fmt.Println(watchStartRevision)watcher := clientv3.NewWatcher(cli)ctx, cancelFunc := context.WithCancel(context.TODO())time.AfterFunc(5*time.Second, func() {cancelFunc()})watchRespChan := watcher.Watch(ctx, "ping", clientv3.WithRev(watchStartRevision))for watchResp := range watchRespChan {for _, event := range watchResp.Events {switch event.Type {case mvccpb.PUT:fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)case mvccpb.DELETE:fmt.Println("删除了", "Revision:", event.Kv.ModRevision)}}}return}

可以看出Watch返回是一个chan,可以持续的监听测试:

使用txn实现分布式锁

//锁的简单封装type Lock struct {lease      clientv3.LeaseleaseId    clientv3.LeaseIDctx        context.ContextcancelFunc context.CancelFunc}func (l *Lock) Lock() (lock bool, err error) {cli, err := getEtcdCli("open")if err != nil {return false, err}l.lease = clientv3.NewLease(cli)l.ctx, l.cancelFunc = context.WithCancel(context.TODO())leaseGrant, err := l.lease.Grant(context.TODO(), 5)if err != nil {return false, err}l.leaseId = leaseGrant.IDkv := clientv3.NewKV(cli)txn := kv.Txn(l.ctx)          txn.If(clientv3.Compare(clientv3.CreateRevision("lock"), "=", 0)).Then(clientv3.OpPut("lock", "g", clientv3.WithLease(l.leaseId)))txnResp, err := txn.Commit()if err != nil {return false, err}if !txnResp.Succeeded {return false, nil}//自动续约keepRespChan, err := l.lease.KeepAlive(l.ctx, l.leaseId)_ = keepRespChanreturn true, nil}func (l *Lock) Unlock() {//l.cancelFunc()l.lease.Revoke(l.ctx, l.leaseId)}

txn通过简单的"If-Then-Else"实现了原子操作,这里我们租期过期之后需要立刻将key删除,所以使用Revoke。测试:

func LockTest() {go Node("node1", 5)go Node("node2", 3)select {}}func Node(node string, t time.Duration) {l := Lock{}for {getLock, err := l.Lock()if err != nil || !getLock {continue}fmt.Println("i get the lock: ", node)time.Sleep(time.Second * t)l.Unlock()fmt.Println("i release the lock: ", node)time.Sleep(time.Second)}}

本次代码:https://github.com/zhaoshoucheng/hodgepodge/tree/main/etcd

关键词: