Enterprise Integration PatternsMessaging Patterns

Message FilterMessage Filter

Messaging Patterns

Previous Previous   Next Next

Continuing with the order processing example, let's assume that company management publishes price changes and promotions to large customers. Whenever a price for an item changes, we send a message notifying the customer. We do the same if we are running a special promotion, e.g. all widgets are 10% off in the month of November. Some customers may be interested in receiving price updates or promotions only related to specific items. If I purchase primarily gadgets, I may not be interested in knowing whether widgets are on sale or not.

How can a component avoid receiving uninteresting messages?

Use a special kind of Message Router, a Message Filter, to eliminate undesired messages from a channel based on a set of criteria.

The Message Filter has only a single output channel. If the message content matches the criteria specified by the Message Filter, the message is routed to the output channel. If the message content does not match the criteria, the message is discarded.


Example: RabbitMQ Bindings (NEW)

RabbitMQ allows a straightforward implementation of Publish-Subscribe Channel with Message Filters via a direct exchange. Exchanges are a concept which is distinct from a queue. Generally, message producers publish messages to an exchange, which controls how the message is propagated to other queues. For example, a "fanout" exchange simply propagates a message to all subscribed queues while a "direct" exchange allows subscribing queues to express a filter condition for the messages they want to receive.

A direct exchange places incoming messages into a subscribing queue based on a message's routing key and the queue's binding key. Multiple binding keys are allowed per queue and multiple queues can use the same binding key, which results in each of them receiving a copy of the queue.

Mapping RabbitMQ Concepts to Enterprise Integration Patterns

The Widgets and Gadgets example from above can be implemented quite easily with a single direct exchange and two queues bound to this exchange, each with its respective filter. A helper method helps create a new queue, bind it to the exchange with the filter, and setup a Event-Driven Consumer that simply prints out messages.

static void filteredReceive(final Channel channel, String filter, String exchangeName) throws IOException {

  System.out.println(filter + " waiting for messages. To exit press CTRL+C");

  Consumer consumer = new DefaultConsumer(channel) {
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
            throws IOException {
      String message = new String(body, "UTF-8");
      System.out.println(filter + " Received '" + message + "'");

  String queueName = channel.queueDeclare().getQueue();
  channel.queueBind(queueName, EXCHANGE_NAME, filter);

  boolean autoAck = true;
  channel.basicConsume(queueName, autoAck, consumer);

The consumer is a simple Event-Driven Consumer that is called every time a message is received. The following lines generate a new private queue and bind it to the exchange with the specified filter string. Lastly, the consumer implementation is attached to the queue.

We can now create the direct exchange and start multiple of these consumers, giving each a respective binding key:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

filteredReceive(channel, "Widget", EXCHANGE_NAME);
filteredReceive(channel, "Gadget", EXCHANGE_NAME);

Lastly, we publish some messages against the exchange with different message keys:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

sendMessage(channel, EXCHANGE_NAME, "Widget", "Hello EIP Widget!");
sendMessage(channel, EXCHANGE_NAME, "Gadget", "Hello EIP Gadget!");

sendMessage is a simple helper method:

static void sendMessage(Channel channel, String exchange, String key, String message) throws IOException {
  channel.basicPublish(exchange, key, null, message.getBytes());
  System.out.println(" [x] Sent '" + message + "'");

You can run this code and observe that RabbitMQ filters the messages according to their key:

Widget waiting for messages. To exit press CTRL+C
Gadget waiting for messages. To exit press CTRL+C
Widget Received 'Hello EIP Widget!'
Gadget Received 'Hello EIP Gadget!'

The complete source code for this example can be found on Github.

Related patterns: Content-Based Router, Event-Driven Consumer, Message Router, Selective Consumer, Publish-Subscribe Channel, Recipient List

Want to keep up-to-date? Follow My Blog.
Want to read more in depth? Check out My Articles.
Want to see me live? See where I am speaking next.

Enterprise Integration Patterns Find the full description of this pattern in:
Enterprise Integration Patterns
Gregor Hohpe and Bobby Woolf
ISBN 0321200683
650 pages

From Enterprise Integration to Enterprise Transformation:

My new book describes how architects can play a critical role in IT transformation by applying their technical, communication, and organizational skills with 37 episodes from large-scale enterprise IT.

DRM-free eBook on Leanpub.com

Print book on Amazon.com

Creative Commons Attribution License Parts of this page are made available under the Creative Commons Attribution license. You can reuse the pattern icon, the pattern name, the problem and solution statements (in bold), and the sketch under this license. Other portions of the text, such as text chapters or the full pattern text, are protected by copyright.

Table of Contents
Solving Integration Problems using Patterns
Integration Styles
File Transfer
Shared Database
Remote Procedure Invocation
Messaging Systems
Message Channel
Pipes and Filters
Message Router
Message Translator
Message Endpoint
Messaging Channels
Point-to-Point Channel
Publish-Subscribe Channel
Datatype Channel
Invalid Message Channel
Dead Letter Channel
Guaranteed Delivery
Channel Adapter
Messaging Bridge
Message Bus
Message Construction
Command Message
Document Message
Event Message
Return Address
Correlation Identifier
Message Sequence
Message Expiration
Format Indicator
Interlude: Simple Messaging
JMS Request/Reply Example
.NET Request/Reply Example
JMS Publish/Subscribe Example
Message Routing
Content-Based Router
Message Filter
Dynamic Router
Recipient List
Composed Msg. Processor
Routing Slip
Process Manager
Message Broker
Message Transformation
Envelope Wrapper
Content Enricher
Content Filter
Claim Check
Canonical Data Model
Interlude: Composed Messaging
Synchronous (Web Services)
Asynchronous (MSMQ)
Asynchronous (TIBCO)
Messaging Endpoints
Messaging Gateway
Messaging Mapper
Transactional Client
Polling Consumer
Event-Driven Consumer
Competing Consumers
Message Dispatcher
Selective Consumer
Durable Subscriber
Idempotent Receiver
Service Activator
System Management
Control Bus
Wire Tap
Message History
Message Store
Smart Proxy
Test Message
Channel Purger
Interlude: Systems Management Example
Instrumenting Loan Broker
Integration Patterns in Practice
Case Study: Bond Trading System
Concluding Remarks
Emerging Standards
Revision History