Example index

Request-Response Sample Application

Ultra Messaging offers a Request/Response messaging model. A regular UM source can use a dedicated API to send a request on its publishing topic. A receiver for that topic will receive the request similar to regular data messages, and can use a dedicated API to send a response back to the source.

When a request message is sent, the message is published on the same transport that the source is configured to use. Response messages on the other hand are sent directly to the source from the receiver via Unicast Immediate Messaging (UIM), which is a direct TCP connection.

An important aspect of UM's Request/Response model is that it allows the application to keep track of which request corresponds to a given response. Due to the asynchronous nature of UM requests, any number of requests can be outstanding, and as the responses come in, they can be matched to their corresponding requests.

In this demonstration of Request/Response, the application create a single context, then a source and receier on topic "test.topic". Once the source and receiver are created, the source publishes a single request message, then the main application thread wait until the receiver callback receives the request, and replies with a response. Once the source callback for the response message is executed, the application sets a flag for the main application thread to cleanly exit

There is one program source file:

Program explanation: request_response.c

Declare Variables

Here the application is declaring all the variables it will use to setup this test case. Note the single context object (ctx), separate topic objects for the source and receiver (rtopic and stopic), source and receiver object (src and rcv), context and source attribute objects (cattr and tattr), the request object (req), and finally an integer for tracking the return codes from API calls

00081      lbm_context_t *ctx;                     /* Context object */
00082      lbm_topic_t *stopic;                    /* Source Topic object */
00083      lbm_src_t *src;                         /* Source object */
00084      lbm_src_topic_attr_t * tattr;           /* Source topic attribute object */
00085      lbm_rcv_t *rcv;                         /* Receive object: for subscribing to messages. */
00086      lbm_topic_t *rtopic;                    /* Receiver Topic object */
00087      lbm_request_t *req;                     /* Request object */
00088      int err;

Create Receiver and Message Callback Function

First, the application does a topic lookup for the receiver topic object (rtopic), then creates the receiver using the receiver object (rcv). Note when calling the receiver create API, lbm_rcv_create, the fourth parameter is rcv_handle_message.

00105      /* Create receiver for receiving request and sending response */
00106      err = lbm_rcv_topic_lookup(&rtopic, ctx, "test.topic", NULL);
00107      EX_LBM_CHK(err);
00108  
00109      err = lbm_rcv_create(&rcv, ctx, rtopic, rcv_handle_msg, NULL, NULL);
00110      EX_LBM_CHK(err);

rcv_handle_message is a reference to the callback function that is to be executed when this receiver receives a message, including request messages. The receiver message handler is executed for every single message delivered to the application from the UM library, and is where the application typically processes the message data. Messages are depicted as a struct, lbm_msg_t, to make processing the message and meta data for the message very simple. Since the message callback function is used for many different types of messages, the type of the message is stored in the msg->type value of the struct. In this example, only the message type LBM_MSG_REQUEST is handled.

00050  /* Callback used to handle request message for receiver */
00051  int rcv_handle_msg(lbm_rcv_t *rcv, lbm_msg_t *msg, void *clientd)
00052  {
00053      int err;
00054  
00055      switch (msg->type) {
00056      case LBM_MSG_REQUEST:
00057          printf("LBM_MSG_REQUEST received\n");
00058          err = lbm_send_response(msg->response, "response", 8, LBM_SRC_NONBLOCK);
00059          EX_LBM_CHK(err);
00060          break;
00061      }
00062      return 0;
00063  }  /* rcv_handle_msg */

Create source, Send Request and Response Handler Callback

After creating the receiver, the application then initialized the source topic and creates the source which will be used to send the request:

00113      err = lbm_src_topic_attr_create(&tattr);
00114      EX_LBM_CHK(err);
00115  
00116      err = lbm_src_topic_alloc(&stopic, ctx, "test.topic", tattr);
00117      EX_LBM_CHK(err);
00118  
00119      err = lbm_src_topic_attr_delete(tattr);
00120      EX_LBM_CHK(err);
00121  
00122      err = lbm_src_create(&src, ctx, stopic, NULL, NULL, NULL);
00123      EX_LBM_CHK(err);

Once the source is successfully create, the request message can then be sent. The API to send a request is lbm_send_request. Note the first parameter of the API is the request object. It is important for the application to track this object, and delete the object once the application is not expecting any more responses for this particular request. Also note the fifth parameter, handle_response:

00125      err = lbm_send_request(&req, src, "request", 7, handle_response, NULL, NULL, 0);
00126      EX_LBM_CHK(err);
handle_response is the callback function for the source that is executed upon receiving a response from a receiver. It is similar to the previously set receiver callback in that is is called to execute a response message, which also is delivered via the lbm_msg_t struct.

