Asynchronous Implementation with MSMQ

Pattern Catalog

Previous Previous   Next Next

Site HomePatterns HomeTable of Contents

This section describes how to implement the Loan Broker example (see Introduction to Composed Messaging Examples) using Microsoft .NET, C# and MSMQ. The Microsoft .NET framework includes the System.Messaging namespace that gives .NET programs access to the Microsoft Message Queuing Service that is included in the recent versions of the Windows operating system (Windows 2000, Windows XP, Windows Server 2003 etc). The example walks through many of the design decisions and shows the actual code that is required to make this solution work. I tried as much as possible to focus on the design aspects of the solution so that this example is of value even if you are not a hard-core C# developer. In fact, much of the application besides the actual interface into System.Messaging would look very similar if this implementation was done in Java and JMS.

Some of the functions demonstrated in this solution could likely be implemented with less effort by using an integration and orchestration tool such as Microsoft BizTalk Server. I intentionally avoided using such tools for two reasons. First, these tools are not free and would require you to acquire a license just to run this simple example. Second, I wanted to demonstrate the explicit implementation of all necessary functions.

I setup the solution as multiple executables so that the different components could be run distributed across multiple computers. For purpose of the example I use local, private message to keep the setup requirements simple and avoid having to install ActiveDirectory. As a result, the solution 'as is' needs to run on a single machine.

This implementation of the loan broker example uses asynchronous messaging over message queues. As described in the example overview, this allows us to process multiple quote requests concurrently, but also requires us to correlate messages as they flow though the system and ultimately produce a response message to the loan quote request. Many of our design decisions are driven by the need for asynchronous processing.

Loan Broker Ecosystem

It is a good idea to start understanding the loan broker design from the outside in. Let's start by examining all external interfaces that the loan broker has to support (see picture). Because message queues are unidirectional we need a pair of queues to establish a request-response communication with another component (see .NET Request/Reply Example for a simple case). As a result, the loan broker receives requests for loan quotes on the loanRequestQueue and replies to the test client on the loanReplyQueue. The interaction with the credit bureau happens over a similar pair of queues. I did not want to create a pair of queues for each bank, so I decided to have all banks reply to the same bankReplyQueue. The Recipient List sends the request message to each individual bank queue while the Aggregator selects the best quote from the reply messages arriving on the loanReplyQueue. Together, the Recipient List and the Aggregator act as a distribution-style Scatter-Gather. For simplicity's sake all banks in this example use the same message format so that a Normalizer is not required. But because the common bank message format is different from the format expected by the consumer, we still need to use one Message Translator to convert bank reply messages into loan broker reply messages. I decided to design the loan broker as a Process Manager. Rather than implementing the functions inside the loan broker as individual components separated by message queues, the loan broker is a single component that executes all functions internally. As a result, this approach eliminates the overhead that would be incurred by sending messages across queues between these functions, but requires the loan broker to maintain multiple, concurrent process instances.


Loan Broker with Message Queue Interfaces

Laying the Groundwork: A Messaging Gateway

I do not want this section to become an introduction into the System.Messaging namespace and MSMQ. Therefore, it makes sense to try to separate some of the MSMQ specific functions into separate classes so that the application code will not be littered with MSMQ-specific commands. Mike Rettig reminded me that the Gateway [EAA] is an excellent pattern to use for this. Mike created the JMS example for Martin's P of EAA book and has been instrumental in bringing the Messaging Gateway into this implementation. As Martin points out, the use of a Gateway has two key advantages. First, it abstracts the technical details of the communication from the application. Second, if we choose to separate the gateway interface from the gateway implementation we can replace the actual external service with a Service Stub [EAA] for testing.


Use of a Gateway helps keep MSMQ details out of the application and improves testability

In our case we define two interfaces into theMessaging Gateway: IMessageSender and IMessageReceiver. We kept these interfaces almost trivially simplistic. All the IMessageSender can do is send a message and all the IMessageReceiver can do is (surprise!) receive a message. Additionally, the receiver has a Begin method to tell it that it is OK to start receiving messages. Keeping the interfaces this simple makes it easy to define classes that implement the interface.

IMessageSender.cs

namespace MessageGateway
{
    using System.Messaging;
	
    public interface IMessageSender
    {
        void Send(Message mess);
    }
}
 

IMessageReceiver.cs

namespace MessageGateway
{
    using System.Messaging;

    public interface IMessageReceiver
    {
        OnMsgEvent OnMessage 
        { 
            get;
            set;
        }
		
        void Begin();

        MessageQueue GetQueue();
    }
}
 

The actual implementations reside in the MessageSenderGateway and MessageReceiverGateway classes. These classes take care of configuring the message queue properties such as MessageReadPropertyFilter or Formatter settings. MessageReceiverGateway uses a Template Method for the ReceiveCompleted event of the MSMQ message queue to take care of small, but important details such as calling the mq.BeginReceive method after processing the message. I am not going to dive into the details of these features right now but refer you to the on-line documentation on MSDN ([MSMQ01]).

Because we defined very narrow interfaces, it is also possible to provide an implementation that does not even use a message queue. MockQueue implements both interfaces without even referencing a message queue! When an application sends a message, MockQueue immediately triggers the OnMessageevent with that same message. This makes testing the application in a single address space much simpler without having to worry about the asynchronous aspects (more on testing below).

The line "OnMsgEvent OnMessage" in IMessageReceiver may require a little bit of explanation for readers who are new to C#. The .NET Framework provides language features for the Observer pattern, called delegates and events. OnMsgEvent is a delegate defined in the MessageReceiverGateway:

public delegate void OnMsgEvent(Message msg);

A delegate allows objects to register with a certain type of event. When the event is invoked, .NET calls all registered methods. A delegate can be invoked in a number of different ways but the simplest form is the direct invocation by using the name of the delegate:

OnMsgEvent receiver;
Message message;
...
receiver(message);

If this leaves you more interested in delegates have a look at a good .NET or C# book. If you want to know the dirty details on how the CLR implements them, have a look at [Box].

Base Classes for Common Functionality

When we look at the high-level design, we quickly realize that some of the components in the loan broker scenario have common functions. For example, both a bank and a credit bureau act as a service by receiving a request, processing the request and publishing the result to another channel. Sounds easy enough. But since we live in the world of asynchronous messaging, we have to do a little extra work to implement a simple request-reply scheme. First, we want the caller of the service to specify a Return Address for the reply. This allows different callers to use the same service but use different reply queues. The service should also support a Correlation Identifier so that the caller can reconcile incoming reply messages with request messages. Furthermore, if the service receives a message in an unrecognized format it would be good manners to route the message to an Invalid Message Channel instead of simply discarding it.

To eliminate code duplication (the deadly sin of object-oriented programming), I created the base class MQService. This class incorporates the support for Return Address and Correlation Identifier. Really, the server-side support for a Correlation Identifier consists of nothing more than copying the message ID of the incoming message to the correlation id of the reply message. In our example, I also copy the AppSpecific property because we will see later that sometimes we need to correlate by something else besides message ID. The MQService also makes sure to send the response to the specified Return Address. Because the requestor supplies the Return Address, the only initialization parameter for the MQService is the name of the request queue, i.e. the queue where new request messages come in. If the requestor forgets to supply a Return Address, the RequestReplyService sends the reply to the Invalid Message Channel. One may also consider sending the request message to the Invalid Message Channel because that's the one that is faulty. For now, we will keep our lives simple and not get into the details of error handling.

MQService.cs

public abstract class MQService
{
    static protected readonly String InvalidMessageQueueName = ".\\private$\\invalidMessageQueue";
    IMessageSender invalidQueue = new MessageSenderGateway(InvalidMessageQueueName);

    protected IMessageReceiver requestQueue;
    protected Type requestBodyType;
    protected IMessageGatewayFactory gatewayFactory;
	
    public MQService(IMessageReceiver receiver, IMessageGatewayFactory gatewayFactory)
    {
        this.gatewayFactory = gatewayFactory;
        this.requestQueue = receiver;
        Register(requestQueue);
        Console.WriteLine("Processing messages from " + requestQueue.GetQueue().QueueName);
    }
	
