Showing posts with label RabbitMQ/Celery. Show all posts
Showing posts with label RabbitMQ/Celery. Show all posts

Monday, January 24, 2011

celeryd-multi

The Celeryd documentation only includes examples of how celeryd-multi should be invoked, so here
they are:

1. You can use this init script as a template:
https://github.com/ask/celery/blob/master/contrib/generic-init.d/celeryd

Or you can invoke celeryd-multi the old-fashioned way:

a. You need a /var/run directory for celery. The pidfile depends on them.
b. You also need to create a /var/log/celeryd dir too.

2. If you type:
celeryd-multi show 2 -Q:1-1 default -Q:2-2 nightly --loglevel=INFO --pidfile=/var/run/celery/${USER}%n.pid --logfile=/var/log/celeryd.${USER}%n.log
> Starting nodes...
celeryd -Q celery,default --pidfile=/var/run/celery/me1.pid -n celery1.myhost-dev-0 --loglevel=INFO --logfile=/var/log/celeryd.me1.log
celeryd -Q nightly --pidfile=/var/run/celery/me2.pid -n celery2.myhost-dev-0 --loglevel=INFO --logfile=/var/log/celeryd.me2.log

...you can see how things will get invoked.

To start, you would do:
celeryd-multi start 2 -Q:1-1 celery,default -Q:2-2 nightly --loglevel=INFO --pidfile=/var/run/celery/${USER}%n.pid --logfile=/var/log/celeryd.${USER}%n.log

To stop:

This should start up two separate workers, one for celery, default and one for nightly. The celeryd aemon will fork up to # of CPU's, so if you have 4 CPU's per machine, you could have a total of 8 celery workers running.

...in both cases, CELERY_CONFIG_MODULE needs to be set.

Then you setup a queue in your Celery config similar to the following:

CELERY_QUEUES = {
    "default" : {
        "exchange" : "default",
        "binding_key" : "default" },
    "nightly" : {
        "exchange" : "nightly",
        "binding_key" : "nightly",
        "exchange_type" : "direct"
        }
    }

CELERY_DEFAULT_QUEUE = "default"
CELERY_DEFAULT_EXCHANGE = "default"
CELERY_DEFAULT_EXCHANGE_TYPE = "direct"
CELERY_DEFAULT_ROUTING_KEY = "default"

RabbitMQ and the delay function..

