Guide for Persistence
Designing Persistent Receivers

Receivers are predominantly interested in RegID management and recovery management.


Receiver RegID Management  <-

RegIDs are slightly more involved for receivers than for sources. Since RegIDs are per source per topic per Store and a topic may have several sources, a receiver may have to manage several RegIDs per Store in use. Fortunately, receivers in UM can leverage the RegID of the source with the use of a callback as discussed in Adding Fault Recovery with Registration IDs and shown in ume-example-rcv-2.c. Your application can determine the correct RegID to use and return it to UM. You can also use Session IDs to enable UM to manage receiver RegIDs. See Managing RegIDs with Session IDs.

Much like sources, receivers typically have a lifetime based on an amount of work, perhaps an infinite amount. And just like sources, it may be helpful to consider that a RegID is "assigned" at the start of that work and is out of use at the end. In between, the RegID is in use by the instance of the receiver application. However, the nature of RegIDs being per source means that the expected lifetime of a source should play a role in how RegIDs on the receiver are managed. Thus, it may be helpful for the application developer to consider the source application lifetime when deciding how best to handle RegIDs on the receiver.

Receiver Message and Event Handler

The Receiver Message and Event Handler is an application callback, defined at receiver initialization, to deliver received messages to your application. The following source code examples illustrate the use of a receiver message and event handler for registration messages. To accept other receiver events, additional case statements would be required, one for each additional event. See also Persistence Events

C API