    public MQService(String requestQueueName, IMessageGatewayFactory gatewayFactory) 
    {
        this.gatewayFactory = gatewayFactory;
        MessageReceiverGateway receiver = gatewayFactory.GetReceiverInstance(requestQueueName, GetFormatter());
        this.requestQueue = receiver;
        Register(requestQueue);
        Console.WriteLine("Processing messages from " + requestQueue.GetQueue().QueueName);
    }
			
    protected virtual IMessageFormatter GetFormatter()
    {
        return new XmlMessageFormatter(new Type[] { GetRequestBodyType() });
    }

    protected abstract Type GetRequestBodyType();

    protected Object GetTypedMessageBody(Message msg)
    {
        try 
        {
            if (msg.Body.GetType().Equals(GetRequestBodyType())) 
            {
                return msg.Body;
            }
            else
            { 
                Console.WriteLine("Illegal message format.");
                return null;
            }
        }
        catch (Exception e) 
        {
            Console.WriteLine("Illegal message format" + e.Message);    
            return null;
        }
    }


    public void Register(IMessageReceiver rec)
    {
        OnMsgEvent ev = new OnMsgEvent(OnMessage);
        rec.OnMessage += ev;
    }
	
    public void Run()
    {
        requestQueue.Begin();
    }

    
    public void SendReply(Object outObj, Message inMsg)
    {
        Message outMsg = new Message(outObj);
        outMsg.CorrelationId = inMsg.Id;
        outMsg.AppSpecific = inMsg.AppSpecific;

        if (inMsg.ResponseQueue != null) 
        {
            IMessageSender replyQueue = gatewayFactory.GetSenderInstance(inMsg.ResponseQueue);
            replyQueue.Send(outMsg);
        }
        else
        {
            invalidQueue.Send(outMsg);
        }
    }

    protected abstract void OnMessage(Message inMsg);

}

The class is abstract because it does not provide an implementation for the GetTypedMessageBody and OnMessage methods. We want our classes to deal as much as possible with strongly typed business objects as opposed to Message data types. Therefore, we want would like the MQService to verify the type of the message body and cast it to the correct type. The problem is that this abstract base class does not know which type to cast it to because it can be used by many different service implementations each of which is likely to use a different message type. To perform as much work in the base class we created the GetTypedMessageBody and the abstract method GetRequestBodyType. Each subclass has to implement the method GetRequestBodyType to specify the type of the messages that it expects to receive. MQServer use the type to initialize the XML formatter and to perform type checking. After these checks the subclass can safely cast the incoming message body to the desired type without being afraid of exceptions. The exception handling inside GetTypedMessageBodyis admittedly primitive at this point -- all it does is print a message to the console. If this weren't a simple demo app, we would definitely use some more sophisticated approach to logging or better yet a comprehensive Control Bus.

The OnMessage method is left to be implemented by the subclasses of MQService. We provide two implementations, a synchronous one and an asynchronous one. The synchronous implementation (RequestReplyService) calls the virtual method ProcessMessage which is expected to return a reply message right away. The synchronous implementation of OnMessage calls SendReply right away. The asynchronous implementation (AsyncRequestReplyService) in contrast, defines the virtual ProcessMessage method without any return parameter. The inheriting subclasses are responsible for calling SendReply.

public class RequestReplyService : MQService
{
    public RequestReplyService(IMessageReceiver receiver, IMessageGatewayFactory gatewayFactory) : base(receiver, gatewayFactory) {}		
    public RequestReplyService(String requestQueueName, IMessageGatewayFactory gatewayFactory) : base (requestQueueName, gatewayFactory) {}

    protected override Type GetRequestBodyType()
    {
        return typeof(System.String);
    }

    protected virtual Object ProcessMessage(Object o)
    {
        String body = (String)o;
        Console.WriteLine("Received Message: " + body);
        return body;
    }

    protected override void OnMessage(Message inMsg)
    {
        inMsg.Formatter =  GetFormatter();
        Object inBody = GetTypedMessageBody(inMsg);
        if (inBody != null) 
        {
            Object outBody = ProcessMessage(inBody);

            if (outBody != null) 
            {
                SendReply(outBody, inMsg);
            }
        }
    }	
}

public class AsyncRequestReplyService : MQService
{
    public AsyncRequestReplyService(IMessageReceiver receiver, IMessageGatewayFactory gatewayFactory) : base(receiver, gatewayFactory) {}		
    public AsyncRequestReplyService(String requestQueueName, IMessageGatewayFactory gatewayFactory) : base (requestQueueName, gatewayFactory) {}


    protected override Type GetRequestBodyType()
    {
        return typeof(System.String);
    }

    protected virtual void ProcessMessage(Object o, Message msg)
    {
        String body = (String)o;
        Console.WriteLine("Received Message: " + body);
    }
    
    protected override void OnMessage(Message inMsg)
    {
        inMsg.Formatter =  GetFormatter();
        Object inBody = GetTypedMessageBody(inMsg);
        if (inBody != null) 
        {
            ProcessMessage(inBody, inMsg);
        }

    }	
}
 

Both classes provide a default implementation of the GetRequestBodyType and ProcessMessage methods. GetRequestBodyType specifies that the message expects a simple string and ProcessMessage prints that string to the console. Technically speaking we could have omitted the default implementations of these methods from RequestReplyService and AsyncRequestReplyService so that they remain abstract themselves. This would allow the compiler to detect any subclass of one of these classes that forgot to implement one of the abstract methods. On the other hand, it is nice to have a default implementation of a service available for testing and debugging purposes. So I decided to go ahead and let these classes be concrete so that they can be instantiated as is.

In summary, the class diagram for the base classes looks as follows (we will discuss the bank, credit bureau and loan broker classes shortly):


Base Classes for Message Services

Designing the Bank

Now that we have created a set of base classes and utility functions it is time to start implementing the application logic. An easy way to start creating the solution is to build the application components by reverse order of dependency. This means, we first create components that do not depend on anything else. This allows us to run and test these components independently. The bank is certainly one of those components. The loan broker depends on the banks, but the banks themselves are self-contained. Conveniently enough, a bank is a prime example of a request-reply-service. So implementing a bank should be as simple as inheriting from RequestReplyService and filling in some business logic.

Before we start working on the internals of the bank, though, we should define the external interface. We need to define the message types for loan quote requests and replies. For our simple implementation, we defined a common message format for all banks. That lets us use a common class for all five bank instances. C# supports structs, so we use those as message types:

Message Types for Bank

public struct BankQuoteRequest 
{
    public int SSN;
    public int CreditScore;
    public int HistoryLength;
    public int LoanAmount;
    public int LoanTerm;
}

public struct BankQuoteReply
{
    public double InterestRate;
    public String QuoteID;
    public int ErrorCode;
}

Because we want to use a single class for all bank instances we need to parameterize the banks for different behavior. Our banks are very simple institutions, so the only parameters are BankName, RatePremium, and MaxLoanTerm. The RatePremium determines the number of interest rate points that the bank charges above the prime rate, basically the bank's profit margin. The MaxLoanTerm specifies the longest loan term (in months) that the bank is willing to extend. If a loan request is for a longer duration than specified, the bank will thankfully decline. After plugging in the appropriate convenience constructors and accessors, we can build the ProcessMessage method of the bank:

Bank.cs

protected override Type GetRequestBodyType()
{
    return typeof(BankQuoteRequest);
}

protected  BankQuoteReply ComputeBankReply(BankQuoteRequest requestStruct)
{
    BankQuoteReply replyStruct = new BankQuoteReply();

    if (requestStruct.LoanTerm <= MaxLoanTerm) 
    {
        replyStruct.InterestRate = PrimeRate + RatePremium 
                                 + (double)(requestStruct.LoanTerm / 12)/10 
                                 + (double)random.Next(10) / 10;
        replyStruct.ErrorCode = 0;
    }
    else
    {
        replyStruct.InterestRate = 0.0;
        replyStruct.ErrorCode = 1;
    }
    replyStruct.QuoteID = String.Format("{0}-{1:00000}", BankName, quoteCounter);
    quoteCounter++;
    return replyStruct;
}

