Enterprise Integration Patterns
Messaging Patterns
HOME PATTERNS RAMBLINGS ARTICLES TALKS DOWNLOAD BOOKS CONTACT
Messaging Patterns
Content EnricherContent EnricherMessaging Patterns » Message Transformation

When sending messages from one system to another it is common for the target system to require more information than the source system can provide. For example, incoming Address messages may just contain the ZIP code because the designers felt that storing a redundant state code would be superfluous. Likely, another system is going to want to specify both a state code and a ZIP code field. Yet another system may not actually use state codes, but spell the state name out because it uses free-form addresses in order to support international addresses. Likewise, one system may provide us with a customer ID, but the receiving system actually requires the customer name and address. An order message sent by the order management system may just contain an order number, but we need to find the customer ID associated with that order, so we can pass it to the customer management system. The scenarios are plentiful.

How do we communicate with another system if the message originator does not have all the required data items available?

Use a specialized transformer, a Content Enricher, to access an external data source in order to augment a message with missing information.

The Content Enricher uses information inside the incoming message (e.g. key fields) to retrieve data from an external source. After the Content Enricher retrieves the required data from the resource, it appends the data to the message. The original information from the incoming message may be carried over into the resulting message or may no longer be needed, depending on the specific needs of the receiving application.

The additional information injected by the Content Enricher has to be available somewhere in the system. The most common sources for the new data are:

... Read the entire pattern in the book Enterprise Integration Patterns

Example: Content Enricher with Amazon EventBridge PipesNEW

For cloud-based messaging systems, Amazon EventBridge Pipes can implement a Content Enricher through the "Enrichment" step:


Enriching messages with Amazon EventBridge Pipes

The step itself is a slight misnomer as it can implement any Message Translator or even a Message Filter for batched sources. The pipe simply passes on the message(s) that are returned by the Enrichment step. To build an actual Content Enricher, you can use a Lambda function that fetches additional data from a DynamoDB table based on a key from the original message. You can find the source code for this example implementation in the EIP Code Repository.

You start out by creating two Message Channels in form of Amazon SQS Queues (the example uses the AWS CLI command line to keep things simple; for real projects an automation script is highly recommended):

$ aws sqs create-queue --queue-name EntityIDs --attributes='{"MessageRetentionPeriod": "300"}'
$ aws sqs create-queue --queue-name EntityUpdates --attributes='{"MessageRetentionPeriod": "300"}'
$ aws sqs list-queues

The last command prints out the URLs of the created queues, which you will need later.

The Content Enricher, implemented in EventBridge Pipes, Lambda, and DynamoDB, will fetch messages containing just id fields from EntityID queue and publish enriched events to the EntityUpdates queue. To build the Content Enricher, you first create a table to hold the Entity data and load it up with sample data, which is most easily done with a JSON file (available in the EIP Code Repository):

$ aws dynamodb create-table --table-name Entities \
      --attribute-definitions AttributeName=id,AttributeType=S \
      --key-schema AttributeName=id,KeyType=HASH \
      --billing-mode PAY_PER_REQUEST

$ aws dynamodb batch-write-item --request-items file://entities.json

To check that the table is loaded (and to get accustomed to DynamoDB's syntax), you can fetch an entity:

 $ aws dynamodb get-item --table-name Entities --key '{"id": {"S": "123"}}'
 {
    "Item": {
        "id": { "S": "123" },
        "Amount": { "N": "12.34" },
        "Type": { "S": "Order" },
        "Customer": { "S": "Pete's Pizza" }
    }
}

Before you create the Pipe, you need to create a security role with the required permissions (the trust policy file is supplied in the EIP repository) so that your pipe can read and write messages from SQS and invoke a Lambda function. It's also handy to get your account ID and store it in a variable (if the command line doesn't support evaluations, you can execute the command by hand and set the variable explicitly or just replace $id with the number on each command line):

$ id=`aws sts get-caller-identity --query 'Account' --output text`
$ echo $id
1234567890

$ aws iam create-role --role-name EventBridge_Pipes_Enricher --assume-role-policy-document file://trust_policy_pipes.json

$ aws iam create-policy --policy-name pipes_sqs_lambda_sqs --policy-document file://pipes_role_policy.json \
    --query Policy.Arn --output text

