Sunday, 31 July 2016

RabbitMQ Clustering / High Availability (HA) / Fault Tolerance (FT)

Agenda:

This blog is related with "RabbitMQ Clustering / High Availability (HA) / Fault Tolerance (FT)". It is not a beginner introductory tutorial. Though I will cover some RabbitMQ Basics as base point for Clustering/HA.


Topics Covered:

  • RabbitMQ Basics (limited)
  • RabbitMQ Clustering/HA/FT
  • RabbitMQ System Design Factors



RabbitMQ Basics:

RabbitMQ is open source Message Broker (MB) / Message Oriented Middleware (MOM) software that provides Robust Messaging for Applications (accepts / routes / stores / forward messages).
It Supports various operating systems / developer platforms & messaging protocols

                https://www.rabbitmq.com/download.html
                https://www.rabbitmq.com/devtools.html
                https://www.rabbitmq.com/protocols.html


Basic Flow:




Jargons:

  • Producer :  A program that sends messages is a producer (to Exchange). 
  • Exchanges: Routes messages from Producer to Queue
  • Queue: Stores the messages
  • Consumer: A program that receives messages (from Queue).
  • Binding: Exchange & Queue are binded using binding key/headers  
  • Connection: A physical Socket Connection
  • Channel: Virtual/Logical tunnel (we can create multiple channels from single connection)


Examples:



RabbitMQ Use Cases:

  • Supports Synchronous & Asynchronous Messaging across applications / Components of applications
  • Decoupling applications/components by separating sending & receiving data
  • Supports Publish-Subscribe mechanism
  • Supports Push mechanism
  • Supports Routing  mechanism
  • Supports Broadcasting
  • Asynchronous Remote Procedure Call (RPC) using Callback Queues

 

RabbitMQ Clustering / High Availability(HA) & Fault Tolerant Systems (FT):


