Skip to content

Basics of messaging with RabbitMQ


In this part we cover the following topics:


Problem description

First we should be familiar with the rail fence cipher

Our task is to implement distributed system to break rail fence cipher. Encrypted text is in hidden.txt file. In addition to this there are other files:

  • dictionary.txt dictionary -- set of words we can find in plain text;
  • plain.txt plain text (only to verify if the result of breaking process is correct);
  • fence_cipher.py script used to prepare ciphered data -- for reference and verification if encryption process was correct. This script was also used to encrypt two basic messages.

This task should be solved based either on this tutorial or Task queues with Celery.


Sources


Basic RabbitMQ's features

RabbitMQ is described as a message broker software. General discussion about message queues is presented in a separate Message queue tutorial. As we can read there, we can think about message broker as a one of messaging system component. On the other hand, now it is quite common to use term message broker in a whole messaging system meaning. Very generally a message broker is understood as a computer program module that translates a message from the formal messaging protocol of the sender to the formal messaging protocol of the receiver. The primary purpose of a broker is to take incoming messages from applications and perform some action on them like validate, transform and route.

Many of RabbitMQ’s features are a simple consequence of implementing the AMQP specification. Unlike many other protocols like HTTP and SMTP, the AMQP defines not only a network protocol but also server-side services and behaviors. That is why we can refer to this in much broader sense as the Advanced Message Queuing (AMQ) model. This model logically defines three abstract components in broker software that define the routing behavior of messages: queue, exchange, and binding.


Queue

Queue is the simplest component of AMQ model. It is a data structure beeing responsible for storing received messages which may also contain some configuration information that defines what should be done with a message. A queue may hold messages in RAM only, or it may persist them to disk prior to delivering them in first-in, first-out (FIFO) order.


Exchange

Exchange is the component of the message broker that routes messages to queues. The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all. Instead, the producer can only send messages to an exchange. An exchange receives messages sent into RabbitMQ from producers and determines if and how pushes them to queues. Exchanges define the routing behaviors that are applied to messages, usually by examining data attributes passed along with the message or that are contained within the message’s properties. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

There are a few exchange types available: default, direct, fanout, topic, and headers.

  • Default Default or nameless exchange is identified by the empty string (""). In this case messages are routed to the queue with the name specified by routing_key, if this queue exists.
  • Direct In this setup, we directly specify to which queue our messages should be delivered to. For example, let's assume that we have one exchange X with two queues bound to it. The first queue, Q1, is bound with two bindings, one with routing_key taking a value A and B while the second, Q2, has two bindings, one with routing_key taking a value C and the other one with B.

    In such a setup a message published to the exchange with a routing key A will be routed to queue Q1. Messages with a routing key of C will go to Q2 while messages with a routing key of B will go both to Q1 and Q2. All other messages will be discarded. As we can see it's perfect legal to use the same binding (routing_key; it is quite common in RabbitMQ documentation to use term binding key instead of routing key) for more than one queue.

  • Fanout The fanout exchange is very simple: it just mindlessly broadcasts all the messages it receives to all the queues it knows. The fanout exchanges simply ignore routing_key value.
  • Topic Messages sent to a topic exchange can't have an arbitrary routing_key. Instead it must be a list of words, delimited by dots. The words can be anything, but usually they specify some hierarchically ordered (structured) features connected to the message. For example the messages may be sent with a routing_key that consists of four words and three dots of general form

    in particular

    There can be as many words in the routing key as we like, up to the limit of 255 bytes.

    There are two important special cases for binding keys:

    • * (star) can substitute for exactly one word.
    • # (hash) can substitute for zero or more words.

    With this special cases

    • *.*.dht22.* means: all DHT22 sensors regardless of their location and parameter type;
    • home.# means: all sensors located at home regardless of their type, precise location and parameter type.

    Topic exchange is the most versatile exchange and can behave like all aforementioned exchanges.

    • When a queue is bound with # (hash) binding key -- it will receive all the messages, regardless of the routing key -- like in fanout exchange.
    • When special characters * (star) and # (hash) aren't used in bindings, the topic exchange will behave just like a direct one.
  • Headers
    !!! todo !!!


