/* Copyright 2014-2021 Docker Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package spdystream import ( "errors" "fmt" "io" "net" "net/http" "sync" "time" "github.com/moby/spdystream/spdy" ) var ( ErrUnreadPartialData = errors.New("unread partial data") ) type Stream struct { streamId spdy.StreamId parent *Stream conn *Connection startChan chan error dataLock sync.RWMutex dataChan chan []byte unread []byte priority uint8 headers http.Header headerChan chan http.Header finishLock sync.Mutex finished bool replyCond *sync.Cond replied bool closeLock sync.Mutex closeChan chan bool } // WriteData writes data to stream, sending a dataframe per call func (s *Stream) WriteData(data []byte, fin bool) error { s.waitWriteReply() var flags spdy.DataFlags if fin { flags = spdy.DataFlagFin s.finishLock.Lock() if s.finished { s.finishLock.Unlock() return ErrWriteClosedStream } s.finished = true s.finishLock.Unlock() } dataFrame := &spdy.DataFrame{ StreamId: s.streamId, Flags: flags, Data: data, } debugMessage("(%p) (%d) Writing data frame", s, s.streamId) return s.conn.framer.WriteFrame(dataFrame) } // Write writes bytes to a stream, calling write data for each call. func (s *Stream) Write(data []byte) (n int, err error) { err = s.WriteData(data, false) if err == nil { n = len(data) } return } // Read reads bytes from a stream, a single read will never get more // than what is sent on a single data frame, but a multiple calls to // read may get data from the same data frame. func (s *Stream) Read(p []byte) (n int, err error) { if s.unread == nil { select { case <-s.closeChan: return 0, io.EOF case read, ok := <-s.dataChan: if !ok { return 0, io.EOF } s.unread = read } } n = copy(p, s.unread) if n < len(s.unread) { s.unread = s.unread[n:] } else { s.unread = nil } return } // ReadData reads an entire data frame and returns the byte array // from the data frame. If there is unread data from the result // of a Read call, this function will return an ErrUnreadPartialData. func (s *Stream) ReadData() ([]byte, error) { debugMessage("(%p) Reading data from %d", s, s.streamId) if s.unread != nil { return nil, ErrUnreadPartialData } select { case <-s.closeChan: return nil, io.EOF case read, ok := <-s.dataChan: if !ok { return nil, io.EOF } return read, nil } } func (s *Stream) waitWriteReply() { if s.replyCond != nil { s.replyCond.L.Lock() for !s.replied { s.replyCond.Wait() } s.replyCond.L.Unlock() } } // Wait waits for the stream to receive a reply. func (s *Stream) Wait() error { return s.WaitTimeout(time.Duration(0)) } // WaitTimeout waits for the stream to receive a reply or for timeout. // When the timeout is reached, ErrTimeout will be returned. func (s *Stream) WaitTimeout(timeout time.Duration) error { var timeoutChan <-chan time.Time if timeout > time.Duration(0) { timeoutChan = time.After(timeout) } select { case err := <-s.startChan: if err != nil { return err } break case <-timeoutChan: return ErrTimeout } return nil } // Close closes the stream by sending an empty data frame with the // finish flag set, indicating this side is finished with the stream. func (s *Stream) Close() error { select { case <-s.closeChan: // Stream is now fully closed s.conn.removeStream(s) default: break } return s.WriteData([]byte{}, true) } // Reset sends a reset frame, putting the stream into the fully closed state. func (s *Stream) Reset() error { s.conn.removeStream(s) return s.resetStream() } func (s *Stream) resetStream() error { // Always call closeRemoteChannels, even if s.finished is already true. // This makes it so that stream.Close() followed by stream.Reset() allows // stream.Read() to unblock. s.closeRemoteChannels() s.finishLock.Lock() if s.finished { s.finishLock.Unlock() return nil } s.finished = true s.finishLock.Unlock() resetFrame := &spdy.RstStreamFrame{ StreamId: s.streamId, Status: spdy.Cancel, } return s.conn.framer.WriteFrame(resetFrame) } // CreateSubStream creates a stream using the current as the parent func (s *Stream) CreateSubStream(headers http.Header, fin bool) (*Stream, error) { return s.conn.CreateStream(headers, s, fin) } // SetPriority sets the stream priority, does not affect the // remote priority of this stream after Open has been called. // Valid values are 0 through 7, 0 being the highest priority // and 7 the lowest. func (s *Stream) SetPriority(priority uint8) { s.priority = priority } // SendHeader sends a header frame across the stream func (s *Stream) SendHeader(headers http.Header, fin bool) error { return s.conn.sendHeaders(headers, s, fin) } // SendReply sends a reply on a stream, only valid to be called once // when handling a new stream func (s *Stream) SendReply(headers http.Header, fin bool) error { if s.replyCond == nil { return errors.New("cannot reply on initiated stream") } s.replyCond.L.Lock() defer s.replyCond.L.Unlock() if s.replied { return nil } err := s.conn.sendReply(headers, s, fin) if err != nil { return err } s.replied = true s.replyCond.Broadcast() return nil } // Refuse sends a reset frame with the status refuse, only // valid to be called once when handling a new stream. This // may be used to indicate that a stream is not allowed // when http status codes are not being used. func (s *Stream) Refuse() error { if s.replied { return nil } s.replied = true return s.conn.sendReset(spdy.RefusedStream, s) } // Cancel sends a reset frame with the status canceled. This // can be used at any time by the creator of the Stream to // indicate the stream is no longer needed. func (s *Stream) Cancel() error { return s.conn.sendReset(spdy.Cancel, s) } // ReceiveHeader receives a header sent on the other side // of the stream. This function will block until a header // is received or stream is closed. func (s *Stream) ReceiveHeader() (http.Header, error) { select { case <-s.closeChan: break case header, ok := <-s.headerChan: if !ok { return nil, fmt.Errorf("header chan closed") } return header, nil } return nil, fmt.Errorf("stream closed") } // Parent returns the parent stream func (s *Stream) Parent() *Stream { return s.parent } // Headers returns the headers used to create the stream func (s *Stream) Headers() http.Header { return s.headers } // String returns the string version of stream using the // streamId to uniquely identify the stream func (s *Stream) String() string { return fmt.Sprintf("stream:%d", s.streamId) } // Identifier returns a 32 bit identifier for the stream func (s *Stream) Identifier() uint32 { return uint32(s.streamId) } // IsFinished returns whether the stream has finished // sending data func (s *Stream) IsFinished() bool { return s.finished } // Implement net.Conn interface func (s *Stream) LocalAddr() net.Addr { return s.conn.conn.LocalAddr() } func (s *Stream) RemoteAddr() net.Addr { return s.conn.conn.RemoteAddr() } // TODO set per stream values instead of connection-wide func (s *Stream) SetDeadline(t time.Time) error { return s.conn.conn.SetDeadline(t) } func (s *Stream) SetReadDeadline(t time.Time) error { return s.conn.conn.SetReadDeadline(t) } func (s *Stream) SetWriteDeadline(t time.Time) error { return s.conn.conn.SetWriteDeadline(t) } func (s *Stream) closeRemoteChannels() { s.closeLock.Lock() defer s.closeLock.Unlock() select { case <-s.closeChan: default: close(s.closeChan) } }