Ultra Messaging offers a Request/Response messaging model. A regular UM source can use a dedicated API to send a request on its publishing topic. A receiver for that topic will receive the request similar to regular data messages, and can use a dedicated API to send a response back to the source.
When a request message is sent, the message is published on the same transport that the source is configured to use. Response messages on the other hand are sent directly to the source from the receiver via Unicast Immediate Messaging (UIM), which is a direct TCP connection.
An important aspect of UM's Request/Response model is that it allows the application to keep track of which request corresponds to a given response. Due to the asynchronous nature of UM requests, any number of requests can be outstanding, and as the responses come in, they can be matched to their corresponding requests.
In this demonstration of Request/Response, the application create a single context, then a source and receier on topic "test.topic". Once the source and receiver are created, the source publishes a single request message, then the main application thread wait until the receiver callback receives the request, and replies with a response. Once the source callback for the response message is executed, the application sets a flag for the main application thread to cleanly exit
There is one program source file:
The first phase of any UM program is initialization, where UM objects are created. The first object to be created is a UM context object
00016 LBMContext ctx = new LBMContext();
First, the application must register a LBMReceiverCallback object
00019 LBMReceiverCallback rcvCallback = new LBMReceiverCallback(onReceive);
The receiver callback class is defined as "ReceiverCallback" in this application, and looks like this:
00073 static protected int onReceive(object cbArg, LBMMessage msg) 00074 { 00075 switch (msg.type()) 00076 { 00077 case LBM.MSG_REQUEST: 00078 { 00079 System.Console.Out.Write("Request Received"); 00080 try 00081 { 00082 String msgData = "response"; 00083 byte[] bytes = new byte[msgData.Length * sizeof(char)]; 00084 System.Buffer.BlockCopy(msgData.ToCharArray(), 0, bytes, 0, bytes.Length); 00085 msg.respond(bytes, msgData.Length, LBM.SRC_NONBLOCK); 00086 } 00087 catch (LBMException ex) 00088 { 00089 System.Console.Out.Write("Error responding to request: " + ex.Message); 00090 } 00091 } 00092 break; 00093 } 00094 return 0; 00095 }/* ReceiverCallback */
This callback function will be executed when the receiver receives a message, including request messages. The receiver message handler is executed for every single message delivered to the application from the UM library, and is where the application typically processes the message data.
Next, the application initializes a receiver attribute object called rcv_attr, which is then used to create the topic. Attribute objects are used to configure particular settings for a receiver rather than using the default values or from an inherited configuration file. An LBMTopic object is create next, and this is required for all receivers as it defines which context, the name of the topic string the receiver will use, and the attribute object. Finally, the receiver object, LBMReceiver, is created using a reference to the previously initialized ReceiverCallback object and LBMTopic object.
00022 LBMReceiverAttributes rcv_attr = new LBMReceiverAttributes(); 00023 LBMTopic rtopic = new LBMTopic(ctx, "test.topic", rcv_attr); 00024 LBMReceiver rcv = new LBMReceiver(ctx, rtopic, rcvCallback, null, null);
After creating the receiver, the application then initializes the source topic and creates the source which will be used to send the request:
00027 LBMTopic stopic = new LBMTopic(ctx, "test.topic", new LBMSourceAttributes()); 00028 LBMSource src = new LBMSource(ctx, stopic);Once the source is successfully created, the request object needs to be initialized. The request object is responsible for sending the request and is also responsible for receiving responses. It is important to track the request within the application, and close it when all responses have been received. Once the request object is initialized with the payload for the request, a response handler must be added as a means of processing response messages back from the receiver(s). The response callback is similar to the receiver message callback.
00030 String msgData = "request"; 00031 byte[] bytes = new byte[msgData.Length * sizeof(char)]; 00032 System.Buffer.BlockCopy(msgData.ToCharArray(), 0, bytes, 0, bytes.Length); 00033 00034 LBMRequest req = new LBMRequest(bytes, bytes.Length); 00035 LBMResponseCallback myResponseCallback = new LBMResponseCallback(onResponse); 00036 req.addResponseCallback(myResponseCallback);
And the response handler class and function:
00098 public static int onResponse(object cbArg, LBMRequest req, LBMMessage msg) 00099 { 00100 switch (msg.type()) 00101 { 00102 case LBM.MSG_RESPONSE: 00103 System.Console.Out.WriteLine("Response Received"); 00104 run = 0; 00105 break; 00106 } 00107 return 0; 00108 } /* ResponseCallback */
Now the application is ready to send the request:
00050 src.send(req, LBM.MSG_FLUSH | LBM.SRC_BLOCK);
The main application thread will now wait for the response to be received before continuing. Once this sample application gets a response, it sets the "run" variable to 0 to instruct the main application thread to exit.
Inside the Message Callback function, which is where the receiver processes the request message that was just sent, the sample application sends the response using the msg.respond() API. Note the LBM.SRC_NONBLOCK flag as the last paramter - this is absolutely crucial since the application is attempting to send the message from within the receiver message callback, which executes on the UM context thread. Using a non-blocking send of any kind within a context thread callback is required calling a blocking call on the context thread can result in application deadlock.
00080 try 00081 { 00082 String msgData = "response"; 00083 byte[] bytes = new byte[msgData.Length * sizeof(char)]; 00084 System.Buffer.BlockCopy(msgData.ToCharArray(), 0, bytes, 0, bytes.Length); 00085 msg.respond(bytes, msgData.Length, LBM.SRC_NONBLOCK); 00086 } 00087 catch (LBMException ex) 00088 { 00089 System.Console.Out.Write("Error responding to request: " + ex.Message); 00090 }
Once the receiver and source are created, and the request message is sent, there is nothing else for the main application thread to do but wait for the response to be received. Once the response is received within the ResponseCallback function, the run variable will be set to 0 and the loop will break
00052 while (run == 1) 00053 { 00054 try 00055 { 00056 System.Threading.Thread.Sleep(1000); 00057 } 00058 catch (Exception ex) 00059 { 00060 System.Console.Error.WriteLine("Error System.Threading.Thread.Sleep() exception: " + ex.Message); 00061 System.Environment.Exit(1); 00062 } 00063 }
As previously stated, it is important to cleanup the request object. For applications that send thousands of requests, or more, the memory utilization for the uncleaned request objects will become apparent, and can even lead to longer latencies for request/response when not properly cleaned.
Once the request is deleted, the source, receiver and context can be deleted
00066 req.close(); 00067 src.close(); 00068 rcv.close(); 00069 ctx.close();