"""Worker Task Consumer Bootstep.""" from __future__ import absolute_import, unicode_literals from kombu.common import QoS, ignore_errors from celery import bootsteps from celery.utils.log import get_logger from .mingle import Mingle __all__ = ['Tasks'] logger = get_logger(__name__) debug = logger.debug class Tasks(bootsteps.StartStopStep): """Bootstep starting the task message consumer.""" requires = (Mingle,) def __init__(self, c, **kwargs): c.task_consumer = c.qos = None super(Tasks, self).__init__(c, **kwargs) def start(self, c): """Start task consumer.""" c.update_strategies() # - RabbitMQ 3.3 completely redefines how basic_qos works.. # This will detect if the new qos smenatics is in effect, # and if so make sure the 'apply_global' flag is set on qos updates. qos_global = not c.connection.qos_semantics_matches_spec # set initial prefetch count c.connection.default_channel.basic_qos( 0, c.initial_prefetch_count, qos_global, ) c.task_consumer = c.app.amqp.TaskConsumer( c.connection, on_decode_error=c.on_decode_error, ) def set_prefetch_count(prefetch_count): return c.task_consumer.qos( prefetch_count=prefetch_count, apply_global=qos_global, ) c.qos = QoS(set_prefetch_count, c.initial_prefetch_count) def stop(self, c): """Stop task consumer.""" if c.task_consumer: debug('Canceling task consumer...') ignore_errors(c, c.task_consumer.cancel) def shutdown(self, c): """Shutdown task consumer.""" if c.task_consumer: self.stop(c) debug('Closing consumer channel...') ignore_errors(c, c.task_consumer.close) c.task_consumer = None def info(self, c): """Return task consumer info.""" return {'prefetch_count': c.qos.value if c.qos else 'N/A'}