Guide for Persistence
Designing Persistent Sources

The major concerns of sources revolve around RegID management and message retention.


New or Re-Registration  <-

Any source needs to know at start-up if it is a new registration or a re-registration. The answer determines how a source registers with the Store. The UM library can not answer this question. Therefore, it is essential that the developer consider what identifies the lifetime of a source and how a source determines the appropriate value to use as the RegID when it is ready to register. RegIDs are per source per topic per Store, thus a single RegID per Store is needed.

The following source code examples look for an existing RegID from a file and uses a new RegID assigned from the Store if it finds no existing RegID. See also ume-example-src-3.c

C API

err = lbm_context_create(&ctx, NULL, NULL, NULL);
if (err) {printf("line %d: %s\n", __LINE__, lbm_errmsg()); exit(1);}
srcinfo.message_num = 1;
srcinfo.existing_regid = 0;
err = read_src_regid_from_file(SRC_REGID_SAVE_FILENAME, store_info, sizeof(store_info));
if (!err) { srcinfo.existing_regid = 1; }
err = lbm_src_topic_attr_create_from_xml(&attr, "MyCtx", src_topic_name);
if (err) {printf("line %d: %s\n", __LINE__, lbm_errmsg()); exit(1);}
err = lbm_src_topic_attr_str_setopt(attr, "ume_store", store_info);
if (err) {printf("line %d: %s\n", __LINE__, lbm_errmsg()); exit(1);}

The use of Session IDs allows UM, as opposed to your application, to accomplish the same RegID management. See Managing RegIDs with Session IDs Managing RegIDs with Session IDs.


Sources Must Be Able to Resume Sending  <-

A source sends messages unless UM prevents it, in which case, the send function returns an error. A source may lose the ability to send messages temporarily if the Store(s) in use become unresponsive, e.g. the Store(s) die or become disconnected from the source. Once the Store(s) are responsive again, sending can continue. Thus source applications need to take into account that sending may fail temporarily under specific failure cases and be able to resume sending when the failure is removed.

The following source code examples demonstrate how a failed send function can sleep for a second and try again:

C API

while (lbm_src_send(src, message, len, 0) == LBM_FAILURE) {
If (lbm_errnum() == LBM_EUMENOREG) {
printf("Send unsuccessful. Waiting...\n");
sleep(1);
continue;
}
fprintf(stderr, "lbm_src_send: %s\n", lbm_errmsg());
exit(1);
}

Java API

for (;;) {
try {
src.send(message, len, 0);
}
catch (UMENoRegException ex) {
System.out.println("Send unsuccessful. Waiting...");
try {
Thread.sleep(1000);
}
catch (InterruptedException e) { }
continue;
}
catch (LBMException ex) {
System.err.println("Error sending message: " + ex.toString());
System.exit(1);
}
break;
}

.NET API

for (;;) {
try {
src.send(message, len, 0);
}
catch (UMENoRegException ex) {
System.Console.Out.WriteLine("Send unsuccessful. Waiting...");
System.Threading.Thread.Sleep(1000);
continue;
}
catch (LBMException ex) {
System.Console.Out.WriteLine ("Error sending message: " + ex.toString());
System.exit(1);
}
break;
}


Source Message Retention and Release  <-

UM allows streaming of messages from a source without regard to message stability at a Store, which is one reason for UM's performance advantage over other persistent messaging systems. Sources retain all messages until notified by the active Store(s) that they are stable. This provides a method for Stores to be brought up to date when restarted or started anew.

When messages are considered stable at the Store, the source can release them which frees up source retention memory for new messages. Generally, the source releases older stable messages first. To release the oldest retained message, all the following conditions must be met:

Some things to note:

Note
Smart Sources simplify matters somewhat by pre-allocating retention buffers. They are not dynamically allocated or deallocated during operation. See Smart Sources and Persistence for more information.


Forced Reclaims  <-

If the aggregate amount of buffered messages exceeds retransmit_retention_size_limit (source) bytes in payload and headers, then UM forcibly releases the oldest retained message even if it does not meet one or more of the conditions stated in Source Message Retention and Release. This condition should be avoided and Informatica suggests increasing the retransmit_retention_size_limit (source).

A second condition that produces a forced reclaim is when a message remains unstabilized when the ume_message_stability_lifetime (source) expires.

