Monday, January 24, 2011

RabbitMQ and the delay function..

When you call delay() on your tasks, Celery will publish a message to RabbitMQ with this task info. All of these messages that get thrown into a default queue ('celery' by the original configuration, and Celery workers listening to these queues will start to process them. You need to have celeryd running in order for these workers to be running. By default, celeryd will also search for the celeryd queue.

In addition, Celery also publishes messages to record the result/statuses of these messages onto your vhost into a RabbitMQ exchange called 'celeryresults', and then creates a temporary queue with the task ID (without the dashes). If you set the CELERY_RESULT_PERSISENT to transient (by default, it's set this way ), then all of these messages will be stored in-memory into this latter queue. If you set it to persisent, then all of these messages get stored both in-memory and on disk. Neither scenario works great if you have a lot of tasks getting dispatched each night. Normally Celery is supposed to set 1-day expiration of these result queues (it can be decreased too by TASK_RESULT_EXPIRES), but I don't see anywhere in the code that actually sends periodic instructions to delete these queues unless explicitly directed to do so.

What ignore_result does is simply not publish these status messages into a temporary queue. Then no messages are created, and no additional memory gets consumed. Obviously you can't get back the results if ignore_result=True, and doing a get() on a task with this flag set will just make your code block forever. The GitHub code at shows that messages were not even being purged during apply_async() if use did a get(), so queues would not have been deleted even if we had attempted to get the result back from tasks that we dispatched.

If you are dispatching tasks within tasks and don't want result messages to be generated, you can either set CELERY_IGNORE_RESULT to be True. However, if you still want Celery results to be set here are things that you can do:

1. Add @task(ignore_result=True) to your task definition instead of @task.

2. Add an options={} to the task call function call as a keyword argument.

 def test_myfunction_task(self, options={}):
3. Most of the you tasks should be changed from .delay() to .apply_async() Your task will then be equipped to send to different queues.

 def test_group_task(self, options={}):
        """ Checks graph profile crawler """
        individual_task.apply_async(kwargs={'arg1' : arg1,
                                    'options' : options},
a. FYI - The delay() function is simply a wrapper for apply_async, but apply_async() has extra options but you aren't allowed to specify other options such as which queues to send (see

b. You need to wrap all keyword arguments into a kwargs={} dictionary. Celery will unwrap this dictionary and pass in **kwargs into your function.

c. You'll notice two parameters: options get passed into kwargs, and we also unwrap **options into the apply_async. If we weren't dispatching more tasks within tasks, then we wouldn't need to pass 'options' into kwargs.

Another FYI -- it turns out that Celery injects a bunch of other keyword arguments into each task call (i.e. task_name), so if you try to pass **kwargs into nested tasks, calling apply_async will also inject task_name again, so not only does 'task_name' live inside **kwargs, but also it gets passed as another keyword argument into the apply_async call too:

def test_group_task(arg1, options={}, **kwargs):
  individual_task.apply_async(**kwargs, **options) -> fail (equvalent to something like   apply_sync({'task_name' : 'orig_func}, task_name={'new func'})

No comments:

Post a Comment