import re import sys __all__ = ["ReceiveBuffer"] # Operations we want to support: # - find next \r\n or \r\n\r\n (\n or \n\n are also acceptable), # or wait until there is one # - read at-most-N bytes # Goals: # - on average, do this fast # - worst case, do this in O(n) where n is the number of bytes processed # Plan: # - store bytearray, offset, how far we've searched for a separator token # - use the how-far-we've-searched data to avoid rescanning # - while doing a stream of uninterrupted processing, advance offset instead # of constantly copying # WARNING: # - I haven't benchmarked or profiled any of this yet. # # Note that starting in Python 3.4, deleting the initial n bytes from a # bytearray is amortized O(n), thanks to some excellent work by Antoine # Martin: # # https://bugs.python.org/issue19087 # # This means that if we only supported 3.4+, we could get rid of the code here # involving self._start and self.compress, because it's doing exactly the same # thing that bytearray now does internally. # # BUT unfortunately, we still support 2.7, and reading short segments out of a # long buffer MUST be O(bytes read) to avoid DoS issues, so we can't actually # delete this code. Yet: # # https://pythonclock.org/ # # (Two things to double-check first though: make sure PyPy also has the # optimization, and benchmark to make sure it's a win, since we do have a # slightly clever thing where we delay calling compress() until we've # processed a whole event, which could in theory be slightly more efficient # than the internal bytearray support.) blank_line_regex = re.compile(b"\n\r?\n", re.MULTILINE) class ReceiveBuffer: def __init__(self): self._data = bytearray() self._next_line_search = 0 self._multiple_lines_search = 0 def __iadd__(self, byteslike): self._data += byteslike return self def __bool__(self): return bool(len(self)) def __len__(self): return len(self._data) # for @property unprocessed_data def __bytes__(self): return bytes(self._data) def _extract(self, count): # extracting an initial slice of the data buffer and return it out = self._data[:count] del self._data[:count] self._next_line_search = 0 self._multiple_lines_search = 0 return out def maybe_extract_at_most(self, count): """ Extract a fixed number of bytes from the buffer. """ out = self._data[:count] if not out: return None return self._extract(count) def maybe_extract_next_line(self): """ Extract the first line, if it is completed in the buffer. """ # Only search in buffer space that we've not already looked at. search_start_index = max(0, self._next_line_search - 1) partial_idx = self._data.find(b"\r\n", search_start_index) if partial_idx == -1: self._next_line_search = len(self._data) return None # + 2 is to compensate len(b"\r\n") idx = partial_idx + 2 return self._extract(idx) def maybe_extract_lines(self): """ Extract everything up to the first blank line, and return a list of lines. """ # Handle the case where we have an immediate empty line. if self._data[:1] == b"\n": self._extract(1) return [] if self._data[:2] == b"\r\n": self._extract(2) return [] # Only search in buffer space that we've not already looked at. match = blank_line_regex.search(self._data, self._multiple_lines_search) if match is None: self._multiple_lines_search = max(0, len(self._data) - 2) return None # Truncate the buffer and return it. idx = match.span(0)[-1] out = self._extract(idx) lines = out.split(b"\n") for line in lines: if line.endswith(b"\r"): del line[-1] assert lines[-2] == lines[-1] == b"" del lines[-2:] return lines # In theory we should wait until `\r\n` before starting to validate # incoming data. However it's interesting to detect (very) invalid data # early given they might not even contain `\r\n` at all (hence only # timeout will get rid of them). # This is not a 100% effective detection but more of a cheap sanity check # allowing for early abort in some useful cases. # This is especially interesting when peer is messing up with HTTPS and # sent us a TLS stream where we were expecting plain HTTP given all # versions of TLS so far start handshake with a 0x16 message type code. def is_next_line_obviously_invalid_request_line(self): try: # HTTP header line must not contain non-printable characters # and should not start with a space return self._data[0] < 0x21 except IndexError: return False