Whenever UM performs a Forced Reclaim, it notifies the application in the following ways:

  • The source event callback's RECLAIMED_EX event (see Persistence Source Events) includes a "FORCED" flag on the event. (UM uses the same RECLAIMED_EX event, without the FORCED flag, for normal reclaims.)

  • Through the separate forced reclaim callback, if registered. You set this separate forced reclaim callback with the ume_force_reclaim_function (source) configuration option.
Note
UM retains the separate callback for backwards compatibility purposes and may be deprecated in future releases. The source event FORCED flag is the recommended method of tracking forced reclaims.

The following sample code, from Example umesrc.c, implements the extended reclaim source event with the 'Forced' flag set if the reclamation is a forced reclaim.

C API

{
if (opts->verbose) {
printf("UME message reclaimed (ex) - sequence number %x (cd %p). Flags 0x%x ",
ackinfo->sequence_number, (char*)(ackinfo->msg_clientd) - 1, ackinfo->flags);
if (ackinfo->flags & LBM_SRC_EVENT_UME_MESSAGE_RECLAIMED_EX_FLAG_FORCED) {
printf("FORCED");
}
printf("\n");
}
}
break;

Java API

case LBM.SRC_EVENT_UME_MESSAGE_RECLAIMED_EX:
UMESourceEventAckInfo reclaiminfo = sourceEvent.ackInfo();
if (_verbose > 0) {
if (reclaiminfo.clientObject() != null) {
System.out.print("UME message reclaimed (ex) - sequence number "
+ Long.toHexString(reclaiminfo.sequenceNumber())
+ " (cd "
+ Long.toHexString(((Long)reclaiminfo.clientObject()).longValue())
+ "). Flags 0x"
+ reclaiminfo.flags());
} else {
System.out.print("UME message reclaimed (ex) - sequence number "
+ Long.toHexString(reclaiminfo.sequenceNumber())
+ " Flags 0x"
+ reclaiminfo.flags());
}
if ((reclaiminfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_RECLAIMED_EX_FLAG_FORCED) != 0) {
System.out.print(" FORCED");
}
System.out.println();
}
break;

.NET API

case LBM.SRC_EVENT_UME_MESSAGE_RECLAIMED_EX:
UMESourceEventAckInfo reclaiminfo = sourceEvent.ackInfo();
if (_verbose > 0) {
System.Console.Out.Write("UME message reclaimed (ex) - sequence number "
+ reclaiminfo.sequenceNumber()
+ " (cd "
+ ((uint)reclaiminfo.clientObject()).ToString("x")
+ "). Flags "
+ reclaiminfo.flags());
if ((reclaiminfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_RECLAIMED_EX_FLAG_FORCED) != 0) {
System.Console.Out.Write(" FORCED");
}
System.Console.Out.WriteLine();
}
break;


Source Retention Policy Options  <-

Sources use a set of configuration options to release messages that, in effect, specify the source's retention policy. The following configuration options directly impact when the source may release retained messages:


Confirmed Delivery  <-

The configuration option ume_retention_unique_confirmations (source) requires a message to have a minimum number of unique confirmations from different receivers before the message may be released. This retains messages that have not been confirmed as being received and processed and keeps them available to fulfill any retransmission requests. This provides a form of receiver-pacing; the source will not be allowed to exceed Persistence Flight Size beyond receiving applications.

For example, a topic might have 2 receivers which are considered essential to keep up, and which should therefore contribute to flight size calculation. There might be any number of less-essential receivers which can be allowed to lag behind. In this case, ume_retention_unique_confirmations (source) would be set to 2, and the non-essential receivers would set ume_allow_confirmed_delivery (receiver) to 0.

Note
Smart Sources do not support delivery confirmation.

The following code samples show how to require a message to have 10 unique receiver confirmations

C API

if (lbm_src_topic_attr_create_from_xml(&sattr, "MyCtx", src_topic_name) == LBM_FAILURE) {
fprintf(stderr, "lbm_src_topic_attr_create_from_xml: %s\n", lbm_errmsg());
exit(1);
}
if (lbm_src_topic_attr_str_setopt(sattr, "ume_retention_unique_confirmations",
"10") == LBM_FAILURE) {
fprintf(stderr, "lbm_src_topic_attr_str_setopt: %s\n", lbm_errmsg());
exit(1);
}