RabbitMQ Clustering:

  • A single logical RabbitMQ broker comprised of multiple nodes, each running RabbitMQ application & sharing resources/metadata like vHosts/users/exchanges etc.
  • All data/state required for the operation of a RabbitMQ broker (like vHosts/exchanges etc.) are replicated across all nodes through clustering so they survive if any node goes down (not business data/messages & queues, though they are reachable/visible from other nodes of cluster...........means if the node that holds messages/queues goes down then it won't be accessible though other nodes of cluster) (Note: to replicate business data/messages & queue across nodes on cluster, you need to implement "High Availability")
  • It is recommended to use Clustering on LAN, if you want WAN friendly mechanism then explore Federation/Shovel

  Cluster Formation Pre-requisite

  • RabbitMQ nodes inside a cluster, communicates through the cookie. So when you form a cluster, all the nodes should share the same cookie
  • RabbitMQ nodes should be able to resolve hostname of member nodes using DNS or local host files
  • All nodes in a cluster must run the same version of Erlang/RabbitMQ (patch version can be different)
  • It is recommended to implement Clustering in LAN

 Cluster Setup Steps:

  • Setup First node (run the RabbitMQ application)
  • Copy the cookie to all nodes (which are supposed to form a cluster). Note: If you are testing the cluster on single machine, you don't require to copy cookie.
  • Setup all other nodes (run the RabbitMQ application)
  • Join the all other Nodes to first node using RabbitMQ "join_cluster"command/config file
  • Verify the cluster (You can enable RabbitMQ Management Plug-in for all nodes & verify cluster behavior on browser)

Cluster Behaviour:


Message Behaviour in Cluster:

  • As explained before: Message resides in the queue & queue doesn't replicate by Clustering (it requires to implement HA). It means Client can connect to any node, but when he send/receive messages it will go the specific node (which is holding queue) directly/indirectly............so if client is connected to other node which is not holding queue, there will be extra hop inside the cluster.

 Cluster Composition Change:

  • Any node can start/stop anytime. This won't impact on Cluster availability. Cluster will still survive with other nodes. But Message availability may be affected: As explained before Message resides in the queue & queue doesn't replicate by Clustering (it requires to implement HA), so if node (which is holding messages) goes down, message availability will be affected.
  • Any node can be added or detached from cluster anytime (added/removed node will reset in the process).

Upgrading Cluster:

  • When upgrading from one major or minor version of RabbitMQ to another (i.e. from 3.0.x to 3.1.x, or from 2.x.x to 3.x.x), or when upgrading Erlang, the whole cluster must be taken down for the upgrade (since clusters cannot run mixed versions like this). This will not be the case when upgrading from one patch version to another (i.e. from 3.0.x to 3.0.y); these versions can be mixed in a cluster (with the exception that 3.0.0 cannot be mixed with later versions from the 3.0.x series).

 References on Clustering:




High Availability (HA):

Active-Passive Configuration:
This can be achieved using Pacemaker & DRBD/SANHere persistent messages can be written to the Disk on Active Node & can be recovered by Passive Node in case of Active Node failure.Here Passive Node may take a little while as it reads the message off the disk.Pacemaker is used for Resource Management & Monitoring. 
More Details:https://www.rabbitmq.com/pacemaker.html

Active-Active Configuration (Preferred):
This can be achieved using Mirrored Queue.


High Availability (HA) Behavior using Mirrored Queue:

  • RabbitMQ Clustering helps to replicate metadata like Exchanges to Cluster Nodes. But it doesn't help to replicate Messages (Business Data) & Queues. By implementing High Availability using Mirrored Queue, we can replicate Queues & Message data to other cluster nodes.
  • Queue Mirroring is achieved through HA Policy. Policies can change anytime.
  • Each Mirrored queue consists of one master node & one or more slave nodes.
  • Oldest Slave will be promoted to Master if old master disappears for any reason & as old master comes back, it works as Slave.

Note: Master-Slave relationship is specific to the Queues (not Nodes).Suppose you have 3 node cluster & implemented HA policies among them. You have 3 Queues.Than it is possible that at the same time:
  • for First Queue --> Master is Node1 & Slaves are Node2 & Node 3
  • for Second Queue --> Master is Node2 & Slaves are Node1 & Node 3
  • for Third Queue --> Master is Node3 & Slaves are Node3 & Node 1
  • Producers/Consumers can connect to any node in the cluster, but still all the requests will be served from the master node. This is required to maintain Queue Behavior (FIFO). It means Client can connect to any node, but when he send/receive messages it will go through the master node of the queue directly/indirectly............so if client is connected to slave nodes of the queue, than there will be extra hop inside the cluster.
  • Messages published to the queue are replicated to all slaves Asynchronously means client doesn't have to wait for 'Message Replication Completion'.


HA Policies:



Slave Node Synchronisation:

Implicit Synchronisation:

  • A node may join a cluster at any time. At this point, the new slave will be empty: it will not contain any existing contents of the queue. Such a slave will receive new messages published to the queue, and thus over time will accurately represent the tail of the mirrored queue. As messages are drained from the mirrored queue, the size of the head of the queue for which the new slave is missing messages, will shrink until eventually the slave's contents precisely match the master's contents. At this point, the slave can be considered fully synchronised, but it is important to note that this has occurred because of actions of clients in terms of draining the preexisting head of the queue.


Explicit Synchronisation:

  • Explicit synchronisation can be triggered in two ways: automatically or manually. 

If a queue is set to automatically synchronise, it will synchronise whenever a new slave joins (slave node will be unresponsive until it has done so). It can be achieved through policies:
  • ha-sync-mode:automatic
 With RabbitMQ 3.6.0 we introduced a new policy that can be configured with mirrored queues:ha-sync-batch-size. By synchronising messages in batches, the synchronisation process can be sped up considerably.
 We can also Synchronize Queues manually by setting policy:
  • ha-sync-mode:manual
If it is not set then manual is assumed. In case of manual, we can synchronize it by command:
  • rabbitmqctl sync_queue name


Other References:



Load Balancers:

Think about a 3 node rabbitmq cluster, all nodes have different IPs. A client can connect to any node, but if that node goes down, you (client) again need to change node details on client program to connect to other existing nodes of the cluster (which is definitely  not a good idea).
To avoid this problem, you can use any load balancers like HAProxy/Nginx etc. on top of RabbitMQ Cluster.

If you are using Spring-AMQP configuration, It provides inbuilt Cluster support. In this case you can avoid Load-Balancer.
  • <rabbit:connection-factory  id="connectionFactory" addresses="host1:5672,host2:5672"/>



Other References:


 


Migration from Single RabbitMQ Node to RabbitMQ Cluster with HA:


Assume you are already having a Single RabbitMQ Node which is currently serving all messaging purpose. You thought to implement Clustering/HA. So added two more nodes, formed cluster & implemented HA policy. It is working good.

But still all queues are on first node means first node serves as master node for all queues. Every Message passes (Producer/Consumer) through first node. As the number of queues & messages will increase, it may be tough to scale.

To solve this problem, you can change the master node of specific queue (without stopping any node/cluster).

check out this link:



Designing RabbitMQ Message Broker System:


So far we have covered RabbitMQ Basics/Clustering & HA Concepts.

But Important question is still open, what is best configuration that balance Broker Performance & High Availability (HA), also properly fits with my requirement.

Answer of this question may be different for each application. I tried to summarize points below that will help you to find best configuration. You can create Performance Matrix, play around all the scenarios & see what configurations suits you best.

RabbitMQ  Deployment Architecture:

  • LAN : Clustering
  • WAN : Explore Federation & Shovel

RabbitMQ Routing Topology:

  • Exchanges can be of different types like Direct/Fan-out/Topic/Headers which serves different requirements like Broadcasting/Publisher-Subscriber mechanisms etc.

Broker's Reliability/Availability:

  • Clustering
  • Meta-Data Reliability : All Metadata like Exchanges/Queues can be marked as Durable or Transient
  • Disk & RAM nodes (I could not explore the use cases of RAM node. )

 Message Reliability/Availability:

  • Messages can be marked as Persistent.
(Durability of a queue does not make messages that are routed to that queue durable. If broker is taken down and then brought back up, durable queue will be re-declared during broker startup, however, only persistent messages will be recovered)
  • Message Acknowledgement:
Consumer Prefetch is an important point to note, if you are processing huge number of message payloads & using AUTO acknowledgement, you may get Out of Memory Error on Consumer Side. To avoid this you can use Consumer Prefetch with Manual Acknowledgement.
  •  High Availability (HA) through Mirrored Queue


 Separation & Multi-Tenancy:

  • vHosts
  • Users

 Performance & Scaling:

  • Cluster Size: How many nodes will be in the cluster (recommended to use odd number of nodes like 3,5 and so on)
  • Replication Factor of Queues: It is not necessary that all the Queues will be replicated to all nodes of the cluster. You can choose the replication factor & implement corresponding HA Policy
  • Average Message Size (approx.)
  • Message Processing Throughput (expected/actual)
  • Auto Sync, Batch Synchronisation
  • Multi Producers/Consumers Scenario


Others:

  • Handling Network Failures
  • Handling Policies (HA Policies explained before are just subset of RabbitMQ Policies)


Sample Source Code (based on Windows):



Other References:



Motivation: