Here's a simple code that we can use to talk to our queue if we also have our Celery configuration settings defined too. In this example, we only use the Queue class to consume messages. We can bind the default queue and exchange to the channel and then register a callback that will dump the message to stdout.
from celery.conf import settings
from kombu.connection import BrokerConnection
from kombu.messaging import Exchange, Queue, Consumer
connection = BrokerConnection(settings.BROKER_HOST, settings.BROKER_USER, settings.BROKER_PASSWORD, settings.BROKER_VHOST)
# RabbitMQ connection
channel = connection.channel()
default_exchange = Exchange("default", "direct", durable=True)
default_queue = Queue("default", exchange=default_exchange, key="default")
bound_default_queue = default_queue(channel)
def process_msg(msg):
print "%s" % repr(msg)
bound_default_queue.consume(callback=process_msg)
while True:
connection.drain_events()
We can also do the same by declaring a Consumer class too and calling the consume() to register the Consumer:
from celery.conf import settings
from kombu.connection import BrokerConnection
from kombu.messaging import Exchange, Queue, Consumer
connection = BrokerConnection(settings.BROKER_HOST, settings.BROKER_USER, settings.BROKER_PASSWORD, settings.BROKER_VHOST)
# RabbitMQ connection
channel = connection.channel()
default_exchange = Exchange("default", "direct", durable=True)
default_queue = Queue("default", exchange=default_exchange, key="default")
def process_msg(body, msg):
print "body %s, msg %s" % (repr(body), repr(msg))
consumer = Consumer(channel, default_queue, callbacks=[process_msg])
consumer.consume()
while True:
connection.drain_events()
No comments:
Post a Comment