00066  /* Callback used to handle response message */
00067  int handle_response(lbm_request_t *req, lbm_msg_t *msg, void *clientd)
00068  {
00069      switch (msg->type) {
00070      case LBM_MSG_RESPONSE:
00071          printf("LBM_MSG_RESPONSE received\n");
00072          run = 0;
00073          break;
00074      }
00075      return 0;
00076  }  /* handle_response */

Once this samples application gets a response, it sets the "run" variable to 0 to instruct the main application thread to exit.

Send Response

Inside the Message Callback function (rcv_handle_message), which is where the receiver processes the request message that was just sent, the sample application sends the response using the lbm_send_response API. The first paramter is the response object which is contained within the lbm_msg_t struct. Note the LBM_SRC_NONBLOCK flag as the last paramter - this is absolutely crucial since the application is attempting to send the message from within the receiver message callback, which executes on the UM context thread. Using a non-blocking send of any kind within a context thread callback is required calling a blocking call on the context thread can result in application deadlock.

00058          err = lbm_send_response(msg->response, "response", 8, LBM_SRC_NONBLOCK);
00059          EX_LBM_CHK(err);

Main Loop

Once the receiver and source are created, and the request message is sent, there is nothing else for the main application thread to do but wait for the response to be received. Once the response is received within the handle_response callback, the run variable will be set to 0 and the loop will break

00128      while (run) {  /* loop until response received */
00129          SLEEP(1);
00130      }

Cleanup

As previously stated, it is important to cleanup the request object. For applications that send thousands of requests, or more, the memory utilization for the uncleaned request objects will become apparent, and can even lead to longer latencies for request/response when not properly cleaned.

00133      err = lbm_request_delete(req);
00134      EX_LBM_CHK(err);

Once the request is deleted, the source, receiver and context can be deleted

00136      err = lbm_src_delete(src);
00137      EX_LBM_CHK(err);
00138      
00139      err = lbm_rcv_delete(rcv);
00140      EX_LBM_CHK(err);
00141  
00142      err = lbm_context_delete(ctx);
00143      EX_LBM_CHK(err);

Error Checking

Error handling can be one of the most complicated issues facing a programmer. Each application may have its own unique error handling conventions. For these example programs, a very simplistic error handling approach is taken: if anything unexpected happens, print a message and exit the program:

00037  /* Example error checking macro.  Include after each UM call. */
00038  #define EX_LBM_CHK(err) do { \
00039      if ((err) < 0) { \
00040          fprintf(stderr, "%s:%d, lbm error: '%s'\n", \
00041              __FILE__, __LINE__, lbm_errmsg()); \
00042          exit(1); \
00043      }  \
00044  } while (0)

The EX_LBM_CHK() macro checks a UM API return value. If negative, it assumes an error. One reason for making this a macro is so that the __FILE__ and __LINE__ compiler built-ins reference the file and line of the usages of the macro, not the file and line of the macro defintion itself.

Most UM functions are designed to return one of only two integer values: 0 for success and -1 for failure. The lbm.h header file defines the symbols LBM_OK and LBM_FAILURE respectively for these. However, there are a few functions which can return a value of 0 or greater in the event of success (for example: lbm_event_dispatch() and lbm_send_response() to name two). So the example EX_LBM_CHK() macro was written to be somewhat more general. However, be aware that there are a few other functions which do not return an integer status at all (for exmaple: lbm_serialize_response() to name one). Please refer to the API documentation for each UM function you call.

Windows Only

Windows applications must initialize the Winsock library to utilize sockets.

00090  #if defined(_WIN32)
00091      /* windows-specific code */
00092      WSADATA wsadata;
00093      int wsStat = WSAStartup(MAKEWORD(2,2), &wsadata);
00094      if (wsStat != 0)
00095      {
00096          printf("line %d: wsStat=%d\n",__LINE__,wsStat);
00097          exit(1);
00098      }
00099  #endif

A well-structured Windows networking application will also call WSACleanup() after before exit

00145  #if defined(_MSC_VER)
00146      /* Windows-specific cleanup overhead */
00147      WSACleanup();
00148  #endif

Includes

A small effort was made to provide a some portability between Unix and Windows. For example:

00022  #include <stdio.h>
00023  
00024  #if defined(_MSC_VER)
00025  /* Windows-only includes */
00026  #include <winsock2.h>
00027  #define SLEEP(s) Sleep((s)*1000)
00028  #else
00029  /* Unix-only includes */
00030  #include <stdlib.h>
00031  #include <unistd.h>
00032  #define SLEEP(s) sleep(s)
00033  #endif
00034  
00035  #include <lbm/lbm.h>

Different sets of header files should be included for Unix v.s. Windows applications. Also, whereas Unix has a simple function sleep() which puts the caller to sleep for the specified number of seconds, Windows has no such function. It does, however, have Sleep(), which sleeps for the specified number of milliseconds. To provide portability within the exmaple code, the macro SLEEP() is defined appropriately to have the same behavior between Unix and Windows.