Messaging Patterns : Flow, SAGA, Messaging Gateway and Observability With RabbitMQ Exchange to Exchange Bindings

Jean-Philippe Dutrève
6 min readJul 23, 2021

Agenda

This article describes a Java implementation of the previous article, Integrated Microservices. Also, it adds other concepts like Message Flows, the SAGA Pattern, RabbitMQ Exchange to Exchange bindings, CloudEvents message standard, and the Messaging Gateway Pattern.

Flow versus SAGA

  • Flows and Sagas are 2 ways of grouping a set of messages. They both have a name and an identifier so that we can group messages by.
  • The difference between them lies in their scope and their intent.
  • A Flow has a broader scope than a SAGA and encompasses all messages issued from a single initial business request (i.e. CreateOrder command). Its intent is to track all of them.
  • A SAGA is managed by a single microservice and encompasses all messages used in the orchestration of some other microservices (Commands and resulting Events, even compensating or fallback messages). Its intent is to manage a Distributed Business Transaction : either all Commands are successful or all are aborted. That helps to maintain some data consistency across a set of called microservices.

From an Observability standpoint, we may want to define:

  • A dashboard containing all messages of a single Flow instance, with all its SAGAs, to visualize the history of all messages sent during the execution starting from this initial business request.
  • A dashboard containing all messages of a single SAGA instance.

Messaging Gateway

A Messaging Gateway is a class than wraps messaging-specific method calls (send, publish, subscribe, consume) and exposes domain-specific methods to the application.
  • The Messaging Gateway is a Pattern from the Enterprise Integration Patterns web site.
  • It is a class than wraps messaging-specific method calls (send, publish, subscribe, consume) and exposes domain-specific methods to the application (Proxy or Backend).
  • This pattern is used extensively in this implementation, in order to create Service classes without any messaging infrastructure.

Use Case

A FLOW encompasses all the messages starting from an initial request (Start) and going through any service (FRONTEND, SCORING, CUSTOMER). It may be useful to track the propagation of a business request. A FLOW may contain several Sagas, one per service.
  • The use case is not very meaningful, and means to create/update a customer with a credit rating score.
  • It defines 3 microservices (FRONTEND, CUSTOMER, SCORING) and a CLIENT that publishes a starting Event and subscribes to a resulting Event (aka Choreography).
  • The communication between these 3 microservices follows the principles defined in the previous article, Integrated Microservices.
  • There are 2 FLOWs and 3 SAGAs (2 from FRONTEND, 1 from CUSTOMER).
  • Below are 2 SAGAs from FRONTEND & CUSTOMER Microservices:
A SAGA is owned by a single service and its state is persisted in database. It is used to correlate requests & replies and to manage a (distributed) business transaction. SCORING has no SAGA.
  • Below are the messages from the CreateCustomerSaga SAGA
This is the happy path of the CreateCustomerSaga Saga. This FRONTEND Saga calls CUSTOMER first, that calls SCORING in chain, before publishing its own resulting Event (CustomerScoredEvent).
  • Below are the messages from the UpdateCustomerSaga SAGA
This is the happy path of the UpdateCustomerSaga Saga. This FRONTEND Saga calls SCORING, before publishing its resulting Event (CustomerScoredEvent).

Java Implementation

  • All Java code is stored in this GitHub Repository.
  • It uses Spring Boot extensively, and the ELK stack for Observability.
  • The message broker used here is RabbitMQ due to its great routing and filtering mechanisms, and delivery guarantees.
  • The messaging framework used here is Spring Cloud Stream. It encapsulates the access to the message broker with a messaging abstraction.
  • This is NOT production code, it is only dedicated for educational purpose. It is an example of how could be created Integrated Microservices.
  • Every state is stored in-memory only.
  • The filesystem layout is an autonomous package per Microservice, and a common package that is shared by all Microservices.

Message Structure

  • Each concrete message (either a Command or an Event) inherits from this CloudEvents Java structure:
