Skip to content

Message queues

Informations presented on this page may be incomplete or outdated. You can find the most up-to-date version reading my book Engineering of Big Data Processing

In this part we cover the following topics


Introduction

The key question while working with big, heterogeneous environment is how to connect all this stuff to get one consistent system? Because nowadays almost everything seems to be a software (even computers, networks and storages thanks to virtualization), we can reformulate this question to the following: How can I integrate multiple applications to work together and share data?

We need something to

  • transfer packages of data,
  • immediately,
  • reliably,
  • with high speed,
  • and asynchronously,
  • using customizable formats.

It sounds like an utopia or holy grail. Surprisingly, there is a "tool" matching this set of requirements: we can use messaging. Generaly speaking, messaging might be a right choice whenever there is a

  • need to get data from point A to point B,
  • need to integrate different systems to work together,
  • need to scale,
  • need to be able to monitor data flows,
  • need to load balancing,
  • need to asynchronous processing,
  • need to queuing and buffering,
  • need to decouple producers (publishers) and consumers (subscribers).

In the next section we will discuss reasons to use message queuing in more details.


Messaging is not the only integration option; we can use for example (simple) file transfer, shared database, remote procedure invocation.


Overview of concepts that are used in messaging

In computer science, message queues are software-engineering components used for inter-process communication (IPC), or for inter-thread communication within the same process. They use a queue for messaging, where messaging means passing of control or of content.

The message queue paradigm is an extended version of the publisher/subscriber pattern. In software architecture, publish–subscribe is a messaging pattern where senders of messages, called publishers, do not program the messages to be sent directly to specific receivers, called subscribers, but instead categorize published messages into classes without knowledge of which subscribers, if any, there may be. Similarly, subscribers express interest in one or more classes and only receive messages that are of interest, without knowledge of which publishers, if any, there are. Based on this pattern, message queues provide an asynchronous communications protocol, meaning that the sender and receiver of the message do not need to interact with the message queue at the same time. Messages placed onto the queue are stored until the recipient retrieves them. Message queues have implicit or explicit limits on the size of data that may be transmitted in a single message and the number of messages that may remain outstanding on the queue.


I will treat message queue or messaging as an extension (superset) of publisher/subscriber pattern, because publish-subscribe channel is one of messaging mode. Other authors treat these concepts equally.


Reasons to use message queuing

In the Introduction some premises, in terms of needs, for messaging usage were given. Now we will point out what we will get if we use messaging.

Redundancy and transactions support
Before the message is completely removed from the queue it may be required that the process that reads the message confirm that it completed the transaction so the removal is possible and safe. In case of any problems, the message is persisted to storage and won’t be lost allows for the later reprocessing. Confirmation requirement helps also to ensure that a transaction occurs only once.

Load sensitivity reduction
Cloud is one of the most popular term nowadays. We cloudify almost everyting -- all that remains is to wait for someone to transfer his or her live to the cloud. Clouds are promoted, among others, as a panacea for sudden increase in demand for system resources. This need can be reduced considerably with message queue help.
In many cases it's true that we can't predict application utilization -- it's highly dependant on users actions. By queuing the data we can be assured the data will be persisted and then be processed eventually, even if that means it takes a longer than usual due to a temporary high system resources needs.

Efficiency by batching
If you've ever worked with databases, you should know that it is much more efficient to insert 100 records into a database at a time instead of 1 record at a time, 100 times. If you've ever worked with programming languages, you should know that it is much more efficient to save 100 bytes into a file at a time instead of 1 byte at a time, 100 times. Collecting data/operations in one bigger batch helps us optimize performace by reducing constant initializing and ending costs required for every operation.

Asynchrony
Queues are an excellent way to implement an asynchronous programming pattern. Asynchronous pattern (also known as asynchronous method invocation (AMI) or asynchronous method calls) is a design pattern in which the call site is not blocked while waiting for the called code to finish. Instead, the calling site is notified when the results are ready to be delivered. This approach can be great in scenarios where an application needs something done but doesn’t need it done now, or doesn’t even care about the result.

Decoupling and coupling
With message queues we can reach two totaly different and contradictory goals.

  • Decoupling In case of big monolitic systems it can be difficult to integrate new ideas with existing solutions. Messages queues provide handy tool allowing implement abstraction. This way monolitic system can be decoupled into union of loosely connected modules. Each module can be replaced in any time by equivalent solution but maybe implemened in totaly different technology. Decoupling helps to increase fault tolerance and makes easier system maintenance.
  • Coupling In case of many systems which are supposed to function as one consistent, messages queues provide handy tool allowing implement an intermediate layer supporting communication between technologicaly different elements.

Data flow monitoring
In some sense message queues limits possible communication channels and this way allow us to monitor a total traffic easily. We can get information how many items are in a queue, the rate of processing messages, and other stats.

