Friday, March 23, 2012

Moving RabbitMQ machines with the Kombu framework

If you've ever contemplated moving a RabbitMQ master node from one machine to another, there are several ways to consider. The first would be to zip up the mnesia database directory (often located in /var/lib/rabbitmq/mnesia) and transfer files to the new machine. Another way would be to setting up RabbitMQ mirroring (available since RabbitMQ 2.6.0+ for clustering support and high availability) so that one master node could be taken down. Here's some of our reasons why we chose not to go towards those options and instead used another approach.

The first option may have issues since the database often has the hostname tied directly in the database. The following discussion came from a thread on Server fault thread (see below).
The database RabbitMQ uses is bound to the machine's hostname, so if you copied the database dir to another machine, it won't work. If this is the case, you have to set up a machine with the same hostname as before and transfer any outstanding messages to the new machine. If there's nothing important in rabbit, you could just clear everything by removing the RabbitMQ files in /var/lib/rabbitmq.

You could follow this approach, but in our case, we had already spun up a separate machine and wanted to have both the old and new machines running at the same time without any further config changes. We also explored trying to use the Erlang interpreter to make changes to the mnesia database, but we weren't sure if there had been any changes to the Mnesia database format between the RabbitMQ versions we were using so the option seemed a bit risky.

In terms of using RabbitMQ's high availability features, we were upgrading a host machine that was running with RabbitMQ 2.3.1 and didn't have some of the clustering support in later versions. It also seemed that we would have to invest more time to learn how to implement things and verifying the replication was happening correctly.

In addition, we thought it might have been easier to point all new jobs to the new RabbitMQ host and allow the Celery workers on the old RabbitMQ host to drain the rest of the remaining queue. The problem of this approach is that we were using the Celery framework, which supports the eta/countdown parameter, which is extremely useful for setting up scheduled tasks that fire at their relatively precise times. These scheduled tasks are implemented by keeping the messages unacknowledged until their scheduled date/time, whereby the worker which picked up the job and kept in a different scheduled queue (for more information, click here) moves the task into its ready queue. If any tasks needed to be cancelled, the new Celery workers using this new host would not know how to deal with these revoked tasks.

The reason is that all cancelled tasks rely on sending a revoked message to all Celery workers, which are instructed not to move any tasks that match a specific task ID from the scheduled to ready queue. Without this existing message in the new AMQP host, none of the Celery workers would know how to handle these cancelled tasks. The old Celery workers connected to the old AMQP host, which would not be receiving these cancelled messages, would fire them off without realizing that they were actually revoked.

Since we were using Celery, however, there was a third option for us. Celery is built on top of the Kombu framework, which provides an abstraction layer for communicating with AMQP hosts. We could create two AMQP connections, one to the old broker host and to the new one. By draining all messages without acknowledging them, the messages could be transferred from one to the other. If a failure occurred, we could use the camqadm utility to purge the queue and restart.

The script below is an example of how you can move the messages stored in one AMQP host to another, assuming that all of the messages you enqueued were using the Celery framework. This script also only works if all Celery workers have been stopped since some of the messages may be held by Celery's scheduler queue and assumes that the queues you are using are all AMQP-direct exchanges, so you may need to tweak things if your settings are different. Finally, we specified the serializer as "pickle", which is the default mechanism used to store Python objects.

# NOTE: Before running this script, all Celery workers must be killed.  This script will copy all messages from one RabbitMQ host to another.                                                                                                             

from celery import conf
from kombu.connection import BrokerConnection
from kombu.messaging import Exchange, Queue, Consumer, Producer

import dateutil
import signal
import socket
import sys

OLD_BROKER_HOST = "oldhost.mydev.com" # change
NEW_BROKER_HOST = "newhost.mydev.com" # change

DEFAULT = "default" # change to your queue name

global old_connection, new_connection, producer

def signal_handler(signal, frame):
print 'You pressed Ctrl+C!'

for connection in [old_connection, new_connection]:
if connection:
print "Closing broker connection %s" % (connection)
connection.release()

sys.exit(0)


def process_msg(body, msg):
task = body.get('task')
eta = body.get('eta')
kwargs = body.get('kwargs')

#msg.ack() # Acknowledge the message so that it gets removed from the queue

print "Enqueuing new task to publish to Producer (task=%s, eta=%s, kwargs=%s)" % (task, eta, kwargs)
producer.publish(body)
print "body %s, msg %s" % (repr(body), repr(msg))

# If we don't cancel them, then the messages are still being held by the Celery workers.
char = raw_input("Can you verify that all Celery workers have been stopped? (Y/N): ")

if char != 'Y':
print "This script will not work if there are Celery workers that are still reserving RabbitMQ messages. Exiting..."
sys.exit(0)

old_connection = BrokerConnection(OLD_BROKER_HOST, conf.BROKER_USER, conf.BROKER_PASSWORD, conf.BROKER_VHOST)
new_connection = BrokerConnection(NEW_BROKER_HOST, conf.BROKER_USER, conf.BROKER_PASSWORD, conf.BROKER_VHOST)

signal.signal(signal.SIGINT, signal_handler) # Ctrl-C handler

# RabbitMQ connection
old_channel = old_connection.channel()
old_default_exchange = Exchange(DEFAULT, "direct", durable=True)
old_default_queue = Queue(DEFAULT, exchange=old_default_exchange, routing_key=DEFAULT)

consumer = Consumer(old_channel, old_default_queue, callbacks=[process_msg])
consumer.consume()

new_channel = new_connection.channel()
new_default_exchange = Exchange(DEFAULT, "direct", durable=True) # should be DEFAULT

# Use pickle serializer
producer = Producer(new_channel, exchange=new_default_exchange, serializer="pickle", routing_key=DEFAULT)

while True:
try:
old_connection.drain_events(timeout=10) # 10 seconds is an acceptable timeout
except socket.timeout:
print "No more events came down the pipeline after 10 seconds...exiting."
old_connection.release()
exit(0)
except socket.error:
print "Socket error...exiting."
old_connection.release()
exit(0)

You can verify that all messages have been transferred by doing sudo rabbitmqctl -p <vhost> list_messages to verify the number of messages in old/new AMQP broker hosts match. If you really want to make sure the messages were copied successfully, you can tweak this same script to print all the messages in the new AMQP host (instead of initiating a connection to both hosts), as well as running Celery on the new AMQP host to verify the messages can be executed as tasks.

I'd recommend shutting down all your Celery workers before trying this approach. After the Celery tasks have all been transferred, you can shutdown the old RabbitMQ server, point your new Celery workers to use the new RabbitMQ host, and startup your Celery workers again. Good luck!

1 comment:

  1. Thanks for this, we're looking at a similar migration right now and might use a variant of your script to help orchestrate it!

    ReplyDelete