int rcv_handle_msg(lbm_rcv_t *rcv, lbm_msg_t *msg, void *clientd)
{
switch (msg->type) {
printf("[%s][%s] UME registration error: %s\n", msg->topic_name,
msg->source, msg->data);
exit(0);
break;
{
printf("[%s][%s] UME registration successful. "
"SrcRegID %u RcvRegID %u\n",
msg->topic_name, msg->source,
}
break;
{
printf("[%s][%s] store %u: %s UME registration successful. "
"SrcRegID %u RcvRegID %u. Flags %x ",
msg->topic_name, msg->source, reg->store_index, reg->store,
if (reg->flags & LBM_MSG_UME_REGISTRATION_SUCCESS_EX_FLAG_OLD)
printf("OLD[SQN %x] ", reg->sequence_number);
printf("\n");
}
break;
{
printf("[%s][%s] UME registration complete. SQN %x. Flags %x ",
msg->topic_name, msg->source, reg->sequence_number, reg->flags);
if (reg->flags & LBM_MSG_UME_REGISTRATION_COMPLETE_EX_FLAG_QUORUM)
printf("QUORUM ");
if (reg->flags & LBM_MSG_UME_REGISTRATION_COMPLETE_EX_FLAG_RXREQMAX)
printf("RXREQMAX ");
printf("\n");
}
break;
printf("[%s][%s] UME registration change: %s\n", msg->topic_name,
msg->source, msg->data);
break;
...
default:
printf("Unknown lbm_msg_t type %x [%s][%s]\n", msg->type,
msg->topic_name, msg->source);
break;
}
return 0;
}

JAVA API

public int onReceive(Object cbArg, LBMMessage msg)
{
case LBM.MSG_UME_REGISTRATION_ERROR:
System.out.println("[" + msg.topicName() + "][" + msg.source()
+ "] UME registration error: " + msg.dataString());
break;
case LBM.MSG_UME_REGISTRATION_SUCCESS_EX:
UMERegistrationSuccessInfo reg = msg.registrationSuccessInfo();
System.out.print("[" + msg.topicName() + "][" + msg.source()
+ "] store " + reg.storeIndex() + ": "
+ reg.store() + " UME registration successful. SrcRegID "
+ reg.sourceRegistrationId() + " RcvRegID "
+ reg.receiverRegistrationId()
+ ". Flags " + reg.flags() + " ");
if ((reg.flags() & LBM.MSG_UME_REGISTRATION_SUCCESS_EX_FLAG_OLD) != 0) {
System.out.print("OLD[SQN " + reg.sequenceNumber() + "] ");
}
System.out.println();
break;
case LBM.MSG_UME_REGISTRATION_COMPLETE_EX:
UMERegistrationCompleteInfo regcomplete = msg.registrationCompleteInfo();
System.out.print("[" + msg.topicName() + "][" + msg.source()
+ "] UME registration complete. SQN " + regcomplete.sequenceNumber()
+ ". Flags " + regcomplete.flags() + " ");
if ((regcomplete.flags() & LBM.MSG_UME_REGISTRATION_COMPLETE_EX_FLAG_QUORUM) != 0) {
System.out.print("QUORUM ");
}
if ((regcomplete.flags() & LBM.MSG_UME_REGISTRATION_COMPLETE_EX_FLAG_RXREQMAX) != 0) {
System.out.print("RXREQMAX ");
}
System.out.println();
break;
case LBM.MSG_UME_REGISTRATION_CHANGE:
System.out.println("[" + msg.topicName() + "][" + msg.source()
+ "] UME registration change: " + msg.dataString());
break;
...
default:
System.err.println("Unknown lbm_msg_t type " + msg.type() + " ["
+ msg.topicName() + "][" + msg.source() + "]");
break;
}
return 0;
}

.NET API

public int onReceive(Object cbArg, LBMMessage msg)
{
case LBM.MSG_UME_REGISTRATION_ERROR:
System. Console.Out.WriteLine("[" + msg.topicName() + "]["
+ msg.source() + "] UME registration error: " + msg.dataString());
break;
case LBM.MSG_UME_REGISTRATION_SUCCESS_EX:
UMERegistrationSuccessInfo reg = msg.registrationSuccessInfo();
System.Console.Out.Write("[" + msg.topicName() + "][" + msg.source()
+ "] store " + reg.storeIndex() + ": "
+ reg.store() + " UME registration successful. SrcRegID "
+ reg.sourceRegistrationId() + " RcvRegID "
+ reg.receiverRegistrationId()
+ ". Flags " + reg.flags() + " ");
if ((reg.flags() & LBM.MSG_UME_REGISTRATION_SUCCESS_EX_FLAG_OLD) != 0) {
System.Console.Out.Write ("OLD[SQN " + reg.sequenceNumber() + "] ");
}
System.Console.Out.WriteLine();
break;
case LBM.MSG_UME_REGISTRATION_COMPLETE_EX:
UMERegistrationCompleteInfo regcomplete = msg.registrationCompleteInfo();
System.Console.Out.Write("[" + msg.topicName() + "][" + msg.source()
+ "] UME registration complete. SQN "
+ regcomplete.sequenceNumber()
+ ". Flags " + regcomplete.flags() + " ");
if ((regcomplete.flags() & LBM.MSG_UME_REGISTRATION_COMPLETE_EX_FLAG_QUORUM) != 0) {
System.Console.Out.Write("QUORUM ");
}
if ((regcomplete.flags() & LBM.MSG_UME_REGISTRATION_COMPLETE_EX_FLAG_RXREQMAX) != 0) {
System.Console.Out.Write("RXREQMAX ");
}
System.Console.Out.WriteLine();
break;
case LBM.MSG_UME_REGISTRATION_CHANGE:
System.Console.Out.WriteLine("[" + msg.topicName() + "][" + msg.source()
+ "] UME registration change: " + msg.dataString());
break;
...
default:
System.Console.Out.WriteLine("Unknown lbm_msg_t type " + msg.type()
+ " [" + msg.topicName() + "][" + msg.source() + "]");
break;
}
return 0;
}


Recovery Management  <-

Recovery management for failed and restarted receivers is fairly simple. UM requests any missed messages from the Store(s) and delivers them to the restarted receiver. However, your application can override that default behavior either by configuring a retransmit_request_maximum (receiver) value, or by configuring a ume_recovery_sequence_number_info_function (receiver) application callback, or both.

For example, let's say a source sends 7 messages with sequence numbers 0-6 which are stabilized at the Store. A C-based receiver, configured with retransmit_request_maximum (receiver) set to 2, and an application callback ume_recovery_sequence_number_info_function (receiver), consumes (and acknowledges) message 0, goes down, then restarts right after message 6.

During receiver registration, the lbm_ume_rcv_recovery_info_ex_func_t application callback is called with the following values in the passed-in structure lbm_ume_rcv_recovery_info_ex_func_info_t *info:

