Guide for Persistence
Designing Persistence Applications

A persistent system is composed of sources, receivers, and stores managed by one or more applications. Sources and receivers are the endpoints of communication and the store(s) provide fault recovery and persistence of state information. Your application can leverage UM's flexible methods of persistence to add fault tolerance. With this flexibility, your applications assume new responsibilities not normally required in other persistent messaging systems. This section identifies the important considerations for your messaging applications when implementing the following persistence features:


Registration Identifiers  <-

As mentioned in Registration Identifier Concept and Adding Fault Recovery with Registration IDs, stores use RegIDs to identify sources and receivers. UM offers three main methods for managing RegIDs:

  • Recommended: use Session IDs to enable the Store to both assign and manage RegIDs. See Managing RegIDs with Session IDs. Note: while the use of Session IDs is recommended, an understanding of the underlying registration IDs is often helpful to understanding persistence.

  • Your applications assign static RegIDs and ensure that the same RegID is not assigned to multiple sources and/or receivers. See Use Static RegIDs.

  • You can allow Stores to assign RegIDs and then save the assigned RegIDs for subsequent reuse. See Save Assigned RegIDs.

Your applications can manage RegIDs for the lifetime of a source or receiver as long as multiple applications do not reuse RegIDs simultaneously on the same store. RegIDs only need to be unique on the same store and may be reused between stores as desired. You can use a static mapping of RegIDs to applications or use some simple service to assign them.


Use Static RegIDs  <-

For very small deployments, the simplest method uses static RegIDs for individual applications. This method requires every persistent source connecting to a given store have a unique RegID from every other persistent source attaching to the same store. This includes publishing applications that have multiple persistent topics; each topic's source object must have a unique RegID. (The use of session IDs greatly simplifies the management of these RegIDs.)

The following source code examples assign a static RegID to a source by adding the RegID, 1000, to the ume_store (source) attribute. See also ume-example-src-2.c

C API

if (lbm_src_topic_attr_create_from_xml(&sattr, "MyCtx", src_topic_name) == LBM_FAILURE) {
fprintf(stderr, "lbm_src_topic_attr_create_from_xml: %s\n", lbm_errmsg());
exit(1);
}
if (lbm_src_topic_attr_str_setopt(sattr, "ume_store", "127.0.0.1:14567:1000")
== LBM_FAILURE) {
fprintf(stderr, "lbm_src_topic_attr_str_setopt: %s\n", lbm_errmsg());
exit(1);
}

JAVA API

LBMSourceAttributes sattr = null;
try {
sattr = new LBMSourceAttributes();
sattr.setValue("ume_store", "127.0.0.1:14567:1000");
}
catch (LBMException ex) {
System.err.println("Error creating source attribute: " + ex.toString());
System.exit(1);
}

.NET API

LBMSourceAttributes sattr = null;
try {
sattr = new LBMSourceAttributes();
sattr.setValue("ume_store", "127.0.0.1:14567:1000");
}
catch (LBMException ex) {
System.Console.Error.WriteLine ("Error creating source attribute: " + ex.toString());
System.Environment.Exit(1);
}


Save Assigned RegIDs  <-

When using RegIDs, your application can request that the store assign it a new and unique RegID when it registers for the first time. That RegID is made available to the application, which can then save it to local storage. Thus, the next time the application starts (or restarts) and wants to use the same registration, it reads the value written to local storage. This method of managing RegIDs is not common. For example, what if the application needs to be restarted on a different server due to hardware failure? If it cannot re-register with its earlier RegID, it will not be able to recover only those messages it had not yet acknowledged. (The use of Session IDs simplifies this greatly by essentially saving the registration IDs for you on the store itself.)

The following minimal source code example saves the RegID assigned to a source to a file. See also ume-example-src-3.c

C API

/* Callback invoked by UM for source events. */
int app_src_callback(lbm_src_t *src, int event, void *eventd, void *clientd)
{
...
switch (event) {
...
/* Get the registration information. */
/* Might want to do the following conditionally only if we are requesting a new RegID. */
FILE *fp = fopen("UME-example-src-RegID", "w"); /* Error checking omitted for clarity. */
fprintf(fp, "%s:%u", reginfo->store, reginfo->registration_id);
fclose(fp);
...
} /* switch */
...
} /* app_src_callback */
...
err = lbm_src_create(&src, ctx, topic, app_src_callback, ...); /* Error checking omitted. */


Managing RegIDs with Session IDs  <-

The RegIDs used by stores to identify sources and receivers must be unique. Rather than maintaining RegIDs (either statically or dynamically), applications can use a Session ID, which is simply a 64-bit value that uniquely identifies any set of sources with unique topics and receivers with unique topics. A single Session ID allows UM stores to correctly identify all the sources and receivers for a particular application.

In practice, a Session ID is often thought of as an application identifier, although it is more accurately thought of as a context identifier. (For applications that only have a single context with persistent sources and/or receivers, the two are effectively the same.) However, be aware that many application systems run multiple instances of a given program, perhaps for horizontal scaling. Each instance needs its own Session ID.

It is also possible for a single context to host multiple Session IDs, although this is rarely done. The UM configuration options ume_session_id (source) and ume_session_id (receiver) can be used to arrange individual source and/or receiver objects into registration groupings. However, it is more common to use the option ume_session_id (context) to group all sources and receivers created within a context into a single session ID. (If both a context and a source or receiver option is specified, the source or receiver option will override the context option.)

How Stores Associate Session IDs and RegIDs

Session IDs do not replace the use of RegIDs by UM but rather simplify RegID management. Using Session IDs equates to your application specifying a 0 (zero) RegID for all sources and receivers. However, instead of your application persisting the RegID assigned by the store, the store maintains the RegID for you.

When a store receives a registration request from a source or receiver with a particular Session ID, it checks to see if it already has a source or receiver for that topic/Session ID. If it does, then it responds with that source's or receiver's RegID.

If it does not find a source or receiver for that topic/Session ID pair, the store:

  1. Assigns a new RegID.
  2. Associates the topic/Session ID with the new RegID.
  3. Responds to the source or receiver with the new RegID.

The source can then advertise with the RegID supplied by the store. Receivers include the source's RegID in their registration request.

All of the above steps happen within UM itself without any intervention by the application. However, the application does have access to the underlying registration ID, if it desires it.


Designing Persistent Sources  <-

The major concerns of sources revolve around RegID management and message retention.


New or Re-Registration  <-

Any source needs to know at start-up if it is a new registration or a re-registration. The answer determines how a source registers with the store. The UM library can not answer this question. Therefore, it is essential that the developer consider what identifies the lifetime of a source and how a source determines the appropriate value to use as the RegID when it is ready to register. RegIDs are per source per topic per store, thus a single RegID per store is needed.

The following source code examples look for an existing RegID from a file and uses a new RegID assigned from the store if it finds no existing RegID. See also ume-example-src-3.c

C API

err = lbm_context_create(&ctx, NULL, NULL, NULL);
if (err) {printf("line %d: %s\n", __LINE__, lbm_errmsg()); exit(1);}
srcinfo.message_num = 1;
srcinfo.existing_regid = 0;
err = read_src_regid_from_file(SRC_REGID_SAVE_FILENAME, store_info, sizeof(store_info));
if (!err) { srcinfo.existing_regid = 1; }
err = lbm_src_topic_attr_create_from_xml(&attr, "MyCtx", src_topic_name);
if (err) {printf("line %d: %s\n", __LINE__, lbm_errmsg()); exit(1);}
err = lbm_src_topic_attr_str_setopt(attr, "ume_store", store_info);
if (err) {printf("line %d: %s\n", __LINE__, lbm_errmsg()); exit(1);}

The use of Session IDs allows UM, as opposed to your application, to accomplish the same RegID management. See Managing RegIDs with Session IDs Managing RegIDs with Session IDs.


Sources Must Be Able to Resume Sending  <-

A source sends messages unless UM prevents it, in which case, the send function returns an error. A source may lose the ability to send messages temporarily if the store(s) in use become unresponsive, e.g. the store(s) die or become disconnected from the source. Once the store(s) are responsive again, sending can continue. Thus source applications need to take into account that sending may fail temporarily under specific failure cases and be able to resume sending when the failure is removed.

The following source code examples demonstrate how a failed send function can sleep for a second and try again:

C API

while (lbm_src_send(src, message, len, 0) == LBM_FAILURE) {
If (lbm_errnum() == LBM_EUMENOREG) {
printf("Send unsuccessful. Waiting...\n");
sleep(1);
continue;
}
fprintf(stderr, "lbm_src_send: %s\n", lbm_errmsg());
exit(1);
}

Java API

