package etcdv3 import ( "context" "crypto/tls" "errors" "time" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/pkg/transport" ) var ( // ErrNoKey indicates a client method needs a key but receives none. ErrNoKey = errors.New("no key provided") // ErrNoValue indicates a client method needs a value but receives none. ErrNoValue = errors.New("no value provided") ) // Client is a wrapper around the etcd client. type Client interface { // GetEntries queries the given prefix in etcd and returns a slice // containing the values of all keys found, recursively, underneath that // prefix. GetEntries(prefix string) ([]string, error) // WatchPrefix watches the given prefix in etcd for changes. When a change // is detected, it will signal on the passed channel. Clients are expected // to call GetEntries to update themselves with the latest set of complete // values. WatchPrefix will always send an initial sentinel value on the // channel after establishing the watch, to ensure that clients always // receive the latest set of values. WatchPrefix will block until the // context passed to the NewClient constructor is terminated. WatchPrefix(prefix string, ch chan struct{}) // Register a service with etcd. Register(s Service) error // Deregister a service with etcd. Deregister(s Service) error // LeaseID returns the lease id created for this service instance LeaseID() int64 } type client struct { cli *clientv3.Client ctx context.Context kv clientv3.KV // Watcher interface instance, used to leverage Watcher.Close() watcher clientv3.Watcher // watcher context wctx context.Context // watcher cancel func wcf context.CancelFunc // leaseID will be 0 (clientv3.NoLease) if a lease was not created leaseID clientv3.LeaseID hbch <-chan *clientv3.LeaseKeepAliveResponse // Lease interface instance, used to leverage Lease.Close() leaser clientv3.Lease } // ClientOptions defines options for the etcd client. All values are optional. // If any duration is not specified, a default of 3 seconds will be used. type ClientOptions struct { Cert string Key string CACert string DialTimeout time.Duration DialKeepAlive time.Duration Username string Password string } // NewClient returns Client with a connection to the named machines. It will // return an error if a connection to the cluster cannot be made. func NewClient(ctx context.Context, machines []string, options ClientOptions) (Client, error) { if options.DialTimeout == 0 { options.DialTimeout = 3 * time.Second } if options.DialKeepAlive == 0 { options.DialKeepAlive = 3 * time.Second } var err error var tlscfg *tls.Config if options.Cert != "" && options.Key != "" { tlsInfo := transport.TLSInfo{ CertFile: options.Cert, KeyFile: options.Key, TrustedCAFile: options.CACert, } tlscfg, err = tlsInfo.ClientConfig() if err != nil { return nil, err } } cli, err := clientv3.New(clientv3.Config{ Context: ctx, Endpoints: machines, DialTimeout: options.DialTimeout, DialKeepAliveTime: options.DialKeepAlive, TLS: tlscfg, Username: options.Username, Password: options.Password, }) if err != nil { return nil, err } return &client{ cli: cli, ctx: ctx, kv: clientv3.NewKV(cli), }, nil } func (c *client) LeaseID() int64 { return int64(c.leaseID) } // GetEntries implements the etcd Client interface. func (c *client) GetEntries(key string) ([]string, error) { resp, err := c.kv.Get(c.ctx, key, clientv3.WithPrefix()) if err != nil { return nil, err } entries := make([]string, len(resp.Kvs)) for i, kv := range resp.Kvs { entries[i] = string(kv.Value) } return entries, nil } // WatchPrefix implements the etcd Client interface. func (c *client) WatchPrefix(prefix string, ch chan struct{}) { c.wctx, c.wcf = context.WithCancel(c.ctx) c.watcher = clientv3.NewWatcher(c.cli) wch := c.watcher.Watch(c.wctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(0)) ch <- struct{}{} for wr := range wch { if wr.Canceled { return } ch <- struct{}{} } } func (c *client) Register(s Service) error { var err error if s.Key == "" { return ErrNoKey } if s.Value == "" { return ErrNoValue } if c.leaser != nil { c.leaser.Close() } c.leaser = clientv3.NewLease(c.cli) if c.watcher != nil { c.watcher.Close() } c.watcher = clientv3.NewWatcher(c.cli) if c.kv == nil { c.kv = clientv3.NewKV(c.cli) } if s.TTL == nil { s.TTL = NewTTLOption(time.Second*3, time.Second*10) } grantResp, err := c.leaser.Grant(c.ctx, int64(s.TTL.ttl.Seconds())) if err != nil { return err } c.leaseID = grantResp.ID _, err = c.kv.Put( c.ctx, s.Key, s.Value, clientv3.WithLease(c.leaseID), ) if err != nil { return err } // this will keep the key alive 'forever' or until we revoke it or // the context is canceled c.hbch, err = c.leaser.KeepAlive(c.ctx, c.leaseID) if err != nil { return err } // discard the keepalive response, make etcd library not to complain // fix bug #799 go func() { for { select { case r := <-c.hbch: // avoid dead loop when channel was closed if r == nil { return } case <-c.ctx.Done(): return } } }() return nil } func (c *client) Deregister(s Service) error { defer c.close() if s.Key == "" { return ErrNoKey } if _, err := c.cli.Delete(c.ctx, s.Key, clientv3.WithIgnoreLease()); err != nil { return err } return nil } // close will close any open clients and call // the watcher cancel func func (c *client) close() { if c.leaser != nil { c.leaser.Close() } if c.watcher != nil { c.watcher.Close() } if c.wcf != nil { c.wcf() } }