protected override Object ProcessMessage(Object o)
{
    BankQuoteRequest requestStruct;
    BankQuoteReply replyStruct;

    requestStruct = (BankQuoteRequest)o;
    replyStruct = ComputeBankReply(requestStruct);

    Console.WriteLine("Received request for SSN {0} for {1:c} / {2} months", 
                      requestStruct.SSN, requestStruct.LoanAmount, requestStruct.LoanTerm);
    Thread.Sleep(random.Next(10) * 100);
    Console.WriteLine("  Quote: {0} {1} {2}", 
                      replyStruct.ErrorCode, replyStruct.InterestRate, replyStruct.QuoteID); 

    return replyStruct;
}

We can see that the concrete service has to implement only the GetRequestBodyType and ProcessMessage methods. The service can safely cast the object passed in by ProcessMessage because the base class has already verified the correct type. As we can see the remaining implementation has rather little to do with messaging -- all the details are taken care if in the base classes. The MQService and RequestReplyService classes act as a Service Activator, keeping the application from having to dig into messaging system details.

The method ComputeBankReply contains the complete business logic for a bank. If life were only so simple! Well, this is not an introduction into macro-economics but an example of messaging so I took some liberties here to simplify things. The logic is the same as in the previous implementation. The computed interest rate is the sum of the prime rate, the configured rate premium, the loan term and a sprinkle of randomness. If the requested loan term is longer than the bank is comfortable with, it returns an error code. Each quote that the bank issues receives a unique quote ID so the customer may refer back to it later. In the current implementation I use a simple incrementing counter to create these IDs.

The ProcessMessage method incorporates a small delay (between 1/10th and 1 second) to make the bank transaction a bit more realistic. The ProcessMessage also logs some activities to the console so we can see what is going on when we run it inside a simple console application.

To start a bank, we simply instantiate it with the appropriate parameters and call the Run method that it inherits from RequestReplyService. Since the processing happens through events, the Run method returns right away. Therefore, we need to be careful not to terminate the program right after it starts. For my simple tests, I simple inserted a Console.ReadLine() statement after the call to Run.

Designing the Credit Bureau

The credit bureau implementation is very analogous to the bank. The only difference is in the message types and the business logic. The credit bureau can handle the following message types:

Message Types for Credit Bureau

public class CreditBureauRequest 
{
    public int SSN;
}

public class CreditBureauReply
{
    public int SSN;
    public int CreditScore;  
    public int HistoryLength;
}

The ProcessMessage method is nearly identical to the bank code, except it deals with different data structures and invokes different application logic. The credit bureau also has a built-in delay.

CreditBureau.cs

private int getCreditScore(int ssn) 
{
    return (int)(random.Next(600) + 300);
}

private int getCreditHistoryLength(int ssn)
{
    return (int)(random.Next(19) + 1);
}

Designing the Loan Broker

Now that we have a functioning credit bureau and a bank class that lets us instantiate multiple incarnations of a bank, we are ready to work in the internal design of the loan broker. The routing and transformation patterns from this book help us segment the functions that the loan broker needs to provide. We can group the internal functions of the loan broker into three main portions (see picture): the request-reply interface that accepts requests from clients, the credit bureau interface, and the bank interface.


Internal structure of the Loan Broker

In similar fashion to building the whole solution in reverse order of dependency, let's start building the pieces that only depend on what's already there. Because we just built a bank and a credit bureau service it makes sense to create the interface from the loan broker to these external components. The credit bureau interface definitely seems simpler, so let's start there.

Credit Bureau Gateway

The loan broker needs to make requests to the credit bureau to obtain the customer's credit rating, which is required by the bank. This implies sending a message to the external loan broker component and receiving reply messages. Wrapping the details of sending a generic message inside a MessageGateway allowed us to hide many MSMQ details from the rest of the application. Following the same reasoning we should encapsulate sending and receiving messages to the credit bureau inside a credit bureau gateway. This credit bureau gateway performs the important function of semantic enrichment, allowing the loan broker to call methods such as GetCreditScore as opposed to SendMessage. This makes the loan broker code more readable and provides a strong encapsulation of the communication between the loan broker and the credit bureau. The following diagram illustrates the levels of abstraction achieved by "chaining" the two gateways.


The Loan Broker Provides an Additional Level of Abstraction From the Messaging Infrastructure

In order to request a credit score, the gateway needs to create an instance of a CreditBureauRequest struct as specified by credit bureau. Likewise, the interface will receive the results inside a CreditBureauReply struct. In the beginning we stated that the solution is built from separate executables to that the credit bureau can run on a different computer than the loan broker. This means, though, that the loan broker may not have access to types defined in the credit bureau's assembly. And really we would not want the loan broker to make any references to the credit bureau internals because that would eliminate the benefits of loose coupling over message queues. The loan broker is supposed to be completely unaware of who services the credit score requests. The loan broker needs, however, access to the structs that define the message formats. Luckily, the Microsoft .NET Framework SKD contains a tool that lets us do just that, the XML Schema Definition Tool (xsd.exe). This tool can create XML schemas from an assembly and also create C# source code from XML schemas. The following picture describes the process:


Creating Class Stubs from Another Assembly

xsd.exe extracts public type definitions and creates an XML schema file based on the type definition and optional attributes that control serialization. In our case, xsd.exe created the following schema:

<?xml version="1.0" encoding="utf-8"?>
<xs:schema elementFormDefault="qualified" xmlns:xs="http://www.w3.org/2001/XMLSchema">
  <xs:element name="CreditBureauRequest" type="CreditBureauRequest" />
  <xs:complexType name="CreditBureauRequest">
    <xs:sequence>
      <xs:element minOccurs="1" maxOccurs="1" name="SSN" type="xs:int" />
    </xs:sequence>
  </xs:complexType>
  <xs:element name="CreditBureauReply" type="CreditBureauReply" />
  <xs:complexType name="CreditBureauReply">
    <xs:sequence>
      <xs:element minOccurs="1" maxOccurs="1" name="SSN" type="xs:int" />
      <xs:element minOccurs="1" maxOccurs="1" name="CreditScore" type="xs:int" />
      <xs:element minOccurs="1" maxOccurs="1" name="HistoryLength" type="xs:int" />
    </xs:sequence>
  </xs:complexType>
  <xs:element name="Run" nillable="true" type="Run" />
  <xs:complexType name="Run" />
</xs:schema>

Usually a service would publish this schema definition to potential callers. This allows the caller the option to produce the required message format in a number of different ways. First, the caller could construct the xsd-compliant explicitly. Alternatively, the caller could use the .NET built-in serialization. Since the .NET CLR is programming language independent, the client would still have a choice of programming languages.

We decide to use .NET's built-in serialization. Therefore, we run xsd.exe again to create source files to be used by the service consumer, and get a file that looks like this:

// 
// This source code was auto-generated by xsd, Version=1.1.4322.573.
// 
namespace CreditBureau {
    using System.Xml.Serialization;
    
    
    /// <remarks/>
    [System.Xml.Serialization.XmlRootAttribute(Namespace="", IsNullable=false)]
    public class CreditBureauRequest {
        