Reduce race condition influence
A race condition or race hazard is the behavior of an electronics, software, or other system where the system's substantive behavior is dependent on the sequence or timing of other uncontrollable events.

Imagine that 1000 people are logged to our website and working with some data, potentially not exclusively. It's highly probable that many threads are created to serve all requests. In such a case some problems with concurrency may arise and it may be difficult to ensure that the first order in finishes first. By queuing them up, we could guarantee their order and keep control how they are processed (sequentialy, concurrently, in batch etc).


Messaging system components

Messaging makes the messaging system responsible for transferring data from one application to another, so the applications can focus on what data they need to share but not worry so much about how to share it. Like all technologies, also messaging involves certain basic concepts. Below these basic concepts are discussed so we can make sense of the general idea and their place in the whole system. Messaging system components and their place can be gracefully depicted on the image

You can use recycling queue to make all concept less abstract:


Message
Any data that is to be transmitted via a messaging system must be converted into one or more messages that can be sent through messaging channels.

A message consists of two basic parts:

  • Header Information used by the messaging system that describes the data being transmitted, its origin, its destination, and so on.
  • Body The data being transmitted; generally ignored by the messaging system and simply transmitted as-is.

This concept is not unique to messaging. All network comunication protocols like for example well known Ethernet, IP or TCP are based on "packaging" user data into "box". Despite their different names (Ethernet -- frame, IP -- datagram or packet, TCP -- segment) all "boxes" share similar structure: user data bytes (known as body or payload) are preceded by information necessary to efficiently transmit them from sender to receiver.

From the messaging system point of view, all messages are the same: some body of data to be transmitted as described by the header. However, to the programmer, there are different types of messages, i.e., different application styles of use.

Message channel
If you have ever worked with computer network ideas you should be familiar with an image like below

where the Internet is depicted as a cloud. We don't know details of that cloud and we don't care about them. In some sens this is an idea of the Internet. If we want to send some data, we simply throw information into and receive at destination point. In most cases the way data travel doesn't matter.

Messaging system isn't like that. Simply speaking, it's a set of connections that enable applications to communicate by transmitting information in predetermined and predictable ways. In case of messaging system when an application has data to share, it doesn't just put it into the messaging system. Instead, it adds the information to a particular, carefully choosen, message channel. On the other side, an application receiving information doesn't simply wait for data on a well known address (like IP) because such an address doesn't exists. Instead, it retrieves the information from a particular, carefully choosen, message channel.

The application adding data doesn't necessarily know what particular application will end up retrieving the data which is opposite to the Internet communication where precise source and destination addresses and even applications are known (both endpoints have specific IP address and use specific well known and predefined TCP/UDP port). Optimistic information is that whatever application retrieves the data, that application will be interested in it. This is because the messaging system has different message channels for different types of information the applications want to share. When an application sends data, it doesn't randomly add it to any channel available; it adds the info to a channel whose specific purpose is to share that sort of data. The same way, an application that wants to receive particular data selects what channel to get data from based on what type of information it wants to get.

There are two different kinds of message channels: point-to-point and publish-subscribe.

  • point-to-point Communications protocol used to establish a direct connection between two nodes.
  • publish-subscribe Communications protocol where senders called publisher do not sent data directly to specific receivers, called subscribers, but instead categorize published messages into classes without knowledge of which subscribers, if any, there may be. Similarly, subscribers express interest in one or more classes and only receive messages that are of interest, without knowledge of which publishers, if any, there are.

Message router
The sender and the receiver of a message is separated by message channel which makes them anonymous to each other. This way multiple applications can publish messages to the same message channel. As a result, a message channel can contain messages from different sources that may have to be treated differently based on the type of the message or other criteria.

To make things simpler there is nothing wrong with creation many separated and well suited message channels for many different message types and bind each channel with the required processing steps for that message type. Unfortunately solving this problem that way we encounter new difficulties.

  • This would require the sender to be aware of different processing steps depending on message type, so that they can publish the message to the correct channel dedicated to this type of data.
  • Creating a new channel any time we want to send a new type of data can lead to uncontrolable growth in the number of message channels.
  • Channels are cheap, but they're not free. Each channel requires system resources: memory to represent the messages, and disk space in case of persistent channels.
  • This could makes difficulties when selection of channel depends on the number of messages that have passed through the channel so far. A sender may have no information about this number.

A key property of the message router is that it does not modify the message contents -- it only care about the destination of the message. A message router, removes a message from one message channel and republishes it to a different message channel(s) depending on a set of conditions.

Message translator
Message translator translates one data format into another.