info->high_sequence_number == 6
info->low_rxreq_max_sequence_number == 4
info->low_sequence_number == 1

Where:

Normally, UM would start delivering messages at 1, but retransmit_request_maximum (receiver) is set to 2, which overrides UM's normal behavior. So in this example, the first message delivered will be number 4.

Finally, the application can, at run-time, further override the starting sequence number. The callback function can modify the contents of the passed-in structure lbm_ume_rcv_recovery_info_ex_func_info_t *info; specifically it can update the lbm_ume_rcv_recovery_info_ex_func_info_t::low_sequence_number field. When the callback returns, UM examines that field to see if it was modified by the callback. If so, UM overrides the effect of retransmit_request_maximum (receiver) and starts at the requested sequence number.

Notice that this design does not allow the callback to override the effect of retransmit_request_maximum (receiver) by setting the lbm_ume_rcv_recovery_info_ex_func_info_t::low_sequence_number field to its original value, 1 in this example. Upon return, UM will see the value unchanged, and will allow retransmit_request_maximum (receiver) to override the starting sequence number. This is only an issue if both retransmit_request_maximum (receiver) and ume_recovery_sequence_number_info_function (receiver) are used. If the application wants to use the sequence number remembered by the Store, it should not configure retransmit_request_maximum (receiver).


Duplicate Message Delivery  <-

In a distributed system, it is not possible to guarantee "once-and-only-once" delivery of messages in the face of unpredictable system or component failure. Regardless of the algorithms and handshaking, there is always the possibility of messages sent that are never received, as well as messages received and then received again if the receiving application fails and restarts.

UM's persistence design is based on the principle of being close to once-and-only-once, but when that is not possible, UM prefers to fail on the side of duplicate message delivery. Due to other design goals (low latency and high throughput), the possibility of receiving duplicate messages is significant after an application failure and restart.

It is therefore important for persistent applications to be designed to tolerate duplicate message reception, either by making message processing idempotent, or by including logic in the receiving application to detect duplicates and only process the messages which have not been previously processed.

To assist the application in implementing "de-duplication", all messages retransmitted to a receiver are marked as retransmissions via a flag in the message structure. Thus it is easy for an application to determine if a message is a new "live" message from the source, or a retransmission, which may or may not have been processed before the failure. The presence or absence of the retransmit flag gives the application a hint of how best to handle the message with regard to it being processed previously or not.

Informatica recommends that you always check the data or other message properties of messages with the retransmit flag set to be sure the message has not been already processed. Relying on UM sequence numbers is not a 100% reliable method for detecting duplicate messages.


Setting Callback Function to Set Recovery Sequence Number  <-

Whereas the UM persistence design attempts to choose the correct starting sequence number for a recovering receiver, there are cases where the application wishes to override UM's choice.

The sample code below demonstrates how to use the recovery sequence number info function to determine the stored message with which to restart a receiver. This example retrieves the low sequence number from the recovery sequence number structure and adds an offset to determine the beginning sequence number. The offset is a value completely under the control of your application. For example, if a receiver was down for a "long" period and you only want the receiver to receive the last 10 messages, use an offset to start the receiver with the 10th most recent message. If you wish not to receive any messages, set the lbm_ume_rcv_recovery_info_ex_func_info_t::low_sequence_number to the lbm_ume_rcv_recovery_info_ex_func_info_t::high_sequence_number plus one.

C API