        /// <remarks/>
        public int SSN;
    }
    ...
 

It is worth noting that the .NET XML serialization and deserialization allows loose coupling. So technically speaking, the request message that we send to the credit bureau does not have to be of the exact same type that is used inside the credit bureau implementation as long as the XML representation contains the required elements. For example, this would allow a requestor to send a message whose XML representation contains additional elements without disturbing the communication. For our example, we assume that the loan broker is willing to conform to the credit bureau's specified format and use the same types on both ends of the communication.

We are now ready to send messages in the correct format to the credit bureau. We need to keep in mind, though, that this communication is asynchronous with a separate, asynchronous request message and reply message. We could design the credit bureau gateway so that after sending a request the gateway code waits for the response to come back. This approach has one significant drawback: the application will just sit and wait while the credit bureau is processing a message. This type of pseudo-synchronous processing can quickly result in a performance bottleneck. If we make each step of the process pseudo-synchronous it means that the loan broker can only process one request process at a time. For example, it would not be able to request the credit score for a new request while it is still waiting for bank replies for the previous quote request. To visualize the difference let's consider that the loan broker has to perform two main steps: get the credit score and get the best quote from the banks. If we assume the loan broker runs only a single, sequential execution, the execution will look like the top half of the following picture:


Pipeline Processing Can Provide Significantly Higher Throughput

Because the bulk of the actual work is executed by external components the loan broker component basically sits around and waits for results -- not an efficient use of computing resources. If we design the whole loan broker process as an Event-Driven Consumer, we can start processing multiple requests in parallel and process the results as they come in. We call this mode 'pipeline' processing. The scalability of the system now only depends on the processing capacity of the external components and not on the loan broker. If we run only a single instance of the credit bureau process, the difference may not be as pronounced because the bureau request queue will queue up the requests anyway. However, if we decide to run multiple credit bureau instances in parallel we will see immediate performance gains (more on performance below).

There are two primary ways to make the loan broker process event-driven. For once, we can create a sequential process but create a new thread for each incoming message. Alternatively, we can let the messaging system notify the loan broker whenever an event is pending. This way we let the messaging system control the threads of execution. Each approach has pros and cons. Coding a sequential process can make the code easier to understand, however if our component is primarily a broker component that brokers messages between external entities it would result in a potentially large number of threads that are waiting for incoming messages. These threads could consume a large number of system resources and accomplish little. Therefore, we may be better off to let the messaging system drive the execution. Whenever a message is ready to be consumed, the system will invoke the broker execution. This lets us maintain a single thread of execution and not worry about thread management. However, we need to deal with the fact that the execution path is not a single sequential method, but multiple code segments that are executed as messages arrive.

You might have guessed that the way to make things event-driven in .NET is the use of delegates. As expected, the credit bureau gateway defines a new delegate.

public delegate void OnCreditReplyEvent(CreditBureauReply creditReply, Object ACT);

This delegate allows other code segments to tell the credit bureau gateway which method to call when the result come in. The credit bureau gateway passes a properly typed CreditBureauReply struct back to the caller. It also passes something we call ACT -- an Asynchronous Completion Token [POSA]. This token allows the caller to pass in data to the gateway and receive the data back when the corresponding reply message comes in. Basically, the credit bureau gateway performs correlation for the request and reply messages so that the caller does not have to.

What's left is a method to request a credit score and the method that handles an incoming message, correlating the proper ACT and invoking the properly typed delegate.

CreditBureauGateway.cs

internal struct CreditRequestProcess
{
    public int CorrelationID;
    public Object ACT;
    public OnCreditReplyEvent callback;
}

internal class CreditBureauGateway 
{
    protected IMessageSender creditRequestQueue;
    protected IMessageReceiver creditReplyQueue;

    protected IDictionary activeProcesses = (IDictionary)(new Hashtable());

    protected Random random = new Random();

    public void Listen()
    {
        creditReplyQueue.Begin();
    }

    public void GetCreditScore(CreditBureauRequest quoteRequest, OnCreditReplyEvent OnCreditResponse, Object ACT)
    {
        Message requestMessage = new Message(quoteRequest);
        requestMessage.ResponseQueue = creditReplyQueue.GetQueue();
        requestMessage.AppSpecific = random.Next();

        CreditRequestProcess processInstance = new CreditRequestProcess();
        processInstance.ACT = ACT;
        processInstance.callback = OnCreditResponse;
        processInstance.CorrelationID = requestMessage.AppSpecific;

        creditRequestQueue.Send(requestMessage);

        activeProcesses.Add(processInstance.CorrelationID, processInstance);
    }

