# Copyright 2015-present MongoDB, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Test various legacy / deprecated API features.""" import itertools import sys import threading import time import uuid import warnings sys.path[0:0] = [""] from bson.binary import PYTHON_LEGACY, STANDARD from bson.code import Code from bson.codec_options import CodecOptions from bson.dbref import DBRef from bson.objectid import ObjectId from bson.py3compat import string_type from bson.son import SON from pymongo import ASCENDING, DESCENDING from pymongo.database import Database from pymongo.common import partition_node from pymongo.errors import (BulkWriteError, ConfigurationError, CursorNotFound, DocumentTooLarge, DuplicateKeyError, InvalidDocument, InvalidOperation, OperationFailure, WriteConcernError, WTimeoutError) from pymongo.message import _CursorAddress from pymongo.son_manipulator import (AutoReference, NamespaceInjector, ObjectIdShuffler, SONManipulator) from pymongo.write_concern import WriteConcern from test import client_context, qcheck, unittest, SkipTest from test.test_client import IntegrationTest from test.test_bulk import BulkTestBase, BulkAuthorizationTestBase from test.utils import (DeprecationFilter, joinall, oid_generated_on_client, remove_all_users, rs_or_single_client, rs_or_single_client_noauth, single_client, wait_until) class TestDeprecations(IntegrationTest): @classmethod def setUpClass(cls): super(TestDeprecations, cls).setUpClass() cls.deprecation_filter = DeprecationFilter("error") @classmethod def tearDownClass(cls): cls.deprecation_filter.stop() def test_save_deprecation(self): self.assertRaises( DeprecationWarning, lambda: self.db.test.save({})) def test_insert_deprecation(self): self.assertRaises( DeprecationWarning, lambda: self.db.test.insert({})) def test_update_deprecation(self): self.assertRaises( DeprecationWarning, lambda: self.db.test.update({}, {})) def test_remove_deprecation(self): self.assertRaises( DeprecationWarning, lambda: self.db.test.remove({})) def test_find_and_modify_deprecation(self): self.assertRaises( DeprecationWarning, lambda: self.db.test.find_and_modify({'i': 5}, {})) def test_add_son_manipulator_deprecation(self): db = self.client.pymongo_test self.assertRaises(DeprecationWarning, lambda: db.add_son_manipulator(AutoReference(db))) def test_ensure_index_deprecation(self): try: self.assertRaises( DeprecationWarning, lambda: self.db.test.ensure_index('i')) finally: self.db.test.drop() class TestLegacy(IntegrationTest): @classmethod def setUpClass(cls): super(TestLegacy, cls).setUpClass() cls.w = client_context.w cls.deprecation_filter = DeprecationFilter() @classmethod def tearDownClass(cls): cls.deprecation_filter.stop() def test_insert_find_one(self): # Tests legacy insert. db = self.db db.test.drop() self.assertEqual(0, len(list(db.test.find()))) doc = {"hello": u"world"} _id = db.test.insert(doc) self.assertEqual(1, len(list(db.test.find()))) self.assertEqual(doc, db.test.find_one()) self.assertEqual(doc["_id"], _id) self.assertTrue(isinstance(_id, ObjectId)) doc_class = dict # Work around http://bugs.jython.org/issue1728 if (sys.platform.startswith('java') and sys.version_info[:3] >= (2, 5, 2)): doc_class = SON db = self.client.get_database( db.name, codec_options=CodecOptions(document_class=doc_class)) def remove_insert_find_one(doc): db.test.remove({}) db.test.insert(doc) # SON equality is order sensitive. return db.test.find_one() == doc.to_dict() qcheck.check_unittest(self, remove_insert_find_one, qcheck.gen_mongo_dict(3)) def test_generator_insert(self): # Only legacy insert currently supports insert from a generator. db = self.db db.test.remove({}) self.assertEqual(db.test.find().count(), 0) db.test.insert(({'a': i} for i in range(5)), manipulate=False) self.assertEqual(5, db.test.count()) db.test.remove({}) db.test.insert(({'a': i} for i in range(5)), manipulate=True) self.assertEqual(5, db.test.count()) db.test.remove({}) def test_insert_multiple(self): # Tests legacy insert. db = self.db db.drop_collection("test") doc1 = {"hello": u"world"} doc2 = {"hello": u"mike"} self.assertEqual(db.test.find().count(), 0) ids = db.test.insert([doc1, doc2]) self.assertEqual(db.test.find().count(), 2) self.assertEqual(doc1, db.test.find_one({"hello": u"world"})) self.assertEqual(doc2, db.test.find_one({"hello": u"mike"})) self.assertEqual(2, len(ids)) self.assertEqual(doc1["_id"], ids[0]) self.assertEqual(doc2["_id"], ids[1]) ids = db.test.insert([{"hello": 1}]) self.assertTrue(isinstance(ids, list)) self.assertEqual(1, len(ids)) self.assertRaises(InvalidOperation, db.test.insert, []) # Generator that raises StopIteration on first call to next(). self.assertRaises(InvalidOperation, db.test.insert, (i for i in [])) def test_insert_multiple_with_duplicate(self): # Tests legacy insert. db = self.db db.drop_collection("test_insert_multiple_with_duplicate") collection = db.test_insert_multiple_with_duplicate collection.create_index([('i', ASCENDING)], unique=True) # No error collection.insert([{'i': i} for i in range(5, 10)], w=0) wait_until(lambda: 5 == collection.count(), 'insert 5 documents') db.drop_collection("test_insert_multiple_with_duplicate") collection.create_index([('i', ASCENDING)], unique=True) # No error collection.insert([{'i': 1}] * 2, w=0) wait_until(lambda: 1 == collection.count(), 'insert 1 document') self.assertRaises( DuplicateKeyError, lambda: collection.insert([{'i': 2}] * 2), ) db.drop_collection("test_insert_multiple_with_duplicate") db = self.client.get_database( db.name, write_concern=WriteConcern(w=0)) collection = db.test_insert_multiple_with_duplicate collection.create_index([('i', ASCENDING)], unique=True) # No error. collection.insert([{'i': 1}] * 2) wait_until(lambda: 1 == collection.count(), 'insert 1 document') # Implied acknowledged. self.assertRaises( DuplicateKeyError, lambda: collection.insert([{'i': 2}] * 2, fsync=True), ) # Explicit acknowledged. self.assertRaises( DuplicateKeyError, lambda: collection.insert([{'i': 2}] * 2, w=1)) db.drop_collection("test_insert_multiple_with_duplicate") @client_context.require_replica_set def test_insert_prefers_write_errors(self): # Tests legacy insert. collection = self.db.test_insert_prefers_write_errors self.db.drop_collection(collection.name) collection.insert_one({'_id': 1}) large = 's' * 1024 * 1024 * 15 with self.assertRaises(DuplicateKeyError): collection.insert( [{'_id': 1, 's': large}, {'_id': 2, 's': large}]) self.assertEqual(1, collection.count()) with self.assertRaises(DuplicateKeyError): collection.insert( [{'_id': 1, 's': large}, {'_id': 2, 's': large}], continue_on_error=True) self.assertEqual(2, collection.count()) collection.delete_one({'_id': 2}) # A writeError followed by a writeConcernError should prefer to raise # the writeError. with self.assertRaises(DuplicateKeyError): collection.insert( [{'_id': 1, 's': large}, {'_id': 2, 's': large}], continue_on_error=True, w=len(client_context.nodes) + 10, wtimeout=1) self.assertEqual(2, collection.count()) collection.delete_many({}) with self.assertRaises(WriteConcernError): collection.insert( [{'_id': 1, 's': large}, {'_id': 2, 's': large}], continue_on_error=True, w=len(client_context.nodes) + 10, wtimeout=1) self.assertEqual(2, collection.count()) def test_insert_iterables(self): # Tests legacy insert. db = self.db self.assertRaises(TypeError, db.test.insert, 4) self.assertRaises(TypeError, db.test.insert, None) self.assertRaises(TypeError, db.test.insert, True) db.drop_collection("test") self.assertEqual(db.test.find().count(), 0) db.test.insert(({"hello": u"world"}, {"hello": u"world"})) self.assertEqual(db.test.find().count(), 2) db.drop_collection("test") self.assertEqual(db.test.find().count(), 0) db.test.insert(map(lambda x: {"hello": "world"}, itertools.repeat(None, 10))) self.assertEqual(db.test.find().count(), 10) def test_insert_manipulate_false(self): # Test two aspects of legacy insert with manipulate=False: # 1. The return value is None or [None] as appropriate. # 2. _id is not set on the passed-in document object. collection = self.db.test_insert_manipulate_false collection.drop() oid = ObjectId() doc = {'a': oid} try: # The return value is None. self.assertTrue(collection.insert(doc, manipulate=False) is None) # insert() shouldn't set _id on the passed-in document object. self.assertEqual({'a': oid}, doc) # Bulk insert. The return value is a list of None. self.assertEqual([None], collection.insert([{}], manipulate=False)) docs = [{}, {}] ids = collection.insert(docs, manipulate=False) self.assertEqual([None, None], ids) self.assertEqual([{}, {}], docs) finally: collection.drop() def test_continue_on_error(self): # Tests legacy insert. db = self.db db.drop_collection("test_continue_on_error") collection = db.test_continue_on_error oid = collection.insert({"one": 1}) self.assertEqual(1, collection.count()) docs = [] docs.append({"_id": oid, "two": 2}) # Duplicate _id. docs.append({"three": 3}) docs.append({"four": 4}) docs.append({"five": 5}) with self.assertRaises(DuplicateKeyError): collection.insert(docs, manipulate=False) self.assertEqual(1, collection.count()) with self.assertRaises(DuplicateKeyError): collection.insert(docs, manipulate=False, continue_on_error=True) self.assertEqual(4, collection.count()) collection.remove({}, w=client_context.w) oid = collection.insert({"_id": oid, "one": 1}, w=0) wait_until(lambda: 1 == collection.count(), 'insert 1 document') docs[0].pop("_id") docs[2]["_id"] = oid with self.assertRaises(DuplicateKeyError): collection.insert(docs, manipulate=False) self.assertEqual(3, collection.count()) collection.insert(docs, manipulate=False, continue_on_error=True, w=0) wait_until(lambda: 6 == collection.count(), 'insert 3 documents') def test_acknowledged_insert(self): # Tests legacy insert. db = self.db db.drop_collection("test_acknowledged_insert") collection = db.test_acknowledged_insert a = {"hello": "world"} collection.insert(a) collection.insert(a, w=0) self.assertRaises(OperationFailure, collection.insert, a) def test_insert_adds_id(self): # Tests legacy insert. doc = {"hello": "world"} self.db.test.insert(doc) self.assertTrue("_id" in doc) docs = [{"hello": "world"}, {"hello": "world"}] self.db.test.insert(docs) for doc in docs: self.assertTrue("_id" in doc) def test_insert_large_batch(self): # Tests legacy insert. db = self.client.test_insert_large_batch self.addCleanup(self.client.drop_database, 'test_insert_large_batch') max_bson_size = self.client.max_bson_size # Write commands are limited to 16MB + 16k per batch big_string = 'x' * int(max_bson_size / 2) # Batch insert that requires 2 batches. successful_insert = [{'x': big_string}, {'x': big_string}, {'x': big_string}, {'x': big_string}] db.collection_0.insert(successful_insert, w=1) self.assertEqual(4, db.collection_0.count()) db.collection_0.drop() # Test that inserts fail after first error. insert_second_fails = [{'_id': 'id0', 'x': big_string}, {'_id': 'id0', 'x': big_string}, {'_id': 'id1', 'x': big_string}, {'_id': 'id2', 'x': big_string}] with self.assertRaises(DuplicateKeyError): db.collection_1.insert(insert_second_fails) self.assertEqual(1, db.collection_1.count()) db.collection_1.drop() # 2 batches, 2nd insert fails, don't continue on error. self.assertTrue(db.collection_2.insert(insert_second_fails, w=0)) wait_until(lambda: 1 == db.collection_2.count(), 'insert 1 document', timeout=60) db.collection_2.drop() # 2 batches, ids of docs 0 and 1 are dupes, ids of docs 2 and 3 are # dupes. Acknowledged, continue on error. insert_two_failures = [{'_id': 'id0', 'x': big_string}, {'_id': 'id0', 'x': big_string}, {'_id': 'id1', 'x': big_string}, {'_id': 'id1', 'x': big_string}] with self.assertRaises(OperationFailure) as context: db.collection_3.insert(insert_two_failures, continue_on_error=True, w=1) self.assertIn('id1', str(context.exception)) # Only the first and third documents should be inserted. self.assertEqual(2, db.collection_3.count()) db.collection_3.drop() # 2 batches, 2 errors, unacknowledged, continue on error. db.collection_4.insert(insert_two_failures, continue_on_error=True, w=0) # Only the first and third documents are inserted. wait_until(lambda: 2 == db.collection_4.count(), 'insert 2 documents', timeout=60) db.collection_4.drop() def test_bad_dbref(self): # Requires the legacy API to test. c = self.db.test c.drop() # Incomplete DBRefs. self.assertRaises( InvalidDocument, c.insert_one, {'ref': {'$ref': 'collection'}}) self.assertRaises( InvalidDocument, c.insert_one, {'ref': {'$id': ObjectId()}}) ref_only = {'ref': {'$ref': 'collection'}} id_only = {'ref': {'$id': ObjectId()}} def test_update(self): # Tests legacy update. db = self.db db.drop_collection("test") id1 = db.test.save({"x": 5}) db.test.update({}, {"$inc": {"x": 1}}) self.assertEqual(db.test.find_one(id1)["x"], 6) id2 = db.test.save({"x": 1}) db.test.update({"x": 6}, {"$inc": {"x": 1}}) self.assertEqual(db.test.find_one(id1)["x"], 7) self.assertEqual(db.test.find_one(id2)["x"], 1) def test_update_manipulate(self): # Tests legacy update. db = self.db db.drop_collection("test") db.test.insert({'_id': 1}) db.test.update({'_id': 1}, {'a': 1}, manipulate=True) self.assertEqual( {'_id': 1, 'a': 1}, db.test.find_one()) class AddField(SONManipulator): def transform_incoming(self, son, dummy): son['field'] = 'value' return son db.add_son_manipulator(AddField()) db.test.update({'_id': 1}, {'a': 2}, manipulate=False) self.assertEqual( {'_id': 1, 'a': 2}, db.test.find_one()) db.test.update({'_id': 1}, {'a': 3}, manipulate=True) self.assertEqual( {'_id': 1, 'a': 3, 'field': 'value'}, db.test.find_one()) def test_update_nmodified(self): # Tests legacy update. db = self.db db.drop_collection("test") ismaster = self.client.admin.command('ismaster') used_write_commands = (ismaster.get("maxWireVersion", 0) > 1) db.test.insert({'_id': 1}) result = db.test.update({'_id': 1}, {'$set': {'x': 1}}) if used_write_commands: self.assertEqual(1, result['nModified']) else: self.assertFalse('nModified' in result) # x is already 1. result = db.test.update({'_id': 1}, {'$set': {'x': 1}}) if used_write_commands: self.assertEqual(0, result['nModified']) else: self.assertFalse('nModified' in result) def test_multi_update(self): # Tests legacy update. db = self.db db.drop_collection("test") db.test.save({"x": 4, "y": 3}) db.test.save({"x": 5, "y": 5}) db.test.save({"x": 4, "y": 4}) db.test.update({"x": 4}, {"$set": {"y": 5}}, multi=True) self.assertEqual(3, db.test.count()) for doc in db.test.find(): self.assertEqual(5, doc["y"]) self.assertEqual(2, db.test.update({"x": 4}, {"$set": {"y": 6}}, multi=True)["n"]) def test_upsert(self): # Tests legacy update. db = self.db db.drop_collection("test") db.test.update({"page": "/"}, {"$inc": {"count": 1}}, upsert=True) db.test.update({"page": "/"}, {"$inc": {"count": 1}}, upsert=True) self.assertEqual(1, db.test.count()) self.assertEqual(2, db.test.find_one()["count"]) def test_acknowledged_update(self): # Tests legacy update. db = self.db db.drop_collection("test_acknowledged_update") collection = db.test_acknowledged_update collection.create_index("x", unique=True) collection.insert({"x": 5}) _id = collection.insert({"x": 4}) self.assertEqual( None, collection.update({"_id": _id}, {"$inc": {"x": 1}}, w=0)) self.assertRaises(DuplicateKeyError, collection.update, {"_id": _id}, {"$inc": {"x": 1}}) self.assertEqual(1, collection.update({"_id": _id}, {"$inc": {"x": 2}})["n"]) self.assertEqual(0, collection.update({"_id": "foo"}, {"$inc": {"x": 2}})["n"]) db.drop_collection("test_acknowledged_update") def test_update_backward_compat(self): # MongoDB versions >= 2.6.0 don't return the updatedExisting field # and return upsert _id in an array subdocument. This test should # pass regardless of server version or type (mongod/s). # Tests legacy update. c = self.db.test c.drop() oid = ObjectId() res = c.update({'_id': oid}, {'$set': {'a': 'a'}}, upsert=True) self.assertFalse(res.get('updatedExisting')) self.assertEqual(oid, res.get('upserted')) res = c.update({'_id': oid}, {'$set': {'b': 'b'}}) self.assertTrue(res.get('updatedExisting')) def test_save(self): # Tests legacy save. self.db.drop_collection("test_save") collection = self.db.test_save # Save a doc with autogenerated id _id = collection.save({"hello": "world"}) self.assertEqual(collection.find_one()["_id"], _id) self.assertTrue(isinstance(_id, ObjectId)) # Save a doc with explicit id collection.save({"_id": "explicit_id", "hello": "bar"}) doc = collection.find_one({"_id": "explicit_id"}) self.assertEqual(doc['_id'], 'explicit_id') self.assertEqual(doc['hello'], 'bar') # Save docs with _id field already present (shouldn't create new docs) self.assertEqual(2, collection.count()) collection.save({'_id': _id, 'hello': 'world'}) self.assertEqual(2, collection.count()) collection.save({'_id': 'explicit_id', 'hello': 'baz'}) self.assertEqual(2, collection.count()) self.assertEqual( 'baz', collection.find_one({'_id': 'explicit_id'})['hello'] ) # Acknowledged mode. collection.create_index("hello", unique=True) # No exception, even though we duplicate the first doc's "hello" value collection.save({'_id': 'explicit_id', 'hello': 'world'}, w=0) self.assertRaises( DuplicateKeyError, collection.save, {'_id': 'explicit_id', 'hello': 'world'}) self.db.drop_collection("test") def test_save_with_invalid_key(self): if client_context.version.at_least(3, 5, 8): raise SkipTest("MongoDB >= 3.5.8 allows dotted fields in updates") # Tests legacy save. self.db.drop_collection("test") self.assertTrue(self.db.test.insert({"hello": "world"})) doc = self.db.test.find_one() doc['a.b'] = 'c' self.assertRaises(OperationFailure, self.db.test.save, doc) def test_acknowledged_save(self): # Tests legacy save. db = self.db db.drop_collection("test_acknowledged_save") collection = db.test_acknowledged_save collection.create_index("hello", unique=True) collection.save({"hello": "world"}) collection.save({"hello": "world"}, w=0) self.assertRaises(DuplicateKeyError, collection.save, {"hello": "world"}) db.drop_collection("test_acknowledged_save") def test_save_adds_id(self): # Tests legacy save. doc = {"hello": "jesse"} self.db.test.save(doc) self.assertTrue("_id" in doc) def test_save_returns_id(self): doc = {"hello": "jesse"} _id = self.db.test.save(doc) self.assertTrue(isinstance(_id, ObjectId)) self.assertEqual(_id, doc["_id"]) doc["hi"] = "bernie" _id = self.db.test.save(doc) self.assertTrue(isinstance(_id, ObjectId)) self.assertEqual(_id, doc["_id"]) def test_remove_one(self): # Tests legacy remove. self.db.test.remove() self.assertEqual(0, self.db.test.count()) self.db.test.insert({"x": 1}) self.db.test.insert({"y": 1}) self.db.test.insert({"z": 1}) self.assertEqual(3, self.db.test.count()) self.db.test.remove(multi=False) self.assertEqual(2, self.db.test.count()) self.db.test.remove() self.assertEqual(0, self.db.test.count()) def test_remove_all(self): # Tests legacy remove. self.db.test.remove() self.assertEqual(0, self.db.test.count()) self.db.test.insert({"x": 1}) self.db.test.insert({"y": 1}) self.assertEqual(2, self.db.test.count()) self.db.test.remove() self.assertEqual(0, self.db.test.count()) def test_remove_non_objectid(self): # Tests legacy remove. db = self.db db.drop_collection("test") db.test.insert_one({"_id": 5}) self.assertEqual(1, db.test.count()) db.test.remove(5) self.assertEqual(0, db.test.count()) def test_write_large_document(self): # Tests legacy insert, save, and update. max_size = self.db.client.max_bson_size half_size = int(max_size / 2) self.assertEqual(max_size, 16777216) self.assertRaises(OperationFailure, self.db.test.insert, {"foo": "x" * max_size}) self.assertRaises(OperationFailure, self.db.test.save, {"foo": "x" * max_size}) self.assertRaises(OperationFailure, self.db.test.insert, [{"x": 1}, {"foo": "x" * max_size}]) self.db.test.insert([{"foo": "x" * half_size}, {"foo": "x" * half_size}]) self.db.test.insert({"bar": "x"}) # Use w=0 here to test legacy doc size checking in all server versions self.assertRaises(DocumentTooLarge, self.db.test.update, {"bar": "x"}, {"bar": "x" * (max_size - 14)}, w=0) # This will pass with OP_UPDATE or the update command. self.db.test.update({"bar": "x"}, {"bar": "x" * (max_size - 32)}) def test_last_error_options(self): # Tests legacy write methods. self.db.test.save({"x": 1}, w=1, wtimeout=1) self.db.test.insert({"x": 1}, w=1, wtimeout=1) self.db.test.remove({"x": 1}, w=1, wtimeout=1) self.db.test.update({"x": 1}, {"y": 2}, w=1, wtimeout=1) if client_context.replica_set_name: # client_context.w is the number of hosts in the replica set w = client_context.w + 1 # MongoDB 2.8+ raises error code 100, CannotSatisfyWriteConcern, # if w > number of members. Older versions just time out after 1 ms # as if they had enough secondaries but some are lagging. They # return an error with 'wtimeout': True and no code. def wtimeout_err(f, *args, **kwargs): try: f(*args, **kwargs) except WTimeoutError as exc: self.assertIsNotNone(exc.details) except OperationFailure as exc: self.assertIsNotNone(exc.details) self.assertEqual(100, exc.code, "Unexpected error: %r" % exc) else: self.fail("%s should have failed" % f) coll = self.db.test wtimeout_err(coll.save, {"x": 1}, w=w, wtimeout=1) wtimeout_err(coll.insert, {"x": 1}, w=w, wtimeout=1) wtimeout_err(coll.update, {"x": 1}, {"y": 2}, w=w, wtimeout=1) wtimeout_err(coll.remove, {"x": 1}, w=w, wtimeout=1) # can't use fsync and j options together self.assertRaises(ConfigurationError, self.db.test.insert, {"_id": 1}, j=True, fsync=True) def test_find_and_modify(self): c = self.db.test c.drop() c.insert({'_id': 1, 'i': 1}) # Test that we raise DuplicateKeyError when appropriate. c.ensure_index('i', unique=True) self.assertRaises(DuplicateKeyError, c.find_and_modify, query={'i': 1, 'j': 1}, update={'$set': {'k': 1}}, upsert=True) c.drop_indexes() # Test correct findAndModify self.assertEqual({'_id': 1, 'i': 1}, c.find_and_modify({'_id': 1}, {'$inc': {'i': 1}})) self.assertEqual({'_id': 1, 'i': 3}, c.find_and_modify({'_id': 1}, {'$inc': {'i': 1}}, new=True)) self.assertEqual({'_id': 1, 'i': 3}, c.find_and_modify({'_id': 1}, remove=True)) self.assertEqual(None, c.find_one({'_id': 1})) self.assertEqual(None, c.find_and_modify({'_id': 1}, {'$inc': {'i': 1}})) self.assertEqual(None, c.find_and_modify({'_id': 1}, {'$inc': {'i': 1}}, upsert=True)) self.assertEqual({'_id': 1, 'i': 2}, c.find_and_modify({'_id': 1}, {'$inc': {'i': 1}}, upsert=True, new=True)) self.assertEqual({'_id': 1, 'i': 2}, c.find_and_modify({'_id': 1}, {'$inc': {'i': 1}}, fields=['i'])) self.assertEqual({'_id': 1, 'i': 4}, c.find_and_modify({'_id': 1}, {'$inc': {'i': 1}}, new=True, fields={'i': 1})) # Test with full_response=True. result = c.find_and_modify({'_id': 1}, {'$inc': {'i': 1}}, new=True, upsert=True, full_response=True, fields={'i': 1}) self.assertEqual({'_id': 1, 'i': 5}, result["value"]) self.assertEqual(True, result["lastErrorObject"]["updatedExisting"]) result = c.find_and_modify({'_id': 2}, {'$inc': {'i': 1}}, new=True, upsert=True, full_response=True, fields={'i': 1}) self.assertEqual({'_id': 2, 'i': 1}, result["value"]) self.assertEqual(False, result["lastErrorObject"]["updatedExisting"]) class ExtendedDict(dict): pass result = c.find_and_modify({'_id': 1}, {'$inc': {'i': 1}}, new=True, fields={'i': 1}) self.assertFalse(isinstance(result, ExtendedDict)) c = self.db.get_collection( "test", codec_options=CodecOptions(document_class=ExtendedDict)) result = c.find_and_modify({'_id': 1}, {'$inc': {'i': 1}}, new=True, fields={'i': 1}) self.assertTrue(isinstance(result, ExtendedDict)) def test_find_and_modify_with_sort(self): c = self.db.test c.drop() for j in range(5): c.insert({'j': j, 'i': 0}) sort = {'j': DESCENDING} self.assertEqual(4, c.find_and_modify({}, {'$inc': {'i': 1}}, sort=sort)['j']) sort = {'j': ASCENDING} self.assertEqual(0, c.find_and_modify({}, {'$inc': {'i': 1}}, sort=sort)['j']) sort = [('j', DESCENDING)] self.assertEqual(4, c.find_and_modify({}, {'$inc': {'i': 1}}, sort=sort)['j']) sort = [('j', ASCENDING)] self.assertEqual(0, c.find_and_modify({}, {'$inc': {'i': 1}}, sort=sort)['j']) sort = SON([('j', DESCENDING)]) self.assertEqual(4, c.find_and_modify({}, {'$inc': {'i': 1}}, sort=sort)['j']) sort = SON([('j', ASCENDING)]) self.assertEqual(0, c.find_and_modify({}, {'$inc': {'i': 1}}, sort=sort)['j']) try: from collections import OrderedDict sort = OrderedDict([('j', DESCENDING)]) self.assertEqual(4, c.find_and_modify({}, {'$inc': {'i': 1}}, sort=sort)['j']) sort = OrderedDict([('j', ASCENDING)]) self.assertEqual(0, c.find_and_modify({}, {'$inc': {'i': 1}}, sort=sort)['j']) except ImportError: pass # Test that a standard dict with two keys is rejected. sort = {'j': DESCENDING, 'foo': DESCENDING} self.assertRaises(TypeError, c.find_and_modify, {}, {'$inc': {'i': 1}}, sort=sort) def test_find_and_modify_with_manipulator(self): class AddCollectionNameManipulator(SONManipulator): def will_copy(self): return True def transform_incoming(self, son, dummy): copy = SON(son) if 'collection' in copy: del copy['collection'] return copy def transform_outgoing(self, son, collection): copy = SON(son) copy['collection'] = collection.name return copy db = self.client.pymongo_test db.add_son_manipulator(AddCollectionNameManipulator()) c = db.test c.drop() c.insert({'_id': 1, 'i': 1}) # Test correct findAndModify # With manipulators self.assertEqual({'_id': 1, 'i': 1, 'collection': 'test'}, c.find_and_modify({'_id': 1}, {'$inc': {'i': 1}}, manipulate=True)) self.assertEqual({'_id': 1, 'i': 3, 'collection': 'test'}, c.find_and_modify({'_id': 1}, {'$inc': {'i': 1}}, new=True, manipulate=True)) # With out manipulators self.assertEqual({'_id': 1, 'i': 3}, c.find_and_modify({'_id': 1}, {'$inc': {'i': 1}})) self.assertEqual({'_id': 1, 'i': 5}, c.find_and_modify({'_id': 1}, {'$inc': {'i': 1}}, new=True)) def test_group(self): db = self.db db.drop_collection("test") self.assertEqual([], db.test.group([], {}, {"count": 0}, "function (obj, prev) { prev.count++; }" )) db.test.insert_many([{"a": 2}, {"b": 5}, {"a": 1}]) self.assertEqual([{"count": 3}], db.test.group([], {}, {"count": 0}, "function (obj, prev) { prev.count++; }" )) self.assertEqual([{"count": 1}], db.test.group([], {"a": {"$gt": 1}}, {"count": 0}, "function (obj, prev) { prev.count++; }" )) db.test.insert_one({"a": 2, "b": 3}) self.assertEqual([{"a": 2, "count": 2}, {"a": None, "count": 1}, {"a": 1, "count": 1}], db.test.group(["a"], {}, {"count": 0}, "function (obj, prev) { prev.count++; }" )) # modifying finalize self.assertEqual([{"a": 2, "count": 3}, {"a": None, "count": 2}, {"a": 1, "count": 2}], db.test.group(["a"], {}, {"count": 0}, "function (obj, prev) " "{ prev.count++; }", "function (obj) { obj.count++; }")) # returning finalize self.assertEqual([2, 1, 1], db.test.group(["a"], {}, {"count": 0}, "function (obj, prev) " "{ prev.count++; }", "function (obj) { return obj.count; }")) # keyf self.assertEqual([2, 2], db.test.group("function (obj) { if (obj.a == 2) " "{ return {a: true} }; " "return {b: true}; }", {}, {"count": 0}, "function (obj, prev) " "{ prev.count++; }", "function (obj) { return obj.count; }")) # no key self.assertEqual([{"count": 4}], db.test.group(None, {}, {"count": 0}, "function (obj, prev) { prev.count++; }" )) self.assertRaises(OperationFailure, db.test.group, [], {}, {}, "5 ++ 5") def test_group_with_scope(self): db = self.db db.drop_collection("test") db.test.insert_many([{"a": 1}, {"b": 1}]) reduce_function = "function (obj, prev) { prev.count += inc_value; }" self.assertEqual(2, db.test.group([], {}, {"count": 0}, Code(reduce_function, {"inc_value": 1}))[0]['count']) self.assertEqual(4, db.test.group([], {}, {"count": 0}, Code(reduce_function, {"inc_value": 2}))[0]['count']) self.assertEqual(1, db.test.group([], {}, {"count": 0}, Code(reduce_function, {"inc_value": 0.5}))[0]['count']) self.assertEqual(2, db.test.group( [], {}, {"count": 0}, Code(reduce_function, {"inc_value": 1}))[0]['count']) self.assertEqual(4, db.test.group( [], {}, {"count": 0}, Code(reduce_function, {"inc_value": 2}))[0]['count']) self.assertEqual(1, db.test.group( [], {}, {"count": 0}, Code(reduce_function, {"inc_value": 0.5}))[0]['count']) def test_group_uuid_representation(self): db = self.db coll = db.uuid coll.drop() uu = uuid.uuid4() coll.insert_one({"_id": uu, "a": 2}) coll.insert_one({"_id": uuid.uuid4(), "a": 1}) reduce = "function (obj, prev) { prev.count++; }" coll = self.db.get_collection( "uuid", CodecOptions(uuid_representation=STANDARD)) self.assertEqual([], coll.group([], {"_id": uu}, {"count": 0}, reduce)) coll = self.db.get_collection( "uuid", CodecOptions(uuid_representation=PYTHON_LEGACY)) self.assertEqual([{"count": 1}], coll.group([], {"_id": uu}, {"count": 0}, reduce)) def test_last_status(self): # Tests many legacy API elements. # We must call getlasterror on same socket as the last operation. db = rs_or_single_client(maxPoolSize=1).pymongo_test collection = db.test_last_status collection.remove({}) collection.save({"i": 1}) collection.update({"i": 1}, {"$set": {"i": 2}}, w=0) self.assertTrue(db.last_status()["updatedExisting"]) collection.update({"i": 1}, {"$set": {"i": 500}}, w=0) self.assertFalse(db.last_status()["updatedExisting"]) def test_auto_ref_and_deref(self): # Legacy API. db = self.client.pymongo_test db.add_son_manipulator(AutoReference(db)) db.add_son_manipulator(NamespaceInjector()) db.test.a.remove({}) db.test.b.remove({}) db.test.c.remove({}) a = {"hello": u"world"} db.test.a.save(a) b = {"test": a} db.test.b.save(b) c = {"another test": b} db.test.c.save(c) a["hello"] = "mike" db.test.a.save(a) self.assertEqual(db.test.a.find_one(), a) self.assertEqual(db.test.b.find_one()["test"], a) self.assertEqual(db.test.c.find_one()["another test"]["test"], a) self.assertEqual(db.test.b.find_one(), b) self.assertEqual(db.test.c.find_one()["another test"], b) self.assertEqual(db.test.c.find_one(), c) def test_auto_ref_and_deref_list(self): # Legacy API. db = self.client.pymongo_test db.add_son_manipulator(AutoReference(db)) db.add_son_manipulator(NamespaceInjector()) db.drop_collection("users") db.drop_collection("messages") message_1 = {"title": "foo"} db.messages.save(message_1) message_2 = {"title": "bar"} db.messages.save(message_2) user = {"messages": [message_1, message_2]} db.users.save(user) db.messages.update(message_1, {"title": "buzz"}) self.assertEqual("buzz", db.users.find_one()["messages"][0]["title"]) self.assertEqual("bar", db.users.find_one()["messages"][1]["title"]) def test_object_to_dict_transformer(self): # PYTHON-709: Some users rely on their custom SONManipulators to run # before any other checks, so they can insert non-dict objects and # have them dictified before the _id is inserted or any other # processing. # Tests legacy API elements. class Thing(object): def __init__(self, value): self.value = value class ThingTransformer(SONManipulator): def transform_incoming(self, thing, dummy): return {'value': thing.value} db = self.client.foo db.add_son_manipulator(ThingTransformer()) t = Thing('value') db.test.remove() db.test.insert([t]) out = db.test.find_one() self.assertEqual('value', out.get('value')) def test_son_manipulator_outgoing(self): class Thing(object): def __init__(self, value): self.value = value class ThingTransformer(SONManipulator): def transform_outgoing(self, doc, collection): # We don't want this applied to the command return # value in pymongo.cursor.Cursor. if 'value' in doc: return Thing(doc['value']) return doc db = self.client.foo db.add_son_manipulator(ThingTransformer()) db.test.delete_many({}) db.test.insert_one({'value': 'value'}) out = db.test.find_one() self.assertTrue(isinstance(out, Thing)) self.assertEqual('value', out.value) out = next(db.test.aggregate([], cursor={})) self.assertTrue(isinstance(out, Thing)) self.assertEqual('value', out.value) def test_son_manipulator_inheritance(self): # Tests legacy API elements. class Thing(object): def __init__(self, value): self.value = value class ThingTransformer(SONManipulator): def transform_incoming(self, thing, dummy): return {'value': thing.value} def transform_outgoing(self, son, dummy): return Thing(son['value']) class Child(ThingTransformer): pass db = self.client.foo db.add_son_manipulator(Child()) t = Thing('value') db.test.remove() db.test.insert([t]) out = db.test.find_one() self.assertTrue(isinstance(out, Thing)) self.assertEqual('value', out.value) def test_disabling_manipulators(self): class IncByTwo(SONManipulator): def transform_outgoing(self, son, collection): if 'foo' in son: son['foo'] += 2 return son db = self.client.pymongo_test db.add_son_manipulator(IncByTwo()) c = db.test c.drop() c.insert({'foo': 0}) self.assertEqual(2, c.find_one()['foo']) self.assertEqual(0, c.find_one(manipulate=False)['foo']) self.assertEqual(2, c.find_one(manipulate=True)['foo']) c.drop() def test_manipulator_properties(self): db = self.client.foo self.assertEqual([], db.incoming_manipulators) self.assertEqual([], db.incoming_copying_manipulators) self.assertEqual([], db.outgoing_manipulators) self.assertEqual([], db.outgoing_copying_manipulators) db.add_son_manipulator(AutoReference(db)) db.add_son_manipulator(NamespaceInjector()) db.add_son_manipulator(ObjectIdShuffler()) self.assertEqual(1, len(db.incoming_manipulators)) self.assertEqual(db.incoming_manipulators, ['NamespaceInjector']) self.assertEqual(2, len(db.incoming_copying_manipulators)) for name in db.incoming_copying_manipulators: self.assertTrue(name in ('ObjectIdShuffler', 'AutoReference')) self.assertEqual([], db.outgoing_manipulators) self.assertEqual(['AutoReference'], db.outgoing_copying_manipulators) def test_ensure_index(self): db = self.db self.assertRaises(TypeError, db.test.ensure_index, {"hello": 1}) self.assertRaises(TypeError, db.test.ensure_index, {"hello": 1}, cache_for='foo') db.test.drop_indexes() self.assertEqual("goodbye_1", db.test.ensure_index("goodbye")) self.assertEqual(None, db.test.ensure_index("goodbye")) db.test.drop_indexes() self.assertEqual("foo", db.test.ensure_index("goodbye", name="foo")) self.assertEqual(None, db.test.ensure_index("goodbye", name="foo")) db.test.drop_indexes() self.assertEqual("goodbye_1", db.test.ensure_index("goodbye")) self.assertEqual(None, db.test.ensure_index("goodbye")) db.test.drop_index("goodbye_1") self.assertEqual("goodbye_1", db.test.ensure_index("goodbye")) self.assertEqual(None, db.test.ensure_index("goodbye")) db.drop_collection("test") self.assertEqual("goodbye_1", db.test.ensure_index("goodbye")) self.assertEqual(None, db.test.ensure_index("goodbye")) db.test.drop_index("goodbye_1") self.assertEqual("goodbye_1", db.test.ensure_index("goodbye")) self.assertEqual(None, db.test.ensure_index("goodbye")) db.test.drop_index("goodbye_1") self.assertEqual("goodbye_1", db.test.ensure_index("goodbye", cache_for=1)) time.sleep(1.2) self.assertEqual("goodbye_1", db.test.ensure_index("goodbye")) # Make sure the expiration time is updated. self.assertEqual(None, db.test.ensure_index("goodbye")) # Clean up indexes for later tests db.test.drop_indexes() def test_ensure_index_threaded(self): coll = self.db.threaded_index_creation index_docs = [] class Indexer(threading.Thread): def run(self): coll.ensure_index('foo0') coll.ensure_index('foo1') coll.ensure_index('foo2') index_docs.append(coll.index_information()) try: threads = [] for _ in range(10): t = Indexer() t.setDaemon(True) threads.append(t) for thread in threads: thread.start() joinall(threads) first = index_docs[0] for index_doc in index_docs[1:]: self.assertEqual(index_doc, first) finally: coll.drop() def test_ensure_purge_index_threaded(self): coll = self.db.threaded_index_creation class Indexer(threading.Thread): def run(self): coll.ensure_index('foo') try: coll.drop_index('foo') except OperationFailure: # The index may have already been dropped. pass coll.ensure_index('foo') coll.drop_indexes() coll.create_index('foo') try: threads = [] for _ in range(10): t = Indexer() t.setDaemon(True) threads.append(t) for thread in threads: thread.start() joinall(threads) self.assertTrue('foo_1' in coll.index_information()) finally: coll.drop() def test_ensure_unique_index_threaded(self): coll = self.db.test_unique_threaded coll.drop() coll.insert_many([{'foo': i} for i in range(10000)]) class Indexer(threading.Thread): def run(self): try: coll.ensure_index('foo', unique=True) coll.insert_one({'foo': 'bar'}) coll.insert_one({'foo': 'bar'}) except OperationFailure: pass threads = [] for _ in range(10): t = Indexer() t.setDaemon(True) threads.append(t) for i in range(10): threads[i].start() joinall(threads) self.assertEqual(10001, coll.count()) coll.drop() def test_kill_cursors_with_cursoraddress(self): coll = self.client.pymongo_test.test coll.drop() coll.insert_many([{'_id': i} for i in range(200)]) cursor = coll.find().batch_size(1) next(cursor) self.client.kill_cursors( [cursor.cursor_id], _CursorAddress(self.client.address, coll.full_name)) # Prevent killcursors from reaching the server while a getmore is in # progress -- the server logs "Assertion: 16089:Cannot kill active # cursor." time.sleep(2) def raises_cursor_not_found(): try: next(cursor) return False except CursorNotFound: return True wait_until(raises_cursor_not_found, 'close cursor') def test_kill_cursors_with_tuple(self): if (client_context.version[:2] == (3, 6) and client_context.auth_enabled): raise SkipTest("SERVER-33553") coll = self.client.pymongo_test.test coll.drop() coll.insert_many([{'_id': i} for i in range(200)]) cursor = coll.find().batch_size(1) next(cursor) self.client.kill_cursors( [cursor.cursor_id], self.client.address) # Prevent killcursors from reaching the server while a getmore is in # progress -- the server logs "Assertion: 16089:Cannot kill active # cursor." time.sleep(2) def raises_cursor_not_found(): try: next(cursor) return False except CursorNotFound: return True wait_until(raises_cursor_not_found, 'close cursor') def test_get_default_database(self): c = rs_or_single_client("mongodb://%s:%d/foo" % (client_context.host, client_context.port), connect=False) self.assertEqual(Database(c, 'foo'), c.get_default_database()) def test_get_default_database_error(self): # URI with no database. c = rs_or_single_client("mongodb://%s:%d/" % (client_context.host, client_context.port), connect=False) self.assertRaises(ConfigurationError, c.get_default_database) def test_get_default_database_with_authsource(self): # Ensure we distinguish database name from authSource. uri = "mongodb://%s:%d/foo?authSource=src" % ( client_context.host, client_context.port) c = rs_or_single_client(uri, connect=False) self.assertEqual(Database(c, 'foo'), c.get_default_database()) class TestLegacyBulk(BulkTestBase): @classmethod def setUpClass(cls): super(TestLegacyBulk, cls).setUpClass() cls.deprecation_filter = DeprecationFilter() @classmethod def tearDownClass(cls): cls.deprecation_filter.stop() def test_empty(self): bulk = self.coll.initialize_ordered_bulk_op() self.assertRaises(InvalidOperation, bulk.execute) def test_find(self): # find() requires a selector. bulk = self.coll.initialize_ordered_bulk_op() self.assertRaises(TypeError, bulk.find) self.assertRaises(TypeError, bulk.find, 'foo') # No error. bulk.find({}) @client_context.require_version_min(3, 1, 9, -1) def test_bypass_document_validation_bulk_op(self): # Test insert self.coll.insert_one({"z": 0}) self.db.command(SON([("collMod", "test"), ("validator", {"z": {"$gte": 0}})])) bulk = self.coll.initialize_ordered_bulk_op( bypass_document_validation=False) bulk.insert({"z": -1}) # error self.assertRaises(BulkWriteError, bulk.execute) self.assertEqual(0, self.coll.count({"z": -1})) bulk = self.coll.initialize_ordered_bulk_op( bypass_document_validation=True) bulk.insert({"z": -1}) bulk.execute() self.assertEqual(1, self.coll.count({"z": -1})) self.coll.insert_one({"z": 0}) self.db.command(SON([("collMod", "test"), ("validator", {"z": {"$gte": 0}})])) bulk = self.coll.initialize_unordered_bulk_op( bypass_document_validation=False) bulk.insert({"z": -1}) # error self.assertRaises(BulkWriteError, bulk.execute) self.assertEqual(1, self.coll.count({"z": -1})) bulk = self.coll.initialize_unordered_bulk_op( bypass_document_validation=True) bulk.insert({"z": -1}) bulk.execute() self.assertEqual(2, self.coll.count({"z": -1})) self.coll.drop() def test_insert(self): expected = { 'nMatched': 0, 'nModified': 0, 'nUpserted': 0, 'nInserted': 1, 'nRemoved': 0, 'upserted': [], 'writeErrors': [], 'writeConcernErrors': [] } bulk = self.coll.initialize_ordered_bulk_op() self.assertRaises(TypeError, bulk.insert, 1) # find() before insert() is prohibited. self.assertRaises(AttributeError, lambda: bulk.find({}).insert({})) # We don't allow multiple documents per call. self.assertRaises(TypeError, bulk.insert, [{}, {}]) self.assertRaises(TypeError, bulk.insert, ({} for _ in range(2))) bulk.insert({}) result = bulk.execute() self.assertEqualResponse(expected, result) self.assertEqual(1, self.coll.count()) doc = self.coll.find_one() self.assertTrue(oid_generated_on_client(doc['_id'])) bulk = self.coll.initialize_unordered_bulk_op() bulk.insert({}) result = bulk.execute() self.assertEqualResponse(expected, result) self.assertEqual(2, self.coll.count()) def test_insert_check_keys(self): bulk = self.coll.initialize_ordered_bulk_op() bulk.insert({'$dollar': 1}) self.assertRaises(InvalidDocument, bulk.execute) bulk = self.coll.initialize_ordered_bulk_op() bulk.insert({'a.b': 1}) self.assertRaises(InvalidDocument, bulk.execute) def test_update(self): expected = { 'nMatched': 2, 'nModified': 2, 'nUpserted': 0, 'nInserted': 0, 'nRemoved': 0, 'upserted': [], 'writeErrors': [], 'writeConcernErrors': [] } self.coll.insert_many([{}, {}]) bulk = self.coll.initialize_ordered_bulk_op() # update() requires find() first. self.assertRaises( AttributeError, lambda: bulk.update({'$set': {'x': 1}})) self.assertRaises(TypeError, bulk.find({}).update, 1) self.assertRaises(ValueError, bulk.find({}).update, {}) # All fields must be $-operators. self.assertRaises(ValueError, bulk.find({}).update, {'foo': 'bar'}) bulk.find({}).update({'$set': {'foo': 'bar'}}) result = bulk.execute() self.assertEqualResponse(expected, result) self.assertEqual(self.coll.find({'foo': 'bar'}).count(), 2) # All fields must be $-operators -- validated server-side. bulk = self.coll.initialize_ordered_bulk_op() updates = SON([('$set', {'x': 1}), ('y', 1)]) bulk.find({}).update(updates) self.assertRaises(BulkWriteError, bulk.execute) self.coll.delete_many({}) self.coll.insert_many([{}, {}]) bulk = self.coll.initialize_unordered_bulk_op() bulk.find({}).update({'$set': {'bim': 'baz'}}) result = bulk.execute() self.assertEqualResponse( {'nMatched': 2, 'nModified': 2, 'nUpserted': 0, 'nInserted': 0, 'nRemoved': 0, 'upserted': [], 'writeErrors': [], 'writeConcernErrors': []}, result) self.assertEqual(self.coll.find({'bim': 'baz'}).count(), 2) self.coll.insert_one({'x': 1}) bulk = self.coll.initialize_unordered_bulk_op() bulk.find({'x': 1}).update({'$set': {'x': 42}}) result = bulk.execute() self.assertEqualResponse( {'nMatched': 1, 'nModified': 1, 'nUpserted': 0, 'nInserted': 0, 'nRemoved': 0, 'upserted': [], 'writeErrors': [], 'writeConcernErrors': []}, result) self.assertEqual(1, self.coll.find({'x': 42}).count()) # Second time, x is already 42 so nModified is 0. bulk = self.coll.initialize_unordered_bulk_op() bulk.find({'x': 42}).update({'$set': {'x': 42}}) result = bulk.execute() self.assertEqualResponse( {'nMatched': 1, 'nModified': 0, 'nUpserted': 0, 'nInserted': 0, 'nRemoved': 0, 'upserted': [], 'writeErrors': [], 'writeConcernErrors': []}, result) def test_update_one(self): expected = { 'nMatched': 1, 'nModified': 1, 'nUpserted': 0, 'nInserted': 0, 'nRemoved': 0, 'upserted': [], 'writeErrors': [], 'writeConcernErrors': [] } self.coll.insert_many([{}, {}]) bulk = self.coll.initialize_ordered_bulk_op() # update_one() requires find() first. self.assertRaises( AttributeError, lambda: bulk.update_one({'$set': {'x': 1}})) self.assertRaises(TypeError, bulk.find({}).update_one, 1) self.assertRaises(ValueError, bulk.find({}).update_one, {}) self.assertRaises(ValueError, bulk.find({}).update_one, {'foo': 'bar'}) bulk.find({}).update_one({'$set': {'foo': 'bar'}}) result = bulk.execute() self.assertEqualResponse(expected, result) self.assertEqual(self.coll.find({'foo': 'bar'}).count(), 1) self.coll.delete_many({}) self.coll.insert_many([{}, {}]) bulk = self.coll.initialize_unordered_bulk_op() bulk.find({}).update_one({'$set': {'bim': 'baz'}}) result = bulk.execute() self.assertEqualResponse(expected, result) self.assertEqual(self.coll.find({'bim': 'baz'}).count(), 1) # All fields must be $-operators -- validated server-side. bulk = self.coll.initialize_ordered_bulk_op() updates = SON([('$set', {'x': 1}), ('y', 1)]) bulk.find({}).update_one(updates) self.assertRaises(BulkWriteError, bulk.execute) def test_replace_one(self): expected = { 'nMatched': 1, 'nModified': 1, 'nUpserted': 0, 'nInserted': 0, 'nRemoved': 0, 'upserted': [], 'writeErrors': [], 'writeConcernErrors': [] } self.coll.insert_many([{}, {}]) bulk = self.coll.initialize_ordered_bulk_op() self.assertRaises(TypeError, bulk.find({}).replace_one, 1) self.assertRaises(ValueError, bulk.find({}).replace_one, {'$set': {'foo': 'bar'}}) bulk.find({}).replace_one({'foo': 'bar'}) result = bulk.execute() self.assertEqualResponse(expected, result) self.assertEqual(self.coll.find({'foo': 'bar'}).count(), 1) self.coll.delete_many({}) self.coll.insert_many([{}, {}]) bulk = self.coll.initialize_unordered_bulk_op() bulk.find({}).replace_one({'bim': 'baz'}) result = bulk.execute() self.assertEqualResponse(expected, result) self.assertEqual(self.coll.find({'bim': 'baz'}).count(), 1) def test_remove(self): # Test removing all documents, ordered. expected = { 'nMatched': 0, 'nModified': 0, 'nUpserted': 0, 'nInserted': 0, 'nRemoved': 2, 'upserted': [], 'writeErrors': [], 'writeConcernErrors': [] } self.coll.insert_many([{}, {}]) bulk = self.coll.initialize_ordered_bulk_op() # remove() must be preceded by find(). self.assertRaises(AttributeError, lambda: bulk.remove()) bulk.find({}).remove() result = bulk.execute() self.assertEqualResponse(expected, result) self.assertEqual(self.coll.count(), 0) # Test removing some documents, ordered. self.coll.insert_many([{}, {'x': 1}, {}, {'x': 1}]) bulk = self.coll.initialize_ordered_bulk_op() bulk.find({'x': 1}).remove() result = bulk.execute() self.assertEqualResponse( {'nMatched': 0, 'nModified': 0, 'nUpserted': 0, 'nInserted': 0, 'nRemoved': 2, 'upserted': [], 'writeErrors': [], 'writeConcernErrors': []}, result) self.assertEqual(self.coll.count(), 2) self.coll.delete_many({}) # Test removing all documents, unordered. self.coll.insert_many([{}, {}]) bulk = self.coll.initialize_unordered_bulk_op() bulk.find({}).remove() result = bulk.execute() self.assertEqualResponse( {'nMatched': 0, 'nModified': 0, 'nUpserted': 0, 'nInserted': 0, 'nRemoved': 2, 'upserted': [], 'writeErrors': [], 'writeConcernErrors': []}, result) # Test removing some documents, unordered. self.assertEqual(self.coll.count(), 0) self.coll.insert_many([{}, {'x': 1}, {}, {'x': 1}]) bulk = self.coll.initialize_unordered_bulk_op() bulk.find({'x': 1}).remove() result = bulk.execute() self.assertEqualResponse( {'nMatched': 0, 'nModified': 0, 'nUpserted': 0, 'nInserted': 0, 'nRemoved': 2, 'upserted': [], 'writeErrors': [], 'writeConcernErrors': []}, result) self.assertEqual(self.coll.count(), 2) self.coll.delete_many({}) def test_remove_one(self): bulk = self.coll.initialize_ordered_bulk_op() # remove_one() must be preceded by find(). self.assertRaises(AttributeError, lambda: bulk.remove_one()) # Test removing one document, empty selector. # First ordered, then unordered. self.coll.insert_many([{}, {}]) expected = { 'nMatched': 0, 'nModified': 0, 'nUpserted': 0, 'nInserted': 0, 'nRemoved': 1, 'upserted': [], 'writeErrors': [], 'writeConcernErrors': [] } bulk.find({}).remove_one() result = bulk.execute() self.assertEqualResponse(expected, result) self.assertEqual(self.coll.count(), 1) self.coll.insert_one({}) bulk = self.coll.initialize_unordered_bulk_op() bulk.find({}).remove_one() result = bulk.execute() self.assertEqualResponse(expected, result) self.assertEqual(self.coll.count(), 1) # Test removing one document, with a selector. # First ordered, then unordered. self.coll.insert_one({'x': 1}) bulk = self.coll.initialize_ordered_bulk_op() bulk.find({'x': 1}).remove_one() result = bulk.execute() self.assertEqualResponse(expected, result) self.assertEqual([{}], list(self.coll.find({}, {'_id': False}))) self.coll.insert_one({'x': 1}) bulk = self.coll.initialize_unordered_bulk_op() bulk.find({'x': 1}).remove_one() result = bulk.execute() self.assertEqualResponse(expected, result) self.assertEqual([{}], list(self.coll.find({}, {'_id': False}))) def test_upsert(self): bulk = self.coll.initialize_ordered_bulk_op() # upsert() requires find() first. self.assertRaises( AttributeError, lambda: bulk.upsert()) expected = { 'nMatched': 0, 'nModified': 0, 'nUpserted': 1, 'nInserted': 0, 'nRemoved': 0, 'upserted': [{'index': 0, '_id': '...'}] } bulk.find({}).upsert().replace_one({'foo': 'bar'}) result = bulk.execute() self.assertEqualResponse(expected, result) bulk = self.coll.initialize_ordered_bulk_op() bulk.find({}).upsert().update_one({'$set': {'bim': 'baz'}}) result = bulk.execute() self.assertEqualResponse( {'nMatched': 1, 'nModified': 1, 'nUpserted': 0, 'nInserted': 0, 'nRemoved': 0, 'upserted': [], 'writeErrors': [], 'writeConcernErrors': []}, result) self.assertEqual(self.coll.find({'bim': 'baz'}).count(), 1) bulk = self.coll.initialize_ordered_bulk_op() bulk.find({}).upsert().update({'$set': {'bim': 'bop'}}) # Non-upsert, no matches. bulk.find({'x': 1}).update({'$set': {'x': 2}}) result = bulk.execute() self.assertEqualResponse( {'nMatched': 1, 'nModified': 1, 'nUpserted': 0, 'nInserted': 0, 'nRemoved': 0, 'upserted': [], 'writeErrors': [], 'writeConcernErrors': []}, result) self.assertEqual(self.coll.find({'bim': 'bop'}).count(), 1) self.assertEqual(self.coll.find({'x': 2}).count(), 0) def test_upsert_large(self): big = 'a' * (client_context.client.max_bson_size - 37) bulk = self.coll.initialize_ordered_bulk_op() bulk.find({'x': 1}).upsert().update({'$set': {'s': big}}) result = bulk.execute() self.assertEqualResponse( {'nMatched': 0, 'nModified': 0, 'nUpserted': 1, 'nInserted': 0, 'nRemoved': 0, 'upserted': [{'index': 0, '_id': '...'}]}, result) self.assertEqual(1, self.coll.find({'x': 1}).count()) def test_client_generated_upsert_id(self): batch = self.coll.initialize_ordered_bulk_op() batch.find({'_id': 0}).upsert().update_one({'$set': {'a': 0}}) batch.find({'a': 1}).upsert().replace_one({'_id': 1}) # This is just here to make the counts right in all cases. batch.find({'_id': 2}).upsert().replace_one({'_id': 2}) result = batch.execute() self.assertEqualResponse( {'nMatched': 0, 'nModified': 0, 'nUpserted': 3, 'nInserted': 0, 'nRemoved': 0, 'upserted': [{'index': 0, '_id': 0}, {'index': 1, '_id': 1}, {'index': 2, '_id': 2}]}, result) def test_single_ordered_batch(self): batch = self.coll.initialize_ordered_bulk_op() batch.insert({'a': 1}) batch.find({'a': 1}).update_one({'$set': {'b': 1}}) batch.find({'a': 2}).upsert().update_one({'$set': {'b': 2}}) batch.insert({'a': 3}) batch.find({'a': 3}).remove() result = batch.execute() self.assertEqualResponse( {'nMatched': 1, 'nModified': 1, 'nUpserted': 1, 'nInserted': 2, 'nRemoved': 1, 'upserted': [{'index': 2, '_id': '...'}]}, result) def test_single_error_ordered_batch(self): self.coll.create_index('a', unique=True) self.addCleanup(self.coll.drop_index, [('a', 1)]) batch = self.coll.initialize_ordered_bulk_op() batch.insert({'b': 1, 'a': 1}) batch.find({'b': 2}).upsert().update_one({'$set': {'a': 1}}) batch.insert({'b': 3, 'a': 2}) try: batch.execute() except BulkWriteError as exc: result = exc.details self.assertEqual(exc.code, 65) else: self.fail("Error not raised") self.assertEqualResponse( {'nMatched': 0, 'nModified': 0, 'nUpserted': 0, 'nInserted': 1, 'nRemoved': 0, 'upserted': [], 'writeConcernErrors': [], 'writeErrors': [ {'index': 1, 'code': 11000, 'errmsg': '...', 'op': {'q': {'b': 2}, 'u': {'$set': {'a': 1}}, 'multi': False, 'upsert': True}}]}, result) def test_multiple_error_ordered_batch(self): self.coll.create_index('a', unique=True) self.addCleanup(self.coll.drop_index, [('a', 1)]) batch = self.coll.initialize_ordered_bulk_op() batch.insert({'b': 1, 'a': 1}) batch.find({'b': 2}).upsert().update_one({'$set': {'a': 1}}) batch.find({'b': 3}).upsert().update_one({'$set': {'a': 2}}) batch.find({'b': 2}).upsert().update_one({'$set': {'a': 1}}) batch.insert({'b': 4, 'a': 3}) batch.insert({'b': 5, 'a': 1}) try: batch.execute() except BulkWriteError as exc: result = exc.details self.assertEqual(exc.code, 65) else: self.fail("Error not raised") self.assertEqualResponse( {'nMatched': 0, 'nModified': 0, 'nUpserted': 0, 'nInserted': 1, 'nRemoved': 0, 'upserted': [], 'writeConcernErrors': [], 'writeErrors': [ {'index': 1, 'code': 11000, 'errmsg': '...', 'op': {'q': {'b': 2}, 'u': {'$set': {'a': 1}}, 'multi': False, 'upsert': True}}]}, result) def test_single_unordered_batch(self): batch = self.coll.initialize_unordered_bulk_op() batch.insert({'a': 1}) batch.find({'a': 1}).update_one({'$set': {'b': 1}}) batch.find({'a': 2}).upsert().update_one({'$set': {'b': 2}}) batch.insert({'a': 3}) batch.find({'a': 3}).remove() result = batch.execute() self.assertEqualResponse( {'nMatched': 1, 'nModified': 1, 'nUpserted': 1, 'nInserted': 2, 'nRemoved': 1, 'upserted': [{'index': 2, '_id': '...'}], 'writeErrors': [], 'writeConcernErrors': []}, result) def test_single_error_unordered_batch(self): self.coll.create_index('a', unique=True) self.addCleanup(self.coll.drop_index, [('a', 1)]) batch = self.coll.initialize_unordered_bulk_op() batch.insert({'b': 1, 'a': 1}) batch.find({'b': 2}).upsert().update_one({'$set': {'a': 1}}) batch.insert({'b': 3, 'a': 2}) try: batch.execute() except BulkWriteError as exc: result = exc.details self.assertEqual(exc.code, 65) else: self.fail("Error not raised") self.assertEqualResponse( {'nMatched': 0, 'nModified': 0, 'nUpserted': 0, 'nInserted': 2, 'nRemoved': 0, 'upserted': [], 'writeConcernErrors': [], 'writeErrors': [ {'index': 1, 'code': 11000, 'errmsg': '...', 'op': {'q': {'b': 2}, 'u': {'$set': {'a': 1}}, 'multi': False, 'upsert': True}}]}, result) def test_multiple_error_unordered_batch(self): self.coll.create_index('a', unique=True) self.addCleanup(self.coll.drop_index, [('a', 1)]) batch = self.coll.initialize_unordered_bulk_op() batch.insert({'b': 1, 'a': 1}) batch.find({'b': 2}).upsert().update_one({'$set': {'a': 3}}) batch.find({'b': 3}).upsert().update_one({'$set': {'a': 4}}) batch.find({'b': 4}).upsert().update_one({'$set': {'a': 3}}) batch.insert({'b': 5, 'a': 2}) batch.insert({'b': 6, 'a': 1}) try: batch.execute() except BulkWriteError as exc: result = exc.details self.assertEqual(exc.code, 65) else: self.fail("Error not raised") # Assume the update at index 1 runs before the update at index 3, # although the spec does not require it. Same for inserts. self.assertEqualResponse( {'nMatched': 0, 'nModified': 0, 'nUpserted': 2, 'nInserted': 2, 'nRemoved': 0, 'upserted': [ {'index': 1, '_id': '...'}, {'index': 2, '_id': '...'}], 'writeConcernErrors': [], 'writeErrors': [ {'index': 3, 'code': 11000, 'errmsg': '...', 'op': {'q': {'b': 4}, 'u': {'$set': {'a': 3}}, 'multi': False, 'upsert': True}}, {'index': 5, 'code': 11000, 'errmsg': '...', 'op': {'_id': '...', 'b': 6, 'a': 1}}]}, result) def test_large_inserts_ordered(self): big = 'x' * self.coll.database.client.max_bson_size batch = self.coll.initialize_ordered_bulk_op() batch.insert({'b': 1, 'a': 1}) batch.insert({'big': big}) batch.insert({'b': 2, 'a': 2}) try: batch.execute() except BulkWriteError as exc: result = exc.details self.assertEqual(exc.code, 65) else: self.fail("Error not raised") self.assertEqual(1, result['nInserted']) self.coll.delete_many({}) big = 'x' * (1024 * 1024 * 4) batch = self.coll.initialize_ordered_bulk_op() batch.insert({'a': 1, 'big': big}) batch.insert({'a': 2, 'big': big}) batch.insert({'a': 3, 'big': big}) batch.insert({'a': 4, 'big': big}) batch.insert({'a': 5, 'big': big}) batch.insert({'a': 6, 'big': big}) result = batch.execute() self.assertEqual(6, result['nInserted']) self.assertEqual(6, self.coll.count()) def test_large_inserts_unordered(self): big = 'x' * self.coll.database.client.max_bson_size batch = self.coll.initialize_unordered_bulk_op() batch.insert({'b': 1, 'a': 1}) batch.insert({'big': big}) batch.insert({'b': 2, 'a': 2}) try: batch.execute() except BulkWriteError as exc: result = exc.details self.assertEqual(exc.code, 65) else: self.fail("Error not raised") self.assertEqual(2, result['nInserted']) self.coll.delete_many({}) big = 'x' * (1024 * 1024 * 4) batch = self.coll.initialize_ordered_bulk_op() batch.insert({'a': 1, 'big': big}) batch.insert({'a': 2, 'big': big}) batch.insert({'a': 3, 'big': big}) batch.insert({'a': 4, 'big': big}) batch.insert({'a': 5, 'big': big}) batch.insert({'a': 6, 'big': big}) result = batch.execute() self.assertEqual(6, result['nInserted']) self.assertEqual(6, self.coll.count()) def test_numerous_inserts(self): # Ensure we don't exceed server's 1000-document batch size limit. n_docs = 2100 batch = self.coll.initialize_unordered_bulk_op() for _ in range(n_docs): batch.insert({}) result = batch.execute() self.assertEqual(n_docs, result['nInserted']) self.assertEqual(n_docs, self.coll.count()) # Same with ordered bulk. self.coll.delete_many({}) batch = self.coll.initialize_ordered_bulk_op() for _ in range(n_docs): batch.insert({}) result = batch.execute() self.assertEqual(n_docs, result['nInserted']) self.assertEqual(n_docs, self.coll.count()) def test_multiple_execution(self): batch = self.coll.initialize_ordered_bulk_op() batch.insert({}) batch.execute() self.assertRaises(InvalidOperation, batch.execute) def test_generator_insert(self): def gen(): yield {'a': 1, 'b': 1} yield {'a': 1, 'b': 2} yield {'a': 2, 'b': 3} yield {'a': 3, 'b': 5} yield {'a': 5, 'b': 8} result = self.coll.insert_many(gen()) self.assertEqual(5, len(result.inserted_ids)) class TestLegacyBulkNoResults(BulkTestBase): @classmethod def setUpClass(cls): super(TestLegacyBulkNoResults, cls).setUpClass() cls.deprecation_filter = DeprecationFilter() @classmethod def tearDownClass(cls): cls.deprecation_filter.stop() def tearDown(self): self.coll.delete_many({}) def test_no_results_ordered_success(self): batch = self.coll.initialize_ordered_bulk_op() batch.insert({'_id': 1}) batch.find({'_id': 3}).upsert().update_one({'$set': {'b': 1}}) batch.insert({'_id': 2}) batch.find({'_id': 1}).remove_one() self.assertTrue(batch.execute({'w': 0}) is None) wait_until(lambda: 2 == self.coll.count(), 'insert 2 documents') wait_until(lambda: self.coll.find_one({'_id': 1}) is None, 'removed {"_id": 1}') def test_no_results_ordered_failure(self): batch = self.coll.initialize_ordered_bulk_op() batch.insert({'_id': 1}) batch.find({'_id': 3}).upsert().update_one({'$set': {'b': 1}}) batch.insert({'_id': 2}) # Fails with duplicate key error. batch.insert({'_id': 1}) # Should not be executed since the batch is ordered. batch.find({'_id': 1}).remove_one() self.assertTrue(batch.execute({'w': 0}) is None) wait_until(lambda: 3 == self.coll.count(), 'insert 3 documents') self.assertEqual({'_id': 1}, self.coll.find_one({'_id': 1})) def test_no_results_unordered_success(self): batch = self.coll.initialize_unordered_bulk_op() batch.insert({'_id': 1}) batch.find({'_id': 3}).upsert().update_one({'$set': {'b': 1}}) batch.insert({'_id': 2}) batch.find({'_id': 1}).remove_one() self.assertTrue(batch.execute({'w': 0}) is None) wait_until(lambda: 2 == self.coll.count(), 'insert 2 documents') wait_until(lambda: self.coll.find_one({'_id': 1}) is None, 'removed {"_id": 1}') def test_no_results_unordered_failure(self): batch = self.coll.initialize_unordered_bulk_op() batch.insert({'_id': 1}) batch.find({'_id': 3}).upsert().update_one({'$set': {'b': 1}}) batch.insert({'_id': 2}) # Fails with duplicate key error. batch.insert({'_id': 1}) # Should be executed since the batch is unordered. batch.find({'_id': 1}).remove_one() self.assertTrue(batch.execute({'w': 0}) is None) wait_until(lambda: 2 == self.coll.count(), 'insert 2 documents') wait_until(lambda: self.coll.find_one({'_id': 1}) is None, 'removed {"_id": 1}') class TestLegacyBulkWriteConcern(BulkTestBase): @classmethod def setUpClass(cls): super(TestLegacyBulkWriteConcern, cls).setUpClass() cls.w = client_context.w cls.secondary = None if cls.w > 1: for member in client_context.ismaster['hosts']: if member != client_context.ismaster['primary']: cls.secondary = single_client(*partition_node(member)) break # We tested wtimeout errors by specifying a write concern greater than # the number of members, but in MongoDB 2.7.8+ this causes a different # sort of error, "Not enough data-bearing nodes". In recent servers we # use a failpoint to pause replication on a secondary. cls.need_replication_stopped = client_context.version.at_least(2, 7, 8) cls.deprecation_filter = DeprecationFilter() @classmethod def tearDownClass(cls): cls.deprecation_filter.stop() def cause_wtimeout(self, batch): if self.need_replication_stopped: if not client_context.test_commands_enabled: raise SkipTest("Test commands must be enabled.") self.secondary.admin.command('configureFailPoint', 'rsSyncApplyStop', mode='alwaysOn') try: return batch.execute({'w': self.w, 'wtimeout': 1}) finally: self.secondary.admin.command('configureFailPoint', 'rsSyncApplyStop', mode='off') else: return batch.execute({'w': self.w + 1, 'wtimeout': 1}) def test_fsync_and_j(self): batch = self.coll.initialize_ordered_bulk_op() batch.insert({'a': 1}) self.assertRaises( ConfigurationError, batch.execute, {'fsync': True, 'j': True}) @client_context.require_replica_set def test_write_concern_failure_ordered(self): # Ensure we don't raise on wnote. batch = self.coll.initialize_ordered_bulk_op() batch.find({"something": "that does no exist"}).remove() self.assertTrue(batch.execute({"w": self.w})) batch = self.coll.initialize_ordered_bulk_op() batch.insert({'a': 1}) batch.insert({'a': 2}) # Replication wtimeout is a 'soft' error. # It shouldn't stop batch processing. try: self.cause_wtimeout(batch) except BulkWriteError as exc: result = exc.details self.assertEqual(exc.code, 65) else: self.fail("Error not raised") self.assertEqualResponse( {'nMatched': 0, 'nModified': 0, 'nUpserted': 0, 'nInserted': 2, 'nRemoved': 0, 'upserted': [], 'writeErrors': []}, result) # When talking to legacy servers there will be a # write concern error for each operation. self.assertTrue(len(result['writeConcernErrors']) > 0) failed = result['writeConcernErrors'][0] self.assertEqual(64, failed['code']) self.assertTrue(isinstance(failed['errmsg'], string_type)) self.coll.delete_many({}) self.coll.create_index('a', unique=True) self.addCleanup(self.coll.drop_index, [('a', 1)]) # Fail due to write concern support as well # as duplicate key error on ordered batch. batch = self.coll.initialize_ordered_bulk_op() batch.insert({'a': 1}) batch.find({'a': 3}).upsert().replace_one({'b': 1}) batch.insert({'a': 1}) batch.insert({'a': 2}) try: self.cause_wtimeout(batch) except BulkWriteError as exc: result = exc.details self.assertEqual(exc.code, 65) else: self.fail("Error not raised") self.assertEqualResponse( {'nMatched': 0, 'nModified': 0, 'nUpserted': 1, 'nInserted': 1, 'nRemoved': 0, 'upserted': [{'index': 1, '_id': '...'}], 'writeErrors': [ {'index': 2, 'code': 11000, 'errmsg': '...', 'op': {'_id': '...', 'a': 1}}]}, result) self.assertTrue(len(result['writeConcernErrors']) > 1) failed = result['writeErrors'][0] self.assertTrue("duplicate" in failed['errmsg']) @client_context.require_replica_set def test_write_concern_failure_unordered(self): # Ensure we don't raise on wnote. batch = self.coll.initialize_unordered_bulk_op() batch.find({"something": "that does no exist"}).remove() self.assertTrue(batch.execute({"w": self.w})) batch = self.coll.initialize_unordered_bulk_op() batch.insert({'a': 1}) batch.find({'a': 3}).upsert().update_one({'$set': {'a': 3, 'b': 1}}) batch.insert({'a': 2}) # Replication wtimeout is a 'soft' error. # It shouldn't stop batch processing. try: self.cause_wtimeout(batch) except BulkWriteError as exc: result = exc.details self.assertEqual(exc.code, 65) else: self.fail("Error not raised") self.assertEqual(2, result['nInserted']) self.assertEqual(1, result['nUpserted']) self.assertEqual(0, len(result['writeErrors'])) # When talking to legacy servers there will be a # write concern error for each operation. self.assertTrue(len(result['writeConcernErrors']) > 1) self.coll.delete_many({}) self.coll.create_index('a', unique=True) self.addCleanup(self.coll.drop_index, [('a', 1)]) # Fail due to write concern support as well # as duplicate key error on unordered batch. batch = self.coll.initialize_unordered_bulk_op() batch.insert({'a': 1}) batch.find({'a': 3}).upsert().update_one({'$set': {'a': 3, 'b': 1}}) batch.insert({'a': 1}) batch.insert({'a': 2}) try: self.cause_wtimeout(batch) except BulkWriteError as exc: result = exc.details self.assertEqual(exc.code, 65) else: self.fail("Error not raised") self.assertEqual(2, result['nInserted']) self.assertEqual(1, result['nUpserted']) self.assertEqual(1, len(result['writeErrors'])) # When talking to legacy servers there will be a # write concern error for each operation. self.assertTrue(len(result['writeConcernErrors']) > 1) failed = result['writeErrors'][0] self.assertEqual(2, failed['index']) self.assertEqual(11000, failed['code']) self.assertTrue(isinstance(failed['errmsg'], string_type)) self.assertEqual(1, failed['op']['a']) failed = result['writeConcernErrors'][0] self.assertEqual(64, failed['code']) self.assertTrue(isinstance(failed['errmsg'], string_type)) upserts = result['upserted'] self.assertEqual(1, len(upserts)) self.assertEqual(1, upserts[0]['index']) self.assertTrue(upserts[0].get('_id')) class TestLegacyBulkAuthorization(BulkAuthorizationTestBase): @classmethod def setUpClass(cls): super(TestLegacyBulkAuthorization, cls).setUpClass() cls.deprecation_filter = DeprecationFilter() @classmethod def tearDownClass(cls): cls.deprecation_filter.stop() def test_readonly(self): # We test that an authorization failure aborts the batch and is raised # as OperationFailure. cli = rs_or_single_client_noauth() db = cli.pymongo_test coll = db.test db.authenticate('readonly', 'pw') bulk = coll.initialize_ordered_bulk_op() bulk.insert({'x': 1}) self.assertRaises(OperationFailure, bulk.execute) def test_no_remove(self): # We test that an authorization failure aborts the batch and is raised # as OperationFailure. cli = rs_or_single_client_noauth() db = cli.pymongo_test coll = db.test db.authenticate('noremove', 'pw') bulk = coll.initialize_ordered_bulk_op() bulk.insert({'x': 1}) bulk.find({'x': 2}).upsert().replace_one({'x': 2}) bulk.find({}).remove() # Prohibited. bulk.insert({'x': 3}) # Never attempted. self.assertRaises(OperationFailure, bulk.execute) self.assertEqual(set([1, 2]), set(self.coll.distinct('x'))) if __name__ == "__main__": unittest.main()