# -*- coding: utf-8 -*- """Scheduler for Python functions. .. note:: This is used for the thread-based worker only, not for amqp/redis/sqs/qpid where :mod:`kombu.async.timer` is used. """ from __future__ import absolute_import, print_function, unicode_literals import os import sys import threading from itertools import count from time import sleep from celery.five import THREAD_TIMEOUT_MAX from kombu.async.timer import Entry, Timer as Schedule, to_timestamp, logger TIMER_DEBUG = os.environ.get('TIMER_DEBUG') __all__ = ['Entry', 'Schedule', 'Timer', 'to_timestamp'] class Timer(threading.Thread): """Timer thread. Note: This is only used for transports not supporting AsyncIO. """ Entry = Entry Schedule = Schedule running = False on_tick = None _timer_count = count(1) if TIMER_DEBUG: # pragma: no cover def start(self, *args, **kwargs): import traceback print('- Timer starting') traceback.print_stack() super(Timer, self).start(*args, **kwargs) def __init__(self, schedule=None, on_error=None, on_tick=None, on_start=None, max_interval=None, **kwargs): self.schedule = schedule or self.Schedule(on_error=on_error, max_interval=max_interval) self.on_start = on_start self.on_tick = on_tick or self.on_tick threading.Thread.__init__(self) self._is_shutdown = threading.Event() self._is_stopped = threading.Event() self.mutex = threading.Lock() self.not_empty = threading.Condition(self.mutex) self.daemon = True self.name = 'Timer-{0}'.format(next(self._timer_count)) def _next_entry(self): with self.not_empty: delay, entry = next(self.scheduler) if entry is None: if delay is None: self.not_empty.wait(1.0) return delay return self.schedule.apply_entry(entry) __next__ = next = _next_entry # for 2to3 def run(self): try: self.running = True self.scheduler = iter(self.schedule) while not self._is_shutdown.isSet(): delay = self._next_entry() if delay: if self.on_tick: self.on_tick(delay) if sleep is None: # pragma: no cover break sleep(delay) try: self._is_stopped.set() except TypeError: # pragma: no cover # we lost the race at interpreter shutdown, # so gc collected built-in modules. pass except Exception as exc: logger.error('Thread Timer crashed: %r', exc, exc_info=True) os._exit(1) def stop(self): self._is_shutdown.set() if self.running: self._is_stopped.wait() self.join(THREAD_TIMEOUT_MAX) self.running = False def ensure_started(self): if not self.running and not self.isAlive(): if self.on_start: self.on_start(self) self.start() def _do_enter(self, meth, *args, **kwargs): self.ensure_started() with self.mutex: entry = getattr(self.schedule, meth)(*args, **kwargs) self.not_empty.notify() return entry def enter(self, entry, eta, priority=None): return self._do_enter('enter_at', entry, eta, priority=priority) def call_at(self, *args, **kwargs): return self._do_enter('call_at', *args, **kwargs) def enter_after(self, *args, **kwargs): return self._do_enter('enter_after', *args, **kwargs) def call_after(self, *args, **kwargs): return self._do_enter('call_after', *args, **kwargs) def call_repeatedly(self, *args, **kwargs): return self._do_enter('call_repeatedly', *args, **kwargs) def exit_after(self, secs, priority=10): self.call_after(secs, sys.exit, priority) def cancel(self, tref): tref.cancel() def clear(self): self.schedule.clear() def empty(self): return not len(self) def __len__(self): return len(self.schedule) def __bool__(self): """``bool(timer)``.""" return True __nonzero__ = __bool__ @property def queue(self): return self.schedule.queue