# Copyright 2015-present MongoDB, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Internal network layer helper methods.""" import datetime import errno import socket import struct from bson import _decode_all_selective from bson.py3compat import PY3 from pymongo import helpers, message from pymongo.common import MAX_MESSAGE_SIZE from pymongo.compression_support import decompress, _NO_COMPRESSION from pymongo.errors import (AutoReconnect, NotMasterError, OperationFailure, ProtocolError, NetworkTimeout, _OperationCancelled) from pymongo.message import _UNPACK_REPLY, _OpMsg from pymongo.monotonic import time from pymongo.socket_checker import _errno_from_exception _UNPACK_HEADER = struct.Struct(" max_bson_size): message._raise_document_too_large(name, size, max_bson_size) else: request_id, msg, size = message.query( flags, ns, 0, -1, spec, None, codec_options, check_keys, compression_ctx) if (max_bson_size is not None and size > max_bson_size + message._COMMAND_OVERHEAD): message._raise_document_too_large( name, size, max_bson_size + message._COMMAND_OVERHEAD) if publish: encoding_duration = datetime.datetime.now() - start listeners.publish_command_start(orig, dbname, request_id, address) start = datetime.datetime.now() try: sock_info.sock.sendall(msg) if use_op_msg and unacknowledged: # Unacknowledged, fake a successful command response. reply = None response_doc = {"ok": 1} else: reply = receive_message(sock_info, request_id) sock_info.more_to_come = reply.more_to_come unpacked_docs = reply.unpack_response( codec_options=codec_options, user_fields=user_fields) response_doc = unpacked_docs[0] if client: client._process_response(response_doc, session) if check: helpers._check_command_response( response_doc, sock_info.max_wire_version, allowable_errors, parse_write_concern_error=parse_write_concern_error) except Exception as exc: if publish: duration = (datetime.datetime.now() - start) + encoding_duration if isinstance(exc, (NotMasterError, OperationFailure)): failure = exc.details else: failure = message._convert_exception(exc) listeners.publish_command_failure( duration, failure, name, request_id, address) raise if publish: duration = (datetime.datetime.now() - start) + encoding_duration listeners.publish_command_success( duration, response_doc, name, request_id, address) if client and client._encrypter and reply: decrypted = client._encrypter.decrypt(reply.raw_command_response()) response_doc = _decode_all_selective(decrypted, codec_options, user_fields)[0] return response_doc _UNPACK_COMPRESSION_HEADER = struct.Struct(" max_message_size: raise ProtocolError("Message length (%r) is larger than server max " "message size (%r)" % (length, max_message_size)) if op_code == 2012: op_code, _, compressor_id = _UNPACK_COMPRESSION_HEADER( _receive_data_on_socket(sock_info, 9, deadline)) data = decompress( _receive_data_on_socket(sock_info, length - 25, deadline), compressor_id) else: data = _receive_data_on_socket(sock_info, length - 16, deadline) try: unpack_reply = _UNPACK_REPLY[op_code] except KeyError: raise ProtocolError("Got opcode %r but expected " "%r" % (op_code, _UNPACK_REPLY.keys())) return unpack_reply(data) _POLL_TIMEOUT = 0.5 def wait_for_read(sock_info, deadline): """Block until at least one byte is read, or a timeout, or a cancel.""" context = sock_info.cancel_context # Only Monitor connections can be cancelled. if context: sock = sock_info.sock while True: # SSLSocket can have buffered data which won't be caught by select. if hasattr(sock, 'pending') and sock.pending() > 0: readable = True else: # Wait up to 500ms for the socket to become readable and then # check for cancellation. if deadline: timeout = max(min(deadline - time(), _POLL_TIMEOUT), 0.001) else: timeout = _POLL_TIMEOUT readable = sock_info.socket_checker.select( sock, read=True, timeout=timeout) if context.cancelled: raise _OperationCancelled('isMaster cancelled') if readable: return if deadline and time() > deadline: raise socket.timeout("timed out") # memoryview was introduced in Python 2.7 but we only use it on Python 3 # because before 2.7.4 the struct module did not support memoryview: # https://bugs.python.org/issue10212. # In Jython, using slice assignment on a memoryview results in a # NullPointerException. if not PY3: def _receive_data_on_socket(sock_info, length, deadline): buf = bytearray(length) i = 0 while length: try: wait_for_read(sock_info, deadline) chunk = sock_info.sock.recv(length) except (IOError, OSError) as exc: if _errno_from_exception(exc) == errno.EINTR: continue raise if chunk == b"": raise AutoReconnect("connection closed") buf[i:i + len(chunk)] = chunk i += len(chunk) length -= len(chunk) return bytes(buf) else: def _receive_data_on_socket(sock_info, length, deadline): buf = bytearray(length) mv = memoryview(buf) bytes_read = 0 while bytes_read < length: try: wait_for_read(sock_info, deadline) chunk_length = sock_info.sock.recv_into(mv[bytes_read:]) except (IOError, OSError) as exc: if _errno_from_exception(exc) == errno.EINTR: continue raise if chunk_length == 0: raise AutoReconnect("connection closed") bytes_read += chunk_length return mv