Sunday, August 7, 2011

How scheduled tasks and Celery and Kombu interfaces with RabbitMQ

A really good overview of how PBS Education Technology uses Celery:

http://www.slideshare.net/tarequeh/life-in-a-queue-using-message-queue-with-django

If you want to learn the AMQP interface and try building consumers, producers, exchanges, queues, the Python code here is extremely useful to learning how the basic standard works with the amqplib, which is the basis on which Celery is built.  Celery handles a lot of the higher-level functionality of building task queues, but the underlying plumbing is built on the Kombu framework (originally Carrot but completely rewritten):

http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/

The internals of how scheduled tasks are outlined in the Celery documentation:

http://ask.github.com/celery/internals/worker.html

When you create a scheduled task in Celery (using either the eta= or countdown= parameter), a message gets created that gets pickled (assuming you're using the default CELERY_TASK_SERIALIZER as pickle) that includes an 'eta' keyword:

print pickle.loads(msg.body).keys()
['retries', 'task', 'args', 'expires', 'eta', 'kwargs', 'id']
(Pdb) print pickle.loads(msg.body)['eta']
2012-08-02T17:03:00.593141

When you startup a Celery worker, it will create a Consumer that will begin to receive messages from the RabbitMQ broker. If it sees a message with an eta parameter, then the task is moved into an ETA scheduler queue instead of the ready queue. The message itself is not acknowledged until the task is executed, though since it is received by the Consumer, any other workers will not receive the same task. If the Consumer disconnects or loses connection from the RabbitMQ worker, then RabbitMQ will attempt to redeliver this message. The message itself will not be deleted until an acknowledge is sent back to the broker:

kombu/transport.py:

def ack(self):
       """Acknowledge this message as being processed.,                                     
       This will remove the message from the queue.                                         
                                                                                            
       :raises MessageStateError: If the message has already been                           
           acknowledged/requeued/rejected.                                                  
                                                                                            
       """
       if self.channel.no_ack_consumers is not None:
           consumer_tag = self.delivery_info["consumer_tag"]
           if consumer_tag in self.channel.no_ack_consumers:
               return
       if self.acknowledged:
           raise self.MessageStateError(
               "Message already acknowledged with state: %s" % self._state)
       self.channel.basic_ack(self.delivery_tag)
       self._state = "ACK"


When a message is received by Celery, it invokes a function called from_message(), which then passes on to the on_task() to insert into the queue. Notice that the 'ack' function, which will be used to acknowledge a message once it has been processed, is passed along to this routine.

celery/worker/listener.py:
task = TaskRequest.from_message(message, message_data, ack,
                                                logger=self.logger,
                                                hostname=self.hostname,
The ready queue itself (assuming no rate limits) uses a basic Python queue and uses the process_task function, which will then call self.acknowledge() and invoke the ack function that was passed into the initial TaskRequest.from_message creation.
celery/worker/__init.py:
if disable_rate_limits:
            self.ready_queue = FastQueue()
            self.ready_queue.put = self.process_task
Also, it appears that revoked tasks are not persistent if you do not setup a CELERYD_STATE_DB (defaults to None). Celery appears to keep all revoked tasks in memory and skips tasks if they are in this list of revoked task ID's without this setting. Without this configuration variable, all revoked tasks will be forgotten if you restart Celery.

No comments:

Post a Comment