When you call delay() on your tasks, Celery will publish a message to RabbitMQ with this task info. All of these messages that get thrown into a default queue ('celery' by the original configuration, and Celery workers listening to these queues will start to process them. You need to have celeryd running in order for these workers to be running. By default, celeryd will also search for the celeryd queue.

In addition, Celery also publishes messages to record the result/statuses of these messages onto your vhost into a RabbitMQ exchange called 'celeryresults', and then creates a temporary queue with the task ID (without the dashes). If you set the CELERY_RESULT_PERSISENT to transient (by default, it's set this way ), then all of these messages will be stored in-memory into this latter queue. If you set it to persisent, then all of these messages get stored both in-memory and on disk. Neither scenario works great if you have a lot of tasks getting dispatched each night. Normally Celery is supposed to set 1-day expiration of these result queues (it can be decreased too by TASK_RESULT_EXPIRES), but I don't see anywhere in the code that actually sends periodic instructions to delete these queues unless explicitly directed to do so.

What ignore_result does is simply not publish these status messages into a temporary queue. Then no messages are created, and no additional memory gets consumed. Obviously you can't get back the results if ignore_result=True, and doing a get() on a task with this flag set will just make your code block forever. The GitHub code at https://github.com/ask/celery/commit/06fe87af3eb56b91f61ad00ada700a4a01d15c6a shows that messages were not even being purged during apply_async() if use did a get(), so queues would not have been deleted even if we had attempted to get the result back from tasks that we dispatched.

If you are dispatching tasks within tasks and don't want result messages to be generated, you can either set CELERY_IGNORE_RESULT to be True. However, if you still want Celery results to be set here are things that you can do:

1. Add @task(ignore_result=True) to your task definition instead of @task.

2. Add an options={} to the task call function call as a keyword argument.

i.e.:
@task(ignore_result=True)
 def test_myfunction_task(self, options={}):
3. Most of the you tasks should be changed from .delay() to .apply_async() Your task will then be equipped to send to different queues.

i.e.:
@task(ignore_result=True)
 def test_group_task(self, options={}):
        """ Checks graph profile crawler """
        individual_task.apply_async(kwargs={'arg1' : arg1,
                                    'options' : options},
                                             **options)
a. FYI - The delay() function is simply a wrapper for apply_async, but apply_async() has extra options but you aren't allowed to specify other options such as which queues to send (see http://celeryproject.org/docs/userguide/executing.html).

b. You need to wrap all keyword arguments into a kwargs={} dictionary. Celery will unwrap this dictionary and pass in **kwargs into your function.

c. You'll notice two parameters: options get passed into kwargs, and we also unwrap **options into the apply_async. If we weren't dispatching more tasks within tasks, then we wouldn't need to pass 'options' into kwargs.

Another FYI -- it turns out that Celery injects a bunch of other keyword arguments into each task call (i.e. task_name), so if you try to pass **kwargs into nested tasks, calling apply_async will also inject task_name again, so not only does 'task_name' live inside **kwargs, but also it gets passed as another keyword argument into the apply_async call too:

def test_group_task(arg1, options={}, **kwargs):
  individual_task.apply_async(**kwargs, **options) -> fail (equvalent to something like   apply_sync({'task_name' : 'orig_func}, task_name={'new func'})

Sunday, January 23, 2011

More bug fixes in RabbitMQ

http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2010-November/010172.html

- queue.declare and exchange.declare raise precondition_failed rather
  than not_allowed when attempting to redeclare a queue or exchange
  with parameters different than those currently known to the broker

Bug in camqadm -- too many values to unpack

Apparently this GitHub issue in camqadm prevents you from declaring exchange's beyond the defaults of durable=False, auto_delete=False, internal=False

http://en.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol

passive: the exchange will not get declared but an error will be thrown if it does not exist.
durable: the exchange will survive a broker restart.
auto-delete: the exchange will get deleted as soon as there are no more queues bound to it. 
Exchanges to which queues have never been bound will never get auto deleted.
camqadm queue.declare myqueue no yes no no

http://www.amqp.org/confluence/download/attachments/720900/amqp-xml-doc0-9.pdf?version=1&modificationDate=1183135733000

1.7.2.1.7. Parameter exchange.declare.internal (bit)
Ordinal: 7
Domain: bit
Label: create internal exchange
If set, the exchange may not be used directly by publishers, but only when bound to other exchanges.
Internal exchanges are used to construct wiring that is not visible to applications.

Where AMQP queues & exchanges are defined...

/usr/local/lib/python2.6/dist-packages/carrot/messaging.py(258)declare()
    def declare(self):
        """Declares the queue, the exchange and binds the queue to                                                                                                                                                                           
        the exchange."""
        arguments = None
        routing_key = self.routing_key
        if self.exchange_type == "headers":
            arguments, routing_key = routing_key, ""

        if self.queue:
            self.backend.queue_declare(queue=self.queue, durable=self.durable,
                                       exclusive=self.exclusive,
                                       auto_delete=self.auto_delete,
                                       arguments=self.queue_arguments,
                                       warn_if_exists=self.warn_if_exists)
        if self.exchange:
            self.backend.exchange_declare(exchange=self.exchange,
                                          type=self.exchange_type,
                                          durable=self.durable,
                                          auto_delete=self.auto_delete)
        if self.queue:
            self.backend.queue_bind(queue=self.queue,
                                    exchange=self.exchange,
                                    routing_key=routing_key,
                                    arguments=arguments)
        self._closed = False
        return self

Friday, January 21, 2011

RabbitMQ talks

Three valuable links to understanding RabbitMQ and Celery:

http://www.rabbitmq.com/resources/google-tech-talk-final/alexis-google-rabbitmq-talk.pdf
http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/
http://celeryproject.org/docs/userguide/routing.html#amqp-primer
http://celeryproject.org/docs/userguide/monitoring.html#guide-monitoring

In Celery the routing_key is the key used to send the message, while binding_key is the key the queue is bound with. In the AMQP API they are both referred to as the routing key.

Celery automatically creates the entities necessary for the queues in CELERY_QUEUES to work (except if the queue’s auto_declare setting is set to False). This statement implies then it will do all the work to talk to the RabbitMQ server to create the defined exchange name.

Useful commands:
camqadm queue.purge 
celeryctl inspect active
rabbitmqctl list_queues name memory
rabbitmqctl -p  list_queues
rabbitmqctl -p  list_exchanges
http://ask.github.com/celery/userguide/routing.html#changing-the-name-of-the-default-queue

The non-AMQP backends like ghettoq does not support exchanges, so they require the exchange to have the same name as the queue. Using this design ensures it will work for them as well.

http://celeryproject.org/docs/userguide/tasks.html#task-options

Task.ignore_result
Don’t store task state. Note that this means you can’t use AsyncResult to check if the task is ready, or get its return value.

Thursday, January 20, 2011

Remote debugging Celery tasks...

Use import pdb; pdb.set_trace()? Well if you've ever tried to debug Celery task queue processes, apparently there's always an rdb import that lets you debug processes as they run:

http://ask.github.com/celery/tutorials/debugging.html