# Copyright 2016-present MongoDB, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Run the BSON corpus specification tests.""" import binascii import codecs import functools import glob import os import sys from decimal import DecimalException if sys.version_info[:2] == (2, 6): try: import simplejson as json except ImportError: import json else: import json sys.path[0:0] = [""] from bson import BSON, json_util from bson.binary import STANDARD from bson.codec_options import CodecOptions from bson.decimal128 import Decimal128 from bson.dbref import DBRef from bson.errors import InvalidBSON, InvalidId from bson.json_util import JSONMode from bson.py3compat import text_type, b from bson.son import SON from test import unittest _TEST_PATH = os.path.join( os.path.dirname(os.path.realpath(__file__)), 'bson_corpus') _TESTS_TO_SKIP = set([ # Python cannot decode dates after year 9999. 'Y10K', ]) _NON_PARSE_ERRORS = set([ # {"$date": } is our legacy format which we still need to parse. 'Bad $date (number, not string or hash)', # This variant of $numberLong may have been generated by an old version # of mongoexport. 'Bad $numberLong (number, not string)', ]) _DEPRECATED_BSON_TYPES = { # Symbol '0x0E': text_type, # Undefined '0x06': type(None), # DBPointer '0x0C': DBRef } # Need to set tz_aware=True in order to use "strict" dates in extended JSON. codec_options = CodecOptions(tz_aware=True, document_class=SON) # We normally encode UUID as binary subtype 0x03, # but we'll need to encode to subtype 0x04 for one of the tests. codec_options_uuid_04 = codec_options._replace(uuid_representation=STANDARD) json_options_uuid_04 = json_util.JSONOptions(json_mode=JSONMode.CANONICAL, uuid_representation=STANDARD) json_options_iso8601 = json_util.JSONOptions( datetime_representation=json_util.DatetimeRepresentation.ISO8601) to_extjson = functools.partial(json_util.dumps, json_options=json_util.CANONICAL_JSON_OPTIONS) to_extjson_uuid_04 = functools.partial(json_util.dumps, json_options=json_options_uuid_04) to_extjson_iso8601 = functools.partial(json_util.dumps, json_options=json_options_iso8601) to_relaxed_extjson = functools.partial( json_util.dumps, json_options=json_util.RELAXED_JSON_OPTIONS) to_bson_uuid_04 = functools.partial(BSON.encode, codec_options=codec_options_uuid_04) to_bson = functools.partial(BSON.encode, codec_options=codec_options) decode_bson = lambda bbytes: BSON(bbytes).decode(codec_options=codec_options) if json_util._HAS_OBJECT_PAIRS_HOOK: decode_extjson = functools.partial( json_util.loads, json_options=json_util.JSONOptions(json_mode=JSONMode.CANONICAL, document_class=SON)) loads = functools.partial(json.loads, object_pairs_hook=SON) else: decode_extjson = functools.partial( json_util.loads, json_options=json_util.CANONICAL_JSON_OPTIONS) loads = json.loads class TestBSONCorpus(unittest.TestCase): def assertJsonEqual(self, first, second, msg=None): """Fail if the two json strings are unequal. Normalize json by parsing it with the built-in json library. This accounts for discrepancies in spacing. """ self.assertEqual(loads(first), loads(second), msg=msg) def create_test(case_spec): bson_type = case_spec['bson_type'] # Test key is absent when testing top-level documents. test_key = case_spec.get('test_key') deprecated = case_spec.get('deprecated') def run_test(self): for valid_case in case_spec.get('valid', []): description = valid_case['description'] if description in _TESTS_TO_SKIP: continue # Special case for testing encoding UUID as binary subtype 0x04. if description == 'subtype 0x04': encode_extjson = to_extjson_uuid_04 encode_bson = to_bson_uuid_04 else: encode_extjson = to_extjson encode_bson = to_bson cB = binascii.unhexlify(b(valid_case['canonical_bson'])) cEJ = valid_case['canonical_extjson'] rEJ = valid_case.get('relaxed_extjson') dEJ = valid_case.get('degenerate_extjson') lossy = valid_case.get('lossy') decoded_bson = decode_bson(cB) if not lossy: # Make sure we can parse the legacy (default) JSON format. legacy_json = json_util.dumps( decoded_bson, json_options=json_util.LEGACY_JSON_OPTIONS) self.assertEqual(decode_extjson(legacy_json), decoded_bson) if deprecated: if 'converted_bson' in valid_case: converted_bson = binascii.unhexlify( b(valid_case['converted_bson'])) self.assertEqual(encode_bson(decoded_bson), converted_bson) self.assertJsonEqual( encode_extjson(decode_bson(converted_bson)), valid_case['converted_extjson']) # Make sure we can decode the type. self.assertEqual(decoded_bson, decode_extjson(cEJ)) if test_key is not None: self.assertIsInstance(decoded_bson[test_key], _DEPRECATED_BSON_TYPES[bson_type]) continue # PyPy3 and Jython can't handle NaN with a payload from # struct.(un)pack if endianness is specified in the format string. if not ((('PyPy' in sys.version and sys.version_info[:2] < (3, 3)) or sys.platform.startswith("java")) and description == 'NaN with payload'): # Test round-tripping canonical bson. self.assertEqual(encode_bson(decoded_bson), cB) self.assertJsonEqual(encode_extjson(decoded_bson), cEJ) # Test round-tripping canonical extended json. decoded_json = decode_extjson(cEJ) self.assertJsonEqual(encode_extjson(decoded_json), cEJ) if not lossy and json_util._HAS_OBJECT_PAIRS_HOOK: self.assertEqual(encode_bson(decoded_json), cB) # Test round-tripping degenerate bson. if 'degenerate_bson' in valid_case: dB = binascii.unhexlify(b(valid_case['degenerate_bson'])) self.assertEqual(encode_bson(decode_bson(dB)), cB) # Test round-tripping degenerate extended json. if dEJ is not None: decoded_json = decode_extjson(dEJ) self.assertJsonEqual(encode_extjson(decoded_json), cEJ) if not lossy: # We don't need to check json_util._HAS_OBJECT_PAIRS_HOOK # because degenerate_extjson is always a single key so # the order cannot be changed. self.assertEqual(encode_bson(decoded_json), cB) # Test round-tripping relaxed extended json. if rEJ is not None: self.assertJsonEqual(to_relaxed_extjson(decoded_bson), rEJ) decoded_json = decode_extjson(rEJ) self.assertJsonEqual(to_relaxed_extjson(decoded_json), rEJ) for decode_error_case in case_spec.get('decodeErrors', []): with self.assertRaises(InvalidBSON): decode_bson( binascii.unhexlify(b(decode_error_case['bson']))) for parse_error_case in case_spec.get('parseErrors', []): if bson_type == '0x13': self.assertRaises( DecimalException, Decimal128, parse_error_case['string']) elif bson_type == '0x00': description = parse_error_case['description'] if description in _NON_PARSE_ERRORS: decode_extjson(parse_error_case['string']) else: try: decode_extjson(parse_error_case['string']) raise AssertionError('exception not raised for test ' 'case: ' + description) except (ValueError, KeyError, TypeError, InvalidId): pass else: raise AssertionError('cannot test parseErrors for type ' + bson_type) return run_test def create_tests(): for filename in glob.glob(os.path.join(_TEST_PATH, '*.json')): test_suffix, _ = os.path.splitext(os.path.basename(filename)) with codecs.open(filename, encoding='utf-8') as bson_test_file: test_method = create_test(json.load(bson_test_file)) setattr(TestBSONCorpus, 'test_' + test_suffix, test_method) create_tests() if __name__ == '__main__': unittest.main()