Wednesday, February 23, 2011

How Celery implements eta/countdown...

Internals of the Celery worker:

I'll try to describe it in brief... The worker.scheduler module is actually long gone now, 
most of the functionality there is replaced by timer2: 
But timer2 is just the internal mechanism that applies a timed function, I'm assuming what 
you're really interested in is how the messaging part works. 
From the comments in celery.worker.consumer: 
* If the task has an ETA/countdown, the task is moved to the `eta_schedule` 
  so the :class:`timer2.Timer` can schedule it at its 
  deadline. Tasks without an eta are moved immediately to the `ready_queue`, 
  so they can be picked up by the :class:`~celery.worker.controllers.Mediator` 
  to be sent to the pool. 
* When a task with an ETA is received the QoS prefetch count is also 
  incremented, so another message can be reserved. When the ETA is met 
  the prefetch count is decremented again, though this cannot happen 
  immediately because amqplib doesn't support doing broker requests 
  across threads. Instead the current prefetch count is kept as a 
  shared counter, so as soon as  :meth:`~Consumer.consume_messages` 
  detects that the value has changed it will send out the actual 
  QoS event to the broker. 
So pretty simple, whenever a task with an eta is received we increment the 
prefetch_count, when the task is processed we decrement it again.  We 
can keep the eta tasks in memory for as long as we like since the message 
will just be redelivered if the connection is lost and the message is not acked. 
Hope this helps, 

No comments:

Post a Comment