Message broker
Message broker module may consist of a number of specialized submodules, each intended to do one job. Among others, aforementioned message router and message translator can be such a module.
The primary purpose of a broker is to take incoming messages from applications and perform some action on them:

  • validate them,
  • transform them,
  • translates a message from the formal messaging protocol of the sender to the formal messaging protocol of the receiver,
  • perform message aggregation and/or decomposition,
  • route them to the correct destination,
  • manage a workload for multiple receivers,
  • provie reliable storage,
  • guaranteed message delivery.

Message endpoint
Message endpoint is an abstraction layer between application and messaging channel. It is the messaging endpoint responsibility to take command or data from application, packed it into a message form, and send it on a particular messaging channel. On the other side, it is the endpoint responsibility to receive a message, extract the contents, and pass correct data to the application so it can use it.


Message queue usage examples

Message queues can be used in various ways.


Message queue as a point-to-point connection

We can use named queue to send data from a point directly to another point


Message queue as work queues

We can use named queue to distribute time-consuming tasks among multiple workers (in such a case message queue is called work queue or task queue).


Message queue as a broadcast publish-subscribe channel

We can use message queue to deliver a message to multiple consumers with broadcast (fanout) exchange. The fanout exchange is very simple -- it just broadcasts all the messages it receives to all the queues it knows.


Message queue as a publish-subscribe channel with bindings

We can use message queue to deliver a subset of message to multiple consumers with direct exchange. The routing algorithm behind a direct exchange is simple -- a message goes to the queues whose binding key exactly matches the routing key of the message.


Message queue as a publish-subscribe channel with topics

We can use message queue to deliver a subset of message to multiple consumers with topic exchange. The logic behind the topic exchange is similar to a direct one -- a message sent with a particular routing key will be delivered to all the queues that are bound with a matching binding key. The key difference is that with topic we can do routing based on multiple criteria. Messages sent to a topic exchange can't have an arbitrary routing_key - it must be a list of words, delimited by dots. The words can be anything, but usually they specify some hierarchically ordered (structured) features connected to the message. There are two important special cases for binding keys:

  • * (star) can substitute for exactly one word.
  • * (hash) can substitute for zero or more words.

For example the messages may be sent with a routing key that consists of four words and three dots of general form

in particular

Binding

  • *.*.dht22.* means: all DHT22 sensors regardless of their location and parameter type.
  • home.# means: all sensors located at home regardless of their type, precise location and parameter type.


Message queue as a Remote Procedure Call

We can use message queue to run a function on a remote computer and wait for the result -- we can make so called Remote Procedure Call.


Standards and protocols

Historically, message queuing has used proprietary, closed protocols, restricting the ability for different operating systems or programming languages to interact in a heterogeneous set of environments. Over the years three standards have emerged which are used in open source message queue implementations:

  • Advanced Message Queuing Protocol (AMQP) –- feature-rich message queue protocol, approved as ISO/IEC 19464 since April 2014; AMQP v 1.0 was approved as an OASIS standard on 31 October 2012;
  • Streaming Text Oriented Messaging Protocol (STOMP) -– simple, text-oriented message protocol;
  • MQTT (formerly MQ Telemetry Transport) -- lightweight message queue protocol especially for embedded devices (since 2014-10-30 MQTT Version 3.1.1 becomes an OASIS Standard; is an ISO standard (ISO/IEC PRF 20922)).

These protocols are at different stages of standardization and adoption. The first two operate at the same level as HTTP, MQTT at the level of TCP/IP.


AMQP

AMQP, which stands for Advanced Message Queuing Protocol, is a binary wire protocol which was designed for interoperability between different vendors. It was designed as an open replacement for existing proprietary messaging middleware. Two of the most important features of AMQP are reliability and interoperability. It provides a wide range of features related to messaging, including

  • flow controlled, message-oriented communication,
  • reliable and manageable queuing (eg. restrict access to queues, manage their depth e.t.c.),
  • topic-based publish-and-subscribe messaging,
  • flexibility and versatility through the use of message properties, annotations and headers,
  • message-delivery guarantees such as at-most-once (where each message is delivered once or never), at-least-once (where each message is certain to be delivered, but may do so multiple times) and exactly-once (where the message will always certainly arrive and do so only once),
  • flexible routing,
  • transactions,
  • security -- authentication and/or encryption based on SASL (Simple Authentication and Security Layer) and/or TLS (Transport Layer Security).

AMQP assumes an underlying reliable transport layer protocol such as Transmission Control Protocol (TCP).


In computer networking, a wire protocol refers to a way of getting data from point to point and is needed if more than one application has to interoperate. It generally refers to protocols higher than the physical layer. This term is used to describe a common way to represent information at the application level. A wire-level protocol is a description of the format of the data that is sent across the network as a stream of bytes.

Basically, if something's communicating with a remote machine (even conceptually) then there's some data going across the network connection (the wire). A wire-level protocol can be thought of as the complement of an API. Instead of defining functions and creating libraries, you define the conversational byte sequences that pass over a network to make things happen.


