Sunday, March 23, 2014

How RabbitMQ computes the name of its message queue directories...

One popular open source implementation of the AMQP messaging standard is RabbitMQ, which is currently supported by Pivotal/VmWare.  The code itself is written in Erlang, which is a programming language developed internally by Ericsson and released as open source later in 1994.  One of the best introductions about the language is a paper written by its initial creator, Joel Armstrong.

RabbitMQ relies on the Mnesia database system, which is a distributed store written in Erlang. RabbitMQ uses the system to persist information about virtual hosts, message queues, and exchanges. We can obviously use the rabbitmqctl command to query this information, but I wanted to understand how the queue directory names mapped to the ones listed in /var/lib/rabbitmq/mnesia:
$ ls -al /var/lib/rabbitmq/mnesia/rabbit@hostname/queues

drwxr-xr-x 2 rabbitmq rabbitmq 4096 Mar 17 04:27 3RG15Y3PJT7OHGG08CCU0Y7Z6
drwxr-xr-x 2 rabbitmq rabbitmq 4096 Mar 17 04:27 8LSP3194PK9RGC9PQTVOKKMQW
drwxr-xr-x 2 rabbitmq rabbitmq 4096 Mar 17 04:27 8XEM9YWU4AWY8YNC9KIW62NJW
To do so required learning a bit how Erlang works.  Through the process of trial-and-error, reading through several Erlang books and whatever could information posted online, I was able to understand how the language worked at a basic level.  The documentation inside the RabbitMQ source code was incredibly valuable, and using the Erlang shell allowed me to experiment and understand how RabbitMQ is implemented.

One of the big advantages of Erlang is its concurrency model.  When Erlang starts up a process, it creates a file called ~/.erlang_cookie. The contents of this file acts as a shared secret.  Erlang is built with concurrency in mind, so another Erlang process can exchange messages with other processes so long as they share this same cookie value.  When starting up a new Erlang process, we can use the -setcookie argument that should match.  In addition, we need to provide the short-hand name for this process using the -sname parameter to differentiate the name of the node (using the same node as another one will generate a conflict error).
erl -mnesia dir '"/tmp/tst"' -setcookie [COOKIE_CONTENTS] -sname tst
(An alternative is to simply copy the .erlang_cookie created by another process to your own home directory.  The RabbitMQ Ubutnu PPA repository provided also sets up a username 'rabbitmq'.  Any Erlang commands using this username will use this file.  The rabbitmqctl program switches to the rabbitmq user, which allows is to communicate with other Erlang processes running under that username).

We also want to make use of the functions that are available from the RabbitMQ. The compiled Erlang modules are located in /usr/lib/rabbitmq/lib/rabbitmq_server-3.2.4/ebin, so we can also specify additional compiled code by adding this directory to the codepath search directory using the -pa argument.
erl -mnesia dir '"/tmp/tst"' -setcookie [COOKIE_CONTENTS] -sname tst -pa /usr/lib/rabbitmq/lib/rabbitmq_server-3.2.4/ebin
Once we've started up the process, we can communicate with the RabbitMQ process.  Inside our local Erlang process, we first need to load the data structures declared by RabbitMQ using the 'rr' command (read records).   These records are the equivalent of typedef struct declarations in C and declared as "-record" in the include .hrl files.
Erlang R14B04 (erts-5.8.5) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:0] [kernel-poll:false]

Eshell V5.8.5  (abort with ^G)
1> rr ("/usr/lib/rabbitmq/lib/rabbitmq_server-3.2.4/include/rabbit_msg_store.hrl").
[amqp_error,amqqueue,basic_message,binding,content,delivery,
 event,exchange,exchange_serial,internal_user,listener,
 message_properties,msg_location,permission,plugin,resource,
 reverse_binding,reverse_route,route,runtime_parameters,
 ssl_socket,topic_trie_binding,topic_trie_edge,
 topic_trie_node,trie_binding,trie_edge,trie_node,user,
 user_permission|...]