    private void OnCreditResponse(Message msg)
    {
        msg.Formatter =  GetFormatter();

        CreditBureauReply replyStruct;
        try 
        {
            if (msg.Body is CreditBureauReply) 
            {
                replyStruct = (CreditBureauReply)msg.Body;
                int CorrelationID = msg.AppSpecific;

                if (activeProcesses.Contains(CorrelationID))
                {
                    CreditRequestProcess processInstance = (CreditRequestProcess)(activeProcesses[CorrelationID]);
                    processInstance.callback(replyStruct, processInstance.ACT);
                    activeProcesses.Remove(CorrelationID);
                }
                else { Console.WriteLine("Incoming credit response does not match any request"); }
            }
            else
            { Console.WriteLine("Illegal reply."); }
        }
        catch (Exception e) 
        {
            Console.WriteLine("Exception: {0}", e.ToString());    
        }
    }
}

When a caller requests a credit score via the GetCreditScoremethod, the credit bureau gateway allocates a new instance of the CreditRequestProcess structure. The collection activeProcesses contains one instance of CreditRequestProcess for each outstanding request, keyed by the Correlation Identifier of the message. The structure also holds the delegate for the OnCreditReplyEvent event. Storing the delegate for each message allows each caller to specify a different callback location for each request. As we will see later this allows the caller to use delegates to manage conversation state.

it is important to note that we do not use the message's built-in Id field to correlate. Instead, we assign a random integer number to the AppSpecific field and correlate incoming messages by the value of that field (remember that we designed the RequestReplyService to copy both the Id field and the AppSpecific field to the reply message). Why would we want to correlate by something else but the message ID? The advantage of the message ID is that it is unique for each message in the system. But that also limits our flexibility. Requiring a reply message to correlate to the message ID of the request message does not allow us to insert intermediate steps (for example, a router) into the message flow. Because any intermediate step would consume the request message and publish a new message to the service, the reply message's CorrelationId would match the message the service received but not the message that the Loan Broker originally sent (see picture).


Intermediaries Hinder Correlation with System-Generated Message IDs

There are two solutions to this problem. First, any intermediate would be required to intercept both request and reply messages and make sure to equip reply messages with the correct CorrelationId value (for an example of this approach see the Smart Proxy. Alternatively, we can use a separate field for correlation purposes so that all related messages that flow through the intermediary and the service carry the same ID. In this example we chose the second approach to make it easier for an intermediary component to intercept request messages between the Loan Broker and the Credit Bureau (we will take advantage of this in Loan Broker System Management. How should we pick a value for the AppSpecific property? We could use sequential values, but then we have to be careful that two concurrent instances do not use use the same starting value. We could also use a central ID generation module (e.g. a Database) that guarantees system-wide uniqueness. This seemed a little too much trouble for this simple example, so we chose a random number. .NET generates random numbers as signed 32-bit integers so that the odds of a duplicate are 1 in 2 billion -- a risk we are willing to take.

The credit bureau gateway now provides the clean abstraction from the Windows message queuing infrastructure that we were aiming for. The only public interface into the credit bureau gateway (besides constructors) are a delegate and two methods:

delegate void OnCreditReplyEvent(CreditBureauReply creditReply, Object ACT);
void Listen() {...}
void GetCreditScore(CreditBureauRequest quoteRequest, OnCreditReplyEvent OnCreditResponse, Object ACT) {...}

Neither construct makes any reference to a message or a message queue. This provides a number of benefits. First, we can easily implement a stubbed out version of the credit bureau gateway that does not rely on message queues at all (similar to the MockQueue). Second, we can replace the implementation of the credit bureau gateway if we decide to use a transport different from MSMQ. For example, if we were going to use a Web services interface using SOAP and http instead of MSMQ the methods exposed by the gateway would most likely not have to change at all.

Bank Gateway

The design of the bank gateway follows the same design principles as the design of the credit bureau gateway. We use the same process as before to declare stubs for the request and reply message types specified by the bank. The external portion of the bank gateway is very similar to the credit bureau gateway:

delegate void OnBestQuoteEvent(BankQuoteReply bestQuote, Object ACT);
class BankGateway {
    void Listen() {...}
    void GetBestQuote(BankQuoteRequest quoteRequest, OnBestQuoteEvent onBestQuoteEvent, Object ACT) {...}
    ...
}

The internal workings are slightly more complex because the Scatter-Gather style of interaction routes a single BankQuoteRequest to multiple banks. Likewise, a single BankQuoteReply is usually the result of multiple bank quote reply messages. The former part is handled by a Recipient List while the latter part is handled by an Aggregator. Let's start with the Recipient List.

The Recipient List has to implement three main functions:

As described in the design overview (Introduction to Composed Messaging Examples), this implementation uses the distribution style of Scatter-Gather, actively determining which banks to route the request to. This approach makes business sense if the banks charge the broker for each quote or the bank and the broker have an agreement that requires the broker to prequalify leads he or she generates . The loan broker makes the routing decision based on the customer's credit score, the amount of the loan and the length of the credit history. We encapsulate each connection to a bank inside a class that inherits from the abstract BankConnection. This class contains a reference to the properly addressed message queue and a method CanHandleLoanRequest that determines whether the quote request should be forwarded to this bank. The BankConnectionManager simply iterates through the list of all bank connections and compiles a list of those that match the criteria of the loan quote. If the list of banks was longer we could consider implementing a configurable rules engine. We prefer the current approach because it is simple and explicit.

internal class BankConnectionManager
{
    static protected BankConnection[] banks = {new Bank1(), new Bank2(), new Bank3(), new Bank4(), new Bank5() };

    public IMessageSender[] GetEligibleBankQueues(int CreditScore, int HistoryLength, int LoanAmount)
    {
        ArrayList lenders = new ArrayList();

        for (int index = 0; index < banks.Length; index++) 
        {
            if (banks[index].CanHandleLoanRequest(CreditScore, HistoryLength, LoanAmount))
                lenders.Add(banks[index].Queue);
        }
        IMessageSender[] lenderArray = (IMessageSender [])Array.CreateInstance(typeof(IMessageSender), lenders.Count);
        lenders.CopyTo(lenderArray);
        return lenderArray;
    }
}

internal abstract class BankConnection
{
    protected MessageSenderGateway queue;
    protected String bankName = "";
    public MessageSenderGateway Queue
    {
        get { return queue; }
    }
    public String BankName 
    {
        get { return bankName; }
    }

    public BankConnection (MessageSenderGateway queue) 
    {
        this.queue = queue; 
    }

    public abstract bool CanHandleLoanRequest(int CreditScore, int HistoryLength, int LoanAmount);
}

internal class Bank1 : BankConnection
{  
    protected String bankname = "Exclusive Country Club Bankers";
    
    public Bank1 (IMessageGatewayFactory gatewayFactory) : base (gatewayFactory.GetSenderInstance(".\\private$\\bank1Queue")) {}
    public override bool CanHandleLoanRequest(int CreditScore, int HistoryLength, int LoanAmount)
    {
        return LoanAmount >= 75000 && CreditScore >= 600 && HistoryLength >= 8;
    }
}

...

Once the list of relevant banks is compiled, sending the message is a simple matter of iterating over the list. In a production application, this iteration should occur inside a single transaction to avoid error conditions where a message may be sent to some banks but not to others. Once again, we chose to let simplicity prevail for this example.

internal class MessageRouter
{
    public static void SendToRecipientList (Message msg, IMessageSender[] recipientList)
    {   
        IEnumerator e = recipientList.GetEnumerator();
        while (e.MoveNext()) 
        {
            ((IMessageSender)e.Current).Send(msg);
        }
    }
}

Now that request message are on their way to the banks we need to initialize the Aggregator to expect incoming bank quotes. Due to the event driven nature of the loan broker the aggregator needs to be prepared to work on more than one aggregate concurrently -- maintaining one active aggregate for each quote request that is pending. This means that incoming messages need to be uniquely correlated to a specific aggregate. Unfortunately, we cannot use the message ID as correlation identifier because the Recipient List needs to send individual messages to each of the banks. As a result, if three banks participate in a quote request, the Recipient List needs to send 3 unique messages, one to each bank. Each of these messages will have a unique message ID. If the banks correlate by message ID the three responses will have different correlation IDs even though they belong to the same aggregate. We could have the aggregate store the message ID for each message and thus correlate the incoming message's correlation ID back to the aggregate. However, this would seem more complicated than we need it to be. Instead, we just generate our own correlation IDs -- one for each aggregate as opposed to one for each message. We store this (numeric) ID in the AppSpecific property of the outgoing request messages. The banks inherit from RequestReplyService which already transfers the incoming message's AppSpecific property to the reply messages. When a quote message comes in from a bank, the BankGateway can easily correlate the incoming message by the AppSpecific property of the message (see picture).


The BankGateway uses the AppSpecific Message Property to Correlate Response Messages to Aggregates

The bank gateway initializes an aggregate with the aggregate ID (generated by a simple counter) and the number of expected messages. In addition, the caller needs to supply a delegate and can optionally specify an object reference to an ACT in the same way the credit bureau gateway functioned. The aggregation strategy is simple. The aggregate is considered complete when all selected banks responded with a reply message. The Recipient List initializes the aggregate with the number of expected messages. Remember that banks have the option of declining to provide a quote. So we can know when an aggregate is complete we require the banks to provide a reply message with the error code set if they do not want to provide a quote. We could easily modify the aggregation strategy, for example to cut off the bidding after 1 second and take the best response up to that point.

internal class BankQuoteAggregate
{
    protected int ID;
    protected int expectedMessages;
    protected Object ACT;
    protected OnBestQuoteEvent callback;

    protected double bestRate = 0.0;

    protected ArrayList receivedMessages = new ArrayList();
    protected BankQuoteReply bestReply = null;

    public BankQuoteAggregate(int ID, int expectedMessages, OnBestQuoteEvent callback, Object ACT)
    {
        this.ID = ID;
        this.expectedMessages = expectedMessages;
        this.callback = callback;
        this.ACT = ACT;
    }

    public void AddMessage(BankQuoteReply reply) 
    {
        if (reply.ErrorCode == 0) 
        {
            if (bestReply == null)
            {
                bestReply = reply;
            }
            else 
            {
                if (reply.InterestRate < bestReply.InterestRate)
                {
                    bestReply = reply;
                }
            }
        }
        receivedMessages.Add(reply);
    }

    public bool IsComplete()
    {
        return receivedMessages.Count == expectedMessages;
    }

    public BankQuoteReply getBestResult()
    {
        return bestReply;
    }

    public void NotifyBestResult()
    {
        if (callback != null)
        {
            callback(bestReply, ACT);
        }
    }
}

Armed with the bank connection manager, the recipient list, and the aggregate the implementation of the BankGateway's functions becomes relatively simple:

internal class BankGateway
{
    protected IMessageReceiver bankReplyQueue;
    protected BankConnectionManager connectionManager;

    protected IDictionary aggregateBuffer = (IDictionary)(new Hashtable());
    protected int aggregationCorrelationID;

    public void Listen()
    {
        bankReplyQueue.Begin();
    }

    public void GetBestQuote(BankQuoteRequest quoteRequest, OnBestQuoteEvent onBestQuoteEvent, Object ACT)
    {

        Message requestMessage = new Message(quoteRequest);
        requestMessage.AppSpecific = aggregationCorrelationID;
        requestMessage.ResponseQueue = bankReplyQueue.GetQueue();
        IMessageSender[] eligibleBanks = 
            connectionManager.GetEligibleBankQueues(quoteRequest.CreditScore, quoteRequest.HistoryLength, 
                                                    quoteRequest.LoanAmount);
        
        aggregateBuffer.Add(aggregationCorrelationID, 
            new BankQuoteAggregate(aggregationCorrelationID, eligibleBanks.Length, onBestQuoteEvent, ACT));
        aggregationCorrelationID++;

        MessageRouter.SendToRecipientList(requestMessage, eligibleBanks);
    }


    private void OnBankMessage(Message msg)
    {
        msg.Formatter =  GetFormatter();

        BankQuoteReply replyStruct;
        try 
        {
            if (msg.Body is BankQuoteReply) 
            {
                replyStruct = (BankQuoteReply)msg.Body;
                int aggregationCorrelationID = msg.AppSpecific;
                Console.WriteLine("Quote {0:0.00}% {1} {2}", 
                                  replyStruct.InterestRate, replyStruct.QuoteID, replyStruct.ErrorCode);
                if (aggregateBuffer.Contains(aggregationCorrelationID))
                {
                    BankQuoteAggregate aggregate = 
                        (BankQuoteAggregate)(aggregateBuffer[aggregationCorrelationID]);
                    aggregate.AddMessage(replyStruct);
                    if (aggregate.IsComplete()) 
                    {
                        aggregate.NotifyBestResult();
                        aggregateBuffer.Remove(aggregationCorrelationID);
                    }
                }
                else 
                { Console.WriteLine("Incoming bank response does not match any aggregate"); }
            }
            else
            { Console.WriteLine("Illegal request."); }
        }
        catch (Exception e) 
        {
            Console.WriteLine("Exception: {0}", e.ToString());    
        }
    }
}

When the bank gateway receives a quote reply from a bank the OnBankMessage method executes. The method converts the incoming message to the correct type and goes on to locate the related aggregate via the AppSpecific property. It adds the new bid to the aggregate. Once the aggregate is complete (as defined in the BankQuoteAggregate class, the BankGateway invokes the delegate supplied by the caller.

Accepting Requests

Now that we have a well encapsulated credit bureau gateway and bank gateway we are ready to have the loan broker accept requests. In the earlier sections we discussed the design of the MQService and AsyncRequestReplyService base class. The LoanBroker class inherits from AsyncRequestReplyService because it cannot send the results back to the reply queue right away, but only after several asynchronous operations (obtaining the credit store and communicating with the banks) complete.

The first step in implementing the LoanBroker is to define the message types the loan broker handles:

public struct LoanQuoteRequest 
{
    public int SSN;
    public double LoanAmount;
    public int LoanTerm;
}

public struct LoanQuoteReply
{
    public int SSN;
    public double LoanAmount;  
    public double InterestRate;
    public string QuoteID;
}

Next, we need to create a class that inherits from AsyncRequestReplyService and override the ProcessMessage method.

The "Process"

The loan broker is different from the previous classes because the process that is triggered by an incoming message is not contained in any single method. Instead, the completion of the process depends on a sequence of external events. The loan broker can receive three types of events:

Since the logic for the loan broker is spread across multiple event handlers we need to keep the state of the broker across these functions. That's where the asynchronous completion tokens come in! Remember that the credit bureau gateway and the bank gateway allow the caller (the loan broker) to pass a reference to an object instance when sending a request. The gateway passes the object reference back when the reply message is received. To take advantage of this functionality, we declare an ACT in the loan broker as follows:

internal class ACT 
{
    public LoanQuoteRequest loanRequest;
    public Message message;

    public ACT(LoanQuoteRequest loanRequest, Message message)
    {
        this.loanRequest = loanRequest;
        this.message = message;
    }
}

The ACT contains a copy of the original request message (which contains the message ID and the reply address required to create the reply message) and the request data structure (needed to copy the SSN and the loan amount into the reply message). Technically speaking, the ACT stores duplicate information because we could extract the content of the request structure from the request message. However, the convenience of accessing a strongly typed structure is worth the few extra bytes.

The remainder of the loan broker is implemented as follows:

internal class LoanBroker : AsyncRequestReplyService
{
    protected ICreditBureauGateway creditBureauInterface;
    protected BankGateway bankInterface;

    public LoanBroker(String requestQueueName,
                        String creditRequestQueueName, String creditReplyQueueName, 
                        String bankReplyQueueName, BankConnectionManager connectionManager): base(requestQueueName)
    {

        creditBureauInterface = (ICreditBureauGateway)
            (new CreditBureauGatewayImp(creditRequestQueueName, creditReplyQueueName));
        creditBureauInterface.Listen();

        bankInterface = new BankGateway(bankReplyQueueName, connectionManager);
        bankInterface.Listen();
    }

    protected override Type GetRequestBodyType()
    {
        return typeof(LoanQuoteRequest);
    }

    protected override void ProcessMessage(Object o, Message msg)
    {
        LoanQuoteRequest quoteRequest;
        quoteRequest = (LoanQuoteRequest)o;
        
        CreditBureauRequest creditRequest = LoanBrokerTranslator.GetCreditBureaurequest(quoteRequest);

        ACT act = new ACT(quoteRequest, msg);

        creditBureauInterface.GetCreditScore(creditRequest, new OnCreditReplyEvent(OnCreditReply), act);
    }
	
    private void OnCreditReply(CreditBureauReply creditReply, Object act)
    {
        ACT myAct = (ACT)act;

        Console.WriteLine("Received Credit Score -- SSN {0} Score {1} Length {2}", 
                          creditReply.SSN, creditReply.CreditScore, creditReply.HistoryLength);
        BankQuoteRequest bankRequest = 
            LoanBrokerTranslator.GetBankQuoteRequest(myAct.loanRequest ,creditReply);
        bankInterface.GetBestQuote(bankRequest, new OnBestQuoteEvent(OnBestQuote), act);
    }
        
    private void OnBestQuote(BankQuoteReply bestQuote, Object act)
    {
        ACT myAct = (ACT)act;

        LoanQuoteReply quoteReply = LoanBrokerTranslator.GetLoanQuoteReply(myAct.loanRequest, bestQuote);
        Console.WriteLine("Best quote {0} {1}", quoteReply.InterestRate, quoteReply.QuoteID);
        SendReply(quoteReply, myAct.message);
    }
}

LoanBroker inherits from AsyncRequestReplyService which provides support for receiving requests and sending correlated replies. LoanBroker overrides the method ProcessMessage to deal with incoming request messages. ProcessMessage creates a new instance of the ACT and calls the credit bureau gateway to request a credit score. Interestingly, the method ends there. Processing continues when the credit bureau gateway invokes the delegate specified by LoanBroker: OnCreditReply. This method uses the ACT and the reply to create a bank quote request and calls the bank gateway to send the request messages. This time it specifies the method OnBestQuote as the callback delegate. Once the bank gateway received all bank quote replies it invokes this method via the delegate and passes back the instance of the ACT. OnBestQuote uses the bank quote and the ACT to create a reply to the customer and sends it off using the base class implementation of SendReply.

One class you probably noticed in the source code is the LoanBrokerTranslator. This class provides a handful of static methods that help convert between the different message formats.

The LoanBroker class demonstrates the trade-off we made in our design. The code is free of references to messaging or thread-related concepts (except for the inheritance from AsyncRequestReplyService), which makes the code very easy to read. However, the execution of the main function is spread across three methods that make no direct reference to each other besides the delegates. This can make the flow of execution hard to understand without considering the total solution including all external components.

Refactoring the Loan Broker

When we look at the way the loan broker functions we realize that we are separating data and functionality. We have one instance of the LoanBrokerclass that emulates multiple instances by means of the ACT collection. While ACT's are very useful they seem to go against the spirit of object-oriented programming by separating data and functionality -- the two parts that make up an object. However, we can refactor the LoanBroker class to avoid the repeated lookup of the ACT if we use the delegates in a better way. Delegates are essentially type-safe function pointers. As such they point to a specific object instance. So rather than supplying the credit bureau and bank gateway with a reference to a method in the sole LoanBroker instance we can use the delegates to point to a specific instance of a 'process object' that maintains the current state like an ACT but also contains the logic of the loan broker process'. To do this we turn the ACT into new class called LoanBrokerProcess and move the message handler functions into this class:

internal class LoanBrokerProcess 
{
    protected LoanBrokerPM broker;
    protected String processID;
    protected LoanQuoteRequest loanRequest;
    protected Message message;

    protected CreditBureauGateway creditBureauGateway;
    protected BankGateway bankInterface;

    public LoanBrokerProcess(LoanBrokerPM broker, String processID, 
                             CreditBureauGateway creditBureauGateway,
                             BankGateway bankGateway,
                             LoanQuoteRequest loanRequest, Message msg) 
    {
        this.broker = broker;
        this.creditBureauGateway = creditBureauGateway;
        this.bankInterface = bankGateway;
        this.processID = processID;
        this.loanRequest = loanRequest;
        this.message = msg;

        CreditBureauRequest creditRequest = LoanBrokerTranslator.GetCreditBureaurequest(loanRequest);
        creditBureauGateway.GetCreditScore(creditRequest, new OnCreditReplyEvent(OnCreditReply), null);
    }

    private void OnCreditReply(CreditBureauReply creditReply, Object act)
    {
        Console.WriteLine("Received Credit Score -- SSN {0} Score {1} Length {2}", creditReply.SSN, creditReply.CreditScore, creditReply.HistoryLength);
        BankQuoteRequest bankRequest = LoanBrokerTranslator.GetBankQuoteRequest(loanRequest, creditReply);
        bankInterface.GetBestQuote(bankRequest, new OnBestQuoteEvent(OnBestQuote), null);
    }
        
    private void OnBestQuote(BankQuoteReply bestQuote, Object act)
    {
        LoanQuoteReply quoteReply = LoanBrokerTranslator.GetLoanQuoteReply(loanRequest, bestQuote);
        Console.WriteLine("Best quote {0} {1}", quoteReply.InterestRate, quoteReply.QuoteID);
        broker.SendReply(quoteReply, message);
        broker.OnProcessComplete(processID);
    }
}

The methods no longer reference the ACT parameter provided by the credit bureau gateway and the bank gateway because all necessary information is stored in the instance of the LoanBrokerProcess. Once the process completes, it sends the reply message using the SendReply method that the LoanBrokerPM inherits from the AsyncRequestReplyService. Next, it notifies the LoanBrokerPM of the completion of the process.

internal class LoanBrokerPM : AsyncRequestReplyService
{
    protected CreditBureauGateway creditBureauGateway;
    protected BankGateway bankInterface;
    protected IDictionary activeProcesses = (IDictionary)(new Hashtable());
    
    public LoanBrokerPM(String requestQueueName,
                        String creditRequestQueueName, String creditReplyQueueName, 
                        String bankReplyQueueName, BankConnectionManager connectionManager): base(requestQueueName)
    {
        creditBureauGateway = new CreditBureauGateway(creditRequestQueueName, creditReplyQueueName);
        creditBureauGateway.Listen();

        bankInterface = new BankGateway(bankReplyQueueName, connectionManager);
        bankInterface.Listen();
    }

    protected override Type GetRequestBodyType()
    {
        return typeof(LoanQuoteRequest);
    }

    protected override void ProcessMessage(Object o, Message message)
    {
        LoanQuoteRequest quoteRequest;
        quoteRequest = (LoanQuoteRequest)o;

        String processID = message.Id;
        LoanBrokerProcess newProcess = 
            new LoanBrokerProcess(this, processID, creditBureauGateway,
                                  bankInterface, quoteRequest, message);
        activeProcesses.Add(processID, newProcess);
    }

    public void OnProcessComplete(String processID)
    {
        activeProcesses.Remove(processID);
    }
}

This LoanBrokerPM is basically a generic implementation of a Process Manager. It creates a new process instance when a new message arrives. When a process completes, the process manager removes the process instance from the list of active processes. The process manager uses the message ID as the unique process ID assigned to each process instance. We can now change the behavior of the loan broker just by editing the LoanBrokerProcess class, which has no references to messaging besides passing the message object around. It looks like paying attention to proper encapsulation and refactoring paid off.

The following class diagram summarizes the internal structure of the loan broker:


Loan Broker Class Diagram

Putting it All Together

The only remaining piece is the test client. The test client design is similar to that of the credit bureau gateway. The test client can make a specified number of repeated requests and correlate incoming responses to outstanding requests. Once we start all processes (banks, credit bureau and the loan broker), we can execute the example. We use a number of simple Main classes to start the respective components as console applications. We see a flurry of activity on the screen indicating the flow of messages through the system (see picture).


Running the MSMQ Example

Improving Performance

Now that we have the complete solution running we can gather some performance metrics to compare the throughput of the asynchronous solution to the synchronous solution. Using the test data generator we send 50 randomly generated requests to the loan broker. The test data generator reports that is took a total of 27 seconds to receive the 50 reply messages.


Sending 50 Quote Requests

It would be tempting to think that each request took 27 / 50 = 0.5 seconds. Wrong! The throughput of the loan broker is 50 requests in 27 seconds, but some of the requests took 26 seconds to complete! Why is the system so slow? Let's look at a snapshot of the message queue during the test run:


39 Messages Are Queued up in the Credit Request Queue

39 messages are queued up in the credit bureau request queue! So apparently the credit bureau is our bottleneck because all quote requests have to go through the credit bureau first. Now we can reap some of the rewards of loose coupling and start two additional instances of the credit bureau. Now we have three parallel instances of the credit bureau service running. This should fix our bottleneck, right? Let's see:


Sending 50 Quote Requests, Using 3 credit Bureau Instances

The total time to process all 50 messages is reduced to 21 seconds with the longest request waiting for a response for less than 15 seconds. On average, the client had to wait for a reply to the loan request for 8 seconds, half of the original version. It looks like we eliminated the bottleneck. The message throughput did not increase as dramatically as we might have hoped but we need to remember that for this simple example we are running all processes on a single CPU so that all processes compete for the same resources. Let's look at the new queue statistics to verify that the credit bureau bottleneck is in fact corrected :


Now Bank 5 Appears to be a Bottleneck

Well, it looks like we eliminated one bottleneck just to find a new one -- Bank 5. Why Bank 5? Bank 5 is the Pawn Shop that offers loans to everybody. So Bank 5 is part of almost every quote request. We could now go on to start multiple instances of Bank 5 but it's not realistic to expect the pawnshop to run multiple instances just to improve our throughput. Our other option is to change the routing logic for the bank requests. Since the Pawn Shop charges a substantial premium over the other banks the Pawn Shop quote tends to be the lowest quote only in those cases where no other bank provided a quote. Taking this observation into account we can improve the efficiency of the system by not routing requests to the Pawn Shop if the quote can also be serviced by another bank without affecting the overall behavior of the system.

We change the BankConnectionManager to include Bank 5 only for those quote requests that cannot be serviced by any other bank. The modified BankConnectionManager looks like this:

internal class BankConnectionManager
{
    IMessageGatewayFactory gatewayFactory;

    protected BankConnection[] banks;
    protected BankConnection catchAll;

    public BankConnectionManager(IMessageGatewayFactory gatewayFactory)
    {
        this.gatewayFactory = gatewayFactory;
        banks = new BankConnection[4];
        banks[0] = new Bank1(gatewayFactory);
        banks[1] = new Bank2(gatewayFactory);
        banks[2] = new Bank3(gatewayFactory);
        banks[3] = new Bank4(gatewayFactory);
        catchAll = new Bank5(gatewayFactory); 
    }

    public IMessageSender[] GetEligibleBankQueues(int CreditScore, int HistoryLength, int LoanAmount)
    {
        ArrayList lenders = new ArrayList();

        for (int index = 0; index < banks.Length; index++) 
        {
            if (banks[index].CanHandleLoanRequest(CreditScore, HistoryLength, LoanAmount))
                lenders.Add(banks[index].Queue);
        }
        if (lenders.Count == 0)
            lenders.Add(catchAll.Queue);
        IMessageSender[] lenderArray = (IMessageSender [])Array.CreateInstance(typeof(IMessageSender), lenders.Count);
        lenders.CopyTo(lenderArray);
        return lenderArray;
    }
}

Running with the modified code produces the following results:


Sending 50 Quote Requests, Using 3 credit Bureau Instances and a modified BankConnectionManager

The test results now show that all 50 requests were serviced in 12 seconds, half of the original time. More importantly, the average time to service a loan quote request is now under 4 seconds, a 4-fold improvement over the initial version. This example demonstrates the advantage of predictive routing by using a Recipient List. Because the loan broker has control over the routing we can decide how much 'intelligence' we can build into the routing logic without requiring any changes to the external parties. The trade-off is that the loan broker becomes more and more dependent on knowledge about the internal parties. For example, while the original BankConnectionManager treated all banks as equal, the modified version relies on the fact that Bank5 is a catch-all provider that should only be contacted if there are not other options. If Bank5 starts to offer better rates, the clients may no longer get the best possible deal.

The screen clip also demonstrates that response messages do not necessarily arrive in the order in which the requests were made. We can see that the test client received the response to request number 48 right after the response to request number 43. Because we are not missing any responses, this means that the test client received responses 44 through 47 before response 43. How did these requests 'pass' number 43? It looks like the request number 43 was routed to the General Retail Bank (Bank 3). After the Pawn Shop, this bank has the next least restrictive selection criteria and is more likely to be backed up with requests than the other banks. If requests number 44 through 47 did not match the General Retail Bank's criteria, the bank gateway would have received all responses for these requests while the quote request for request number 43 was still sitting in the bank3Queue. Because our loan broker is truly event driven it will reply to a loan request as soon as it receives all bank quotes. As a result, if the bank quotes for request number 44 arrive before the bank quotes for number 43 the loan broker will send the reply message for request number 44 first. This scenario also highlights the importance of the Correlation Identifier in the messages so that the test client can match responses to requests even if they arrive out of order.

Tuning asynchronous, message-based systems can be a very complex task. Our example showed some of the most basic techniques of identifying and resolving bottlenecks. But even our simple example made it clear that correcting one problem (the credit bureau bottleneck) can cause another problem (the Bank 5 bottleneck) to surface. We can also clearly see the advantages of asynchronous messaging and event-driven consumers. We were able to process 50 quote requests in 12 seconds -- a synchronous solution would have taken 8 or 10 times as long!

A Few Words on Testing

The loan broker example demonstrates how a simple application can become reasonably complex once it becomes distributed, asynchronous and event-driven. We now have a dozen classes and use delegates throughout to deal with the event-driven nature of asynchronous message processing. The increased complexity also means increased risk of defects. The asynchronous nature means risk of defects that are hard to reproduce or trouble-shoot because they depend on specific temporal conditions. Because of these additional risks, messaging solutions require a very thorough approach to testing. While we could probably write a whole book on testing messaging solutions I do want to include some simple, actionable advice on testing in this example. Let me summarize the testing advice in the following there rules:

Isolate the Application From the Messaging Implementation

Testing a single application is much easier than testing multiple, distributed applications connected by messaging channels. A single application allows us to trace through the complete execution path, we do not need a complex start-up procedure to fire up all components, and there is no need to purge channels between tests (see Channel Purger. Sometimes it is useful to "stub out" some external functions while testing others. For example, while we are testing the bank gateway we might as well stub out the credit bureau gateway instead of actually sending messages to an external credit bureau process.

How can we achieve some of the benefits of testing inside a single application with a minimal impact on the application code? We can separate the implementation of a messaging gateway from the interface definition. That allows us to provide multiple implementations of the interface.


Separating Credit Bureau Interface from Implementation

Because we encapsulated all messaging-specific logic inside the credit bureau gateway we can define a very simple interface:

public interface ICreditBureauGateway
{
    void GetCreditScore(CreditBureauRequest quoteRequest, OnCreditReplyEvent OnCreditResponse, Object ACT);
    void Listen();
}

For example, we can create a mock credit bureau gateway implementation that does not actually connect to any message queue but rather invokes the specified delegate right inside the GetCreditScore method. This mock implementation contains the same logic as the actual credit bureau so the remainder of the loan broker is completely unaware of this switcheroo.

public class MockCreditBureauGatewayImp : ICreditBureauGateway
{   

    private Random random = new Random();

    public MockCreditBureauGatewayImp() 
    { }

    public void GetCreditScore(CreditBureauRequest quoteRequest, OnCreditReplyEvent OnCreditResponse, Object ACT)
    {
        CreditBureauReply  reply = new CreditBureauReply();
        reply.CreditScore =  (int)(random.Next(600) + 300);
        reply.HistoryLength = (int)(random.Next(19) + 1);
        reply.SSN = quoteRequest.SSN;
        OnCreditResponse(reply, ACT);
    }

    public void Listen()
    { }
}

Test the Business Logic with Unit Test Cases

The implementation of the CreditBureau class demonstrated a clean separation of messaging-related functions (encapsulated in the base class) and the business logic (reduced to a randomizer in our simple example). In a real-life scenario the business logic would (hopefully) be somewhat more complex. In that case it pays off to move the getCreditScore and getCreditHistoryLength methods into a separate class together that does not have any dependency on the messaging layer (even though it is less visible, inheritance still carries dependencies from the subclass to the base class and related classes). We can the use a unit test tool such as nUnit (www.nunit.org) to write test cases without having to worry about messaging.

Provide a 'mock' Implementation of the Messaging Layer

The mock implementation of the ICreditBureauGateway is simple and effective. But it also replaces all credit bureau gateway-related code so that the class CreditBureauGatewayImp has to be tested separately. If we want to eliminate the dependency (and the associated performance hit) on message queues but still execute the code inside the CreditBureauGatewayImp class we can use a mock implementation of the IMessageReceiver and IMessageSender interfaces. A simple mock implementation could look like this:

public class MockQueue: IMessageSender, IMessageReceiver
{
	private OnMsgEvent onMsg = new OnMsgEvent(DoNothing);
	
	public void Send(Message msg){
		onMsg(msg);
	}
	
	private static void DoNothing(Message msg){
		
	}
	
	public OnMsgEvent OnMessage 
	{ 
		get { return onMsg; } 
		set { onMsg = value; }
	}
	
	public void Begin()
	{
		
	}

    public MessageQueue GetQueue()
    {
        return null;
    }
	
}

We can see that the Send triggers the onMsg delegate immediately without using any message queue. To use this mock queue for the credit bureau gateway we would have to make sure to reply with a message of the correct type. We would not be able to simply pass the request message back to the reply message. This implementation is not shown here but can be simple if, for example, we use a canned reply message.

Limitations of This Example

This section reminded us that even a simple messaging system (the loan broker really has to execute only two steps -- get the credit score and get the best bank quote) can get fairly complex due to the asynchronous nature and loose coupling between components. However, I still took a number of shortcuts to make the example fit inside the book. Specifically, the example does not address the following topics:

The example has no managed mechanism to handle errors. At this point, components simple spit out messages into the various console windows -- not a suitable solution for a production system. For a real implementation error messages should be routed to a central console so they can notify an operator in a unified way. The systems management patterns in the following chapter (e.g., the Control Bus address these requirements.

This example does not use transactional queues. For example, if the MessageRouter crashes after sending 2 out of 4 quote request messages to the banks, some banks will process a quite request while others will not. Likewise, if the loan broker crashes after it receives all bank quote replies but before it sends a reply to the client the client will never receive a reply. In a real-life system, actions like this need to be encapsulated inside transactions so that incoming messages are not consumed until the corresponding outbound message had been sent.

The loan broker implementation executes only in a single thread and does not worry about thread safety. For example, the BeginReceive method on an inbound message queue (hidden away in MessageReceiverGateway) is not called until the processing of the previous message has been completed. This is just fine for an example application (and a lot faster than the synchronous implementation) but for a high-throughput environment we would want to use a Message Dispatcher that manages multiple performer threads.

Summary

This chapter walked us through the implementation of the loan broker application using asynchronous message queues and MSMQ. I intentionally did not shy away from showing implementation details in order to bring the real issues inherent in building asynchronous messaging applications to light. I tried to focus on the design trade-offs more so than the vendor-specific messaging API so that the example is also valuable for non-C# developers.

This example reminds us of the complexities of implementing even a simple messaging application. Many things that can be taken for granted in a monolithic application (e.g. invoking a method) can require a significant amount of coding effort when using asynchronous messaging. Luckily, the design patterns provide us with a language to describe some of the design trade-offs without having to descend too deeply into the vendor jargon.


HomePatternsTable of ContentsPrevious Previous   Next Next