JAVA API

LBMSourceAttributes sattr = null;
try {
sattr = new LBMSourceAttributes();
sattr.setValue("ume_retention_unique_confirmations", "10");
}
catch (LBMException ex) {
System.err.println("Error creating source attribute: " + ex.toString());
System.exit(1);
}

.NET API

LBMSourceAttributes sattr = null;
try {
sattr = new LBMSourceAttributes();
sattr.setValue("ume_retention_unique_confirmations", "10");
}
catch (LBMException ex) {
System.Console.Error.WriteLine ("Error creating source attribute: " + ex.toString());
System.Environment.Exit(1);
}


Source Event Handler  <-

The Source Event Handler is a function callback initialized at source creation to provide source events to your application related to the operation of the source. The following source code examples illustrate the use of a source event handler for registration events. To accept other source events, additional case statements would be required, one for each additional source event. See also Persistence Events.

C API

int handle_src_event(lbm_src_t *src, int event, void *ed, void *cd)
{
switch (event) {
{
const char *errstr = (const char *)ed;
printf("Error registering source with UME store: %s\n", errstr);
}
break;
{
printf("UME store %u: %s registration success. RegID %u. Flags %x ",
reg->store_index, reg->store, reg->registration_id, reg->flags);
if (reg->flags & LBM_SRC_EVENT_UME_REGISTRATION_SUCCESS_EX_FLAG_OLD)
printf("OLD[SQN %x] ", reg->sequence_number);
if (reg->flags & LBM_SRC_EVENT_UME_REGISTRATION_SUCCESS_EX_FLAG_NOACKS)
printf("NOACKS ");
printf("\n");
}
break;
{
(lbm_src_event_ume__complete_ex_t *)ed;
printf("UME registration complete. SQN %x. Flags %x ",
reg->sequence_number, reg->flags);
if (reg->flags & LBM_SRC_EVENT_UME_REGISTRATION_COMPLETE_EX_FLAG_QUORUM)
printf("QUORUM ");
printf("\n");
}
break;
{
const char *infostr = (const char *)ed;
printf("UME store: %s\n", infostr);
}
break;
default:
printf("Unknown source event %d\n", event);
break;
}
return 0;
}

JAVA API

public int onSourceEvent(Object arg, LBMSourceEvent sourceEvent)
{
switch (sourceEvent.type()) {
case LBM.SRC_EVENT_UME_REGISTRATION_ERROR:
System.out.println("Error registering source with UME store: "
+ sourceEvent.dataString());
break;
case LBM.SRC_EVENT_UME_REGISTRATION_SUCCESS_EX:
UMESourceEventRegistrationSuccessInfo reg =
sourceEvent.registrationSuccessInfo();
System.out.print("UME store " + reg.storeIndex() + ": " + reg.store()
+ " registration success. RegID " + reg.registrationId() + ". Flags "
+ reg.flags() + " ");
if (((reg.flags() & LBM.SRC_EVENT_UME_REGISTRATION_SUCCESS_EX_FLAG_OLD)) != 0) {
System.out.print("OLD[SQN " + reg.sequenceNumber() + "] ");
}
if (((reg.flags() & LBM.SRC_EVENT_UME_REGISTRATION_SUCCESS_EX_FLAG_NOACKS)) != 0) {
System.out.print("NOACKS ");
}
System.out.println();
break;
case LBM.SRC_EVENT_UME_REGISTRATION_COMPLETE_EX:
UMESourceEventRegistrationCompleteInfo regcomp =
sourceEvent.registrationCompleteInfo();
System.out.print("UME registration complete. SQN " + regcomp.sequenceNumber()
+ ". Flags " + regcomp.flags() + " ");
if ((regcomp.flags() & LBM.SRC_EVENT_UME_REGISTRATION_COMPLETE_EX_FLAG_QUORUM) != 0) {
System.out.print("QUORUM ");
}
System.out.println();
break;
case LBM.SRC_EVENT_UME_STORE_UNRESPONSIVE:
System.out.println("UME store: " + sourceEvent.dataString());
break;
...
default:
System.out.println("Unknown source event " + sourceEvent.type());
break;
}
return 0;
}

.NET API