Binding

!!! todo !!!
Binding - A rule that tells the exchange which queue the messages should be
stored in

!!!A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange.
The meaning of a binding key depends on the exchange type. The fanout exchanges, which we used previously, simply ignored its value.


Channels

!!! todo !!!
A single AMQP connection can have multiple channels


Consumers

!!! todo !!!



Install

How to install RabbitMQ is beeing described in many different web pages, but all of them do it (a little bit) different way:

In this section I describe how I did it.

  1. Install Erlang.
    RabbitMQ is written in Erlang so we have to install it as a first component. To do this we have to use the following commands
  2. Install RabbitMQ.
    Having Erlang installed we can procede with RAbbitMQ instalation.

    After installation, RabbitMQ service is started and enabled to start on boot. To check the status, run:
  3. We can optionally enable the RabbitMQ Management Web dashboard for easy management.

    Use localhost:15672 URL to display a web interface's login screen

    Use guest as login and password to get an access to the web interface
  4. Install pika which is the Python client recommended by the RabbitMQ team to use


Basic examples


Hello RabbitMQ's world with default exchange

  1. Activate working environment
  2. Crate directories hierarchy
  3. In exchange_default directory create the producer.py and consumer.py files

  4. Run producer

    and verify in web console that some data has beed received.

    Verify task queue status after clik on its name
  5. Run consumer

    and verify in web console that messages keept in task queue have been consumed
  6. Instead of a simple text we can send JSON structures as a message. To do this, use the following slightly modified producer.py and consumer.py code


    While testing, results similar to the following should be displayed


Fanout exchange

  1. Create the producer.py, queue.py and consumer.py files with the following contents


  2. We will make a following queues setup

    In Terminal 1 run queue.py script

    and consumer.py script.

  3. Do the same in Terminal 2: run queue.py script

    and consumer.py script.

  4. Use producer.py script to add messages to the queues -- run it in Terminal 3.

    and verify consumer's output:

    • in Terminal 1
    • in Terminal 2
  5. Again use producer.py script to add messages to the queues -- run it in Terminal 3.

    and verify consumer's output:

    • in Terminal 1
    • in Terminal 2


Direct exchange

  1. Create a new exchange_direct directory.
  2. Create the producer.py, queue.py and consumer.py files with the following contents


  3. We will make a following queues setup

    • Run queue.py script

      and verify results in web console
    • Run again queue.py script but with different parameters
  4. Use producer.py script to add messages to the Q1 and Q2 queue. Saying the truth, producer has no idea to which, if any, queue messages are delivered, and probably doesn't care about it. routing_key is the only criteria. This is an exchange responsibility to pass all messages to a correct queues.

  5. Run consumer (consumer.py) script to consume messages from Q1 queue


Topic exchange

Implementing topic exchange is not much difficult than direct topic. Saying the truth, the source code should be only slightly modified and consumer stays untouched.

  1. Create the producer.py, queue.py and consumer.py files with the following contents


  2. We will make a following queues setup

    Run queue.py script

    and verify results in web console

  3. Run queue.py again and add two more queues

  4. Use producer.py script to add messages to the queues.

    and verify state of queues in web console

  5. Run consumer (consumer.py) script to consume messages from the Q_ALL queue

  6. Run consumer (consumer.py) script to consume messages from the Q_R1 queue

  7. Run consumer (consumer.py) script to consume messages from the Q_ALL_O1_AND_O2 queue


Headers exchange

Implementing headers exchange is not much difficult than topic exchange. Again consumer stays untouched.

  1. Create the producer.py, queue.py and consumer.py files with the following contents

    Notice that althoug we don't use routing key for headers exchange it is required by basic_publish method

    so we simply use an empty string for this.

  2. We will make a following queues setup

    Run queue.py script


    and verify results in web console
  3. Run queue.py again and add two more queues
  4. Use producer.py script to add messages to the queues.

    and verify state of queues in web console
  5. Run consumer (consumer.py) script to consume messages from the Q1 queue
  6. Run consumer (consumer.py) script to consume messages from the Q2 queue
  7. Run consumer (consumer.py) script to consume messages from the Q3 queue