http://ask.github.com/celery/internals/worker.html
http://groups.google.com/group/celery-users/browse_thread/thread/b85c0f8386ffd4ce
I'll try to describe it in brief... The worker.scheduler module is actually long gone now, most of the functionality there is replaced by timer2: https://github.com/ask/timer2 But timer2 is just the internal mechanism that applies a timed function, I'm assuming what you're really interested in is how the messaging part works. From the comments in celery.worker.consumer: http://ask.github.com/celery/internals/reference/celery.worker.consum... " * If the task has an ETA/countdown, the task is moved to the `eta_schedule` so the :class:`timer2.Timer` can schedule it at its deadline. Tasks without an eta are moved immediately to the `ready_queue`, so they can be picked up by the :class:`~celery.worker.controllers.Mediator` to be sent to the pool. * When a task with an ETA is received the QoS prefetch count is also incremented, so another message can be reserved. When the ETA is met the prefetch count is decremented again, though this cannot happen immediately because amqplib doesn't support doing broker requests across threads. Instead the current prefetch count is kept as a shared counter, so as soon as :meth:`~Consumer.consume_messages` detects that the value has changed it will send out the actual QoS event to the broker. " So pretty simple, whenever a task with an eta is received we increment the prefetch_count, when the task is processed we decrement it again. We can keep the eta tasks in memory for as long as we like since the message will just be redelivered if the connection is lost and the message is not acked. Hope this helps, --
No comments:
Post a Comment