package zk import ( "encoding/binary" "errors" "log" "reflect" "runtime" "strings" "time" ) var ( ErrUnhandledFieldType = errors.New("zk: unhandled field type") ErrPtrExpected = errors.New("zk: encode/decode expect a non-nil pointer to struct") ErrShortBuffer = errors.New("zk: buffer too small") ) type defaultLogger struct{} func (defaultLogger) Printf(format string, a ...interface{}) { log.Printf(format, a...) } type ACL struct { Perms int32 Scheme string ID string } type Stat struct { Czxid int64 // The zxid of the change that caused this znode to be created. Mzxid int64 // The zxid of the change that last modified this znode. Ctime int64 // The time in milliseconds from epoch when this znode was created. Mtime int64 // The time in milliseconds from epoch when this znode was last modified. Version int32 // The number of changes to the data of this znode. Cversion int32 // The number of changes to the children of this znode. Aversion int32 // The number of changes to the ACL of this znode. EphemeralOwner int64 // The session id of the owner of this znode if the znode is an ephemeral node. If it is not an ephemeral node, it will be zero. DataLength int32 // The length of the data field of this znode. NumChildren int32 // The number of children of this znode. Pzxid int64 // last modified children } // ServerClient is the information for a single Zookeeper client and its session. // This is used to parse/extract the output fo the `cons` command. type ServerClient struct { Queued int64 Received int64 Sent int64 SessionID int64 Lcxid int64 Lzxid int64 Timeout int32 LastLatency int32 MinLatency int32 AvgLatency int32 MaxLatency int32 Established time.Time LastResponse time.Time Addr string LastOperation string // maybe? Error error } // ServerClients is a struct for the FLWCons() function. It's used to provide // the list of Clients. // // This is needed because FLWCons() takes multiple servers. type ServerClients struct { Clients []*ServerClient Error error } // ServerStats is the information pulled from the Zookeeper `stat` command. type ServerStats struct { Sent int64 Received int64 NodeCount int64 MinLatency int64 AvgLatency int64 MaxLatency int64 Connections int64 Outstanding int64 Epoch int32 Counter int32 BuildTime time.Time Mode Mode Version string Error error } type requestHeader struct { Xid int32 Opcode int32 } type responseHeader struct { Xid int32 Zxid int64 Err ErrCode } type multiHeader struct { Type int32 Done bool Err ErrCode } type auth struct { Type int32 Scheme string Auth []byte } // Generic request structs type pathRequest struct { Path string } type PathVersionRequest struct { Path string Version int32 } type pathWatchRequest struct { Path string Watch bool } type pathResponse struct { Path string } type statResponse struct { Stat Stat } // type CheckVersionRequest PathVersionRequest type closeRequest struct{} type closeResponse struct{} type connectRequest struct { ProtocolVersion int32 LastZxidSeen int64 TimeOut int32 SessionID int64 Passwd []byte } type connectResponse struct { ProtocolVersion int32 TimeOut int32 SessionID int64 Passwd []byte } type CreateRequest struct { Path string Data []byte Acl []ACL Flags int32 } type createResponse pathResponse type DeleteRequest PathVersionRequest type deleteResponse struct{} type errorResponse struct { Err int32 } type existsRequest pathWatchRequest type existsResponse statResponse type getAclRequest pathRequest type getAclResponse struct { Acl []ACL Stat Stat } type getChildrenRequest pathRequest type getChildrenResponse struct { Children []string } type getChildren2Request pathWatchRequest type getChildren2Response struct { Children []string Stat Stat } type getDataRequest pathWatchRequest type getDataResponse struct { Data []byte Stat Stat } type getMaxChildrenRequest pathRequest type getMaxChildrenResponse struct { Max int32 } type getSaslRequest struct { Token []byte } type pingRequest struct{} type pingResponse struct{} type setAclRequest struct { Path string Acl []ACL Version int32 } type setAclResponse statResponse type SetDataRequest struct { Path string Data []byte Version int32 } type setDataResponse statResponse type setMaxChildren struct { Path string Max int32 } type setSaslRequest struct { Token string } type setSaslResponse struct { Token string } type setWatchesRequest struct { RelativeZxid int64 DataWatches []string ExistWatches []string ChildWatches []string } type setWatchesResponse struct{} type syncRequest pathRequest type syncResponse pathResponse type setAuthRequest auth type setAuthResponse struct{} type multiRequestOp struct { Header multiHeader Op interface{} } type multiRequest struct { Ops []multiRequestOp DoneHeader multiHeader } type multiResponseOp struct { Header multiHeader String string Stat *Stat Err ErrCode } type multiResponse struct { Ops []multiResponseOp DoneHeader multiHeader } // zk version 3.5 reconfig API type reconfigRequest struct { JoiningServers []byte LeavingServers []byte NewMembers []byte // curConfigId version of the current configuration // optional - causes reconfiguration to return an error if configuration is no longer current CurConfigId int64 } type reconfigReponse getDataResponse func (r *multiRequest) Encode(buf []byte) (int, error) { total := 0 for _, op := range r.Ops { op.Header.Done = false n, err := encodePacketValue(buf[total:], reflect.ValueOf(op)) if err != nil { return total, err } total += n } r.DoneHeader.Done = true n, err := encodePacketValue(buf[total:], reflect.ValueOf(r.DoneHeader)) if err != nil { return total, err } total += n return total, nil } func (r *multiRequest) Decode(buf []byte) (int, error) { r.Ops = make([]multiRequestOp, 0) r.DoneHeader = multiHeader{-1, true, -1} total := 0 for { header := &multiHeader{} n, err := decodePacketValue(buf[total:], reflect.ValueOf(header)) if err != nil { return total, err } total += n if header.Done { r.DoneHeader = *header break } req := requestStructForOp(header.Type) if req == nil { return total, ErrAPIError } n, err = decodePacketValue(buf[total:], reflect.ValueOf(req)) if err != nil { return total, err } total += n r.Ops = append(r.Ops, multiRequestOp{*header, req}) } return total, nil } func (r *multiResponse) Decode(buf []byte) (int, error) { var multiErr error r.Ops = make([]multiResponseOp, 0) r.DoneHeader = multiHeader{-1, true, -1} total := 0 for { header := &multiHeader{} n, err := decodePacketValue(buf[total:], reflect.ValueOf(header)) if err != nil { return total, err } total += n if header.Done { r.DoneHeader = *header break } res := multiResponseOp{Header: *header} var w reflect.Value switch header.Type { default: return total, ErrAPIError case opError: w = reflect.ValueOf(&res.Err) case opCreate: w = reflect.ValueOf(&res.String) case opSetData: res.Stat = new(Stat) w = reflect.ValueOf(res.Stat) case opCheck, opDelete: } if w.IsValid() { n, err := decodePacketValue(buf[total:], w) if err != nil { return total, err } total += n } r.Ops = append(r.Ops, res) if multiErr == nil && res.Err != errOk { // Use the first error as the error returned from Multi(). multiErr = res.Err.toError() } } return total, multiErr } type watcherEvent struct { Type EventType State State Path string } type decoder interface { Decode(buf []byte) (int, error) } type encoder interface { Encode(buf []byte) (int, error) } func decodePacket(buf []byte, st interface{}) (n int, err error) { defer func() { if r := recover(); r != nil { if e, ok := r.(runtime.Error); ok && strings.HasPrefix(e.Error(), "runtime error: slice bounds out of range") { err = ErrShortBuffer } else { panic(r) } } }() v := reflect.ValueOf(st) if v.Kind() != reflect.Ptr || v.IsNil() { return 0, ErrPtrExpected } return decodePacketValue(buf, v) } func decodePacketValue(buf []byte, v reflect.Value) (int, error) { rv := v kind := v.Kind() if kind == reflect.Ptr { if v.IsNil() { v.Set(reflect.New(v.Type().Elem())) } v = v.Elem() kind = v.Kind() } n := 0 switch kind { default: return n, ErrUnhandledFieldType case reflect.Struct: if de, ok := rv.Interface().(decoder); ok { return de.Decode(buf) } else if de, ok := v.Interface().(decoder); ok { return de.Decode(buf) } else { for i := 0; i < v.NumField(); i++ { field := v.Field(i) n2, err := decodePacketValue(buf[n:], field) n += n2 if err != nil { return n, err } } } case reflect.Bool: v.SetBool(buf[n] != 0) n++ case reflect.Int32: v.SetInt(int64(binary.BigEndian.Uint32(buf[n : n+4]))) n += 4 case reflect.Int64: v.SetInt(int64(binary.BigEndian.Uint64(buf[n : n+8]))) n += 8 case reflect.String: ln := int(binary.BigEndian.Uint32(buf[n : n+4])) v.SetString(string(buf[n+4 : n+4+ln])) n += 4 + ln case reflect.Slice: switch v.Type().Elem().Kind() { default: count := int(binary.BigEndian.Uint32(buf[n : n+4])) n += 4 values := reflect.MakeSlice(v.Type(), count, count) v.Set(values) for i := 0; i < count; i++ { n2, err := decodePacketValue(buf[n:], values.Index(i)) n += n2 if err != nil { return n, err } } case reflect.Uint8: ln := int(int32(binary.BigEndian.Uint32(buf[n : n+4]))) if ln < 0 { n += 4 v.SetBytes(nil) } else { bytes := make([]byte, ln) copy(bytes, buf[n+4:n+4+ln]) v.SetBytes(bytes) n += 4 + ln } } } return n, nil } func encodePacket(buf []byte, st interface{}) (n int, err error) { defer func() { if r := recover(); r != nil { if e, ok := r.(runtime.Error); ok && strings.HasPrefix(e.Error(), "runtime error: slice bounds out of range") { err = ErrShortBuffer } else { panic(r) } } }() v := reflect.ValueOf(st) if v.Kind() != reflect.Ptr || v.IsNil() { return 0, ErrPtrExpected } return encodePacketValue(buf, v) } func encodePacketValue(buf []byte, v reflect.Value) (int, error) { rv := v for v.Kind() == reflect.Ptr || v.Kind() == reflect.Interface { v = v.Elem() } n := 0 switch v.Kind() { default: return n, ErrUnhandledFieldType case reflect.Struct: if en, ok := rv.Interface().(encoder); ok { return en.Encode(buf) } else if en, ok := v.Interface().(encoder); ok { return en.Encode(buf) } else { for i := 0; i < v.NumField(); i++ { field := v.Field(i) n2, err := encodePacketValue(buf[n:], field) n += n2 if err != nil { return n, err } } } case reflect.Bool: if v.Bool() { buf[n] = 1 } else { buf[n] = 0 } n++ case reflect.Int32: binary.BigEndian.PutUint32(buf[n:n+4], uint32(v.Int())) n += 4 case reflect.Int64: binary.BigEndian.PutUint64(buf[n:n+8], uint64(v.Int())) n += 8 case reflect.String: str := v.String() binary.BigEndian.PutUint32(buf[n:n+4], uint32(len(str))) copy(buf[n+4:n+4+len(str)], []byte(str)) n += 4 + len(str) case reflect.Slice: switch v.Type().Elem().Kind() { default: count := v.Len() startN := n n += 4 for i := 0; i < count; i++ { n2, err := encodePacketValue(buf[n:], v.Index(i)) n += n2 if err != nil { return n, err } } binary.BigEndian.PutUint32(buf[startN:startN+4], uint32(count)) case reflect.Uint8: if v.IsNil() { binary.BigEndian.PutUint32(buf[n:n+4], uint32(0xffffffff)) n += 4 } else { bytes := v.Bytes() binary.BigEndian.PutUint32(buf[n:n+4], uint32(len(bytes))) copy(buf[n+4:n+4+len(bytes)], bytes) n += 4 + len(bytes) } } } return n, nil } func requestStructForOp(op int32) interface{} { switch op { case opClose: return &closeRequest{} case opCreate: return &CreateRequest{} case opDelete: return &DeleteRequest{} case opExists: return &existsRequest{} case opGetAcl: return &getAclRequest{} case opGetChildren: return &getChildrenRequest{} case opGetChildren2: return &getChildren2Request{} case opGetData: return &getDataRequest{} case opPing: return &pingRequest{} case opSetAcl: return &setAclRequest{} case opSetData: return &SetDataRequest{} case opSetWatches: return &setWatchesRequest{} case opSync: return &syncRequest{} case opSetAuth: return &setAuthRequest{} case opCheck: return &CheckVersionRequest{} case opMulti: return &multiRequest{} case opReconfig: return &reconfigRequest{} } return nil }