for (;;) { try { src.send(message, len, 0); } catch (UMENoRegException ex) { System.out.println("Send unsuccessful. Waiting..."); try { Thread.sleep(1000); } catch (InterruptedException e) { } continue; } catch (LBMException ex) { System.err.println("Error sending message: " + ex.toString()); System.exit(1); } break; }

.NET API

for (;;) {
try {
src.send(message, len, 0);
}
catch (UMENoRegException ex) {
System.Console.Out.WriteLine("Send unsuccessful. Waiting...");
System.Threading.Thread.Sleep(1000);
continue;
}
catch (LBMException ex) {
System.Console.Out.WriteLine ("Error sending message: " + ex.toString());
System.exit(1);
}
break;
}


Source Message Retention and Release  <-

UM allows streaming of messages from a source without regard to message stability at a store, which is one reason for UM's performance advantage over other persistent messaging systems. Sources retain all messages until notified by the active store(s) that they are stable. This provides a method for stores to be brought up to date when restarted or started anew.

When messages are considered stable at the store, the source can release them which frees up source retention memory for new messages. Generally, the source releases older stable messages first. To release the oldest retained message, all the following conditions must be met:

Some things to note:

Note
Smart Sources simplify matters somewhat by pre-allocating retention buffers. They are not dynamically allocated or deallocated during operation. See Smart Sources and Persistence for more information.


Forced Reclaims  <-

If the aggregate amount of buffered messages exceeds retransmit_retention_size_limit (source) bytes in payload and headers, then UM forcibly releases the oldest retained message even if it does not meet one or more of the conditions stated in Source Message Retention and Release. This condition should be avoided and Informatica suggests increasing the retransmit_retention_size_limit (source).

A second condition that produces a forced reclaim is when a message remains unstabilized when the ume_message_stability_lifetime (source) expires.

Whenever UM performs a Forced Reclaim, it notifies the application in the following ways:

  • The source event callback's RECLAIMED_EX event (see Persistence Source Events) includes a "FORCED" flag on the event. (UM uses the same RECLAIMED_EX event, without the FORCED flag, for normal reclaims.)

  • Through the separate forced reclaim callback, if registered. You set this separate forced reclaim callback with the ume_force_reclaim_function (source) configuration option.
Note
UM retains the separate callback for backwards compatibility purposes and may be deprecated in future releases. The source event FORCED flag is the recommended method of tracking forced reclaims.

The following sample code, from umesrc.c, implements the extended reclaim source event with the 'Forced' flag set if the reclamation is a forced reclaim.

C API

{
if (opts->verbose) {
printf("UME message reclaimed (ex) - sequence number %x (cd %p). Flags 0x%x ",
ackinfo->sequence_number, (char*)(ackinfo->msg_clientd) - 1, ackinfo->flags);
if (ackinfo->flags & LBM_SRC_EVENT_UME_MESSAGE_RECLAIMED_EX_FLAG_FORCED) {
printf("FORCED");
}
printf("\n");
}
}
break;

Java API

case LBM.SRC_EVENT_UME_MESSAGE_RECLAIMED_EX:
UMESourceEventAckInfo reclaiminfo = sourceEvent.ackInfo();
if (_verbose > 0) {
if (reclaiminfo.clientObject() != null) {
System.out.print("UME message reclaimed (ex) - sequence number "
+ Long.toHexString(reclaiminfo.sequenceNumber())
+ " (cd "
+ Long.toHexString(((Long)reclaiminfo.clientObject()).longValue())
+ "). Flags 0x"
+ reclaiminfo.flags());
} else {
System.out.print("UME message reclaimed (ex) - sequence number "
+ Long.toHexString(reclaiminfo.sequenceNumber())
+ " Flags 0x"
+ reclaiminfo.flags());
}
if ((reclaiminfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_RECLAIMED_EX_FLAG_FORCED) != 0) {
System.out.print(" FORCED");
}
System.out.println();
}
break;

.NET API

case LBM.SRC_EVENT_UME_MESSAGE_RECLAIMED_EX:
UMESourceEventAckInfo reclaiminfo = sourceEvent.ackInfo();
if (_verbose > 0) {
System.Console.Out.Write("UME message reclaimed (ex) - sequence number "
+ reclaiminfo.sequenceNumber()
+ " (cd "
+ ((uint)reclaiminfo.clientObject()).ToString("x")
+ "). Flags "
+ reclaiminfo.flags());
if ((reclaiminfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_RECLAIMED_EX_FLAG_FORCED) != 0) {
System.Console.Out.Write(" FORCED");
}
System.Console.Out.WriteLine();
}
break;


Source Release Policy Options  <-

Sources use a set of configuration options to release messages that, in effect, specify the source's release policy. The following configuration options directly impact when the source may release retained messages:


Confirmed Delivery  <-

The configuration option ume_retention_unique_confirmations (source) requires a message to have a minimum number of unique confirmations from different receivers before the message may be released. This retains messages that have not been confirmed as being received and processed and keeps them available to fulfill any retransmission requests. This provides a form of receiver-pacing; the source will not be allowed to exceed Persistence Flight Size beyond receiving applications.

For example, a topic might have 2 receivers which are considered essential to keep up, and which should therefore contribute to flight size calculation. There might be any number of less-essential receivers which can be allowed to lag behind. In this case, ume_retention_unique_confirmations (source) would be set to 2, and the non-essential receivers would set ume_allow_confirmed_delivery (receiver) to 0.

Note
Smart Sources do not support delivery confirmation.

The following code samples show how to require a message to have 10 unique receiver confirmations

C API

if (lbm_src_topic_attr_create_from_xml(&sattr, "MyCtx", src_topic_name) == LBM_FAILURE) {
fprintf(stderr, "lbm_src_topic_attr_create_from_xml: %s\n", lbm_errmsg());
exit(1);
}
if (lbm_src_topic_attr_str_setopt(sattr, "ume_retention_unique_confirmations",
"10") == LBM_FAILURE) {
fprintf(stderr, "lbm_src_topic_attr_str_setopt: %s\n", lbm_errmsg());
exit(1);
}

JAVA API

LBMSourceAttributes sattr = null;
try {
sattr = new LBMSourceAttributes();
sattr.setValue("ume_retention_unique_confirmations", "10");
}
catch (LBMException ex) {
System.err.println("Error creating source attribute: " + ex.toString());
System.exit(1);
}

.NET API

LBMSourceAttributes sattr = null;
try {
sattr = new LBMSourceAttributes();
sattr.setValue("ume_retention_unique_confirmations", "10");
}
catch (LBMException ex) {
System.Console.Error.WriteLine ("Error creating source attribute: " + ex.toString());
System.Environment.Exit(1);
}


Source Event Handler  <-

The Source Event Handler is a function callback initialized at source creation to provide source events to your application related to the operation of the source. The following source code examples illustrate the use of a source event handler for registration events. To accept other source events, additional case statements would be required, one for each additional source event. See also Persistence Events.

C API

int handle_src_event(lbm_src_t *src, int event, void *ed, void *cd)
{
switch (event) {
{
const char *errstr = (const char *)ed;
printf("Error registering source with UME store: %s\n", errstr);
}
break;
{
printf("UME store %u: %s registration success. RegID %u. Flags %x ",
reg->store_index, reg->store, reg->registration_id, reg->flags);
if (reg->flags & LBM_SRC_EVENT_UME_REGISTRATION_SUCCESS_EX_FLAG_OLD)
printf("OLD[SQN %x] ", reg->sequence_number);
if (reg->flags & LBM_SRC_EVENT_UME_REGISTRATION_SUCCESS_EX_FLAG_NOACKS)
printf("NOACKS ");
printf("\n");
}
break;
{
(lbm_src_event_ume__complete_ex_t *)ed;
printf("UME registration complete. SQN %x. Flags %x ",
reg->sequence_number, reg->flags);
if (reg->flags & LBM_SRC_EVENT_UME_REGISTRATION_COMPLETE_EX_FLAG_QUORUM)
printf("QUORUM ");
printf("\n");
}
break;
{
const char *infostr = (const char *)ed;
printf("UME store: %s\n", infostr);
}
break;
default:
printf("Unknown source event %d\n", event);
break;
}
return 0;
}

JAVA API

public int onSourceEvent(Object arg, LBMSourceEvent sourceEvent)
{
switch (sourceEvent.type()) {
case LBM.SRC_EVENT_UME_REGISTRATION_ERROR:
System.out.println("Error registering source with UME store: "
+ sourceEvent.dataString());
break;
case LBM.SRC_EVENT_UME_REGISTRATION_SUCCESS_EX:
UMESourceEventRegistrationSuccessInfo reg =
sourceEvent.registrationSuccessInfo();
System.out.print("UME store " + reg.storeIndex() + ": " + reg.store()
+ " registration success. RegID " + reg.registrationId() + ". Flags "
+ reg.flags() + " ");
if (((reg.flags() & LBM.SRC_EVENT_UME_REGISTRATION_SUCCESS_EX_FLAG_OLD)) != 0) {
System.out.print("OLD[SQN " + reg.sequenceNumber() + "] ");
}
if (((reg.flags() & LBM.SRC_EVENT_UME_REGISTRATION_SUCCESS_EX_FLAG_NOACKS)) != 0) {
System.out.print("NOACKS ");
}
System.out.println();
break;
case LBM.SRC_EVENT_UME_REGISTRATION_COMPLETE_EX:
UMESourceEventRegistrationCompleteInfo regcomp =
sourceEvent.registrationCompleteInfo();
System.out.print("UME registration complete. SQN " + regcomp.sequenceNumber()
+ ". Flags " + regcomp.flags() + " ");
if ((regcomp.flags() & LBM.SRC_EVENT_UME_REGISTRATION_COMPLETE_EX_FLAG_QUORUM) != 0) {
System.out.print("QUORUM ");
}
System.out.println();
break;
case LBM.SRC_EVENT_UME_STORE_UNRESPONSIVE:
System.out.println("UME store: " + sourceEvent.dataString());
break;
...
default:
System.out.println("Unknown source event " + sourceEvent.type());
break;
}
return 0;
}

.NET API

public int onSourceEvent(Object arg, LBMSourceEvent sourceEvent)
{
switch (sourceEvent.type()) {
case LBM.SRC_EVENT_UME_REGISTRATION_ERROR:
System.Console.Out.WriteLine("Error registering source with UME store: "
+ sourceEvent.dataString());
break;
case LBM.SRC_EVENT_UME_REGISTRATION_SUCCESS_EX:
UMESourceEventRegistrationSuccessInfo reg = sourceEvent.registrationSuccessInfo();
System.Console.Out.Write("UME store " + reg.storeIndex() + ": " + reg.store()
+ " registration success. RegID " + reg.registrationId() + ". Flags "
+ reg.flags() + " ");
if (((reg.flags() & LBM.SRC_EVENT_UME_REGISTRATION_SUCCESS_EX_FLAG_OLD)) != 0) {
System.Console.Out.Write("OLD[SQN " + reg.sequenceNumber() + "] ");
}
if (((reg.flags() & LBM.SRC_EVENT_UME_REGISTRATION_SUCCESS_EX_FLAG_NOACKS)) != 0) {
System.Console.Out.Write("NOACKS ");
}
System.Console.Out.WriteLine();
break;
case LBM.SRC_EVENT_UME_REGISTRATION_COMPLETE_EX:
UMESourceEventRegistrationCompleteInfo regcomp =
sourceEvent.registrationCompleteInfo();
System.Console.Out.Write("UME registration complete. SQN " +
regcomp.sequenceNumber() + ". Flags " + regcomp.flags() + " ");
if ((regcomp.flags() & LBM.SRC_EVENT_UME_REGISTRATION_COMPLETE_EX_FLAG_QUORUM) != 0) {
System.Console.Out.Write("QUORUM ");
}
System.Console.Out.WriteLine();
break;
case LBM.SRC_EVENT_UME_STORE_UNRESPONSIVE:
System.Console.Out.WriteLine("UME store: " + sourceEvent.dataString());
break;
...
default:
System.Console.Out.WriteLine("Unknown source event " + sourceEvent.type());
break;
}
return 0;
}


Source Event Handler - Stability, Confirmation and Release  <-

As shown in Source Event Handler above, the Source Event Handler can be expanded to handle more source events by adding additional case statements. The following source code examples show case statements to handle message stability events, delivery confirmation events and message release (reclaim) events. See also Persistence Events.

C API

/* requires that source ume_message_stability_notification attribute is enabled */
{
printf("UME store %u: %s message stable. SQN %x (msgno %d). Flags %x ",
info->store_index, info->store, info->sequence_number,
(int)info->msg_clientd - 1, info->flags);
if (info->flags & LBM_SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_INTRAGROUP_STABLE)
printf("IA "); /* Stable within store group */
if (info->flags & LBM_SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_INTERGROUP_STABLE)
printf("IR "); /* Stable amongst all stores */
if (info->flags & LBM_SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_STABLE)
printf("STABLE "); /* Just plain stable */
if (info->flags & LBM_SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_STORE)
printf("STORE "); /* Stability reported by UME Store */
printf("\n");
}
break;
/* requires that source ume_confirmed_delivery_notification attribute is enabled */
{
printf("UME delivery confirmation. SQN %x, Receiver RegID %u (msgno %d). Flags %x ",
(int)info->msg_clientd - 1, info->flags);
if (info->flags & LBM_SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_UNIQUEACKS)
printf("UNIQUEACKS "); /* Satisfied number of unique ACKs requirement */
if (info->flags & LBM_SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_UREGID)
printf("UREGID "); /* Confirmation contains receiver application registration ID */
if (info->flags & LBM_SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_OOD)
printf("OOD "); /* Confirmation received from arrival order receiver */
if (info->flags & LBM_SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_EXACK)
printf("EXACK "); /* Confirmation explicitly sent by receiver */
printf("\n");
}
break;
/* requires that source ume_confirmed_delivery_notification or ume_message_stability_notification
attributes are enabled */
{
printf("UME message released - sequence number %x (msgno %d)\n",
ackinfo->sequence_number, (int)ackinfo->msg_clientd - 1);
}
break;

JAVA API

case LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX:
// requires that source ume_message_stability_notification attribute is enabled
UMESourceEventAckInfo staInfo = sourceEvent.ackInfo();
System.out.print("UME store " + staInfo.storeIndex() + ": "
+ staInfo.store() + " message stable. SQN " + staInfo.sequenceNumber()
+ " (msgno " + staInfo.clientObject() + "). Flags "
+ staInfo.flags() + " ");
if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_INTRAGROUP_STABLE) != 0) {
System.out.print("IA "); // Stable within store group
}
if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_INTERGROUP_STABLE) != 0) {
System.out.print("IR "); // Stable amongst all stores
}
if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_STABLE) != 0) {
System.out.print("STABLE "); // Just plain stable
}
if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_STORE) != 0) {
System.out.print("STORE "); // Stability reported by UME Store
}
System.out.println();
break;
case LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX:
// requires that source ume_confirmed_delivery_notification attribute is enabled
UMESourceEventAckInfo cdelvinfo = sourceEvent.ackInfo();
System.out.print("UME delivery confirmation. SQN " + cdelvinfo.sequenceNumber()
+ ", RcvRegID " + cdelvinfo.receiverRegistrationId() + " (msgno "
+ cdelvinfo.clientObject() + "). Flags " + cdelvinfo.flags() + " ");
if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_UNIQUEACKS) != 0) {
System.out.print("UNIQUEACKS "); // Satisfied number of unique ACKs requirement
}
if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_UREGID) != 0) {
System.out.print("UREGID "); // Confirmation contains receiver application reg ID
}
if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_OOD) != 0) {
System.out.print("OOD "); // Confirmation received from arrival order receiver
}
if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_EXACK) != 0) {
System.out.print("EXACK "); // Confirmation explicitly sent by receiver
}
System.out.println();
break;
case LBM.SRC_EVENT_UME_MESSAGE_RECLAIMED:
// requires that source ume_confirmed_delivery_notification or
// ume_message_stability_notification attributes are enabled
System.out.println("UME message released - sequence number "
+ Long.toHexString(sourceEvent.sequenceNumber())
+ " (msgno "
+ Long.toHexString(((Integer)sourceEvent.clientObject()).longValue())
+ ")");
break;

.NET API

case LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX:
// requires that source ume_message_stability_notification attribute is enabled
UMESourceEventAckInfo staInfo = sourceEvent.ackInfo();
System.Console.Out.Write("UME store " + staInfo.storeIndex() + ": "
+ staInfo.store() + " message stable. SQN " + staInfo.sequenceNumber()
+ " (msgno " + ((int)staInfo.clientObject()).ToString("x") + ").
Flags " + staInfo.flags() + " ");
if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_INTRAGROUP_STABLE) != 0) {
System.Console.Out.Write("IA "); // Stable within store group
}
if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_INTERGROUP_STABLE) != 0) {
System.Console.Out.Write("IR "); // Stable amongst all stores
}
if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_STABLE) != 0) {
System.Console.Out.Write("STABLE "); // Just plain stable
}
if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_STORE) != 0) {
System.Console.Out.Write("STORE "); // Stability reported by UME Store
}
System.Console.Out.WriteLine();
break;
case LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX:
// requires that source ume_confirmed_delivery_notification attribute is enabled
UMESourceEventAckInfo cdelvinfo = sourceEvent.ackInfo();
System.Console.Out.Write("UME delivery confirmation. SQN " +
cdelvinfo.sequenceNumber()
+ ", RcvRegID " + cdelvinfo.receiverRegistrationId() + " (msgno "
+ ((int)cdelvinfo.clientObject()).ToString("x") + "). Flags " +
cdelvinfo.flags() + " ");
if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_UNIQUEACKS) != 0) {
System.Console.Out.Write("UNIQUEACKS "); // Satisfied number of unique ACKs requirement
}
if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_UREGID) != 0) {
System.Console.Out.Write("UREGID "); // Confirmation contains receiver application reg ID
}
if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_OOD) != 0) {
System.Console.Out.Write("OOD "); // Confirmation received from arrival order receiver
}
if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_EXACK) != 0) {
System.Console.Out.Write("EXACK "); // Confirmation explicitly sent by receiver
}
System.Console.Out.WriteLine();
break;
case LBM.SRC_EVENT_UME_MESSAGE_RECLAIMED:
// requires that source ume_confirmed_delivery_notification or
// ume_message_stability_notification attributes are enabled
System.Console.Out.WriteLine("UME message released - sequence number "
+ sourceEvent.sequenceNumber().ToString("x")
+ " (msgno "
+ ((int)sourceEvent.clientObject()).ToString("x")
+ ")");
break;


Mapping Your Message Numbers to Sequence Numbers  <-

The C API function lbm_src_sendv_ex() allows you to create a pointer to an object or structure. This pointer will be returned to your application along with all source events. You can then update the object or structure with source event information. For example, if your messages exceed 8K - which requires fragmentation your application's message into more than one UM message - receiving sequence number events with this pointer allows you to determine all the UM sequence numbers for the message and, therefore, how many release (reclaim) events to expect. The following two source code examples show how to:

  • Enable message sequence number information.
  • Handle sequence number source events to determine the application message number in the Source Event Handler.

C API - Enable Message Information

/* Enable message sequence number info to be returned */
exinfo.ume_msg_clientd = (void *)(msgno + 1);
/* msgno set to application message number (can't evaluate to NULL) */
while (lbm_src_send_ex(src, message, msglen, 0, &exinfo) == LBM_FAILURE) {
printf("Send unsuccessful. Waiting...\n");
SLEEP_MSEC(1000); /* Sleep for 1 second */
}
else {
fprintf(stderr, "lbm_src_send: %s\n", lbm_errmsg());
break;
}
}

C API - Sequence Number Event Handler

int handle_src_event(lbm_src_t *src, int event, void *ed, void *cd)
{
switch (event) {
{
printf("SQN [%x,%x] (msgno %d)\n", info->first_sequence_number,
info->last_sequence_number, (int)info->msg_clientd - 1);
}
else {
printf("SQN %x (msgno %d)\n", info->last_sequence_number,
(int)info->msg_clientd - 1);
}
}
break;
...
}
return 0;
}

JAVA API - Enable Message Information

LBMSourceSendExInfo exinfo = new LBMSourceSendExInfo();
exinfo.setClientObject(new Integer(msgno)); // msgno set to application message number
exinfo.setFlags(LBM.SRC_SEND_EX_FLAG_SEQUENCE_NUMBER_INFO);
// Enable message sequence number info to be returned
for (;;)
{
try {
src.send(message, msglen, 0, exinfo);
}
catch(UMENoRegException ex) {
try {
Thread.sleep(1000);
}
catch (InterruptedException e) { }
continue;
}
catch (LBMException ex) {
System.err.println("Error sending message: " + ex.toString());
}
break;
}

JAVA API - Sequence Number Event Handler

public int onSourceEvent(Object arg, LBMSourceEvent sourceEvent)
{
switch (sourceEvent.type())
{
case LBM.SRC_EVENT_SEQUENCE_NUMBER_INFO:
LBMSourceEventSequenceNumberInfo info = sourceEvent.sequenceNumberInfo();
if (info.firstSequenceNumber() != info.lastSequenceNumber()) {
System.out.println("SQN [" + info.firstSequenceNumber()
+ "," + info.lastSequenceNumber() + "] (msgno "
+ info.clientObject() + ")");
}
else {
System.out.println("SQN " + info.lastSequenceNumber()
+ " (msgno " + info.clientObject() + ")");
}
break;
...
}
return 0;
}

.NET API - Enable Message Information

LBMSourceSendExInfo exinfo = new LBMSourceSendExInfo();
exinfo.setClientObject(msgno); // msgno set to application message number
exinfo.setFlags(LBM.SRC_SEND_EX_FLAG_SEQUENCE_NUMBER_INFO);
// Enable message sequence number info to be returned
for (;;)
{
try {
src.send(message, msglen, 0, exinfo);
}
catch(UMENoRegException ex) {
System.Threading.Thread.Sleep(100);
continue;
}
catch (LBMException ex) {
System.Console.Out.WriteLine("Error sending message: " + ex.Message());
}
break;
}

.NET API - Sequence Number Event Handler

public void onSourceEvent(Object arg, LBMSourceEvent sourceEvent)
{
switch (sourceEvent.type())
{
case LBM.SRC_EVENT_SEQUENCE_NUMBER_INFO:
LBMSourceEventSequenceNumberInfo info = sourceEvent.sequenceNumberInfo();
if (info.firstSequenceNumber() != info.lastSequenceNumber()) {
System.Console.Out.WriteLine("SQN [" + info.firstSequenceNumber()
+ "," + info.lastSequenceNumber() + "] (cd "
+ ((int)info.clientObject()).ToString("x") + ")");
}
else {
System.Console.Out.WriteLine("SQN " + info.lastSequenceNumber()
+ " (msgno " + ((int)info.clientObject()).ToString("x") + ")");
}
break;
...
}
return 0;
}


Receiver Liveness Detection  <-

As an extension to Confirmed Delivery, you can set receivers to send a keepalive to a source during a measured absence of delivery confirmations (due to traffic lapse). In the event that neither message reaches the source within a designated interval, or if the delivery confirmation TCP connection breaks down, the receiver is assumed to have "died". UM then notifies the publishing application via context event callback. This lets the publisher assign a new subscriber.

To use this feature, set these five configuration options:

Note
Smart Sources do not support liveness detection.

This specialized feature is not recommended for general use. If you are considering it, please note the following caveats:

  • Do not use in conjunction with a UM Router.
  • There is a variety of potential network occurrences that can break or reset the TCP connection and falsely indicate the death of a receiver.
  • In cases where a receiver object is deleted while its context is not, the publisher may still falsely assume the receiver to be alive.

Other false receiver-alive assumptions could be caused by the following:

  • TCP connections can enter a half-open or otherwise corrupted state.
  • Failed TCP connections sometimes do not fully close, or experience objectionable delays before fully closing.
  • A switch or router failure along the path does not affect the TCP connection state.


Designing Persistent Receivers  <-

Receivers are predominantly interested in RegID management and recovery management.


Receiver RegID Management  <-

RegIDs are slightly more involved for receivers than for sources. Since RegIDs are per source per topic per store and a topic may have several sources, a receiver may have to manage several RegIDs per store in use. Fortunately, receivers in UM can leverage the RegID of the source with the use of a callback as discussed in Adding Fault Recovery with Registration IDs and shown in ume-example-rcv-2.c. Your application can determine the correct RegID to use and return it to UM. You can also use Session IDs to enable UM to manage receiver RegIDs. See Managing RegIDs with Session IDs.

Much like sources, receivers typically have a lifetime based on an amount of work, perhaps an infinite amount. And just like sources, it may be helpful to consider that a RegID is "assigned" at the start of that work and is out of use at the end. In between, the RegID is in use by the instance of the receiver application. However, the nature of RegIDs being per source means that the expected lifetime of a source should play a role in how RegIDs on the receiver are managed. Thus, it may be helpful for the application developer to consider the source application lifetime when deciding how best to handle RegIDs on the receiver.

Receiver Message and Event Handler

The Receiver Message and Event Handler is an application callback, defined at receiver initialization, to deliver received messages to your application. The following source code examples illustrate the use of a receiver message and event handler for registration messages. To accept other receiver events, additional case statements would be required, one for each additional event. See also Persistence Events

C API

