Enterprise Integration Patterns
Messaging Patterns
HOME PATTERNS RAMBLINGS ARTICLES TALKS DOWNLOAD BOOKS CONTACT
Content EnricherContent EnricherMessaging Patterns » Message Transformation

When sending messages from one system to another it is common for the target system to require more information than the source system can provide. For example, incoming Address messages may just contain the ZIP code because the designers felt that storing a redundant state code would be superfluous. Likely, another system is going to want to specify both a state code and a ZIP code field. Yet another system may not actually use state codes, but spell the state name out because it uses free-form addresses in order to support international addresses. Likewise, one system may provide us with a customer ID, but the receiving system actually requires the customer name and address. An order message sent by the order management system may just contain an order number, but we need to find the customer ID associated with that order, so we can pass it to the customer management system. The scenarios are plentiful.

How do we communicate with another system if the message originator does not have all the required data items available?

Use a specialized transformer, a Content Enricher, to access an external data source in order to augment a message with missing information.

The Content Enricher uses information inside the incoming message (e.g. key fields) to retrieve data from an external source. After the Content Enricher retrieves the required data from the resource, it appends the data to the message. The original information from the incoming message may be carried over into the resulting message or may no longer be needed, depending on the specific needs of the receiving application.

...

Example: Content Enricher with Amazon EventBridge PipesNEW

For cloud-based messaging systems, Amazon EventBridge Pipes can implement a Content Enricher through the "Enrichment" step:


Enriching messages with Amazon EventBridge Pipes

The step itself is a slight misnomer as it can implement any Message Translator or even a Message Filter for batched sources. The pipe simply passes on the message(s) that are returned by the Enrichment step. To build an actual Content Enricher, you can use a Lambda function that fetches additional data from a DynamoDB table based on a key from the original message. You can find the source code for this example implementation in the EIP Code Repository.

You start out by creating two Message Channels in form of Amazon SQS Queues (the example uses the AWS CLI command line to keep things simple; for real projects an automation script is highly recommended):

$ aws sqs create-queue --queue-name EntityIDs --attributes='{"MessageRetentionPeriod": "300"}'
$ aws sqs create-queue --queue-name EntityUpdates --attributes='{"MessageRetentionPeriod": "300"}'
$ aws sqs list-queues

The last command prints out the URLs of the created queues, which you will need later.

The Content Enricher, implemented in EventBridge Pipes, Lambda, and DynamoDB, will fetch messages containing just id fields from EntityID queue and publish enriched events to the EntityUpdates queue. To build the Content Enricher, you first create a table to hold the Entity data and load it up with sample data, which is most easily done with a JSON file (available in the EIP Code Repository):

$ aws dynamodb create-table --table-name Entities \
      --attribute-definitions AttributeName=id,AttributeType=S \
      --key-schema AttributeName=id,KeyType=HASH \
      --billing-mode PAY_PER_REQUEST

$ aws dynamodb batch-write-item --request-items file://entities.json

To check that the table is loaded (and to get accustomed to DynamoDB's syntax), you can fetch an entity:

 $ aws dynamodb get-item --table-name Entities --key '{"id": {"S": "123"}}'
 {
    "Item": {
        "id": { "S": "123" },
        "Amount": { "N": "12.34" },
        "Type": { "S": "Order" },
        "Customer": { "S": "Pete's Pizza" }
    }
}

Before you create the Pipe, you need to create a security role with the required permissions (the trust policy file is supplied in the EIP repository) so that your pipe can read and write messages from SQS and invoke a Lambda function. It's also handy to get your account ID and store it in a variable (if the command line doesn't support evaluations, you can execute the command by hand and set the variable explicitly or just replace $id with the number on each command line):

$ id=`aws sts get-caller-identity --query 'Account' --output text`
$ echo $id
1234567890

$ aws iam create-role --role-name EventBridge_Pipes_Enricher --assume-role-policy-document file://trust_policy_pipes.json

$ aws iam create-policy --policy-name pipes_sqs_lambda_sqs --policy-document file://pipes_role_policy.json \
    --query Policy.Arn --output text

$ aws iam attach-role-policy --role-name EventBridge_Pipes_Enricher --policy-arn arn:aws:iam::$id:policy/pipes_sqs_lambda_sqs

Additionally, you need to grant write permission to the EntityUpdates queue with a so-called resource policy that's attached to a resource instead of a role:

$ aws sqs add-permission --queue-url= https://sqs.ap-southeast-1.amazonaws.com/$id/EntityUpdates \
  --aws-account-ids=$id \
  --actions SendMessage ReceiveMessage DeleteMessage ChangeMessageVisibility \
  --label SendEntities

You can now create a Pipe and give it the required permissions through the role you just created (notice the use of $id for the account ID):

$ aws pipes create-pipe --name ContentEnricher \
  --source arn:aws:sqs:ap-southeast-1:$id:EntityIDs \
  --target arn:aws:sqs:ap-southeast-1:$id:EntityUpdates \
  --enrichment arn:aws:lambda:ap-southeast-1:$id:function:EnrichFromDynamoDB  \
  --role-arn arn:aws:iam::$id:role/EventBridge_Pipes_Enricher

