"""Convert between frames and higher-level AMQP methods.""" # Copyright (C) 2007-2008 Barry Pederson from __future__ import absolute_import, unicode_literals from collections import defaultdict from . import spec from .basic_message import Message from .exceptions import UnexpectedFrame from .five import range from .platform import pack, unpack_from, pack_into from .utils import str_to_bytes __all__ = ['frame_handler', 'frame_writer'] #: Set of methods that require both a content frame and a body frame. _CONTENT_METHODS = frozenset([ spec.Basic.Return, spec.Basic.Deliver, spec.Basic.GetOk, ]) #: Number of bytes reserved for protocol in a content frame. #: We use this to calculate when a frame exceeeds the max frame size, #: and if it does not the message will fit into the preallocated buffer. FRAME_OVERHEAD = 40 def frame_handler(connection, callback, unpack_from=unpack_from, content_methods=_CONTENT_METHODS): """Create closure that reads frames.""" expected_types = defaultdict(lambda: 1) partial_messages = {} def on_frame(frame): frame_type, channel, buf = frame connection.bytes_recv += 1 if frame_type not in (expected_types[channel], 8): raise UnexpectedFrame( 'Received frame {0} while expecting type: {1}'.format( frame_type, expected_types[channel]), ) elif frame_type == 1: method_sig = unpack_from('>HH', buf, 0) if method_sig in content_methods: # Save what we've got so far and wait for the content-header partial_messages[channel] = Message( frame_method=method_sig, frame_args=buf, ) expected_types[channel] = 2 return False callback(channel, method_sig, buf, None) elif frame_type == 2: msg = partial_messages[channel] msg.inbound_header(buf) if not msg.ready: # wait for the content-body expected_types[channel] = 3 return False # bodyless message, we're done expected_types[channel] = 1 partial_messages.pop(channel, None) callback(channel, msg.frame_method, msg.frame_args, msg) elif frame_type == 3: msg = partial_messages[channel] msg.inbound_body(buf) if msg.ready: expected_types[channel] = 1 partial_messages.pop(channel, None) callback(channel, msg.frame_method, msg.frame_args, msg) elif frame_type == 8: # bytes_recv already updated pass return True return on_frame def frame_writer(connection, transport, pack=pack, pack_into=pack_into, range=range, len=len, bytes=bytes, str_to_bytes=str_to_bytes): """Create closure that writes frames.""" write = transport.write # memoryview first supported in Python 2.7 # Initial support was very shaky, so could be we have to # check for a bugfix release. buf = bytearray(connection.frame_max - 8) view = memoryview(buf) def write_frame(type_, channel, method_sig, args, content): chunk_size = connection.frame_max - 8 offset = 0 properties = None args = str_to_bytes(args) if content: properties = content._serialize_properties() body = content.body bodylen = len(body) framelen = ( len(args) + (len(properties) or 0) + bodylen + FRAME_OVERHEAD ) bigbody = framelen > chunk_size else: body, bodylen, bigbody = None, 0, 0 if bigbody: # ## SLOW: string copy and write for every frame frame = (b''.join([pack('>HH', *method_sig), args]) if type_ == 1 else b'') # encode method frame framelen = len(frame) write(pack('>BHI%dsB' % framelen, type_, channel, framelen, frame, 0xce)) if body: frame = b''.join([ pack('>HHQ', method_sig[0], 0, len(body)), properties, ]) framelen = len(frame) write(pack('>BHI%dsB' % framelen, 2, channel, framelen, frame, 0xce)) for i in range(0, bodylen, chunk_size): frame = body[i:i + chunk_size] framelen = len(frame) write(pack('>BHI%dsB' % framelen, 3, channel, framelen, str_to_bytes(frame), 0xce)) else: # ## FAST: pack into buffer and single write frame = (b''.join([pack('>HH', *method_sig), args]) if type_ == 1 else b'') framelen = len(frame) pack_into('>BHI%dsB' % framelen, buf, offset, type_, channel, framelen, frame, 0xce) offset += 8 + framelen if body is not None: frame = b''.join([ pack('>HHQ', method_sig[0], 0, len(body)), properties, ]) framelen = len(frame) pack_into('>BHI%dsB' % framelen, buf, offset, 2, channel, framelen, frame, 0xce) offset += 8 + framelen bodylen = len(body) if bodylen > 0: framelen = bodylen pack_into('>BHI%dsB' % framelen, buf, offset, 3, channel, framelen, str_to_bytes(body), 0xce) offset += 8 + framelen write(view[:offset]) connection.bytes_sent += 1 return write_frame