int rcv_handle_msg(lbm_rcv_t *rcv, lbm_msg_t *msg, void *clientd)
{
switch (msg->type) {
printf("[%s][%s] UME registration error: %s\n", msg->topic_name,
msg->source, msg->data);
exit(0);
break;
{
printf("[%s][%s] UME registration successful. "
"SrcRegID %u RcvRegID %u\n",
msg->topic_name, msg->source,
}
break;
{
printf("[%s][%s] store %u: %s UME registration successful. "
"SrcRegID %u RcvRegID %u. Flags %x ",
msg->topic_name, msg->source, reg->store_index, reg->store,
if (reg->flags & LBM_MSG_UME_REGISTRATION_SUCCESS_EX_FLAG_OLD)
printf("OLD[SQN %x] ", reg->sequence_number);
if (reg->flags & LBM_MSG_UME_REGISTRATION_SUCCESS_EX_FLAG_NOCACHE)
printf("NOCACHE ");
printf("\n");
}
break;
{
printf("[%s][%s] UME registration complete. SQN %x. Flags %x ",
msg->topic_name, msg->source, reg->sequence_number, reg->flags);
if (reg->flags & LBM_MSG_UME_REGISTRATION_COMPLETE_EX_FLAG_QUORUM)
printf("QUORUM ");
if (reg->flags & LBM_MSG_UME_REGISTRATION_COMPLETE_EX_FLAG_RXREQMAX)
printf("RXREQMAX ");
printf("\n");
}
break;
printf("[%s][%s] UME registration change: %s\n", msg->topic_name,
msg->source, msg->data);
break;
...
default:
printf("Unknown lbm_msg_t type %x [%s][%s]\n", msg->type,
msg->topic_name, msg->source);
break;
}
return 0;
}

JAVA API

public int onReceive(Object cbArg, LBMMessage msg)
{
case LBM.MSG_UME_REGISTRATION_ERROR:
System.out.println("[" + msg.topicName() + "][" + msg.source()
+ "] UME registration error: " + msg.dataString());
break;
case LBM.MSG_UME_REGISTRATION_SUCCESS_EX:
UMERegistrationSuccessInfo reg = msg.registrationSuccessInfo();
System.out.print("[" + msg.topicName() + "][" + msg.source()
+ "] store " + reg.storeIndex() + ": "
+ reg.store() + " UME registration successful. SrcRegID "
+ reg.sourceRegistrationId() + " RcvRegID "
+ reg.receiverRegistrationId()
+ ". Flags " + reg.flags() + " ");
if ((reg.flags() & LBM.MSG_UME_REGISTRATION_SUCCESS_EX_FLAG_OLD) != 0) {
System.out.print("OLD[SQN " + reg.sequenceNumber() + "] ");
}
if ((reg.flags() & LBM.MSG_UME_REGISTRATION_SUCCESS_EX_FLAG_NOCACHE) != 0) {
System.out.print("NOCACHE ");
}
System.out.println();
break;
case LBM.MSG_UME_REGISTRATION_COMPLETE_EX:
UMERegistrationCompleteInfo regcomplete = msg.registrationCompleteInfo();
System.out.print("[" + msg.topicName() + "][" + msg.source()
+ "] UME registration complete. SQN " + regcomplete.sequenceNumber()
+ ". Flags " + regcomplete.flags() + " ");
if ((regcomplete.flags() & LBM.MSG_UME_REGISTRATION_COMPLETE_EX_FLAG_QUORUM) != 0) {
System.out.print("QUORUM ");
}
if ((regcomplete.flags() & LBM.MSG_UME_REGISTRATION_COMPLETE_EX_FLAG_RXREQMAX) != 0) {
System.out.print("RXREQMAX ");
}
System.out.println();
break;
case LBM.MSG_UME_REGISTRATION_CHANGE:
System.out.println("[" + msg.topicName() + "][" + msg.source()
+ "] UME registration change: " + msg.dataString());
break;
...
default:
System.err.println("Unknown lbm_msg_t type " + msg.type() + " ["
+ msg.topicName() + "][" + msg.source() + "]");
break;
}
return 0;
}

.NET API

public int onReceive(Object cbArg, LBMMessage msg)
{
case LBM.MSG_UME_REGISTRATION_ERROR:
System. Console.Out.WriteLine("[" + msg.topicName() + "]["
+ msg.source() + "] UME registration error: " + msg.dataString());
break;
case LBM.MSG_UME_REGISTRATION_SUCCESS_EX:
UMERegistrationSuccessInfo reg = msg.registrationSuccessInfo();
System.Console.Out.Write("[" + msg.topicName() + "][" + msg.source()
+ "] store " + reg.storeIndex() + ": "
+ reg.store() + " UME registration successful. SrcRegID "
+ reg.sourceRegistrationId() + " RcvRegID "
+ reg.receiverRegistrationId()
+ ". Flags " + reg.flags() + " ");
if ((reg.flags() & LBM.MSG_UME_REGISTRATION_SUCCESS_EX_FLAG_OLD) != 0) {
System.Console.Out.Write ("OLD[SQN " + reg.sequenceNumber() + "] ");
}
if ((reg.flags() & LBM.MSG_UME_REGISTRATION_SUCCESS_EX_FLAG_NOCACHE) != 0) {
System.Console.Out.Write ("NOCACHE ");
}
System.Console.Out.WriteLine();
break;
case LBM.MSG_UME_REGISTRATION_COMPLETE_EX:
UMERegistrationCompleteInfo regcomplete = msg.registrationCompleteInfo();
System.Console.Out.Write("[" + msg.topicName() + "][" + msg.source()
+ "] UME registration complete. SQN "
+ regcomplete.sequenceNumber()
+ ". Flags " + regcomplete.flags() + " ");
if ((regcomplete.flags() & LBM.MSG_UME_REGISTRATION_COMPLETE_EX_FLAG_QUORUM) != 0) {
System.Console.Out.Write("QUORUM ");
}
if ((regcomplete.flags() & LBM.MSG_UME_REGISTRATION_COMPLETE_EX_FLAG_RXREQMAX) != 0) {
System.Console.Out.Write("RXREQMAX ");
}
System.Console.Out.WriteLine();
break;
case LBM.MSG_UME_REGISTRATION_CHANGE:
System.Console.Out.WriteLine("[" + msg.topicName() + "][" + msg.source()
+ "] UME registration change: " + msg.dataString());
break;
...
default:
System.Console.Out.WriteLine("Unknown lbm_msg_t type " + msg.type()
+ " [" + msg.topicName() + "][" + msg.source() + "]");
break;
}
return 0;
}


Recovery Management  <-

Recovery management for failed and restarted receivers is fairly simple. UM requests any missed messages from the store(s) and delivers them to the restarted receiver. However, your application can override that default behavior either by configuring a retransmit_request_maximum (receiver) value, or by configuring a ume_recovery_sequence_number_info_function (receiver) application callback, or both.

For example, let's say a source sends 7 messages with sequence numbers 0-6 which are stabilized at the store. A C-based receiver, configured with retransmit_request_maximum (receiver) set to 2, and an application callback ume_recovery_sequence_number_info_function (receiver), consumes (and acknowledges) message 0, goes down, then restarts right after message 6.

During receiver registration, the lbm_ume_rcv_recovery_info_ex_func_t application callback is called with the following values in the passed-in structure lbm_ume_rcv_recovery_info_ex_func_info_t *info:

info->high_sequence_number == 6
info->low_rxreq_max_sequence_number == 4
info->low_sequence_number == 1

Where:

Normally, UM would start delivering messages at 1, but retransmit_request_maximum (receiver) is set to 2, which overrides UM's normal behavior. So in this example, the first message delivered will be number 4.

Finally, the application can, at run-time, further override the starting sequence number. The callback function can modify the contents of the passed-in structure lbm_ume_rcv_recovery_info_ex_func_info_t *info; specifically it can update the lbm_ume_rcv_recovery_info_ex_func_info_t::low_sequence_number field. When the callback returns, UM examines that field to see if it was modified by the callback. If so, UM overrides the effect of retransmit_request_maximum (receiver) and starts at the requested sequence number.

Notice that this design does not allow the callback to override the effect of retransmit_request_maximum (receiver) by setting the lbm_ume_rcv_recovery_info_ex_func_info_t::low_sequence_number field to its original value, 1 in this example. Upon return, UM will see the value unchanged, and will allow retransmit_request_maximum (receiver) to override the starting sequence number. This is only an issue if both retransmit_request_maximum (receiver) and ume_recovery_sequence_number_info_function (receiver) are used. If the application wants to use the sequence number remembered by the store, it should not configure retransmit_request_maximum (receiver).


Duplicate Message Delivery  <-

In a distributed system, it is not possible to guarantee "once-and-only-once" delivery of messages in the face of unpredictable system or component failure. Regardless of the algorithms and handshaking, there is always the possibility of messages sent that are never received, as well as messages received and then received again if the receiving application fails and restarts.

