// Copyright 2015 The etcd Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package rafthttp import ( "context" "fmt" "io" "io/ioutil" "net/http" "path" "strings" "sync" "time" "golang.org/x/time/rate" "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/httputil" "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/version" "github.com/coreos/go-semver/semver" ) const ( streamTypeMessage streamType = "message" streamTypeMsgAppV2 streamType = "msgappv2" streamBufSize = 4096 ) var ( errUnsupportedStreamType = fmt.Errorf("unsupported stream type") // the key is in string format "major.minor.patch" supportedStream = map[string][]streamType{ "2.0.0": {}, "2.1.0": {streamTypeMsgAppV2, streamTypeMessage}, "2.2.0": {streamTypeMsgAppV2, streamTypeMessage}, "2.3.0": {streamTypeMsgAppV2, streamTypeMessage}, "3.0.0": {streamTypeMsgAppV2, streamTypeMessage}, "3.1.0": {streamTypeMsgAppV2, streamTypeMessage}, "3.2.0": {streamTypeMsgAppV2, streamTypeMessage}, "3.3.0": {streamTypeMsgAppV2, streamTypeMessage}, } ) type streamType string func (t streamType) endpoint() string { switch t { case streamTypeMsgAppV2: return path.Join(RaftStreamPrefix, "msgapp") case streamTypeMessage: return path.Join(RaftStreamPrefix, "message") default: plog.Panicf("unhandled stream type %v", t) return "" } } func (t streamType) String() string { switch t { case streamTypeMsgAppV2: return "stream MsgApp v2" case streamTypeMessage: return "stream Message" default: return "unknown stream" } } var ( // linkHeartbeatMessage is a special message used as heartbeat message in // link layer. It never conflicts with messages from raft because raft // doesn't send out messages without From and To fields. linkHeartbeatMessage = raftpb.Message{Type: raftpb.MsgHeartbeat} ) func isLinkHeartbeatMessage(m *raftpb.Message) bool { return m.Type == raftpb.MsgHeartbeat && m.From == 0 && m.To == 0 } type outgoingConn struct { t streamType io.Writer http.Flusher io.Closer } // streamWriter writes messages to the attached outgoingConn. type streamWriter struct { peerID types.ID status *peerStatus fs *stats.FollowerStats r Raft mu sync.Mutex // guard field working and closer closer io.Closer working bool msgc chan raftpb.Message connc chan *outgoingConn stopc chan struct{} done chan struct{} } // startStreamWriter creates a streamWrite and starts a long running go-routine that accepts // messages and writes to the attached outgoing connection. func startStreamWriter(id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter { w := &streamWriter{ peerID: id, status: status, fs: fs, r: r, msgc: make(chan raftpb.Message, streamBufSize), connc: make(chan *outgoingConn), stopc: make(chan struct{}), done: make(chan struct{}), } go w.run() return w } func (cw *streamWriter) run() { var ( msgc chan raftpb.Message heartbeatc <-chan time.Time t streamType enc encoder flusher http.Flusher batched int ) tickc := time.NewTicker(ConnReadTimeout / 3) defer tickc.Stop() unflushed := 0 plog.Infof("started streaming with peer %s (writer)", cw.peerID) for { select { case <-heartbeatc: err := enc.encode(&linkHeartbeatMessage) unflushed += linkHeartbeatMessage.Size() if err == nil { flusher.Flush() batched = 0 sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed)) unflushed = 0 continue } cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error()) sentFailures.WithLabelValues(cw.peerID.String()).Inc() cw.close() plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t) heartbeatc, msgc = nil, nil case m := <-msgc: err := enc.encode(&m) if err == nil { unflushed += m.Size() if len(msgc) == 0 || batched > streamBufSize/2 { flusher.Flush() sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed)) unflushed = 0 batched = 0 } else { batched++ } continue } cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error()) cw.close() plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t) heartbeatc, msgc = nil, nil cw.r.ReportUnreachable(m.To) sentFailures.WithLabelValues(cw.peerID.String()).Inc() case conn := <-cw.connc: cw.mu.Lock() closed := cw.closeUnlocked() t = conn.t switch conn.t { case streamTypeMsgAppV2: enc = newMsgAppV2Encoder(conn.Writer, cw.fs) case streamTypeMessage: enc = &messageEncoder{w: conn.Writer} default: plog.Panicf("unhandled stream type %s", conn.t) } flusher = conn.Flusher unflushed = 0 cw.status.activate() cw.closer = conn.Closer cw.working = true cw.mu.Unlock() if closed { plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.peerID, t) } plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.peerID, t) heartbeatc, msgc = tickc.C, cw.msgc case <-cw.stopc: if cw.close() { plog.Infof("closed the TCP streaming connection with peer %s (%s writer)", cw.peerID, t) } plog.Infof("stopped streaming with peer %s (writer)", cw.peerID) close(cw.done) return } } } func (cw *streamWriter) writec() (chan<- raftpb.Message, bool) { cw.mu.Lock() defer cw.mu.Unlock() return cw.msgc, cw.working } func (cw *streamWriter) close() bool { cw.mu.Lock() defer cw.mu.Unlock() return cw.closeUnlocked() } func (cw *streamWriter) closeUnlocked() bool { if !cw.working { return false } if err := cw.closer.Close(); err != nil { plog.Errorf("peer %s (writer) connection close error: %v", cw.peerID, err) } if len(cw.msgc) > 0 { cw.r.ReportUnreachable(uint64(cw.peerID)) } cw.msgc = make(chan raftpb.Message, streamBufSize) cw.working = false return true } func (cw *streamWriter) attach(conn *outgoingConn) bool { select { case cw.connc <- conn: return true case <-cw.done: return false } } func (cw *streamWriter) stop() { close(cw.stopc) <-cw.done } // streamReader is a long-running go-routine that dials to the remote stream // endpoint and reads messages from the response body returned. type streamReader struct { peerID types.ID typ streamType tr *Transport picker *urlPicker status *peerStatus recvc chan<- raftpb.Message propc chan<- raftpb.Message rl *rate.Limiter // alters the frequency of dial retrial attempts errorc chan<- error mu sync.Mutex paused bool closer io.Closer ctx context.Context cancel context.CancelFunc done chan struct{} } func (cr *streamReader) start() { cr.done = make(chan struct{}) if cr.errorc == nil { cr.errorc = cr.tr.ErrorC } if cr.ctx == nil { cr.ctx, cr.cancel = context.WithCancel(context.Background()) } go cr.run() } func (cr *streamReader) run() { t := cr.typ plog.Infof("started streaming with peer %s (%s reader)", cr.peerID, t) for { rc, err := cr.dial(t) if err != nil { if err != errUnsupportedStreamType { cr.status.deactivate(failureType{source: t.String(), action: "dial"}, err.Error()) } } else { cr.status.activate() plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ) err = cr.decodeLoop(rc, t) plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ) switch { // all data is read out case err == io.EOF: // connection is closed by the remote case transport.IsClosedConnError(err): default: cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error()) } } // Wait for a while before new dial attempt err = cr.rl.Wait(cr.ctx) if cr.ctx.Err() != nil { plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t) close(cr.done) return } if err != nil { plog.Errorf("streaming with peer %s (%s reader) rate limiter error: %v", cr.peerID, t, err) } } } func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error { var dec decoder cr.mu.Lock() switch t { case streamTypeMsgAppV2: dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID) case streamTypeMessage: dec = &messageDecoder{r: rc} default: plog.Panicf("unhandled stream type %s", t) } select { case <-cr.ctx.Done(): cr.mu.Unlock() if err := rc.Close(); err != nil { return err } return io.EOF default: cr.closer = rc } cr.mu.Unlock() for { m, err := dec.decode() if err != nil { cr.mu.Lock() cr.close() cr.mu.Unlock() return err } receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size())) cr.mu.Lock() paused := cr.paused cr.mu.Unlock() if paused { continue } if isLinkHeartbeatMessage(&m) { // raft is not interested in link layer // heartbeat message, so we should ignore // it. continue } recvc := cr.recvc if m.Type == raftpb.MsgProp { recvc = cr.propc } select { case recvc <- m: default: if cr.status.isActive() { plog.MergeWarningf("dropped internal raft message from %s since receiving buffer is full (overloaded network)", types.ID(m.From)) } plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From)) recvFailures.WithLabelValues(types.ID(m.From).String()).Inc() } } } func (cr *streamReader) stop() { cr.mu.Lock() cr.cancel() cr.close() cr.mu.Unlock() <-cr.done } func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { u := cr.picker.pick() uu := u uu.Path = path.Join(t.endpoint(), cr.tr.ID.String()) req, err := http.NewRequest("GET", uu.String(), nil) if err != nil { cr.picker.unreachable(u) return nil, fmt.Errorf("failed to make http request to %v (%v)", u, err) } req.Header.Set("X-Server-From", cr.tr.ID.String()) req.Header.Set("X-Server-Version", version.Version) req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion) req.Header.Set("X-Etcd-Cluster-ID", cr.tr.ClusterID.String()) req.Header.Set("X-Raft-To", cr.peerID.String()) setPeerURLsHeader(req, cr.tr.URLs) req = req.WithContext(cr.ctx) cr.mu.Lock() select { case <-cr.ctx.Done(): cr.mu.Unlock() return nil, fmt.Errorf("stream reader is stopped") default: } cr.mu.Unlock() resp, err := cr.tr.streamRt.RoundTrip(req) if err != nil { cr.picker.unreachable(u) return nil, err } rv := serverVersion(resp.Header) lv := semver.Must(semver.NewVersion(version.Version)) if compareMajorMinorVersion(rv, lv) == -1 && !checkStreamSupport(rv, t) { httputil.GracefulClose(resp) cr.picker.unreachable(u) return nil, errUnsupportedStreamType } switch resp.StatusCode { case http.StatusGone: httputil.GracefulClose(resp) cr.picker.unreachable(u) reportCriticalError(errMemberRemoved, cr.errorc) return nil, errMemberRemoved case http.StatusOK: return resp.Body, nil case http.StatusNotFound: httputil.GracefulClose(resp) cr.picker.unreachable(u) return nil, fmt.Errorf("peer %s failed to find local node %s", cr.peerID, cr.tr.ID) case http.StatusPreconditionFailed: b, err := ioutil.ReadAll(resp.Body) if err != nil { cr.picker.unreachable(u) return nil, err } httputil.GracefulClose(resp) cr.picker.unreachable(u) switch strings.TrimSuffix(string(b), "\n") { case errIncompatibleVersion.Error(): plog.Errorf("request sent was ignored by peer %s (server version incompatible)", cr.peerID) return nil, errIncompatibleVersion case errClusterIDMismatch.Error(): plog.Errorf("request sent was ignored (cluster ID mismatch: peer[%s]=%s, local=%s)", cr.peerID, resp.Header.Get("X-Etcd-Cluster-ID"), cr.tr.ClusterID) return nil, errClusterIDMismatch default: return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b)) } default: httputil.GracefulClose(resp) cr.picker.unreachable(u) return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode) } } func (cr *streamReader) close() { if cr.closer != nil { if err := cr.closer.Close(); err != nil { plog.Errorf("peer %s (reader) connection close error: %v", cr.peerID, err) } } cr.closer = nil } func (cr *streamReader) pause() { cr.mu.Lock() defer cr.mu.Unlock() cr.paused = true } func (cr *streamReader) resume() { cr.mu.Lock() defer cr.mu.Unlock() cr.paused = false } // checkStreamSupport checks whether the stream type is supported in the // given version. func checkStreamSupport(v *semver.Version, t streamType) bool { nv := &semver.Version{Major: v.Major, Minor: v.Minor} for _, s := range supportedStream[nv.String()] { if s == t { return true } } return false }