# Copyright 2011-present MongoDB, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Test the pymongo common module.""" import sys import uuid sys.path[0:0] = [""] from bson.binary import UUIDLegacy, PYTHON_LEGACY, STANDARD from bson.code import Code from bson.codec_options import CodecOptions from bson.objectid import ObjectId from pymongo.errors import OperationFailure from pymongo.write_concern import WriteConcern from test import client_context, IntegrationTest from test.utils import connected, rs_or_single_client, single_client @client_context.require_connection def setUpModule(): pass class TestCommon(IntegrationTest): def test_uuid_representation(self): coll = self.db.uuid coll.drop() # Test property self.assertEqual(PYTHON_LEGACY, coll.codec_options.uuid_representation) # Test basic query uu = uuid.uuid4() # Insert as binary subtype 3 coll.insert_one({'uu': uu}) self.assertEqual(uu, coll.find_one({'uu': uu})['uu']) coll = self.db.get_collection( "uuid", CodecOptions(uuid_representation=STANDARD)) self.assertEqual(STANDARD, coll.codec_options.uuid_representation) self.assertEqual(None, coll.find_one({'uu': uu})) self.assertEqual(uu, coll.find_one({'uu': UUIDLegacy(uu)})['uu']) # Test Cursor.count self.assertEqual(0, coll.find({'uu': uu}).count()) coll = self.db.get_collection( "uuid", CodecOptions(uuid_representation=PYTHON_LEGACY)) self.assertEqual(1, coll.find({'uu': uu}).count()) # Test delete coll = self.db.get_collection( "uuid", CodecOptions(uuid_representation=STANDARD)) coll.delete_one({'uu': uu}) self.assertEqual(1, coll.count()) coll = self.db.get_collection( "uuid", CodecOptions(uuid_representation=PYTHON_LEGACY)) coll.delete_one({'uu': uu}) self.assertEqual(0, coll.count()) # Test update_one coll.insert_one({'_id': uu, 'i': 1}) coll = self.db.get_collection( "uuid", CodecOptions(uuid_representation=STANDARD)) coll.update_one({'_id': uu}, {'$set': {'i': 2}}) coll = self.db.get_collection( "uuid", CodecOptions(uuid_representation=PYTHON_LEGACY)) self.assertEqual(1, coll.find_one({'_id': uu})['i']) coll.update_one({'_id': uu}, {'$set': {'i': 2}}) self.assertEqual(2, coll.find_one({'_id': uu})['i']) # Test Cursor.distinct self.assertEqual([2], coll.find({'_id': uu}).distinct('i')) coll = self.db.get_collection( "uuid", CodecOptions(uuid_representation=STANDARD)) self.assertEqual([], coll.find({'_id': uu}).distinct('i')) # Test findAndModify self.assertEqual(None, coll.find_one_and_update({'_id': uu}, {'$set': {'i': 5}})) coll = self.db.get_collection( "uuid", CodecOptions(uuid_representation=PYTHON_LEGACY)) self.assertEqual(2, coll.find_one_and_update({'_id': uu}, {'$set': {'i': 5}})['i']) self.assertEqual(5, coll.find_one({'_id': uu})['i']) # Test command self.assertEqual(5, self.db.command('findAndModify', 'uuid', update={'$set': {'i': 6}}, query={'_id': uu})['value']['i']) self.assertEqual(6, self.db.command( 'findAndModify', 'uuid', update={'$set': {'i': 7}}, query={'_id': UUIDLegacy(uu)})['value']['i']) # Test (inline)_map_reduce coll.drop() coll.insert_one({"_id": uu, "x": 1, "tags": ["dog", "cat"]}) coll.insert_one({"_id": uuid.uuid4(), "x": 3, "tags": ["mouse", "cat", "dog"]}) map = Code("function () {" " this.tags.forEach(function(z) {" " emit(z, 1);" " });" "}") reduce = Code("function (key, values) {" " var total = 0;" " for (var i = 0; i < values.length; i++) {" " total += values[i];" " }" " return total;" "}") coll = self.db.get_collection( "uuid", CodecOptions(uuid_representation=STANDARD)) q = {"_id": uu} result = coll.inline_map_reduce(map, reduce, query=q) self.assertEqual([], result) result = coll.map_reduce(map, reduce, "results", query=q) self.assertEqual(0, self.db.results.count()) coll = self.db.get_collection( "uuid", CodecOptions(uuid_representation=PYTHON_LEGACY)) q = {"_id": uu} result = coll.inline_map_reduce(map, reduce, query=q) self.assertEqual(2, len(result)) result = coll.map_reduce(map, reduce, "results", query=q) self.assertEqual(2, self.db.results.count()) self.db.drop_collection("result") coll.drop() def test_write_concern(self): c = rs_or_single_client(connect=False) self.assertEqual(WriteConcern(), c.write_concern) c = rs_or_single_client(connect=False, w=2, wtimeout=1000) wc = WriteConcern(w=2, wtimeout=1000) self.assertEqual(wc, c.write_concern) db = c.pymongo_test self.assertEqual(wc, db.write_concern) coll = db.test self.assertEqual(wc, coll.write_concern) cwc = WriteConcern(j=True) coll = db.get_collection('test', write_concern=cwc) self.assertEqual(cwc, coll.write_concern) self.assertEqual(wc, db.write_concern) def test_mongo_client(self): pair = client_context.pair m = rs_or_single_client(w=0) coll = m.pymongo_test.write_concern_test coll.drop() doc = {"_id": ObjectId()} coll.insert_one(doc) self.assertTrue(coll.insert_one(doc)) coll = coll.with_options(write_concern=WriteConcern(w=1)) self.assertRaises(OperationFailure, coll.insert_one, doc) m = rs_or_single_client() coll = m.pymongo_test.write_concern_test new_coll = coll.with_options(write_concern=WriteConcern(w=0)) self.assertTrue(new_coll.insert_one(doc)) self.assertRaises(OperationFailure, coll.insert_one, doc) m = rs_or_single_client("mongodb://%s/" % (pair,), replicaSet=client_context.replica_set_name) coll = m.pymongo_test.write_concern_test self.assertRaises(OperationFailure, coll.insert_one, doc) m = rs_or_single_client("mongodb://%s/?w=0" % (pair,), replicaSet=client_context.replica_set_name) coll = m.pymongo_test.write_concern_test coll.insert_one(doc) # Equality tests direct = connected(single_client(w=0)) direct2 = connected(single_client("mongodb://%s/?w=0" % (pair,), **self.credentials)) self.assertEqual(direct, direct2) self.assertFalse(direct != direct2) if __name__ == "__main__": unittest.main()