This example demonstrates how to configure and code the callbacks for when a receiver creates and deletes a delivery controller for a specific source. This is typically done in support of maintaining per-source state information.
Like many other callback function in UM, the source notification is configured via a receiver attribute, source_notification_function which must be set programatically so that the appropriate function pointers can be provided.
Inside the UM library, there is a concept of a transport and a delivery controller. It is important to understand the difference and characteristics of each and how it related to your application. A transport is the network connection that is used as the medium for publishing/subscribing to messages. For example, LBTRM (reliable multicast) is a transport which publishes on a specific multicast group and port. A transport may contain a single UM topic, or it could contain many UM topics; it depends on how a publishing application configures itself and its sources.
A delivery controller is the internal mechanism that takes messages from a transport for a specific topic and then delivers those messages to the receiver application, given that the application has a receiver that is interested in that topic.
When a new transport is detected by a UM receiver application, you may notice that the receiver application executes the Beginning of Session (BOS) callback. This is great information for the application to log because it can be used to help debug networking issues. However, this callback is not great at telling the application that a specific topic ready to start receiving messages. Instead, the application can use the source_notification_callback which is demonstrated in this sample application. The source_notification_callback can enable a user callback to be executed when delivery controller is created for a topic and a separate callback for when a delivery controller is deleted.
There is one program source file:
As new sources are discovered for the receiver, the user wants to create and maintain some state information associated with that source. In this simple example, just keep a per-source count of messages received.
00036 /* State structure associated with each source the receiver is joined to. */ 00037 typedef struct source_state_s { 00038 int msgs_rcvd; /* Track message count from each source. */ 00039 } source_state_t;
Here the user callback for the delivery controller being created is defined. This function will execute every time a source is detected (when a source's topic advertisement is received by the receiver's context) that the receiver is interested in. UM expects the function to return a pointer to the state information, which will subsequently be passed to the receive callback as msg->source_clientd. This callback will execute before a BOS would because BOS required data on the transport to be received, unlike this function which just requires a TIR on an interested source.
00051 void *new_src_notification_callback(const char *source_name, void *clientd) 00052 { 00053 source_state_t *source_state = (source_state_t *)malloc(sizeof(source_state_t)); 00054 source_state->msgs_rcvd = 0; 00055 00056 printf("Delivery Controller Created: %s\n", source_name); 00057 return source_state; /* This will be available in the receive callback as msg->source_clientd. */ 00058 } /* new_src_notification_callback */
This is the user callback for when the delivery controller is deleted. A delivery controller would be deleted if the topic hasn't received any data for the transport activity timeout. This is typically around the same time as an EOS, though not guaranteed if the transport remains active even if the topic is just deleted. Therefore it is better to rely on this callback versus EOS, because this callback is guaranteed if no data has been received for the topic.
00061 int src_delete_notification_callback(const char *source_name, void *clientd, void* src_clientd ) 00062 { 00063 free(src_clientd); /* This was created by new_src_notification_callback() */ 00064 00065 printf("Delivery Controller Deleted: %s\n", source_name); 00066 return 0; 00067 } /* src_delete_notification_callback */
This is the standard receiver callback where messages are processed, as well as events like BOS and EOS
00070 /* Callback used to handle request message for receiver */ 00071 int rcv_handle_msg(lbm_rcv_t *rcv, lbm_msg_t *msg, void *clientd) 00072 { 00073 source_state_t *source_state = (source_state_t *)msg->source_clientd; 00074 00075 switch (msg->type) { 00076 case LBM_MSG_DATA: 00077 source_state->msgs_rcvd ++; 00078 printf("[%s][%s], Received message %d\n", msg->topic_name, msg->source, source_state->msgs_rcvd); 00079 break; 00080 case LBM_MSG_BOS: 00081 printf("[%s][%s], Beginning of Transport Session\n", msg->topic_name, msg->source); 00082 break; 00083 case LBM_MSG_EOS: 00084 printf("[%s][%s], End of Transport Session\n", msg->topic_name, msg->source); 00085 break; 00086 default: 00087 printf("Other event, type=%x\n", msg->type); 00088 break; 00089 } 00090 00091 return 0; 00092 } /* rcv_handle_msg */
To properly set the receiver attribute source_notification_function this structure must be used to set the user callbacks which have been defined above.
00101 lbm_rcv_src_notification_func_t srccb; /* Source notify callback structure */
Finally, set the creation and deletion callback appropriately to the previously defined UM object, and then use the API to configure the paramter:
00123 srccb.create_func = new_src_notification_callback; 00124 srccb.delete_func = src_delete_notification_callback; 00125 00126 err = lbm_rcv_topic_attr_setopt(rattr, "source_notification_function", &srccb, sizeof(srccb)); 00127 EX_LBM_CHK(err);
Include files for this application. Notice the Windows specific include files - these are not necessary for Linux only applications
00022 #include <stdio.h> 00023 00024 #if defined(_MSC_VER) 00025 /* Windows-only includes */ 00026 #include <winsock2.h> 00027 #define SLEEP(s) Sleep((s)*1000) 00028 #else 00029 /* Unix-only includes */ 00030 #include <stdlib.h> 00031 #include <unistd.h> 00032 #define SLEEP(s) sleep(s) 00033 #endif 00034 #include <lbm/lbm.h>
Windows applications must initialize the Winsock library to utilize sockets.
00104 #if defined(_WIN32) 00105 /* windows-specific code */ 00106 WSADATA wsadata; 00107 int wsStat = WSAStartup(MAKEWORD(2,2), &wsadata); 00108 if (wsStat != 0) 00109 { 00110 printf("line %d: wsStat=%d\n",__LINE__,wsStat); 00111 exit(1); 00112 } 00113 #endif
If this fails, the application should exit since sockets will not be operational.