Ultra Messaging offers a Request/Response messaging model. A regular UM source can use a dedicated API to send a request on its publishing topic. A receiver for that topic will receive the request similar to regular data messages, and can use a dedicated API to send a response back to the source.
When a request message is sent, the message is published on the same transport that the source is configured to use. Response messages on the other hand are sent directly to the source from the receiver via Unicast Immediate Messaging (UIM), which is a direct TCP connection.
An important aspect of UM's Request/Response model is that it allows the application to keep track of which request corresponds to a given response. Due to the asynchronous nature of UM requests, any number of requests can be outstanding, and as the responses come in, they can be matched to their corresponding requests.
In this demonstration of Request/Response, the application create a single context, then a source and receier on topic "test.topic". Once the source and receiver are created, the source publishes a single request message, then the main application thread wait until the receiver callback receives the request, and replies with a response. Once the source callback for the response message is executed, the application sets a flag for the main application thread to cleanly exit
There is one program source file:
Here the application is declaring all the variables it will use to setup this test case. Note the single context object (ctx), separate topic objects for the source and receiver (rtopic and stopic), source and receiver object (src and rcv), context and source attribute objects (cattr and tattr), the request object (req), and finally an integer for tracking the return codes from API calls
00081 lbm_context_t *ctx; /* Context object */ 00082 lbm_topic_t *stopic; /* Source Topic object */ 00083 lbm_src_t *src; /* Source object */ 00084 lbm_src_topic_attr_t * tattr; /* Source topic attribute object */ 00085 lbm_rcv_t *rcv; /* Receive object: for subscribing to messages. */ 00086 lbm_topic_t *rtopic; /* Receiver Topic object */ 00087 lbm_request_t *req; /* Request object */ 00088 int err;
First, the application does a topic lookup for the receiver topic object (rtopic), then creates the receiver using the receiver object (rcv). Note when calling the receiver create API, lbm_rcv_create, the fourth parameter is rcv_handle_message.
00105 /* Create receiver for receiving request and sending response */ 00106 err = lbm_rcv_topic_lookup(&rtopic, ctx, "test.topic", NULL); 00107 EX_LBM_CHK(err); 00108 00109 err = lbm_rcv_create(&rcv, ctx, rtopic, rcv_handle_msg, NULL, NULL); 00110 EX_LBM_CHK(err);
rcv_handle_message is a reference to the callback function that is to be executed when this receiver receives a message, including request messages. The receiver message handler is executed for every single message delivered to the application from the UM library, and is where the application typically processes the message data. Messages are depicted as a struct, lbm_msg_t, to make processing the message and meta data for the message very simple. Since the message callback function is used for many different types of messages, the type of the message is stored in the msg->type value of the struct. In this example, only the message type LBM_MSG_REQUEST is handled.
00050 /* Callback used to handle request message for receiver */ 00051 int rcv_handle_msg(lbm_rcv_t *rcv, lbm_msg_t *msg, void *clientd) 00052 { 00053 int err; 00054 00055 switch (msg->type) { 00056 case LBM_MSG_REQUEST: 00057 printf("LBM_MSG_REQUEST received\n"); 00058 err = lbm_send_response(msg->response, "response", 8, LBM_SRC_NONBLOCK); 00059 EX_LBM_CHK(err); 00060 break; 00061 } 00062 return 0; 00063 } /* rcv_handle_msg */
After creating the receiver, the application then initialized the source topic and creates the source which will be used to send the request:
00113 err = lbm_src_topic_attr_create(&tattr); 00114 EX_LBM_CHK(err); 00115 00116 err = lbm_src_topic_alloc(&stopic, ctx, "test.topic", tattr); 00117 EX_LBM_CHK(err); 00118 00119 err = lbm_src_topic_attr_delete(tattr); 00120 EX_LBM_CHK(err); 00121 00122 err = lbm_src_create(&src, ctx, stopic, NULL, NULL, NULL); 00123 EX_LBM_CHK(err);Once the source is successfully create, the request message can then be sent. The API to send a request is lbm_send_request. Note the first parameter of the API is the request object. It is important for the application to track this object, and delete the object once the application is not expecting any more responses for this particular request. Also note the fifth parameter, handle_response:
00125 err = lbm_send_request(&req, src, "request", 7, handle_response, NULL, NULL, 0); 00126 EX_LBM_CHK(err);handle_response is the callback function for the source that is executed upon receiving a response from a receiver. It is similar to the previously set receiver callback in that is is called to execute a response message, which also is delivered via the lbm_msg_t struct.
00066 /* Callback used to handle response message */ 00067 int handle_response(lbm_request_t *req, lbm_msg_t *msg, void *clientd) 00068 { 00069 switch (msg->type) { 00070 case LBM_MSG_RESPONSE: 00071 printf("LBM_MSG_RESPONSE received\n"); 00072 run = 0; 00073 break; 00074 } 00075 return 0; 00076 } /* handle_response */
Once this samples application gets a response, it sets the "run" variable to 0 to instruct the main application thread to exit.
Inside the Message Callback function (rcv_handle_message), which is where the receiver processes the request message that was just sent, the sample application sends the response using the lbm_send_response API. The first paramter is the response object which is contained within the lbm_msg_t struct. Note the LBM_SRC_NONBLOCK flag as the last paramter - this is absolutely crucial since the application is attempting to send the message from within the receiver message callback, which executes on the UM context thread. Using a non-blocking send of any kind within a context thread callback is required calling a blocking call on the context thread can result in application deadlock.
00058 err = lbm_send_response(msg->response, "response", 8, LBM_SRC_NONBLOCK); 00059 EX_LBM_CHK(err);
Once the receiver and source are created, and the request message is sent, there is nothing else for the main application thread to do but wait for the response to be received. Once the response is received within the handle_response callback, the run variable will be set to 0 and the loop will break
00128 while (run) { /* loop until response received */ 00129 SLEEP(1); 00130 }
As previously stated, it is important to cleanup the request object. For applications that send thousands of requests, or more, the memory utilization for the uncleaned request objects will become apparent, and can even lead to longer latencies for request/response when not properly cleaned.
00133 err = lbm_request_delete(req); 00134 EX_LBM_CHK(err);
Once the request is deleted, the source, receiver and context can be deleted
00136 err = lbm_src_delete(src); 00137 EX_LBM_CHK(err); 00138 00139 err = lbm_rcv_delete(rcv); 00140 EX_LBM_CHK(err); 00141 00142 err = lbm_context_delete(ctx); 00143 EX_LBM_CHK(err);
Error handling can be one of the most complicated issues facing a programmer. Each application may have its own unique error handling conventions. For these example programs, a very simplistic error handling approach is taken: if anything unexpected happens, print a message and exit the program:
00037 /* Example error checking macro. Include after each UM call. */ 00038 #define EX_LBM_CHK(err) do { \ 00039 if ((err) < 0) { \ 00040 fprintf(stderr, "%s:%d, lbm error: '%s'\n", \ 00041 __FILE__, __LINE__, lbm_errmsg()); \ 00042 exit(1); \ 00043 } \ 00044 } while (0)
The EX_LBM_CHK() macro checks a UM API return value. If negative, it assumes an error. One reason for making this a macro is so that the __FILE__ and __LINE__ compiler built-ins reference the file and line of the usages of the macro, not the file and line of the macro defintion itself.
Most UM functions are designed to return one of only two integer values: 0 for success and -1 for failure. The lbm.h header file defines the symbols LBM_OK and LBM_FAILURE respectively for these. However, there are a few functions which can return a value of 0 or greater in the event of success (for example: lbm_event_dispatch() and lbm_send_response() to name two). So the example EX_LBM_CHK() macro was written to be somewhat more general. However, be aware that there are a few other functions which do not return an integer status at all (for exmaple: lbm_serialize_response() to name one). Please refer to the API documentation for each UM function you call.Windows applications must initialize the Winsock library to utilize sockets.
00090 #if defined(_WIN32) 00091 /* windows-specific code */ 00092 WSADATA wsadata; 00093 int wsStat = WSAStartup(MAKEWORD(2,2), &wsadata); 00094 if (wsStat != 0) 00095 { 00096 printf("line %d: wsStat=%d\n",__LINE__,wsStat); 00097 exit(1); 00098 } 00099 #endif
A well-structured Windows networking application will also call WSACleanup() after before exit
00145 #if defined(_MSC_VER) 00146 /* Windows-specific cleanup overhead */ 00147 WSACleanup(); 00148 #endif
A small effort was made to provide a some portability between Unix and Windows. For example:
00022 #include <stdio.h> 00023 00024 #if defined(_MSC_VER) 00025 /* Windows-only includes */ 00026 #include <winsock2.h> 00027 #define SLEEP(s) Sleep((s)*1000) 00028 #else 00029 /* Unix-only includes */ 00030 #include <stdlib.h> 00031 #include <unistd.h> 00032 #define SLEEP(s) sleep(s) 00033 #endif 00034 00035 #include <lbm/lbm.h>
Different sets of header files should be included for Unix v.s. Windows applications. Also, whereas Unix has a simple function sleep() which puts the caller to sleep for the specified number of seconds, Windows has no such function. It does, however, have Sleep(), which sleeps for the specified number of milliseconds. To provide portability within the exmaple code, the macro SLEEP() is defined appropriately to have the same behavior between Unix and Windows.