$ aws iam attach-role-policy --role-name EventBridge_Pipes_Enricher --policy-arn arn:aws:iam::$id:policy/pipes_sqs_lambda_sqs

Additionally, you need to grant write permission to the EntityUpdates queue with a so-called resource policy that's attached to a resource instead of a role:

$ aws sqs add-permission --queue-url= https://sqs.ap-southeast-1.amazonaws.com/$id/EntityUpdates \
  --aws-account-ids=$id \
  --actions SendMessage ReceiveMessage DeleteMessage ChangeMessageVisibility \
  --label SendEntities

You can now create a Pipe and give it the required permissions through the role you just created (notice the use of $id for the account ID):

$ aws pipes create-pipe --name ContentEnricher \
  --source arn:aws:sqs:ap-southeast-1:$id:EntityIDs \
  --target arn:aws:sqs:ap-southeast-1:$id:EntityUpdates \
  --enrichment arn:aws:lambda:ap-southeast-1:$id:function:EnrichFromDynamoDB  \
  --role-arn arn:aws:iam::$id:role/EventBridge_Pipes_Enricher

You also need to deploy the Lambda function that looks up the entities from the database and returns the enriched messages. EventBridge pipes can work in batches, meaning it can fetch more than one event from the event source at one time. Correspondingly, your Lambda function has to be prepared receive a list of IDs. In my implementation, it uses the batch_get_item method to look up all IDs at once (invalid IDs simply don't return an item):

import json
import boto3

dynamo = boto3.resource('dynamodb')
table_name = 'Entities'

def lambda_handler(event, context):
    keys = []
    for evt in event:
      body = json.loads(evt['body'])
      keys += [ {'id': body['id']} ]
    data = dynamo.batch_get_item(RequestItems={table_name: {'Keys': keys} })
    return data['Responses'][table_name]

As you might have guessed, the Lambda function also needs a role that allows it to read from DynamoDB and write logs:

$ aws iam create-role --role-name Lambda_Enricher --assume-role-policy-document file://trust_policy_lambda.json

$ aws iam attach-role-policy --role-name Lambda_Enricher --policy-arn arn:aws:iam::aws:policy/AmazonDynamoDBReadOnlyAccess

$ aws iam attach-role-policy --role-name Lambda_Enricher --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole

To deploy the function directly, you need to first zip it up (using any Zip utility) to code.zip and deploy it with the following command line (again, notice the $id being referenced):

$ aws lambda create-function --function-name EnrichFromDynamoDB \
     --role arn:aws:iam::$id:role/Lambda_Enricher \
     --runtime python3.9 --zip-file fileb://code.zip --handler DynamoDBEnricher.lambda_handler

Show time! You can now send a message to the inbound queue for the Content Enricher and poll the result queue for enriched messages (the query and output parameters just filter down the output to the message body; feel free to omit them for the full experience):

$ aws sqs send-message --queue-url https://sqs.ap-southeast-1.amazonaws.com/$id/EntityIDs --message-body '{"id": "123"}'
$ aws sqs send-message --queue-url https://sqs.ap-southeast-1.amazonaws.com/$id/EntityIDs --message-body '{"id": "456"}'

$ aws sqs receive-message --max-number-of-messages 5 \
   --queue-url https://sqs.ap-southeast-1.amazonaws.com/$id/EntityUpdates \
   --query 'Messages[*].Body' --output text
{"id":"123","Amount":"12.34","Type":"Order","Customer":"Pete's Pizza"}
{"id":"456","Amount":"22.22","Type":"Order","Customer":"Mary's Muffins"}

Related patterns:

Content Filter, Event Message, Message Filter, Message Channel, Message Translator, Claim Check


Creative Commons Attribution License

You can reuse the following elements under the Creative Commons Attribution license: pattern icon, pattern name, problem and solution statements (in bold), and the sketch. Other portions are protected by copyright.

Enterprise Integration Patterns book cover

Enterprise Integration Patterns
The classic, as relevant as ever. Over 90,000 copies sold.

Software Architect Elevator book cover

The Software Architect Elevator
Learn how architects can play a critical role in IT transformation by applying their technical, communication, and organizational skills.

Cloud Strategy book cover

Cloud Strategy
Fill the large gap between high-level goals and product details by understanding decision trade-offs.