from __future__ import absolute_import import gc import sys import time from celery.utils.dispatch import Signal from celery.tests.case import Case if sys.platform.startswith('java'): def garbage_collect(): # Some JVM GCs will execute finalizers in a different thread, meaning # we need to wait for that to complete before we go on looking for the # effects of that. gc.collect() time.sleep(0.1) elif hasattr(sys, 'pypy_version_info'): def garbage_collect(): # noqa # Collecting weakreferences can take two collections on PyPy. gc.collect() gc.collect() else: def garbage_collect(): # noqa gc.collect() def receiver_1_arg(val, **kwargs): return val class Callable(object): def __call__(self, val, **kwargs): return val def a(self, val, **kwargs): return val a_signal = Signal(providing_args=['val']) class DispatcherTests(Case): """Test suite for dispatcher (barely started)""" def _testIsClean(self, signal): """Assert that everything has been cleaned up automatically""" self.assertEqual(signal.receivers, []) # force cleanup just in case signal.receivers = [] def test_exact(self): a_signal.connect(receiver_1_arg, sender=self) expected = [(receiver_1_arg, 'test')] result = a_signal.send(sender=self, val='test') self.assertEqual(result, expected) a_signal.disconnect(receiver_1_arg, sender=self) self._testIsClean(a_signal) def test_ignored_sender(self): a_signal.connect(receiver_1_arg) expected = [(receiver_1_arg, 'test')] result = a_signal.send(sender=self, val='test') self.assertEqual(result, expected) a_signal.disconnect(receiver_1_arg) self._testIsClean(a_signal) def test_garbage_collected(self): a = Callable() a_signal.connect(a.a, sender=self) expected = [] del a garbage_collect() result = a_signal.send(sender=self, val='test') self.assertEqual(result, expected) self._testIsClean(a_signal) def test_multiple_registration(self): a = Callable() a_signal.connect(a) a_signal.connect(a) a_signal.connect(a) a_signal.connect(a) a_signal.connect(a) a_signal.connect(a) result = a_signal.send(sender=self, val='test') self.assertEqual(len(result), 1) self.assertEqual(len(a_signal.receivers), 1) del a del result garbage_collect() self._testIsClean(a_signal) def test_uid_registration(self): def uid_based_receiver_1(**kwargs): pass def uid_based_receiver_2(**kwargs): pass a_signal.connect(uid_based_receiver_1, dispatch_uid='uid') a_signal.connect(uid_based_receiver_2, dispatch_uid='uid') self.assertEqual(len(a_signal.receivers), 1) a_signal.disconnect(dispatch_uid='uid') self._testIsClean(a_signal) def test_robust(self): def fails(val, **kwargs): raise ValueError('this') a_signal.connect(fails) result = a_signal.send_robust(sender=self, val='test') err = result[0][1] self.assertTrue(isinstance(err, ValueError)) self.assertEqual(err.args, ('this',)) a_signal.disconnect(fails) self._testIsClean(a_signal) def test_disconnection(self): receiver_1 = Callable() receiver_2 = Callable() receiver_3 = Callable() a_signal.connect(receiver_1) a_signal.connect(receiver_2) a_signal.connect(receiver_3) a_signal.disconnect(receiver_1) del receiver_2 garbage_collect() a_signal.disconnect(receiver_3) self._testIsClean(a_signal)