UM's persistence design is based on the principle of being close to once-and-only-once, but when that is not possible, UM prefers to fail on the side of duplicate message delivery. Due to other design goals (low latency and high throughput), the possibility of receiving duplicate messages is significant after an application failure and restart.

It is therefore important for persistent applications to be designed to tolerate duplicate message reception, either by making message processing idempotent, or by including logic in the receiving application to detect duplicates and only process the messages which have not been previously processed.

To assist the application in implementing "de-duplication", all messages retransmitted to a receiver are marked as retransmissions via a flag in the message structure. Thus it is easy for an application to determine if a message is a new "live" message from the source, or a retransmission, which may or may not have been processed before the failure. The presence or absence of the retransmit flag gives the application a hint of how best to handle the message with regard to it being processed previously or not.

Informatica recommends that you always check the data or other message properties of messages with the retransmit flag set to be sure the message has not been already processed. Relying on UM sequence numbers is not a 100% reliable method for detecting duplicate messages.


Setting Callback Function to Set Recovery Sequence Number  <-

Whereas the UM persistence design attempts to choose the correct starting sequence number for a recovering receiver, there are cases where the application wishes to override UM's choice.

The sample code below demonstrates how to use the recovery sequence number info function to determine the stored message with which to restart a receiver. This example retrieves the low sequence number from the recovery sequence number structure and adds an offset to determine the beginning sequence number. The offset is a value completely under the control of your application. For example, if a receiver was down for a "long" period and you only want the receiver to receive the last 10 messages, use an offset to start the receiver with the 10th most recent message. If you wish not to receive any messages, set the lbm_ume_rcv_recovery_info_ex_func_info_t::low_sequence_number to the lbm_ume_rcv_recovery_info_ex_func_info_t::high_sequence_number plus one.

C API

