Example index

Receiver Source Notification Callbacks (Create and Delete)

This example demonstrates how to configure and code the callbacks for when a receiver creates and deletes a delivery controller for a specific source. This is typically done in support of maintaining per-source state information.

Like many other callback function in UM, the source notification is configured via a receiver attribute, source_notification_function which must be set programatically so that the appropriate function pointers can be provided.

Inside the UM library, there is a concept of a transport and a delivery controller. It is important to understand the difference and characteristics of each and how it related to your application. A transport is the network connection that is used as the medium for publishing/subscribing to messages. For example, LBTRM (reliable multicast) is a transport which publishes on a specific multicast group and port. A transport may contain a single UM topic, or it could contain many UM topics; it depends on how a publishing application configures itself and its sources.

A delivery controller is the internal mechanism that takes messages from a transport for a specific topic and then delivers those messages to the receiver application, given that the application has a receiver that is interested in that topic.

When a new transport is detected by a UM receiver application, you may notice that the receiver application executes the Beginning of Session (BOS) callback. This is great information for the application to log because it can be used to help debug networking issues. However, this callback is not great at telling the application that a specific topic ready to start receiving messages. Instead, the application can use the source_notification_callback which is demonstrated in this sample application. The source_notification_callback can enable a user callback to be executed when delivery controller is created for a topic and a separate callback for when a delivery controller is deleted.

There is one program source file:

Program explanation: source_notify_callbacks.c

Per-source State Information

As new sources are discovered for the receiver, the user wants to create and maintain some state information associated with that source. In this simple example, just keep a per-source count of messages received.

00036  /* State structure associated with each source the receiver is joined to. */
00037  typedef struct source_state_s {
00038      int msgs_rcvd;  /* Track message count from each source. */
00039  } source_state_t;

Delivery Controller Create Callback

Here the user callback for the delivery controller being created is defined. This function will execute every time a source is detected (when a source's topic advertisement is received by the receiver's context) that the receiver is interested in. UM expects the function to return a pointer to the state information, which will subsequently be passed to the receive callback as msg->source_clientd. This callback will execute before a BOS would because BOS required data on the transport to be received, unlike this function which just requires a TIR on an interested source.

00051  void *new_src_notification_callback(const char *source_name, void *clientd)
00052  {
00053      source_state_t *source_state = (source_state_t *)malloc(sizeof(source_state_t));
00054      source_state->msgs_rcvd = 0;
00055  
00056      printf("Delivery Controller Created: %s\n", source_name);
00057      return source_state;  /* This will be available in the receive callback as msg->source_clientd. */
00058  }  /* new_src_notification_callback */

Delivery Controller Delete Callback

This is the user callback for when the delivery controller is deleted. A delivery controller would be deleted if the topic hasn't received any data for the transport activity timeout. This is typically around the same time as an EOS, though not guaranteed if the transport remains active even if the topic is just deleted. Therefore it is better to rely on this callback versus EOS, because this callback is guaranteed if no data has been received for the topic.

00061  int src_delete_notification_callback(const char *source_name, void *clientd, void* src_clientd )
00062  {
00063      free(src_clientd);  /* This was created by new_src_notification_callback() */
00064  
00065      printf("Delivery Controller Deleted: %s\n", source_name);
00066      return 0;
00067  }  /* src_delete_notification_callback */

Standard Receive Callback

This is the standard receiver callback where messages are processed, as well as events like BOS and EOS

00070  /* Callback used to handle request message for receiver */
00071  int rcv_handle_msg(lbm_rcv_t *rcv, lbm_msg_t *msg, void *clientd)
00072  {
00073      source_state_t *source_state = (source_state_t *)msg->source_clientd;
00074  
00075      switch (msg->type) {
00076      case LBM_MSG_DATA:
00077          source_state->msgs_rcvd ++;
00078          printf("[%s][%s], Received message %d\n", msg->topic_name, msg->source, source_state->msgs_rcvd);
00079          break;
00080      case LBM_MSG_BOS:
00081          printf("[%s][%s], Beginning of Transport Session\n", msg->topic_name, msg->source);
00082          break;
00083      case LBM_MSG_EOS:
00084          printf("[%s][%s], End of Transport Session\n", msg->topic_name, msg->source);
00085          break;
00086      default:
00087          printf("Other event, type=%x\n", msg->type);
00088          break;
00089      }
00090  
00091      return 0;
00092  }  /* rcv_handle_msg */

Declare Notify Structure

To properly set the receiver attribute source_notification_function this structure must be used to set the user callbacks which have been defined above.

00101      lbm_rcv_src_notification_func_t srccb;  /* Source notify callback structure */

Set the Callbacks and Configure Parameter

Finally, set the creation and deletion callback appropriately to the previously defined UM object, and then use the API to configure the paramter:

00123      srccb.create_func = new_src_notification_callback;
00124      srccb.delete_func = src_delete_notification_callback;
00125  
00126      err = lbm_rcv_topic_attr_setopt(rattr, "source_notification_function", &srccb, sizeof(srccb));
00127      EX_LBM_CHK(err);

Includes

Include files for this application. Notice the Windows specific include files - these are not necessary for Linux only applications

00022  #include <stdio.h>
00023  
00024  #if defined(_MSC_VER)
00025  /* Windows-only includes */
00026  #include <winsock2.h>
00027  #define SLEEP(s) Sleep((s)*1000)
00028  #else
00029  /* Unix-only includes */
00030  #include <stdlib.h>
00031  #include <unistd.h>
00032  #define SLEEP(s) sleep(s)
00033  #endif
00034  #include <lbm/lbm.h>

Windows Only

Windows applications must initialize the Winsock library to utilize sockets.

00104  #if defined(_WIN32)
00105      /* windows-specific code */
00106      WSADATA wsadata;
00107      int wsStat = WSAStartup(MAKEWORD(2,2), &wsadata);
00108      if (wsStat != 0)
00109      {
00110          printf("line %d: wsStat=%d\n",__LINE__,wsStat);
00111          exit(1);
00112      }
00113  #endif

If this fails, the application should exit since sockets will not be operational.