public abstract class CloudEvent<T extends Data> {String specversion;     // The version of CloudEvents Specification this message uses
String id; // Unique message ID
String type; // The type/name of this message ex: CustomerCreated, CreateCustomer
String time; // The time this message occurred, ex: 2019-10-12T07:20:50.52Z (RFC 3339)
String expirationDate; // Date after which this message is obsolete, and should NOT be taken into account, ex: 2019-10-12T07:20:50.52Z (RFC 3339)
// FROM
String server; // The application name managing this message
String source; // The service name producing this message

// TO
String destination; // The service name where the message should go, optional

// From WHICH service/request
String replyTo; // The service name the reply of a request should be sent to
String correlationId; // The Request id when this message is its Reply

// On behalf WHO
String userId; // The user that fired this message ex: system-msg-user@BA-FR

// For WHAT purpose
String flowName; // The global Business Flow this message is participating in
String flowId; // Used in Dashboard (i.e. Kibana) to group messages by a same Flow

// For WHICH Business Transaction
String sagaName; // The current Saga (Business Transaction) this message is participating in
String sagaId;

// On WHICH resource
String entity; // the aggregate root name underlying this message ex: Customer
String subject; // the Entity ID ex: 70635875785

// Custom business data
String datacontenttype = "application/json;charset=utf-8"; // Optional. The content type of data. RFC2046
String dataschema; // Optional. Identifies the schema that data adheres to (URI).
T data; // Optional. The payload of this message.
}

Versatile Deployment

  • There is no Docker image provided in this implementation, nor Kubernetes deployment script. This could be done easily and is left to the reader.
  • Instead, each container is simulated by a Spring Application and each POD is simulated by a Java process embedding some Microservices (Proxy or Backend).
  • As such, the deployment is very versatile as we can start a Java Process (POD) with one or several Microservices (Container), depending on the organizational topology (one or several teams, dev vs production).
  • Here, as an example, we imagine that there are 2 teams, owning one or several Microservices. So, there are 3 processes (PODs) : the Client, the Team A, and the Team B.
The Ambassador Architecture embeds service dependencies as Proxy sidecar containers. FRONTEND can access SCORING without Proxy because it is owned by the same team (implementation tactic). There is no blocking remote communication between PODs except with Broker & Database. Every REST API access is done on the same POD, on the localhost.
3 different Processes, simulating 3 different PODs

RabbitMQ Exchange to Exchange bindings

  • RabbitMQ Exchange to Exchange bindings is a really powerful feature that enables Microservices to only read and write on their own Exchanges (Inbound & Outbound).
  • They are used for Event subscriptions only, by binding an Inbound Exchange (Event consumer) to an Outbound Exchange (Event producer).
  • These bindings are done in the Java code, inside Spring Configuration.
  • Inbound Exchanges have Headers type, so that we can filter subscribed Events into internal Queues. Filtering is done on Message Header properties.
  • Outbound Exchanges have Fanout type, so that we can duplicate outgoing messages into any interested Inbound Exchange.
RabbitMQ bindings between Exchanges: Outbound Exchanges have Fanout type (to duplicate messages into bound Exchanges) Inbound Exchanges have Headers type (to filter messages against message Header properties) Commands are sent directly to a specific Exchange (ex: CreateCustomerCommand to the Inbound CUSTOMER Exchange).

Observability

  • Also, we use Queue to Exchange bindings for collecting all messages from the whole system (Commands & Events) into an ELK stack (Logstash/ElasticSearch/Kibana).
  • The goal is to have a Kibana Dashboard per Flow instance, and see clearly what happened after an initial Event from the CLIENT.
RabbitMQ bindings between Exchanges and Log Queue: Outbound Exchanges emit published Events only Inbound Exchanges emit Commands (& subscribed Events), so some bindings have a filter on message header properties (isCommand: true)
A Kibana Dashboard showing all messages issued after an initial Client Event.

Summary

  • This implementation demonstrates the PROS & CONS of the Ambassador Architecture.
  • It is much more complex than just using synchronous calls between Microservices, but it allows much more resiliency and availability due the Message Broker.
  • As a bonus, overall Observability is free due to the fanout of any message into the ELK stack and the use of a common CloudEvents structure, on which we can group messages by the flowId property.
  • As improvements, we could use a Transactional Database and the Inbox/Outbox Patterns to synchronize the change of an Entity and the sending/receiving of a message.

--

--