cb.func = ume_rcv_seqnum_ex; /* declared below */
cb.clientd = NULL;
"ume_recovery_sequence_number_info_function",
&cb, sizeof(cb)) == LBM_FAILURE) {
fprintf(stderr,
"lbm_rcv_topic_attr_setopt:ume_recovery_sequence_number_info_function: %s\n",
exit(1);
}
printf("Will use seqnum info with low offset %u.\n", seqnum_offset);
...
int ume_rcv_seqnum_ex(lbm_ume_rcv_recovery_info_ex_func_info_t *info, void *clientd)
{
lbm_uint_t new_lo = info->low_sequence_number + seqnum_offset;
printf("[%s] SQNs Low %x (will set to %x), Low rxreqmax %x, High %x (CD %p)\n",
info->low_sequence_number = new_lo;
return 0;
}

JAVA API

UMERcvRecInfo umerecinfocb = new UMERcvRecInfo(seqnum_offset);
rcv_attr.setRecoverySequenceNumberCallback(umerecinfocb, null);
System.out.println("Will use seqnum info with low offset " + seqnum_offset);
class UMERcvRecInfo implements UMERecoverySequenceNumberCallback {
private long _seqnum_offset = 0;
public UMERcvRecInfo(long seqnum_offset) {
_seqnum_offset = seqnum_offset;
}
public int setRecoverySequenceNumberInfo(Object cbArg,
UMERecoverySequenceNumberCallbackInfo cbInfo)
{
long new_low = cbInfo.lowSequenceNumber() + _seqnum_offset;
System.out.println("SQNs Low " + cbInfo.lowSequenceNumber() + " (will set to "
+ new_low + "), Low rxreqmax " + cbInfo.lowRxReqMaxSequenceNumber()
+ ", High " + cbInfo.highSequenceNumber());
try {
cbInfo.setLowSequenceNumber(new_low);
}
catch (LBMEInvalException e) {
System.err.println(e.getMessage());
}
return 0;
}
}

.NET API

UMERcvRecInfo umerecinfocb = new UMERcvRecInfo(seqnum_offset);
rcv_attr.setRecoverySequenceNumberCallback(umerecinfocb, null);
System.Console.Out.WriteLine("Will use seqnum info with low offset " + seqnum_offset);
class UMERcvRecInfo implements UMERecoverySequenceNumberCallback {
private long _seqnum_offset = 0;
public UMERcvRecInfo(long seqnum_offset) {
_seqnum_offset = seqnum_offset;
}
public int setRecoverySequenceNumberInfo(Object cbArg,
UMERecoverySequenceNumberCallbackInfo cbInfo)
{
long new_low = cbInfo.lowSequenceNumber() + _seqnum_offset;
System.Console.Out.WriteLine ("SQNs Low " + cbInfo.lowSequenceNumber() + " (will set to "
+ new_low + "), Low rxreqmax " + cbInfo.lowRxReqMaxSequenceNumber()
+ ", High " + cbInfo.highSequenceNumber());
try {
cbInfo.setLowSequenceNumber(new_low);
}
catch (LBMEInvalException e) {
System.Console.Out.WriteLine (e.getMessage());
}
return 0;
}
}


Persistence Message Consumption  <-

Receivers use message consumption, defined as message deletion, to indicate that UM should notify the store(s) that the application consumed the message. This notification takes the form of an acknowledgement, or ACK, to the store(s) in use, and optionally to the source if you configure the source for delivery confirmation.

In many applications, the message receiver application callback will fully process the received message. When the application callback returns, the message should be deleted and acknowledged.

However, there are other application designs where a received message cannot be fully processed inside the receiver application callback. For example, the message might need to be passed to a worker thread for longer-term processing. Or the acknowledgement must be delayed until some other event happens, like a handshake with another application. In these cases, the message deletion and/or message acknowledgement must not be done when the receiver callback returns.

Finally, for high-throughput applications, an application can completely suppress the acknowledgement of each individual message in favor of acknowledgement batching (acknowledging multiple messages in one operation). This is done to reduce the per-message overhead. Note that acknowledgement batching increases the chances that a restarted application will receive duplicate messages (messages that had been previously process but not yet acknowledged). See Duplicate Message Delivery for more information.


Immediate Message Consumption  <-

In many applications, the message receiver application callback will fully process the received message. When the receive callback returns, the message should be deleted and acknowledged. This is handled differently between the C API vs. the Java and .NET APIs.

C API

The default behavior for a C receiver application callback is for the message to be deleted and acknowledged when the receiver callback returns. No special coding is needed for this use case.

Java and .NET

With Java and .NET, the UM library is not able to differentiate between a message that is passed to a different part of the application vs. a message which is simply dereferenced for eventual garbage collection. So the default behavior of the UM library is different – it is assumed that the message should not be deleted and acknowledged when the receiver application callback returns. Instead, the application is expected to explicitly dispose of received messages when processing is complete.

In the case where message processing is completed in the receiver callback, the application must call the "dispose()" method of the message object before returning. This triggers acknowledgement as well as cleanup of the message's resources.


Delayed Message Processing  <-

There are application designs where a received message cannot be fully processed inside the receiver application callback. For example, the message might need to be passed to a worker thread for longer-term processing. Or the acknowledgement must be delayed until some other event happens, like a handshake with another application.

This is handled differently between the C API vs. the Java and .NET APIs.

C API

In the C API, the application's receiver callback function must call the lbm_msg_retain() function for the received message. This suppresses the automatic deletion of the received message when the receiver callback returns, and allows the message buffer to be handed to some other part of the application for processing and deletion at a later time.

When the application subsequently completes all processing of the message and is ready for the message to be deleted and acknowledged, it calls lbm_msg_delete().

Java and .NET

With Java and .NET, the UM library assumes that the message should not be deleted and acknowledged when the receiver application callback returns. The callback can simply pass the message to some other part of the application for subsequent processing.

When the application has completed all processing on the message, the message's "dispose()" method should be called. This releases resources held by the object and also triggers the acknowledgement.


Batching Acknowledgments  <-

For high-throughput applications, it is often desired to reduce the per-message overhead. Sending acknowledgements to the Store and optionally to the source normally involves multiple socket operations, which can limit the maximum sustainable throughput of a persistent receiver.

A significant reduction in per-message overhead can be achieved by batching acknowledgements. In this use case, the sending of acknowledgements is delayed until multiple messages have been received and processed. Then an acknowledgement is sent which covers all messages processed so far.

Warning
While ACK batching provides significant improvements in receiver throughput, it also increases the probability that a failed and restarted receiver will be sent duplicate messages (i.e. messages that the application has already received and processed).

ACK Batching can be done implicitly or explicitly. For implicit ACK batching, use the configuration options ume_use_ack_batching (receiver) and ume_ack_batching_interval (context). Note that implicit ACK batching also supports out-of-order acknowledgements. See ACK Ordering.

Explicit ACK batching gives the application precise control over when acknowledgements are sent via API calls. This mode of operation is enabled with the ume_explicit_ack_only (receiver) configuration option. If enabled, acknowledgements are only sent as a result of the application explicitly calling an API. This allows the application to use application-level knowledge to optimize when to send acknowledgements, potentially minimizing the time that processed messages are left unacknowledged (and therefore minimizing the number of potential duplicate messages).

See lbm_ume_ack_send_explicit_ack() and lbm_msg_ume_send_explicit_ack() for the C API. See com::latencybusters::lbm::LBMMessage::sendExplicitAck() for Java and .NET. See Explicit Acknowledgments for details on explicit ACKs.


ACK Ordering  <-

The Persistent Store does not support "out of order" acknowledgement of messages. If the Store receives an acknowledgement of sequence number N, that implicitly acknowledges all sequence numbers less than N. If a receiving application has the ability to complete processing of messages out of order, it must ensure that an acknowledgement is sent for a given message until all previously-received messages have been completely processed.

Normally, the only way that a receiving application can process messages out of order is to retain those messages and complete processing of them outside of the receiver application callback function. This normally requires "retaining" the messages so that they aren't deleted (and therefore acknowledged) automatically when the receiver callback returns. In this usage, when a message is completely processed, that message is deleted by the application, triggering the acknowledgement of that message. However, if the application design allows those messages to be processed out of order, then the risk exists that the acknowledgement of a given message will implicitly acknowledge previous message which have not been completely processed. This will prevent those incompletely processed messages from being recovered if the receiving application fails and restarts.

ACK Batching can provide a solution, implicitly or explicitly.

The implicit form of ACK batching provides, as a convenience, the ability to postpone the sending of a message ACK until all previous received messages have also been processed. When the UM context wakes up every ume_ack_batching_interval (context) milliseconds, it checks for unacknowledged messages that have been deleted, either implicitly from the receiver callback returning, or explicitly by API calls to retain and then delete the message. UM will only acknowledge up to the highest continuous sequence number.

For example, let's say the application deletes messages with sequence numbers 0, 1, 5, 2, 4. Messages 3 and 6 are still being processed. If the context wakes up at this point, it will send an acknowledgement for sequence 2. If the application fails at this point and restarts, the Store will re-send messages 3, 4, 5, and 6. The receiving application must handle the fact that 3 and 6 were incompletely processed, whereas 4 and 5 were completely processed (see Duplicate Message Delivery).

Instead of using implicit batching for this, the application can be coded to use Explicit Acknowledgments. However, in this case, the application has the responsibility to implement a similar algorithm as the implicit ACK batcher described above. I.e. even though the messages 4 and 5 were fully processed, the application would need to postpone sending an acknowledgement until message 3 is also completed, at which point a single acknowledgement for sequence 5 can be sent.


Explicit Acknowledgments  <-

UM supports Explicit acknowledgement which suppresses UM's default acknowledgement behavior, allowing your application complete control of message consumption notification.

There are two common use cases for Explicit Acknowledgements:

  • Deferred Acknowledgement.
  • Application-level ACK batching.

Deferred Acknowledgement means that the receiving application is not able to fully process a message within the message receiver application callback. For example, the message may require processing in a separate thread. By default, UM will acknowledge a persisted message when the receiver callback returns.

Application-level ACK batching means that the application chooses not to acknowledge every received message. Instead, it implements its own logic to decide which messages to acknowledge. Note that acknowledging a given message implicitly acknowledges all earlier messages. For example, acknowledging messages 5, 10, and 15 tells the Store that all messages 0-15 are acknowledged.

Also note that this imposes the restriction that messages be acknowledged in ascending order. See ACK Ordering for more information.

Explicit acknowledgement is enabled using the configuration option ume_explicit_ack_only (receiver).


Object-free Explicit Acknowledgments  <-

When using explicit ACKs, you can extract ACK information from messages. This allows the received message buffer to be deleted when the receiver callback is done, while still allowing the application to save the ACK structure for persistent acknowledgement to the Store at a future time. This can improve receiver performance when used with the Receive Buffer Recycling feature to reduce the per-message use of dynamic memory (malloc/free) with a persistent receiver. Extracting ACKs can also additionally improve performance of Java and .NET applications by allowing the use of Zero Object Delivery.

The following source code examples show how to extract ACK information and send an explicit ACK.

C API

int rcv_handle_msg(lbm_rcv_t *rcv, lbm_msg_t *msg, void *clientd)
{
lbm_ume_rcv_ack_t *ack = NULL;
...
defer_ack(ack); /* Pass the "ack" to another thread or work queue. */
...
return 0;
}
int worker()
{
lbm_ume_rcv_ack_t *ack = NULL;
...
ack = get_deferred_ack(); /* Get "ack" that was saved above. */
/* Some applications improve throughput by not ACKing every message. */
if (ack_this_message) {
lbm_ume_ack_send_explicit_ack(ack, msg->sequence_number);
}
lbm_ume_ack_delete(ack); /* Extracted ack *must* be deleted. */
...
}

JAVA API or .NET API

public int onReceive(Object cbArg, LBMMessage msg)
{
UMEMessageAck ack;
...
ack = msg.extractUMEAck();
defer_ack(ack); /* Pass the "ack" to another thread or work queue. */
...
return 0;
}
int worker()
{
UMEMessageAck ack;
ack = get_deferred_ack(); /* Get "ack" that was saved above. */
/* Some applications improve throughput by not ACKing every message. */
if (ack_this_message) {
ack.sendExplicitAck(msg.sequenceNumber());
}
ack.dispose(); /* Extracted ack *must* be deleted. */
}


Designing Persistent Stores  <-

As mentioned in Persistent Store Concept, the persistent stores, also just called stores, actually persist the source and receiver state and use RegIDs to identify sources and receivers. Each source to which a store provides persistence may have zero or more receivers. The store maintains each receiver's state along with the source's state and the messages the source has sent.

The store can be configured with its own set of options to persist this state information on disk or simply in memory. The term disk store is used to signify a store that persists state to disk, and the term memory store is used to signify a store that persists state only in memory. A store may also be configured not to cache the source's data, but to simply persist the source and receiver state in memory. This is called a no-cache store.

A source does not send data to the store and then have the store forward it to the receivers. In UM, the source sends to receivers and the stores in parallel. See Persistence Normal Operation. Thus, UM can provide extremely low latency to receiving applications.

The store(s) that a source uses are part of the source's configuration settings. Sources must be configured to use specific store(s) in a Quorum/Consensus arrangement.

Receivers, on the other hand, do not need to be configured with store information a priori. The source provides store information to receivers via a Source Registration Information (SRI) message after the source registers with a store. Thus the receivers learn about stores from the source, without needing to be configured themselves. Because receivers learn about the store or stores with which they must register via a SRI record, the source must be available to receivers. However, the source does not have to be actively sending data to do this.


Store Log File  <-

The store daemon generates log messages that are used to monitor its health and operation. You can configure these to be directed to "console" (standard output) or a specified log "file", via the <log> configuration element. Normally "console" is only used during testing, as a persistent log file is preferred for production use. The store does not over-write log files on startup, but instead appends them.


Store Rolling Logs  <-

To prevent unbounded disk file growth, the store supports rolling log files. When the log file rolls, the file is renamed according to the model:
  CONFIGUREDNAME_PID.DATE.SEQNUM
where:

  • CONFIGUREDNAME - Root name of log file, as configured by user.
  • PID - Process ID of the store daemon process.
  • DATE - Date that the log file was rolled, in YYYY-MM-DD format.
  • SEQNUM - Sequence number, starting at 1 when the process starts, and incrementing each time the log file rolls.

For example: umestorelog_9867.2017-08-20.2

The user can configure when the log file is eligible to roll over by either or both of two criteria: size and frequency. The size criterion is in millions of bytes. The frequency criterion can be daily or hourly. Once one or both criteria are met, the next message written to the log will trigger a roll operation. These criteria are supplied as attributes to the <log> configuration element.

If both criteria are supplied, then the first one to be reached will trigger a roll. For example, consider the setting:

<log type="file" size="23" frequency="daily">store.log</log>

Let say that the log file grows at 1 million bytes per hour. At 11:00 pm, the log file will reach 23 million bytes, and will roll. Then, at 12:00 midnight, the log file will roll again, even though it is only 1 million bytes in size.

Note
The rolling logs cannot be configured to automatically overwrite old logs. Thus, the amount of disk space consumed by log files will grow without bound. The user must implement a desired process of archiving or deleting older log files according to the user's preference.


Quorum/Consensus Store Usage  <-

To provide the highest degree of resiliency in the face of failures, UM provides the Quorum/Consensus failover strategy which allows a source to provide UM with a number of stores to be used at the same time. Multiple stores can fail and messaging can continue operation unhindered as long as a majority of configured stores are operational.

Quorum/Consensus, also called QC, allows a source and the associated receivers to have their persisted state maintained at several stores at the same time. Central to QC is the concept of a group of stores, which is a logical grouping of stores that are intended to signify a single entity of resilience. Within the group, individual stores may fail but for the group as a whole to be viable and provide resiliency, a quorum must be available. In UM, a quorum is a simple majority. For example, in a group of five stores, three stores are required to maintain a quorum. One or two stores may fail and the group continues to provide resiliency. UM requires a source to have a quorum of stores available in the group in order to send messages. A group can consist of a single store.

QC also provides the ability to use multiple groups. As long as a single group maintains quorum, then UM allows a source to proceed. Groups are logical in nature and can be combined in any way imaginable, such as by store location, store type, etc. In addition, QC provides the ability to specify backup stores within groups. Backups may be used if or when a store in the group becomes unresponsive to the source. Quorum/Consensus allows a source many different failure scenarios simply not available in other persistent messaging systems.


Sources Using Quorum/Consensus Store Configuration  <-

In the case of Quorum/Consensus store behavior, a message is considered stable after it has been successfully stored within a group of stores or among groups of stores according to the two settings, intergroup behavior and intragroup behavior, described below.

  • The intragroup behavior specifies the requirements needed to stabilize a message among the stores within a group. A message is stable for the group once it is successfully stored at a quorum (majority) of the group's stores or successfully stored in all the stores in the group.

  • The intergroup behavior specifies the requirements needed to stabilize a message among groups of stores. A message is stable among the groups if it is successfully stored at any group, a majority of groups, all groups, or all active groups.

Notice that a message needs to meet intragroup stability requirements before it can meet intergroup stability requirements. These options provide a number of possibilities for retention of messages for the source.

The following figure displays a 3-group Quorum/Consensus configuration with each group in a different location. A message is considered stable when it has been successfully stored at a quorum of stores in all the active groups.

Quorum/Consensus - Single Location Groups

qc_config_one_locn.png

The source application's UM configuration file appears below.

source ume_store 10.29.3.77:10313:101000:0
source ume_store 10.29.3.77:11313:110000:0
source ume_store 10.29.3.77:12313:120000:0
source ume_store 10.29.3.77:13313:130000:0
source ume_store 10.29.3.77:14313:140000:0
source ume_store 10.29.3.78:15313:150000:1
source ume_store 10.29.3.78:16313:160000:1
source ume_store 10.29.3.78:17313:170000:1
source ume_store 10.29.3.79:18313:180000:2
source ume_store 10.29.3.79:19313:190000:2
source ume_store 10.29.3.79:29313:290000:2
source ume_store 10.29.3.79:39313:390000:2
source ume_store 10.29.3.79:49313:490000:2
source ume_message_stability_notification 1
source ume_store_behavior qc
source ume_store_group 0:5
source ume_store_group 1:3
source ume_store_group 2:5
source ume_retention_intragroup_stability_behavior quorum
source ume_retention_intergroup_stability_behavior all-active


Persistent Fault Recovery  <-

Recovery from source and receiver failure is the real heart of persistent operation. For a source, this means continuing operation from where it stopped. For a receiver, this means essentially the same thing, but with the retransmission of missed messages. Application developers can easily leverage the information in UM to make their applications recover from failure in graceful ways.

Late Join is the mechanism of persistent recovery as well as an UM streaming feature. If Late Join is turned off on a source (late_join (source)) or receiver (use_late_join (receiver)), it also turns off persistent recovery. In order to control Late Join behavior, UM provides a mechanism for a receiver to control the low sequence number. See Recovery Management.

Not all failures are recoverable. For application developers it usually pays in the long run to identify what types of errors are non-recoverable and how best to handle them when possible. Such an exercise establishes the precise boundaries of expected versus abnormal operating conditions.


Persistent Source Recovery  <-

The following shows the basic steps of source recovery:

  1. Re-register with the store.
  2. Determine the highest sequence number that the store has from the source.
  3. Resume sending with the next sequence number.

Because UM allows you to stream messages and not wait until a message is stable at the persistent store before sending the next message, the main task of source recovery is to determine what messages the persistent store(s) have and what they don't. Therefore, when a source re-registers with a store during recovery, the store tells the source what sequence number it has as the most recent from the source. The registration event informs the application of this sequence number. See Source Event Handler.

In addition, a mechanism exists (LBM_SRC_EVENT_SEQUENCE_NUMBER_INFO) that allows the application to know the sequence number assigned to every piece of data it sends. The combination of registration and sequence number information allows an application to know exactly what a store does have and what it does not and where it should pick up sending. An application designed to stream data in this way should consider how best to maintain this information.

When QC is in use, UM uses the consensus of the group(s) to determine what sequence number to use in the first message it will send. This is necessary as not all stores can be expected to be in total agreement about what was sent in a distributed system. The application can configure the source with the ume_consensus_sequence_number_behavior (source) to use the lowest sequence number of the latest group of sequence numbers seen from any store, the highest, or the majority. In most cases, the majority, which is the default, makes the most sense as the consensus. The lowest is a very conservative setting. And the highest is somewhat optimistic. Your application has the flexibility to handle this in any way needed.

If streaming is not what an application desires due to complexity, then it is very simple to use the Persistence Events delivered to the application to mimic the behavior of restricting a source to having only one unstable message at a time.


Persistent Receiver Recovery  <-

The following shows the basic steps of receiver recovery:

  1. Re-register with the store.
  2. Determine the low sequence number.
  3. Request retransmission of messages starting with the low sequence number.

UM provides extensive options for controlling how receivers handle recovery. By default, receivers want to restart after the last piece of data that was consumed prior to failure or graceful suspension. Since UM persists receiver state at the store, receivers request this state from the store as part of re-registration and recovery. Receiving applications experiencing unrecoverable loss can potentially retrieve missed messages from the stores by deleting and recreating the receiver object.

The actual sequence number that a receiver uses as the first topic level message to resume reception with is called the "low sequence number". UM provides a means of modifying this sequence number if desired. An application can decide to use the sequence number as is, to use an even older sequence number, to use a more recent sequence number, or to simply use the most recent sequence number from the source. See Recovery Management and Setting Callback Function to Set Recovery Sequence Number. This allows receivers great flexibility on a per source basis when recovering. New receivers, receivers with no pre-existing registration, also have the same flexibility in determining the sequence number to begin data reception.

Like sources, when QC is in use, UM uses the consensus of the group(s) to determine the low sequence number. And as with sources, this is necessary as not all stores can be expected to be in total agreement about what was acknowledged. The application can configure the receiver with ume_consensus_sequence_number_behavior (receiver) to use the lowest sequence number of the latest group of sequence numbers seen from any store, the highest, or the majority. In most cases, the majority, which is the default, makes the most sense as the consensus. The lowest is a very conservative setting. And the highest is somewhat optimistic. In addition, this sequence number may be modified by the application after the consensus is determined.

For QC, UM load balances receiver retransmission requests among the available stores. In addition, if requests are unanswered, retransmissions of the actual requests will use different stores. This means that as long as a single store has a message, then it is possible for that message to be retransmitted to a requesting receiver.


Callable Store  <-

It is possible for an application to start an instance of the store to run as an independent set of threads within the application process. However, there are several restrictions:

  1. The application may not make use of messaging. I.e. an application which intends to start a store instance must not create contexts, sources, or receivers, or make any use of UM except starting (and optionally stopping) the store. For applications that need to use messaging, it is suggested that the application create a child process from which to invoke the store. The parent process can then use messaging freely. See the example program umestored_example.c for an example of how this can be done.

  2. Only a C API is provided at this time. Two API functions are available: umestored_main() to start the store threads running, and umestored_main_shutdown() to request the store threads to stop gracefully.

  3. The umestored_main() API will not return until the store exits, either by processing a signal, or by the application calling umestored_main_shutdown(). When umestored_main() does return, the store is in a safe state for the application to exit.

  4. Only a single instance of the store may be started. This means that an application may not have two stores running concurrently, and it also means that an application may not start a store, shut it down, and then start it again. The store API is "single use".

  5. The application may not set signal handlers for SIGPIPE, SIGUSR1, SIGINT, or SIGTERM. The store uses those signals. For applications that need to handle those signals, it is suggested that the application create a child process, as mentioned above (#1).

For an example of how to use the umestored_main() API, see the example program umestored_example.c. Note that while the callable store APIs are usable on all supported platforms, this example program is restricted to Linux due to its use of prctl(), a Linux-only function.