Here's a simple code that we can use to talk to our queue if we also have our Celery configuration settings defined too. In this example, we only use the Queue class to consume messages. We can bind the default queue and exchange to the channel and then register a callback that will dump the message to stdout.
from celery.conf import settings from kombu.connection import BrokerConnection from kombu.messaging import Exchange, Queue, Consumer connection = BrokerConnection(settings.BROKER_HOST, settings.BROKER_USER, settings.BROKER_PASSWORD, settings.BROKER_VHOST) # RabbitMQ connection channel = connection.channel() default_exchange = Exchange("default", "direct", durable=True) default_queue = Queue("default", exchange=default_exchange, key="default") bound_default_queue = default_queue(channel) def process_msg(msg): print "%s" % repr(msg) bound_default_queue.consume(callback=process_msg) while True: connection.drain_events()
We can also do the same by declaring a Consumer class too and calling the consume() to register the Consumer:
from celery.conf import settings from kombu.connection import BrokerConnection from kombu.messaging import Exchange, Queue, Consumer connection = BrokerConnection(settings.BROKER_HOST, settings.BROKER_USER, settings.BROKER_PASSWORD, settings.BROKER_VHOST) # RabbitMQ connection channel = connection.channel() default_exchange = Exchange("default", "direct", durable=True) default_queue = Queue("default", exchange=default_exchange, key="default") def process_msg(body, msg): print "body %s, msg %s" % (repr(body), repr(msg)) consumer = Consumer(channel, default_queue, callbacks=[process_msg]) consumer.consume() while True: connection.drain_events()
No comments:
Post a Comment