public int onSourceEvent(Object arg, LBMSourceEvent sourceEvent)
{
switch (sourceEvent.type()) {
case LBM.SRC_EVENT_UME_REGISTRATION_ERROR:
System.Console.Out.WriteLine("Error registering source with UME store: "
+ sourceEvent.dataString());
break;
case LBM.SRC_EVENT_UME_REGISTRATION_SUCCESS_EX:
UMESourceEventRegistrationSuccessInfo reg = sourceEvent.registrationSuccessInfo();
System.Console.Out.Write("UME store " + reg.storeIndex() + ": " + reg.store()
+ " registration success. RegID " + reg.registrationId() + ". Flags "
+ reg.flags() + " ");
if (((reg.flags() & LBM.SRC_EVENT_UME_REGISTRATION_SUCCESS_EX_FLAG_OLD)) != 0) {
System.Console.Out.Write("OLD[SQN " + reg.sequenceNumber() + "] ");
}
if (((reg.flags() & LBM.SRC_EVENT_UME_REGISTRATION_SUCCESS_EX_FLAG_NOACKS)) != 0) {
System.Console.Out.Write("NOACKS ");
}
System.Console.Out.WriteLine();
break;
case LBM.SRC_EVENT_UME_REGISTRATION_COMPLETE_EX:
UMESourceEventRegistrationCompleteInfo regcomp =
sourceEvent.registrationCompleteInfo();
System.Console.Out.Write("UME registration complete. SQN " +
regcomp.sequenceNumber() + ". Flags " + regcomp.flags() + " ");
if ((regcomp.flags() & LBM.SRC_EVENT_UME_REGISTRATION_COMPLETE_EX_FLAG_QUORUM) != 0) {
System.Console.Out.Write("QUORUM ");
}
System.Console.Out.WriteLine();
break;
case LBM.SRC_EVENT_UME_STORE_UNRESPONSIVE:
System.Console.Out.WriteLine("UME store: " + sourceEvent.dataString());
break;
...
default:
System.Console.Out.WriteLine("Unknown source event " + sourceEvent.type());
break;
}
return 0;
}


Source Event Handler - Stability, Confirmation and Release  <-

As shown in Source Event Handler above, the Source Event Handler can be expanded to handle more source events by adding additional case statements. The following source code examples show case statements to handle message stability events, delivery confirmation events and message release (reclaim) events. See also Persistence Events.

C API

/* requires that source ume_message_stability_notification option is enabled */
{
printf("UME store %u: %s message stable. SQN %x (msgno %d). Flags %x ",
info->store_index, info->store, info->sequence_number,
(int)info->msg_clientd - 1, info->flags);
if (info->flags & LBM_SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_INTRAGROUP_STABLE)
printf("IA "); /* Stable within Store QC group */
if (info->flags & LBM_SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_INTERGROUP_STABLE)
printf("IR "); /* Stable amongst all Stores */
if (info->flags & LBM_SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_STABLE)
printf("STABLE "); /* Just plain stable */
if (info->flags & LBM_SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_STORE)
printf("STORE "); /* Stability reported by UME Store */
printf("\n");
}
break;
/* requires that source ume_confirmed_delivery_notification option is enabled */
{
printf("UME delivery confirmation. SQN %x, Receiver RegID %u (msgno %d). Flags %x ",
(int)info->msg_clientd - 1, info->flags);
if (info->flags & LBM_SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_UNIQUEACKS)
printf("UNIQUEACKS "); /* Satisfied number of unique ACKs requirement */
if (info->flags & LBM_SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_UREGID)
printf("UREGID "); /* Confirmation contains receiver application registration ID */
if (info->flags & LBM_SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_OOD)
printf("OOD "); /* Confirmation received from arrival order receiver */
if (info->flags & LBM_SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_EXACK)
printf("EXACK "); /* Confirmation explicitly sent by receiver */
printf("\n");
}
break;
/* requires that source ume_confirmed_delivery_notification or ume_message_stability_notification
attributes are enabled */
{
printf("UME message released - sequence number %x (msgno %d)\n",
ackinfo->sequence_number, (int)ackinfo->msg_clientd - 1);
}
break;

JAVA API

case LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX:
// requires that source ume_message_stability_notification option is enabled
UMESourceEventAckInfo staInfo = sourceEvent.ackInfo();
System.out.print("UME store " + staInfo.storeIndex() + ": "
+ staInfo.store() + " message stable. SQN " + staInfo.sequenceNumber()
+ " (msgno " + staInfo.clientObject() + "). Flags "
+ staInfo.flags() + " ");
if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_INTRAGROUP_STABLE) != 0) {
System.out.print("IA "); // Stable within Store QC group
}
if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_INTERGROUP_STABLE) != 0) {
System.out.print("IR "); // Stable amongst all Stores
}
if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_STABLE) != 0) {
System.out.print("STABLE "); // Just plain stable
}
if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_STORE) != 0) {
System.out.print("STORE "); // Stability reported by UME Store
}
System.out.println();
break;
case LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX:
// requires that source ume_confirmed_delivery_notification option is enabled
UMESourceEventAckInfo cdelvinfo = sourceEvent.ackInfo();
System.out.print("UME delivery confirmation. SQN " + cdelvinfo.sequenceNumber()
+ ", RcvRegID " + cdelvinfo.receiverRegistrationId() + " (msgno "
+ cdelvinfo.clientObject() + "). Flags " + cdelvinfo.flags() + " ");
if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_UNIQUEACKS) != 0) {
System.out.print("UNIQUEACKS "); // Satisfied number of unique ACKs requirement
}
if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_UREGID) != 0) {
System.out.print("UREGID "); // Confirmation contains receiver application reg ID
}
if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_OOD) != 0) {
System.out.print("OOD "); // Confirmation received from arrival order receiver
}
if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_EXACK) != 0) {
System.out.print("EXACK "); // Confirmation explicitly sent by receiver
}
System.out.println();
break;
case LBM.SRC_EVENT_UME_MESSAGE_RECLAIMED:
// requires that source ume_confirmed_delivery_notification or
// ume_message_stability_notification attributes are enabled
System.out.println("UME message released - sequence number "
+ Long.toHexString(sourceEvent.sequenceNumber())
+ " (msgno "
+ Long.toHexString(((Integer)sourceEvent.clientObject()).longValue())
+ ")");
break;

.NET API

case LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX:
// requires that source ume_message_stability_notification option is enabled
UMESourceEventAckInfo staInfo = sourceEvent.ackInfo();
System.Console.Out.Write("UME store " + staInfo.storeIndex() + ": "
+ staInfo.store() + " message stable. SQN " + staInfo.sequenceNumber()
+ " (msgno " + ((int)staInfo.clientObject()).ToString("x") + ").
Flags " + staInfo.flags() + " ");
if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_INTRAGROUP_STABLE) != 0) {
System.Console.Out.Write("IA "); // Stable within Store QC group
}
if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_INTERGROUP_STABLE) != 0) {
System.Console.Out.Write("IR "); // Stable amongst all Stores
}
if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_STABLE) != 0) {
System.Console.Out.Write("STABLE "); // Just plain stable
}
if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_STORE) != 0) {
System.Console.Out.Write("STORE "); // Stability reported by UME Store
}
System.Console.Out.WriteLine();
break;
case LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX:
// requires that source ume_confirmed_delivery_notification option is enabled
UMESourceEventAckInfo cdelvinfo = sourceEvent.ackInfo();
System.Console.Out.Write("UME delivery confirmation. SQN " +
cdelvinfo.sequenceNumber()
+ ", RcvRegID " + cdelvinfo.receiverRegistrationId() + " (msgno "
+ ((int)cdelvinfo.clientObject()).ToString("x") + "). Flags " +
cdelvinfo.flags() + " ");
if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_UNIQUEACKS) != 0) {
System.Console.Out.Write("UNIQUEACKS "); // Satisfied number of unique ACKs requirement
}
if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_UREGID) != 0) {
System.Console.Out.Write("UREGID "); // Confirmation contains receiver application reg ID
}
if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_OOD) != 0) {
System.Console.Out.Write("OOD "); // Confirmation received from arrival order receiver
}
if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_EXACK) != 0) {
System.Console.Out.Write("EXACK "); // Confirmation explicitly sent by receiver
}
System.Console.Out.WriteLine();
break;
case LBM.SRC_EVENT_UME_MESSAGE_RECLAIMED:
// requires that source ume_confirmed_delivery_notification or
// ume_message_stability_notification attributes are enabled
System.Console.Out.WriteLine("UME message released - sequence number "
+ sourceEvent.sequenceNumber().ToString("x")
+ " (msgno "
+ ((int)sourceEvent.clientObject()).ToString("x")
+ ")");
break;