2>
The results returned are all the various data structures available to use.  If we want to see what records have been loaded, we can use the rl() function:
2> rl().
-record(amqp_error,{name,explanation = "",method = none}).
-record(amqqueue,{name,
                  durable,
                  auto_delete,
                  exclusive_owner = none,
                  arguments,
                  pid,
                  slave_pids,
                  sync_slave_pids,
                  policy,
                  gm_pids,
                  decorators}).
-record(basic_message,{exchange_name,
                       routing_keys = [],
                       content,
                       id,
                       is_persistent}).
-record(binding,{source,key,destination,args = []}).
-record(content,{class_id,
                 properties,
                 properties_bin,
                 protocol,
                 payload_fragments_rev}).
-record(delivery,{mandatory,sender,message,msg_seq_no}).
-record(event,{type,props,timestamp}).
.
.
.
These record definitions are the data structures used by RabbitMQ.   We can use this information to make RPC calls to the rabbitmq node (the rabbit@hostname should match whatever name of the RabbitMQ process uses) This function call is equivalent of calling mnesia:schema(rabbit_durable_queue) locally:
rpc:call( 'rabbit@hostname', mnesia, schema, [rabbit_durable_queue] ).
To query the table of the rabbit_durable table,  we would use the mnesia:table() RPC call:
rpc:call( 'rabbit@hostname', mnesia, table, [rabbit_durable_queue] ). 
However, this query returns results that depend on using the qlc library.  We declare a lambda function that will query the table and generate a list.  The statement below amounts to a query result "Q such that Q equals instances of the amqqueue data structure instantiated by the Name and Pid columns from the rabbit_durable_queue table."  We then evaluate this query with the qlc:e() function, which converts the results to a list and use the function generated to make the RPC call.
Fun = fun() ->
                qlc:e(qlc:q([Q || Q = #amqqueue{name = Name,
                                                pid  = Pid}
                                      <- mnesia:table(rabbit_durable_queue)]))
end,
rpc:call( "rabbit@hostname", mnesia, transaction, [Fun]).
The result are instances of amqqueue instances returned from this query (The # in Erlang represents the instance of the amqqueue record with properties defined within the {} block):
#amqqueue{name = #resource{virtual_host = <<"myvhost_rhu">>,
                                                kind = queue,name = <<"archive">>},
                               durable = true,auto_delete = false,exclusive_owner = none,
                               arguments = [],pid = <5827.978.0="">,slave_pids = [],...},
We can use this information to figure out how the queue directory names in generated in /var/lib/rabbitmq/, which at first glance seem to be a string of random 25-byte characters.  Upon further inspection of the source code, there is a function called queue_name_to_dir_name in rabbit_queue_index.erl which takes as an input a resource record of type 'queue':
queue_name_to_dir_name(Name = #resource { kind = queue }) ->
    <<Num:128>> = erlang:md5(term_to_binary(Name)),
    rabbit_misc:format("~.36B", [Num]).
Note the use of the term_to_binary() call here.  It appears to be a way of serializing the data structure using the Erlang term format.  The binary result is then formatted into base36 format.   There is a helper function in rabbit_misc.erl that will generate an instance of a resource instance:
r(VHostPath, Kind, Name) ->
    #resource{virtual_host = VHostPath, kind = Kind, name = Name}.
What to input for these 3 parameters?  We only need to look at the #resource declaration of the results from the previous query.  We can use this information to compute the MD5 hash of the queue directory:
1> <<Num:128>> = erlang:md5(term_to_binary(rabbit_misc:r(<<"myvhost_rhu">>, queue, <<"archive">>))).
2> rabbit_misc:format("~.36B", [Num]).
"EUVCFMQ3KCK9L8KMFN5Q0WBQR"
Assuming messages are been sent to this queue, we should be able to find this matching directory inside the queues directory.  Inside each directory are .idx files that are the queue index that records the order of the messages within the disk.  We can use this information to study the internals of RabbitMQ's queue index defined in rabbit_queue_index.erl, which will be a subject for a later posting.

2 comments:

  1. Wow!!!
    That is a wonderful explanation. I am completely new to erlang and rabbitmq. But this is what the concise information I was googling for.

    Kudos.

    ReplyDelete
  2. eToro is the #1 forex trading platform for rookie and pro traders.

    ReplyDelete