cb.func = ume_rcv_seqnum_ex; /* declared below */
cb.clientd = NULL;
"ume_recovery_sequence_number_info_function",
&cb, sizeof(cb)) == LBM_FAILURE) {
fprintf(stderr,
"lbm_rcv_topic_attr_setopt:ume_recovery_sequence_number_info_function: %s\n",
exit(1);
}
printf("Will use seqnum info with low offset %u.\n", seqnum_offset);
...
int ume_rcv_seqnum_ex(lbm_ume_rcv_recovery_info_ex_func_info_t *info, void *clientd)
{
lbm_uint_t new_lo = info->low_sequence_number + seqnum_offset;
printf("[%s] SQNs Low %x (will set to %x), Low rxreqmax %x, High %x (CD %p)\n",
info->low_sequence_number = new_lo;
return 0;
}

JAVA API

UMERcvRecInfo umerecinfocb = new UMERcvRecInfo(seqnum_offset);
rcv_attr.setRecoverySequenceNumberCallback(umerecinfocb, null);
System.out.println("Will use seqnum info with low offset " + seqnum_offset);
class UMERcvRecInfo implements UMERecoverySequenceNumberCallback {
private long _seqnum_offset = 0;
public UMERcvRecInfo(long seqnum_offset) {
_seqnum_offset = seqnum_offset;
}
public int setRecoverySequenceNumberInfo(Object cbArg,
UMERecoverySequenceNumberCallbackInfo cbInfo)
{
long new_low = cbInfo.lowSequenceNumber() + _seqnum_offset;
System.out.println("SQNs Low " + cbInfo.lowSequenceNumber() + " (will set to "
+ new_low + "), Low rxreqmax " + cbInfo.lowRxReqMaxSequenceNumber()
+ ", High " + cbInfo.highSequenceNumber());
try {
cbInfo.setLowSequenceNumber(new_low);
}
catch (LBMEInvalException e) {
System.err.println(e.getMessage());
}
return 0;
}
}

.NET API

UMERcvRecInfo umerecinfocb = new UMERcvRecInfo(seqnum_offset);
rcv_attr.setRecoverySequenceNumberCallback(umerecinfocb, null);
System.Console.Out.WriteLine("Will use seqnum info with low offset " + seqnum_offset);
class UMERcvRecInfo implements UMERecoverySequenceNumberCallback {
private long _seqnum_offset = 0;
public UMERcvRecInfo(long seqnum_offset) {
_seqnum_offset = seqnum_offset;
}
public int setRecoverySequenceNumberInfo(Object cbArg,
UMERecoverySequenceNumberCallbackInfo cbInfo)
{
long new_low = cbInfo.lowSequenceNumber() + _seqnum_offset;
System.Console.Out.WriteLine ("SQNs Low " + cbInfo.lowSequenceNumber() + " (will set to "
+ new_low + "), Low rxreqmax " + cbInfo.lowRxReqMaxSequenceNumber()
+ ", High " + cbInfo.highSequenceNumber());
try {
cbInfo.setLowSequenceNumber(new_low);
}
catch (LBMEInvalException e) {
System.Console.Out.WriteLine (e.getMessage());
}
return 0;
}
}


Persistence Message Consumption  <-

When a persistent subscriber application has finished processing a received message, it must signal consumption of the message to UM. This is how UM keeps track of where message recovery should begin if recovery is needed (e.g. if the receiver restarts).

There are three basic methods to signaling message consumption that the application must choose between:

  • The subscriber's receiver callback returns to UM, allowing UM to delete the message. The message deletion signals consumption. This is generally the most simple case.
  • The subscriber retains the message past the point of the receiver callback returning. Then some other application function (typically running in a separate thread) finishes the processing and deletes the message. The message deletion signals consumption.
  • The subscriber calls a separate "explicit ACK" API which signals message consumption. In this case, the act of deleting the message does not signal consumption.

When the application uses one of those methods to signal consumption, it is informing the local instance of UM running within the application. As a separate step, UM must send a consumption acknowledgement (ACK) to the Persistent Store(s). However, the two steps are not necessarily directly linked.

There are two configurable acknowledgement methods that UM can follow when the application signals consumption:

  • UM batches ACKs based on a configurable time period. I.e. UM will delay sending ACKs to the Store so that multiple messages can be acknowledged at once, when its timer expires.
  • UM immediately sends an ACK to the Store.

Batching of ACKs greatly improves the throughput of a persistent receiver, and can also improve latency. It is the default configuration.

However, batching has the disadvantage that if message recovery is needed (e.g. if the subscriber restarts), it increases the chances that already processed messages will be re-sent during recovery ("duplicate messages"). Any messages that the application signaled consumed, but which UM has not yet acknowledged to the Store, can be re-sent during recovery.

Note that even with immediate ACK, there is still the possibility of one or more already-consumed messages being recovered; applications need to be able to handle this. See Duplicate Message Delivery for more information.

Use Cases

There are four common use cases which combine an application consumption method with a UM acknowledgement method:


Delete on Return, Batch ACKs  <-

The application receiver callback completes all processing of a message before returning. It allows UM to delete the message on return, which signals consumption.

To maximize efficiency, the application allows UM to batch ACKs to the Store.

Note that in this use case, messages are always signaled consumed in the same order in which they are received.

For this use case, use the configuration:

The subscriber code is written to signal message consumption on return from the receiver callback. This is handled differently between the C API vs. the Java and .NET APIs.

C API

The default behavior for a C application receiver callback is for the message to be deleted and when the receiver callback returns, which signals consumption the message. No special coding is needed for this use case.

Note: the receiver callback must not call lbm_msg_delete().

Java and .NET

With Java and .NET, the application receiver callback must explicitly call com::latencybusters::lbm::LBMMessage::dispose() prior to returning.

(Note that this is not strictly true for .NET. A .NET program can skip calling "dispose()" and allow the message to become garbage. This will introduce significant latency outliers when GC runs, and also makes acknowledgements to the Store non-deterministic. Finally, in the future, performance improvements for .NET will probably require the use of "dispose()". Informatica recommends that .NET programs call "dispose()" for every message.)


Retain on Return, Batch ACKs  <-

There are application designs where a received message cannot be fully processed inside the receiver application callback. For example, the message might need to be handed off to a worker thread for longer-term processing. Or the acknowledgement must be delayed until some asynchronous event happens, like a handshake with another application.

To maximize efficiency, applications allow UM to batch ACKs to the Store.

Note that some applications are written to process these handed-off messages in parallel, and messages might be signaled consumed in a different order than they arrived. In this use case, out-of-order consumption is supported. See ACK Ordering for more information.

For this use case, use the configuration:

Note that this is the same configuration as Delete on Return, Batch ACKs.

The subscriber code should be written to hand off the message to another part of the application and suppress deleting the message on return from the receiver callback. This is handled differently between the C API vs. the Java and .NET APIs.

C API

In C, the application's receiver callback function must call the lbm_msg_retain() API for the received message prior to handing off the message for processing. This suppresses the automatic deletion of the received message when the receiver callback returns, and allows the message buffer to be passed to some other part of the application for processing and deletion at a later time.

When the application subsequently completes all processing of the message, it signals consumption by calling lbm_msg_delete().

Java and .NET

In Java and .NET, the application should call the com::latencybusters::lbm::LBMMessage::promote() API prior to handing the message for a separate processing. This allows the message object to be passed to some other part of the application for processing and disposal at a later time.

When the application subsequently completes all processing of the message, it signals consumption by calling com::latencybusters::lbm::LBMMessage::dispose().


Explicit Acknowledgments  <-

The ACK batching feature did not exist in UM in versions prior to 5.0. In those early versions, the application had to use the Explicit ACK feature to benefit from the increased throughput allowed by batching of ACKs. Informatica will continue to support Explicit ACKs, even though we generally recommend the ACK batching use cases Delete on Return, Batch ACKs or Retain on Return, Batch ACKs.

With explicit ACKs, message consumption is separated from message deletion. The application uses a separate API to trigger an acknowledgement to the Store. Batching is achieved under application control by not ACKing every message. With explicit ACKs, the action of acknowledging a message to the Store implicitly acknowledges all previous unacknowledged messages.

Warning
Because explicit ACKs implicitly acknowledge all previous unacknowledged messages, it is not valid to acknowledge messages out of order. Otherwise the application risks not recovering needed messages. See ACK Ordering.

For this use case, use the configuration:

The subscriber should be written the same as Delete on Return, Batch ACKs (if processing is completed in the receiver callback) or Retain on Return, Batch ACKs (if the message is retained after the receiver callback returns). The same coding rules apply regarding message retention and disposal.

However, as an additional step, the application must call the explicit ACK API:

This is typically done inside of a timer callback, which provides batching behavior.


ACK Immediately on Delete  <-

With this use case, the application directs UM to send an acknowledgement to the Store for each and every processed message immediately after the message is deleted.

The advantage of this use case is that it minimizes the chances that already processed messages will be re-sent during recovery (i.e. "duplicate messages"). However, there is still the possibility of one or more duplicate messages during recovery, so applications need to be able to handle this. See Duplicate Message Delivery for more information.

A disadvantage of this use case is that the maximum sustainable throughput (message rate) is limited by the per-message overhead of sending acknowledgements.

Another disadvantage is that messages must be deleted in the same order as they were received, even if the messages are handed off to another thread for processing. See ACK Ordering.

Informatica's general recommendation is that since the application needs to handle duplicate messages even with immediate acknowledgement, the user should implement a use case that includes ACK batching.

For this use case, use the configuration:

The subscriber should be written the same as Delete on Return, Batch ACKs (if processing is completed in the receiver callback) or Retain on Return, Batch ACKs (if the message is retained after the receiver callback returns). The same coding rules apply regarding message retention and disposal.


ACK Ordering  <-

When UM is allowed to batch ACKs to the Store using ume_use_ack_batching (receiver), the UM client library supports "out of order" signaling that the application is done processing messages. For example, the application might retain received messages 1, 2, and 3 for asynchronous processing, and then delete them in the order 2, 1, 3.

However, be aware that the Persistent Store does not support "out of order" acknowledgement of messages. If the Store receives an acknowledgement of sequence number N, that implicitly acknowledges all sequence numbers less than N.

The UM client library handles this by withholding acknowledgement to the Store all messages for which an earlier message is not signaled complete. For example, if the application retains messages 1, 2, and 3, and subsequently deletes message 2, UM will not send an ACK to the Store for message 2. If the application then deletes message 1, UM is now able to send an ACK for message 2, which will implicitly acknowledge message 1 as well.

However, this ability to process messages out of order is dependent on using ACK batching (i.e. ume_use_ack_batching (receiver) is 1). If ACK batching is disabled, then the application is responsible for ordering acknowledgements to the Store. This is true for both the Explicit Acknowledgments and ACK Immediately on Delete use cases.

For example, suppose that the ACK Immediately on Delete use case is being used. If the application processes and deletes messages in the order 2, 1, 3, the ACK for message 2 will implicitly ACK message 1 to the Store. If the application crashes after ACKing 2 and restarts, recovery will start with message 3. Thus, message 1 is never fully processed.

If ACK batching is disabled, the messages must be deleted in order: 1, 2, 3.


Object-free Explicit Acknowledgments  <-

When using explicit ACKs, you can extract ACK information from messages. This allows the received message buffer to be deleted when the receiver callback is done, while still allowing the application to save the ACK structure for persistent acknowledgement to the Store at a future time. This can improve receiver performance when used with the Receive Buffer Recycling feature to reduce the per-message use of dynamic memory (malloc/free) with a persistent receiver. Extracting ACKs can also additionally improve performance of Java and .NET applications by allowing the use of Zero Object Delivery.

The following source code examples show how to extract ACK information and send an explicit ACK.

C API

int rcv_handle_msg(lbm_rcv_t *rcv, lbm_msg_t *msg, void *clientd)
{
lbm_ume_rcv_ack_t *ack = NULL;
...
defer_ack(ack); /* Pass the "ack" to another thread or work queue. */
...
return 0;
}
int worker()
{
lbm_ume_rcv_ack_t *ack = NULL;
...
ack = get_deferred_ack(); /* Get "ack" that was saved above. */
/* Some applications improve throughput by not ACKing every message. */
if (ack_this_message) {
lbm_ume_ack_send_explicit_ack(ack, msg->sequence_number);
}
lbm_ume_ack_delete(ack); /* Extracted ack *must* be deleted. */
...
}

JAVA API or .NET API

public int onReceive(Object cbArg, LBMMessage msg)
{
UMEMessageAck ack;
...
ack = msg.extractUMEAck();
defer_ack(ack); /* Pass the "ack" to another thread or work queue. */
...
return 0;
}
int worker()
{
UMEMessageAck ack;
ack = get_deferred_ack(); /* Get "ack" that was saved above. */
/* Some applications improve throughput by not ACKing every message. */
if (ack_this_message) {
ack.sendExplicitAck(msg.sequenceNumber());
}
ack.dispose(); /* Extracted ack *must* be deleted. */
}