Mapping Your Message Numbers to Sequence Numbers  <-

Some application developers want their publishing applications to know the UM-assigned topic-level sequence numbers assigned to outgoing messages.

The C API functions lbm_src_send_ex() and lbm_src_sendv_ex() allow you to expand the number of source events that will be delivered to your sending application, including a source event that informs your sender of the topic-level sequence numbers assigned to each message you send. In the case of a large message that requires fragmentation, the callback will tell you the starting and ending topic-level sequence numbers assigned to it.

The following two source code examples show how to:

  • Enable message sequence number information.
  • Handle sequence number source events to determine the application message number in the Source Event Handler.

C API - Enable Message Information

struct my_sqn_nums_s{
unsigned int first_sqn;
unsigned int last_sqn;
};
typedef struct my_sqn_nums_s my_sqn_nums_t;
my_sqn_nums_t msg_sqn_nums;
...
/* Enable message sequence number info to be returned */
exinfo.ume_msg_clientd = &msg_sqn_nums;
if (lbm_src_send_ex(src, message, msglen, 0, &exinfo) == LBM_FAILURE) {
... /* handle error. */
}
/* Before lbm_src_send_ex returns, the source event will be delivered on the callers thread,
* setting up msg_sqn_nums. */
printf("first_sqn=%u, last_sqn=%u\n", msg_sqn_nums.first_sqn, msg_sqn_nums.last_sqn);

C API - Source Number Event Handler

int handle_src_event(lbm_src_t *src, int event, void *ed, void *cd)
{
switch (event) {
{
my_sqn_nums_t *sqn_nums = (my_sqn_nums_t *)info->msg_clientd;
sqn_nums->first_sqn = info->first_sequence_number;
sqn_nums->last_sqn = info->last_sequence_number;
}
break;
...
}
return 0;
}

JAVA API - Enable Message Information

LBMSourceSendExInfo exinfo = new LBMSourceSendExInfo();
exinfo.setClientObject(new Integer(msgno)); // msgno set to application message number
exinfo.setFlags(LBM.SRC_SEND_EX_FLAG_SEQUENCE_NUMBER_INFO);
// Enable message sequence number info to be returned
try {
src.send(message, msglen, 0, exinfo);
}
catch(UMENoRegException ex) {
// Handle error
}
// Access start/end sqns via instance variables.

JAVA API - Sequence Number Event Handler

public int onSourceEvent(Object arg, LBMSourceEvent sourceEvent)
{
switch (sourceEvent.type()) {
case LBM.SRC_EVENT_SEQUENCE_NUMBER_INFO:
{
LBMSourceEventSequenceNumberInfo info = sourceEvent.sequenceNumberInfo();
// Set instance variables to info.firstSequenceNumber() and info.lastSequenceNumber().
break;
}
...
}
return 0;
}

.NET API - Enable Message Information

.NET code is the same as Java (above).


Receiver Liveness Detection  <-

As an extension to Confirmed Delivery, you can set receivers to send a keepalive to a source during a measured absence of delivery confirmations (due to traffic lapse). In the event that neither message reaches the source within a designated interval, or if the delivery confirmation TCP connection breaks down, the receiver is assumed to have "died". UM then notifies the publishing application via context event callback. This lets the publisher assign a new subscriber.

To use this feature, set these five configuration options:

Note
Smart Sources do not support liveness detection.

This specialized feature is not recommended for general use. If you are considering it, please note the following caveats:

  • Do not use in conjunction with a DRO.
  • There is a variety of potential network occurrences that can break or reset the TCP connection and falsely indicate the death of a receiver.
  • In cases where a receiver object is deleted while its context is not, the publisher may still falsely assume the receiver to be alive.

Other false receiver-alive assumptions could be caused by the following:

  • TCP connections can enter a half-open or otherwise corrupted state.
  • Failed TCP connections sometimes do not fully close, or experience objectionable delays before fully closing.
  • A switch or router failure along the path does not affect the TCP connection state.