You also need to deploy the Lambda function that looks up the entities from the database and returns the enriched messages. EventBridge pipes can work in batches, meaning it can fetch more than one event from the event source at one time. Correspondingly, your Lambda function has to be prepared receive a list of IDs. In my implementation, it uses the batch_get_item method to look up all IDs at once (invalid IDs simply don't return an item):

import json
import boto3

dynamo = boto3.resource('dynamodb')
table_name = 'Entities'

def lambda_handler(event, context):
    keys = []
    for evt in event:
      body = json.loads(evt['body'])
      keys += [ {'id': body['id']} ]
    data = dynamo.batch_get_item(RequestItems={table_name: {'Keys': keys} })
    return data['Responses'][table_name]

As you might have guessed, the Lambda function also needs a role that allows it to read from DynamoDB and write logs:

$ aws iam create-role --role-name Lambda_Enricher --assume-role-policy-document file://trust_policy_lambda.json

$ aws iam attach-role-policy --role-name Lambda_Enricher --policy-arn arn:aws:iam::aws:policy/AmazonDynamoDBReadOnlyAccess

$ aws iam attach-role-policy --role-name Lambda_Enricher --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole

To deploy the function directly, you need to first zip it up (using any Zip utility) to code.zip and deploy it with the following command line (again, notice the $id being referenced):

$ aws lambda create-function --function-name EnrichFromDynamoDB \
     --role arn:aws:iam::$id:role/Lambda_Enricher \
     --runtime python3.9 --zip-file fileb://code.zip --handler DynamoDBEnricher.lambda_handler

Show time! You can now send a message to the inbound queue for the Content Enricher and poll the result queue for enriched messages (the query and output parameters just filter down the output to the message body; feel free to omit them for the full experience):

$ aws sqs send-message --queue-url https://sqs.ap-southeast-1.amazonaws.com/$id/EntityIDs --message-body '{"id": "123"}'
$ aws sqs send-message --queue-url https://sqs.ap-southeast-1.amazonaws.com/$id/EntityIDs --message-body '{"id": "456"}'

$ aws sqs receive-message --max-number-of-messages 5 \
   --queue-url https://sqs.ap-southeast-1.amazonaws.com/$id/EntityUpdates \
   --query 'Messages[*].Body' --output text
{"id":"123","Amount":"12.34","Type":"Order","Customer":"Pete's Pizza"}
{"id":"456","Amount":"22.22","Type":"Order","Customer":"Mary's Muffins"}

Related patterns: Content Filter, Event Message, Message Filter, Message Channel, Message Translator, Claim Check


Table of Contents
Preface
Introduction
Solving Integration Problems using Patterns
Integration Styles
File Transfer
Shared Database
Remote Procedure Invocation
Messaging
Messaging Systems
Message Channel
Message
Pipes and Filters
Message Router
Message Translator
Message Endpoint
Messaging Channels
Point-to-Point Channel
Publish-Subscribe Channel
Datatype Channel
Invalid Message Channel
Dead Letter Channel
Guaranteed Delivery
Channel Adapter
Messaging Bridge
Message Bus
Message Construction
Command Message
Document Message
Event Message
Request-Reply
Return Address
Correlation Identifier
Message Sequence
Message Expiration
Format Indicator
Interlude: Simple Messaging
JMS Request/Reply Example
.NET Request/Reply Example
JMS Publish/Subscribe Example
Message Routing
Content-Based Router
Message Filter
Dynamic Router
Recipient List
Splitter
Aggregator
Resequencer
Composed Msg. Processor
Scatter-Gather
Routing Slip
Process Manager
Message Broker
Message Transformation
Envelope Wrapper
Content Enricher
Content Filter
Claim Check
Normalizer
Canonical Data Model
Interlude: Composed Messaging
Synchronous (Web Services)
Asynchronous (MSMQ)
Asynchronous (TIBCO)
Messaging Endpoints
Messaging Gateway
Messaging Mapper
Transactional Client
Polling Consumer
Event-Driven Consumer
Competing Consumers
Message Dispatcher
Selective Consumer
Durable Subscriber
Idempotent Receiver
Service Activator
System Management
Control Bus
Detour
Wire Tap
Message History
Message Store
Smart Proxy
Test Message
Channel Purger
Interlude: Systems Management Example
Instrumenting Loan Broker
Integration Patterns in Practice
Case Study: Bond Trading System
Concluding Remarks
Emerging Standards
Appendices
Bibliography
Revision History


Enterprise Integration Patterns

Find the full description of this pattern in:
Enterprise Integration Patterns
Gregor Hohpe and Bobby Woolf
ISBN 0321200683
650 pages
Addison-Wesley

From Enterprise Integration to Enterprise Transformation:

My new book describes how architects can play a critical role in IT transformation by applying their technical, communication, and organizational skills with 37 episodes from large-scale enterprise IT.

DRM-free eBook on Leanpub.com

Print book on Amazon.com

Creative Commons Attribution License

Parts of this page are made available under the Creative Commons Attribution license. You can reuse the pattern icon, the pattern name, the problem and solution statements (in bold), and the sketch under this license. Other portions of the text, such as text chapters or the full pattern text, are protected by copyright.