STOMP

STOMP, which stands for Simple (or Streaming) Text Oriented Message Protocol (STOMP), formerly known as TTMP (???), is a simple text-based protocol, designed for working with message-oriented middleware. The protocol provides a wire format that is very similar to HTTP, and works over TCP using the following short set of commands:

  • CONNECT (or STOMP)
  • SEND
  • SUBSCRIBE
  • UNSUBSCRIBE
  • BEGIN
  • COMMIT
  • ABORT
  • ACK
  • NACK
  • DISCONNECT

Communication between server and client is through a MESSAGE, RECEIPT or ERROR commands.
Basic communication unit (command) is a "frame" consisting of a number of lines. The first line contains the command, followed by headers in the form [KEY]: [VALUE] (one per line), followed by a blank line and then the body content, ending in a null character. For example (see also a STOMP Protocol Specification, Version 1.2):

This sends a message to a destination named /queue/a. Note that STOMP destination naming does not imply any delivery semantics. See Examples section for an example of STOMP "session".


MQTT

MQTT invented by Andy Stanford-Clark of IBM, and Arlen Nipper of Arcom (now Eurotech), in 1999, known as MQ Telemetry Transport, which stands for Message Queuing Telemetry Transport is a publish/subscribe, extremely simple and lightweight messaging protocol, designed for constrained devices and low-bandwidth, high-latency or unreliable networks. This protocol is ideal for machine-to-machine connections, on the Internet of things (IoT), in mobile devices, and where savings in bandwidth and energy are required. Basically, it can be used effectively in resource-constrained devices -- embedded systems. MQTT’s strengths are simplicity and low footprint -- the design principles are to minimise network bandwidth and device resource requirements whilst also attempting to ensure reliability and some degree of assurance of delivery.

A minimal MQTT control message can be as little as two bytes of data but when needed nearly 256 megabytes of data can be carried. There are fourteen defined message types used to connect and disconnect a client from a broker, to publish data, to acknowledge receipt of data, and to supervise the connection between client and server.

  • CONNECT – Client requests a connection to a Server
  • CONNACK – Acknowledge connection request
  • PUBLISH – Publish message
  • PUBACK – Publish acknowledgement. It is the response to a PUBLISH Packet with QoS level 1.
  • PUBREC – Publish received (QoS 2 publish received, part 1). It is the response to a PUBLISH Packet with QoS 2. It is the second packet of the QoS 2 protocol exchange.
  • PUBREL – Publish release (QoS 2 publish received, part 2). It is the response to a PUBREC Packet. It is the third packet of the QoS 2 protocol exchange.
  • PUBCOMP – Publish complete (QoS 2 publish received, part 3). It is the response to a PUBREL Packet. It is the fourth and final packet of the QoS 2 protocol exchange.
  • SUBSCRIBE - Subscribe to topics
  • SUBACK – Subscribe acknowledgement
  • UNSUBSCRIBE – Unsubscribe from topics
  • UNSUBACK – Unsubscribe acknowledgement
  • PINGREQ – PING request
  • PINGRESP – PING response
  • DISCONNECT – Disconnect notification


MQTT relies on the TCP protocol for data transmission. A variant, MQTT-SN, is used over other transports such as UDP or Bluetooth. TCP/IP port 1883 is reserved with IANA for use with MQTT. TCP/IP port 8883 is also registered, for using MQTT over SSL. Same ports are used for UDP. MQTT sends connection credentials in plain text format and does not include any measures for security or authentication. This can be provided by the underlying TCP transport using measures to protect the integrity of transferred information from interception or duplication. In practice to ensure a secure communication the MQTT is encrypted using TLS / SSL protocols.

Despite it's simplicity it offers three qualities of service:

  • At most once - the message is sent only once and the client and broker take no additional steps to acknowledge delivery (fire and forget / unreliable).
  • At least once - the message is re-tried by the sender multiple times until acknowledgement is received (acknowledged delivery). To ensure it is received, message is sent a minimum of one time (but might be sent more than one time).
  • Exactly once - the sender and receiver engage in a two-level handshake to ensure only one copy of the message is received (assured delivery).

Because UDP protocol does not guarantee packet delivery, one can suppose that MQTT over UDP is not reliable. If you think so, you are wrong. If we use MQTT for repeated updates, such as sensor data transfer, UDP is actually more reliable, than TCP. If our network drops each second packet, MQTT/UDP will simply loose some of them without affecting the transfer of the others. MQTT/TCP will theoretically transfer all data but effectively will be dead, attempting to resend again and again outdated packets which most likely are not needed anymore.


Examples


AMQP

AMQP examples are given in separate tutorial -- see XYZ


STOMP

Below there is an example of STOMP "session" (as a server XYZ is used)


MQTT