7. Designing Persistence Applications

This section discusses considerations and methods for utilizing UMP persistence in your applications.

7.1. Pieces of a Persistence Solution

In UMP , a persistent system is composed of sources, receivers, and stores managed by one or more applications. Sources and receivers are the endpoints of communication and the store(s) provide fault recovery and persistence of state information. Your application can leverage UMP 's flexible methods of persistence to add an unprecedented level of fault tolerance. With this flexibility your applications assume new responsibilities not normally required in other persistent messaging systems. This section identifies the important considerations for your messaging applications when implementing the following UMP features.

7.1.1. Registration Identifiers

As mentioned in Registration Identifier and Adding Fault Recovery with Registration IDs, stores use RegIDs to identify sources and receivers. UMP offers three main methods for managing RegIDs.

  • Your applications assign static RegIDs and ensure that the same RegID is not assigned to multiple sources and/or receivers. See Use Static RegIDs.

  • You can allow UMP stores to assign RegIDs and then save the assigned RegIDs. See Save Assigned RegIDs

  • Use Session IDs to enable the UMP store to both assign and manage RegIDs. See Managing RegIDs with Session IDs

Your applications can manage RegIDs for the lifetime of a source or receiver as long as multiple applications do not reuse RegIDs simultaneously on the same store. RegIDs only need to be unique on the same store and may be reused between stores as desired. You can use a static mapping of RegIDs to applications or use some simple service to assign them.

7.1.1.1. Use Static RegIDs

The simplest method uses static RegIDs for individual applications. This method works best if:

  • Applications use separate stores

  • Multiple instances of an application also use separate stores

In the latter case, the same static source RegID can be used in every instance of the application because receivers will identify every Store/Source RegID tuple as unique.

The following source code examples assign a static RegID to a source by adding the RegID, 1000, to the ume_store attribute. (See also ume-example-src-2.c.)

C API

lbm_src_topic_attr_t * sattr;

if (lbm_src_topic_attr_create(&sattr) == LBM_FAILURE) {
        fprintf(stderr, "lbm_src_topic_attr_create: %s\n", lbm_errmsg());
        exit(1);
}
if (lbm_src_topic_attr_str_setopt(sattr, "ume_store", "127.0.0.1:14567:1000")
== LBM_FAILURE) {
        fprintf(stderr, "lbm_src_topic_attr_str_setopt: %s\n", lbm_errmsg());
        exit(1);
}
       

JAVA API

LBMSourceAttributes sattr = null;
try {
sattr = new LBMSourceAttributes();
sattr.setValue("ume_store", "127.0.0.1:14567:1000");
}
catch (LBMException ex) {
System.err.println("Error creating source attribute: " + ex.toString());
System.exit(1);
}
       

.NET API

LBMSourceAttributes sattr = null;
try {
sattr = new LBMSourceAttributes();
sattr.setValue("ume_store", "127.0.0.1:14567:1000");
}
catch (LBMException ex) {
System.Console.Error.WriteLine ("Error creating source attribute: " + ex.toString());
System.Environment.Exit(1);
       }
       

7.1.1.2. Save Assigned RegIDs

Your application can save the RegID assigned to a source or receiver from the store because the UMP API informs your application of the RegID used for each registration. This method of managing RegIDs is perhaps the most flexible, but also requires some work by the application to save RegIDs and retrieve them in some way.

The following source code examples save the RegID assigned to a source to a file. (See also ume-example-src-3.c.)

C API

typedef struct src_info_t_stct {
    int existing_regid;       
    int message_num;          
} src_info_t;

#define SRC_REGID_SAVE_FILENAME "UME-example-src-RegID"

int save_src_regid_to_file(const char *filename, lbm_src_event_ume_registration_ex_t *reg)
{
    FILE *fp;           
    
    if ((fp = fopen(filename, "w")) == NULL)
        return -1;
    fprintf(fp, "%s:%u", reg->store, reg->registration_id);
    printf("saving RegID info to \"%s\" - %s:%u\n", filename, reg->store, reg->registration_id);
    fflush(fp);
    fclose(fp);
    return 0;
}
       

7.1.1.3. Managing RegIDs with Session IDs

The RegIDs used by stores to identify sources and receivers must be unique. Rather than maintaining RegIDs (either statically or dynamically), applications can use a Session ID, which is simply a 64-bit value that uniquely identifies any set of sources with unique topics and receivers with unique topics. A single Session ID allows UMP stores to correctly identify all the sources and receivers for a particular application.

Combinations of sources and receivers that make up a single valid session include the following.

  • Sources for topics A, B, and C

  • Receivers for topics A, B, and C

  • Sources for topics A, B, and C, and receivers for topics X, Y and Z

  • Sources for topics A, B, and C, and receivers for topics A, B, and C

Note: Note that any topic can be used for a source and a receiver at the same time, but not for more than one of each. Two sources using topic A, for example, would need to be split into two different contexts.

The UMP configuration option, ume_session_id , specifies a Session ID for a source, receiver or a context. If you want all sources and receivers for a particular context to use the same Session ID, use (context) ume_session_id . Any source or receiver that does not specify its own Session ID inherits the context's session ID. If a source or receiver specifies its own Session ID, it overrides the context Session ID for that individual source or receiver.

Of the two mutually exclusive methods for managing RegIDs, ...

  1. Enable your application to assign and manage every RegID, ensuring no two objects registered with an individual store share the same RegID.

  2. Allow the store to assign every RegID and enable your application to persist the RegIDs.

... using Session IDs simplifies the second management method. Since you cannot combine these two strategies at any single store, you also cannot combine the first method with the use of Session IDs at a single store.

7.1.1.3.1. How Stores Associate Session IDs and RegIDs

Session IDs do not replace the use of RegIDs by UMP but rather simplify RegID management. Using Session IDs equates to your application specifying a 0 (zero) RegID for all sources and receivers. However, instead of your application persisting the RegID assigned by the store, the store maintains the RegID for you.

When a store receives a registration request from a source or receiver with a particular Session ID, it checks to see if it already has a source or receiver for that topic/Session ID. If it does, then it responds with that source's or receiver's RegID.

If it does not find a source or receiver for that topic/Session ID pair, the store ...

  1. Assigns a new RegID.

  2. Associates the topic/Session ID with the new RegID.

  3. Responds to the source or receiver with the new RegID.

The source can then advertise with the RegID supplied by the store. Receivers include the source's RegID in their registration request.

7.1.2. UMP Sources

The major concerns of sources revolve around RegID management and message retention. This section discusses the following topics.

7.1.2.1. New or Re-Registration

Any source needs to know at start-up if it is a new registration or a re-registration. The answer determines how a source registers with the store. UMP can not answer this question. Therefore, it is essential that the developer consider what identifies the lifetime of a source and how a source determines the appropriate value to use as the RegID when it is ready to register. RegIDs are per source per topic per store, thus a single RegID per store is needed.

The following source code examples look for an existing RegID from a file and uses a new RegID assigned from the store if it finds no existing RegID. (See also ume-example-src-3.c.)

C API

    err = lbm_context_create(&ctx, NULL, NULL, NULL);
    if (err) {printf("line %d: %s\n", __LINE__, lbm_errmsg()); exit(1);}

    srcinfo.message_num = 1;
    srcinfo.existing_regid = 0;
    
    err = read_src_regid_from_file(SRC_REGID_SAVE_FILENAME, store_info, sizeof(store_info));
    if (!err) { srcinfo.existing_regid = 1; }

        err = lbm_src_topic_attr_create(&attr);
        if (err) {printf("line %d: %s\n", __LINE__, lbm_errmsg()); exit(1);}

        err = lbm_src_topic_attr_str_setopt(attr, "ume_store", store_info);
        if (err) {printf("line %d: %s\n", __LINE__, lbm_errmsg()); exit(1);}
       

The use of Session IDs allows UMP , as opposed to your application, to accomplish the same RegID management. See Managing RegIDs with Session IDs.

7.1.2.2. Sources Must Be Able to Resume Sending

A source sends messages unless UMP prevents it, in which case, the send function returns an error. A source may lose the ability to send messages temporarily if the store(s) in use become unresponsive, e.g. the store(s) die or become disconnected from the source. Once the store(s) are responsive again, sending can continue. Thus source applications need to take into account that sending may fail temporarily under specific failure cases and be able to resume sending when the failure is removed.

The following source code examples demonstrate how a failed send function can sleep for a second and try again.

C API

   while (lbm_src_send(src, message, len, 0) == LBM_FAILURE) {
        If (lbm_errnum() == LBM_EUMENOREG) {
            printf("Send unsuccessful. Waiting...\n");
            sleep(1);
            continue;
        }
        fprintf(stderr, "lbm_src_send: %s\n", lbm_errmsg());
                        exit(1);
                }
       

JAVA API

   for (;;) {
        try {
            src.send(message, len, 0);
        }
        catch (UMENoRegException ex) {
            System.out.println("Send unsuccessful. Waiting...");
            try {
                Thread.sleep(1000);
            }
            catch (InterruptedException e) { }
            continue;
        }
        catch (LBMException ex) {
            System.err.println("Error sending message: " + ex.toString());
System.exit(1);
        }
        break;
    }
       

.NET API

   for (;;) {
        try {
            src.send(message, len, 0);
        }
        catch (UMENoRegException ex) {
            System.Console.Out.WriteLine("Send unsuccessful. Waiting...");
            System.Threading.Thread.Sleep(1000);
            continue;
        }
        catch (LBMException ex) {
            System.Console.Out.WriteLine ("Error sending message: " + ex.toString());
System.exit(1);
        }
        break;
    }

       

7.1.2.3. Source Message Retention and Release

UMP allows streaming of messages from a source without regard to message stability at a store, which is one reason for UMP's performance advantage over other persistent messaging systems. Sources retain all messages until notified by the active store(s) that they are stable. This provides a method for stores to be brought up to date when restarted or started anew.

Note: Source message retention is separate from the persistence of messages in the store.

When messages are considered stable at the store, the source can release them which frees up source retention memory for new messages. Generally, the source releases older stable messages first. To release the oldest retained message, all the following conditions must be met:

  • message must meet stability requirements of the source, which can range from a single stability notice from the active store to stability notices from a group of stores (See Sources Using Quorum/Consensus Store Configuration)

    and

  • message must have been confirmed as delivered by a configured number of receivers (ume_retention_unique_confirmations),

    and

  • the aggregate amount of buffered messages exceeds retransmit_retention_size_threshold bytes in payload and headers.

Some things to note:

  • If the retransmit_retention_size_threshold is not met, no messages will be released regardless of stability.

  • If the source registered with a "no-cache" store (See UMP Stores) or ume_message_stability_notification is turned off, ume_retention_unique_confirmations is the only way to allow the source to release messages before retention size options come into play.

  • If the aggregate amount of buffered messages exceeds retransmit_retention_size_limit bytes in payload and headers, then the oldest retained message is forcibly released even if it does not meet one or more of the conditions above. This condition should be avoided and suggests increasing the retransmit_retention_size_limit or lowering the retransmit_retention_size_threshold.

7.1.2.4. Source Release Policy Options

sources use a set of configuration options to release messages that, in effect, specify the source's release policy. The following configuration options directly impact when the source may release retained messages.

7.1.2.5. Confirmed Delivery

As mentioned earlier, ume_retention_unique_confirmations requires a message to have a minimum number of unique confirmations from different receivers before the message may be released. This retains messages that have not been confirmed as being received and processed and keeps them available to fulfill any retransmission requests.

The following code samples show how to require a message to have 10 unique receiver confirmations

C API

lbm_src_topic_attr_t * sattr;

if (lbm_src_topic_attr_create(&sattr) == LBM_FAILURE) {
        fprintf(stderr, "lbm_src_topic_attr_create: %s\n", lbm_errmsg());
        exit(1);
}
if (lbm_src_topic_attr_str_setopt(sattr, "ume_retention_unique_confirmations",
"10") 
== LBM_FAILURE) {
        fprintf(stderr, "lbm_src_topic_attr_str_setopt: %s\n", lbm_errmsg());
        exit(1);
}
       

JAVA API

LBMSourceAttributes sattr = null;
try {
sattr = new LBMSourceAttributes();
sattr.setValue("ume_retention_unique_confirmations", "10");
}
catch (LBMException ex) {
System.err.println("Error creating source attribute: " + ex.toString());
System.exit(1);
}
       

.NET API

LBMSourceAttributes sattr = null;
try {
sattr = new LBMSourceAttributes();
sattr.setValue("ume_retention_unique_confirmations", "10");
}
catch (LBMException ex) {
System.Console.Error.WriteLine ("Error creating source attribute: " + ex.toString());
System.Environment.Exit(1);
       }
       

7.1.2.6. Sources Using Round-Robin Store Configuration

The source retains messages until they are considered stable at the active store(s). For Round-Robin store behavior, this means the current active store notifies the source that it has stabilized the message via a message stability notification. The following configuration file statements implement Round-Robin behavior among 3 stores.

source ume_store 10.29.3.77:15313:150000:0
source ume_store 10.29.3.76:16313:160000:0 
source ume_store 10.29.3.75:17313:170000:0
source ume_message_stability_notification 1
source ume_store_behavior rr 
       

See also Round-Robin Store Usage

7.1.2.7. Sources Using Quorum/Consensus Store Configuration

In the case of Quorum/Consensus store behavior, a message is considered stable after it has been successfully stored within a group of stores or among groups of stores according to the two settings, intergroup behavior and intragroup behavior, described below.

  • The intragroup behavior specifies the requirements needed to stabilize a message among the stores within a group. A message is stable for the group once it is successfully stored at a quorum (majority) of the group's stores or successfully stored in all the stores in the group.

  • The intergroup behavior specifies the requirements needed to stabilize a message among groups of stores. A message is stable among the groups if it is successfully stored at any group, a majority of groups, or all groups.

Notice that a message needs to meet intragroup stability requirements before it can meet intergroup stability requirements. These options provide a number of possibilities for retention of messages for the source.

The following configuration file statements implement a 3-group Quorum/Consensus configuration with each group on a different machine, in which a message is considered stable when it has been successfully stored at a quorum of stores in at least one group. (See Quorum/Consensus - Single Location Groups for more information about this configuration.)

source ume_store 10.29.3.77:10313:101000:0
source ume_store 10.29.3.77:11313:110000:0
source ume_store 10.29.3.77:12313:120000:0
source ume_store 10.29.3.77:13313:130000:0
source ume_store 10.29.3.77:14313:140000:0
source ume_store 10.29.3.78:15313:150000:1
source ume_store 10.29.3.78:16313:160000:1
source ume_store 10.29.3.78:17313:170000:1
source ume_store 10.29.3.79:18313:180000:2
source ume_store 10.29.3.79:19313:190000:2
source ume_store 10.29.3.79:29313:290000:2
source ume_store 10.29.3.79:39313:390000:2
source ume_store 10.29.3.79:49313:490000:2

source ume_message_stability_notification 1
source ume_store_behavior qc

source ume_store_group 0:5
source ume_store_group 1:3
source ume_store_group 2:5

source ume_retention_intragroup_stability_behavior quorum
source ume_retention_intergroup_stability_behavior any
       

See also Quorum/Consensus Store Usage and Quorum/Consensus - Mixed Location Groups.

7.1.2.8. Source Event Handler

The Source Event Handler is a function callback initialized at source creation to provide source events to your application related to the operation of the source. The following source code examples illustrate the use of a source event handler for registration events. To accept other source events, additional case statements would be required, one for each additional source event. See also UMP and UMQ Events.

C API

int handle_src_event(lbm_src_t *src, int event, void *ed, void *cd)
{
    switch (event) {
    case LBM_SRC_EVENT_UME_REGISTRATION_ERROR:
{
const char *errstr = (const char *)ed;
        printf("Error registering source with UME store: %s\n", errstr);
}
break;
        case LBM_SRC_EVENT_UME_REGISTRATION_SUCCESS_EX:
            {
                lbm_src_event_ume_registration_ex_t *reg = 
                        (lbm_src_event_ume_registration_ex_t *)ed;

                    printf("UME store %u: %s registration success. RegID %u. Flags %x ", 
                                reg->store_index, reg->store, reg->registration_id, 
                                reg->flags);
                    if (reg->flags & LBM_SRC_EVENT_UME_REGISTRATION_SUCCESS_EX_FLAG_OLD)
                        printf("OLD[SQN %x] ", reg->sequence_number);
                    if (reg->flags & LBM_SRC_EVENT_UME_REGISTRATION_SUCCESS_EX_FLAG_NOACKS)
                        printf("NOACKS ");
                    printf("\n");
            }
         break;
        case LBM_SRC_EVENT_UME_REGISTRATION_COMPLETE_EX:
            {
                    lbm_src_event_ume_registration_complete_ex_t *reg;

                        reg  = (lbm_src_event_ume__complete_ex_t *)ed;
                    printf("UME registration complete. SQN %x. Flags %x ", reg->sequence_number, 
                        reg->flags);
                    if (reg->flags & LBM_SRC_EVENT_UME_REGISTRATION_COMPLETE_EX_FLAG_QUORUM)
                     printf("QUORUM ");
                    printf("\n");
            }
         break;
        case LBM_SRC_EVENT_UME_STORE_UNRESPONSIVE:
        {
            const char *infostr = (const char *)ed;
            printf("UME store: %s\n", infostr);
        }
        break;
    default:
            printf("Unknown source event %d\n", event);
            break;
    }
   return 0;
}
       

JAVA API

public int onSourceEvent(Object arg, LBMSourceEvent sourceEvent)
{
    switch (sourceEvent.type()) {
    case LBM.SRC_EVENT_UME_REGISTRATION_ERROR:
System.out.println("Error registering source with UME store: "
+ sourceEvent.dataString());
break;
    case LBM.SRC_EVENT_UME_REGISTRATION_SUCCESS_EX:
 UMESourceEventRegistrationSuccessInfo reg = sourceEvent.registrationSuccessInfo();
 System.out.print("UME store " + reg.storeIndex() + ": " + reg.store()
+ " registration success. RegID " + reg.registrationId() + ". Flags "
+ reg.flags() + " ");
                if (((reg.flags() & LBM.SRC_EVENT_UME_REGISTRATION_SUCCESS_EX_FLAG_OLD)) 
                != 0) {
 System.out.print("OLD[SQN " + reg.sequenceNumber() + "] ");
                }
                if (((reg.flags() & LBM.SRC_EVENT_UME_REGISTRATION_SUCCESS_EX_FLAG_NOACKS)) 
                != 0) {
System.out.print("NOACKS ");
                }
                System.out.println();
                break;
    case LBM.SRC_EVENT_UME_REGISTRATION_COMPLETE_EX:
                UMESourceEventRegistrationCompleteInfo regcomp = 
                sourceEvent.registrationCompleteInfo();
                System.out.print("UME registration complete. SQN " + regcomp.sequenceNumber()
+ ". Flags " + regcomp.flags() + " ");
                if ((regcomp.flags() & 
                LBM.SRC_EVENT_UME_REGISTRATION_COMPLETE_EX_FLAG_QUORUM) != 0) {
System.out.print("QUORUM ");
                }
                System.out.println();
                break;
    case LBM.SRC_EVENT_UME_STORE_UNRESPONSIVE:
                System.out.println("UME store: "
                    + sourceEvent.dataString());
                break;
    ...
    default:
                System.out.println("Unknown source event "
+ sourceEvent.type());
                break;
        }
        return 0;
}
       

.NET API

public int onSourceEvent(Object arg, LBMSourceEvent sourceEvent)
{
    switch (sourceEvent.type()) {
    case LBM.SRC_EVENT_UME_REGISTRATION_ERROR:
System.Console.Out.WriteLine("Error registering source with UME store: "
+ sourceEvent.dataString());
break;
    case LBM.SRC_EVENT_UME_REGISTRATION_SUCCESS_EX:
 UMESourceEventRegistrationSuccessInfo reg = sourceEvent.registrationSuccessInfo();
 System.Console.Out.Write("UME store " + reg.storeIndex() + ": " + reg.store()
+ " registration success. RegID " + reg.registrationId() + ". Flags "
+ reg.flags() + " ");
                if (((reg.flags() & LBM.SRC_EVENT_UME_REGISTRATION_SUCCESS_EX_FLAG_OLD)) 
                != 0) {
 System.Console.Out.Write("OLD[SQN " + reg.sequenceNumber() + "] ");
                }
                if (((reg.flags() & LBM.SRC_EVENT_UME_REGISTRATION_SUCCESS_EX_FLAG_NOACKS)) 
                != 0) {
System.Console.Out.Write("NOACKS ");
                }
                System.Console.Out.WriteLine();
                break;
    case LBM.SRC_EVENT_UME_REGISTRATION_COMPLETE_EX:
                UMESourceEventRegistrationCompleteInfo regcomp = 
                sourceEvent.registrationCompleteInfo();
                System.Console.Out.Write("UME registration complete. SQN " + 
                regcomp.sequenceNumber()
+ ". Flags " + regcomp.flags() + " ");
                if ((regcomp.flags() & 
                LBM.SRC_EVENT_UME_REGISTRATION_COMPLETE_EX_FLAG_QUORUM) != 0) {
System.Console.Out.Write("QUORUM ");
                }
                System.Console.Out.WriteLine();
                break;
    case LBM.SRC_EVENT_UME_STORE_UNRESPONSIVE:
                System.Console.Out.WriteLine("UME store: "
                          + sourceEvent.dataString());
                break;
    ...
    default:
                System.Console.Out.WriteLine("Unknown source event "
+ sourceEvent.type());
                break;
        }
        return 0;
}
       

7.1.2.9. Source Event Handler - Stability, Confirmation and Release

As shown in Section 7.1.2.8 above, the Source Event Handler can be expanded to handle more source events by adding additional case statements. The following source code examples show case statements to handle message stability events, delivery confirmation events and message release (reclaim) events. See also UMP and UMQ Events.

C API

case LBM_SRC_EVENT_UME_MESSAGE_STABLE_EX:  
/* requires that source ume_message_stability_notification attribute is enabled */
        {
            lbm_src_event_ume_ack_ex_info_t *info = (lbm_src_event_ume_ack_ex_info_t *)ed;
    
            printf("UME store %u: %s message stable. SQN %x (msgno %d). Flags %x ", 
            info->store_index, info->store,
                    info->sequence_number, (int)info->msg_clientd - 1, info->flags);
            if (info->flags & LBM_SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_INTRAGROUP_STABLE)
                    printf("IA "); /* Stable within store group */
            if (info->flags & LBM_SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_INTERGROUP_STABLE)
                    printf("IR "); /* Stable amongst all stores */
            if (info->flags & LBM_SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_STABLE)
                    printf("STABLE ");  /* Just plain stable */
            if (info->flags & LBM_SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_STORE)
                    printf("STORE ");   /* Stability reported by UME Store */
            printf("\n");   
        }
        break;

case LBM_SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX:   
/* requires that source ume_confirmed_delivery_notification attribute is enabled */
        {
            lbm_src_event_ume_ack_ex_info_t *info = (lbm_src_event_ume_ack_ex_info_t *)ed;

                printf("UME delivery confirmation. SQN %x, Receiver RegID %u (msgno %d). Flags %x ",
                       info->sequence_number, info->rcv_registration_id, 
                       (int)info->msg_clientd - 1, info->flags);
                if (info->flags & LBM_SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_UNIQUEACKS)
                    printf("UNIQUEACKS ");   
                    /* Satisfied number of unique ACKs requirement */
                if (info->flags & LBM_SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_UREGID)
                    printf("UREGID ");       
                    /* Confirmation contains receiver application registration ID */
                if (info->flags & LBM_SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_OOD)
                    printf("OOD ");          
                    /* Confirmation received from arrival order receiver */
                if (info->flags & LBM_SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_EXACK)
                    printf("EXACK ");        
                    /* Confirmation explicitly sent by receiver */
                printf("\n");
        }
        break;
    
case LBM_SRC_EVENT_UME_MESSAGE_RECLAIMED:  
/* requires that source ume_confirmed_delivery_notification or ume_message_stability_notification 
attributes are enabled */
        {
                lbm_src_event_ume_ack_info_t *ackinfo = (lbm_src_event_ume_ack_info_t *)ed;

                printf("UME message released - sequence number %x (msgno %d)\n",
                       ackinfo->sequence_number, (int)ackinfo->msg_clientd - 1);
        }
        break;

       

JAVA API

case LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX: 
// requires that source ume_message_stability_notification attribute is enabled
        UMESourceEventAckInfo staInfo = sourceEvent.ackInfo();
        System.out.print("UME store " + staInfo.storeIndex() + ": "
                    + staInfo.store() + " message stable. SQN " + staInfo.sequenceNumber()
                    + " (msgno " + staInfo.clientObject() + "). Flags " 
                    + staInfo.flags() + " ");
        if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_INTRAGROUP_STABLE) 
        != 0) {
                System.out.print("IA "); // Stable within store group
        }
        if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_INTERGROUP_STABLE) 
        != 0) {
                System.out.print("IR ");  // Stable amongst all stores
        }
        if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_STABLE) != 0) {
                System.out.print("STABLE ");  // Just plain stable
        }
        if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_STORE) != 0) {
                System.out.print("STORE ");   // Stability reported by UME Store
        }
        System.out.println();
        break;

case LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX:  
// requires that source ume_confirmed_delivery_notification attribute is enabled
        UMESourceEventAckInfo cdelvinfo = sourceEvent.ackInfo();
        System.out.print("UME delivery confirmation. SQN " + cdelvinfo.sequenceNumber()
                    + ", RcvRegID " + cdelvinfo.receiverRegistrationId() + " (msgno "
                    + cdelvinfo.clientObject() + "). Flags " + cdelvinfo.flags() + " ");
        if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_UNIQUEACKS) 
        != 0) {
                System.out.print("UNIQUEACKS "); // Satisfied number of unique ACKs requirement
        }
        if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_UREGID) 
        != 0) {
                System.out.print("UREGID ");    // Confirmation contains receiver application 
                registration ID
        }
        if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_OOD) 
        != 0) {
                System.out.print("OOD ");      // Confirmation received from arrival order 
                receiver
        }
        if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_EXACK) 
        != 0) {
                System.out.print("EXACK ");    // Confirmation explicitly sent by receiver
        }
        System.out.println();
        break;

case LBM.SRC_EVENT_UME_MESSAGE_RECLAIMED:  
// requires that source ume_confirmed_delivery_notification or 
// ume_message_stability_notification attributes are enabled
        System.out.println("UME message released - sequence number "
                + Long.toHexString(sourceEvent.sequenceNumber())
                + " (msgno "
                + Long.toHexString(((Integer)sourceEvent.clientObject()).longValue())
                + ")");
        break;
       

.NET API

case LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX: 
// requires that source ume_message_stability_notification attribute is enabled
            UMESourceEventAckInfo staInfo = sourceEvent.ackInfo();
            System.Console.Out.Write("UME store " + staInfo.storeIndex() + ": "
                        + staInfo.store() + " message stable. SQN " + staInfo.sequenceNumber()
                        + " (msgno " + ((int)staInfo.clientObject()).ToString("x") + "). 
                        Flags " + staInfo.flags() + " ");
            if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_INTRAGROUP_STABLE) 
            != 0)
            {
                    System.Console.Out.Write("IA ");  // Stable within store group
            }
            if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_INTERGROUP_STABLE) 
            != 0) 
            {
                    System.Console.Out.Write("IR ");  // Stable amongst all stores
            }
            if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_STABLE) != 0)
            {
                    System.Console.Out.Write("STABLE ");  // Just plain stable
            }
            if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_STORE) != 0)
            {
                    System.Console.Out.Write("STORE ");  // Stability reported by UME Store
            }
            System.Console.Out.WriteLine();
            break;

case LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX:  
// requires that source ume_confirmed_delivery_notification attribute is enabled

            UMESourceEventAckInfo cdelvinfo = sourceEvent.ackInfo();
            
            System.Console.Out.Write("UME delivery confirmation. SQN " + 
            cdelvinfo.sequenceNumber()
                        + ", RcvRegID " + cdelvinfo.receiverRegistrationId() + " (msgno "
                        + ((int)cdelvinfo.clientObject()).ToString("x") + "). Flags " + 
                        cdelvinfo.flags() + " ");
            if ((cdelvinfo.flags() & 
            LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_UNIQUEACKS) != 0)
            {
                    System.Console.Out.Write("UNIQUEACKS ");  // Satisfied number of unique 
                    ACKs requirement
            }
            if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_UREGID) 
            != 0)
            {
                    System.Console.Out.Write("UREGID ");  // Confirmation contains receiver 
                    application registration ID
            }
            if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_OOD) 
            != 0)
            {
                    System.Console.Out.Write("OOD ");  // Confirmation received from arrival 
                    order receiver
            }
            if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_EXACK) 
            != 0)
            {
                    System.Console.Out.Write("EXACK ");  // Confirmation explicitly sent by 
                    receiver
            }
            System.Console.Out.WriteLine();                    
            break;

case LBM.SRC_EVENT_UME_MESSAGE_RECLAIMED: 
// requires that source ume_confirmed_delivery_notification or 
// ume_message_stability_notification attributes are enabled
            
            System.Console.Out.WriteLine("UME message released - sequence number "
                               + sourceEvent.sequenceNumber().ToString("x")
                               + " (msgno "
                               + ((int)sourceEvent.clientObject()).ToString("x")
                               + ")");
            break;

       

7.1.2.10. Mapping Your Message Numbers to UMS/UMP Sequence Numbers

lbm_src_sendv_ex() allows you to create a pointer to an object or structure. This pointer will be returned to your application along with all source events. You can then update the object or structure with source event information. For example, if your messages exceed 8K - which requires fragmentation your application's message into more than one UM message - receiving sequence number events with this pointer allows you to determine all the UM sequence numbers for the message and, therefore, how many release (reclaim) events to expect. The following two source code examples show how to:

  • Enable message sequence number information

  • Handle sequence number source events to determine the application message number in the Source Event Handler

C API - Enable Message Information

lbm_src_send_ex_info_t exinfo;

/* Enable message sequence number info to be returned */
exinfo.flags = LBM_SRC_SEND_EX_FLAG_UME_CLIENTD | LBM_SRC_SEND_EX_FLAG_SEQUENCE_NUMBER_INFO;
exinfo.ume_msg_clientd = (void *)(msgno + 1); 
/* msgno set to application message number (can't evaluate to NULL) */
while (lbm_src_send_ex(src, message, msglen, 0, &exinfo) == LBM_FAILURE)
{
    if (lbm_errnum() == LBM_EUMENOREG)
        {
        printf("Send unsuccessful. Waiting...\n");
                SLEEP_MSEC(1000);   /* Sleep for 1 second */
        }
    else
    {
        fprintf(stderr, "lbm_src_send: %s\n", lbm_errmsg());
        break;
    }
}
       

C API - Sequence Number Event Handler

int handle_src_event(lbm_src_t *src, int event, void *ed, void *cd)
{
      switch (event) {
      case LBM_SRC_EVENT_SEQUENCE_NUMBER_INFO:
            {
                  lbm_src_event_sequence_number_info_t *info = 
                  (lbm_src_event_sequence_number_info_t *)ed;

                  if (info->first_sequence_number != info->last_sequence_number) {
                        printf("SQN [%x,%x] (msgno %d)\n", info->first_sequence_number, 
                        info->last_sequence_number, (int)info->msg_clientd - 1);
                  } else {
                        printf("SQN %x (msgno %d)\n", info->last_sequence_number, 
                        (int)info->msg_clientd - 1);
                }
            }
            break;
      ...
      }
      return 0;
}
       

JAVA API - Enable Message Information

LBMSourceSendExInfo exinfo = new LBMSourceSendExInfo();
exinfo.setClientObject(new Integer(msgno));  // msgno set to application message number
exinfo.setFlags(LBM.SRC_SEND_EX_FLAG_SEQUENCE_NUMBER_INFO);  
// Enable message sequence number info to be returned 
for (;;)
{
    try 
    {
        src.send(message, msglen, 0, exinfo);
    }
    catch(UMENoRegException ex)
    {
            try
            {
                Thread.sleep(1000);
            }
            catch (InterruptedException e) { }
        continue;

    }
    catch (LBMException ex)
    {
            System.err.println("Error sending message: " + ex.toString());  
    }
    break;
}
       

JAVA API - Sequence Number Event Handler

public int onSourceEvent(Object arg, LBMSourceEvent sourceEvent)
{
      switch (sourceEvent.type())
      {
            case LBM.SRC_EVENT_SEQUENCE_NUMBER_INFO:
                  LBMSourceEventSequenceNumberInfo info = sourceEvent.sequenceNumberInfo();
                  if (info.firstSequenceNumber() != info.lastSequenceNumber()) {
                        System.out.println("SQN [" + info.firstSequenceNumber()
                            + "," + info.lastSequenceNumber() + "] (msgno "
                            + info.clientObject() + ")");
                  }
                  else {
                        System.out.println("SQN " + info.lastSequenceNumber()
                            + " (msgno " + info.clientObject() + ")");
                  }
                  break;
          ...
      }
      return 0;
}
       

.NET API - Enable Message Information

LBMSourceSendExInfo exinfo = new LBMSourceSendExInfo();
exinfo.setClientObject(msgno);  // msgno set to application message number
exinfo.setFlags(LBM.SRC_SEND_EX_FLAG_SEQUENCE_NUMBER_INFO);  
// Enable message sequence number info to be returned
for (;;)
{
    try 
    {
        src.send(message, msglen, 0, exinfo);
    }
    catch(UMENoRegException ex)
    {
            System.Threading.Thread.Sleep(100);
        continue;

    }
    catch (LBMException ex)
    {
            System.Console.Out.WriteLine("Error sending message: " + ex.Message()); 
    }
    break;
}
       

.NET API - Sequence Number Event Handler

public void onSourceEvent(Object arg, LBMSourceEvent sourceEvent)
{
      switch (sourceEvent.type())
      {
            case LBM.SRC_EVENT_SEQUENCE_NUMBER_INFO:
                  LBMSourceEventSequenceNumberInfo info = sourceEvent.sequenceNumberInfo();
                  if (info.firstSequenceNumber() != info.lastSequenceNumber())
                  {
                        System.Console.Out.WriteLine("SQN [" + info.firstSequenceNumber()
                              + "," + info.lastSequenceNumber() + "] (cd "
                              + ((int)info.clientObject()).ToString("x") + ")");
                  }
                  else
                  {
                        System.Console.Out.WriteLine("SQN " + info.lastSequenceNumber()
                              + " (msgno " + ((int)info.clientObject()).ToString("x") + ")");
                  }
                  break;
          ...

      }
      return 0;
}
       

7.1.2.11. Receiver Liveness Detection

As an extension to Confirmed Delivery, you can set receivers to send a keepalive to a source during a measured absence of delivery confirmations (due to traffic lapse). In the event that neither message reaches the source within a designated interval, or if the delivery confirmation TCP connection breaks down, the receiver is assumed to have "died". UM then notifies the publishing application via context event callback. This lets the publisher assign a new subscriber.

To use this feature, set these five configuration options:

Note: You must set the ume_source_liveness_timeout option to 5 times the value of ume_receiver_liveness_interval.

This specialized feature is not recommended for general use. If you are considering it, please note the following caveats:

  • Do not use in conjunction with a UM Gateway.

  • There is a variety of potential network occurrences that can break or reset the TCP connection and falsely indicate the death of a receiver.

  • In cases where a receiver object is deleted while its context is not, the publisher may still falsely assume the receiver to be alive. Other false receiver-alive assumptions could be caused by the following:

  • TCP connections can enter a half-open or otherwise corrupted state.

  • Failed TCP connections sometimes do not fully close, or experience objectionable delays before fully closing.

  • A switch or router failure along the path does not affect the TCP connection state.

7.1.3. UMP Receivers

Receivers are predominantly interested in RegID management and recovery management. This section discusses the following topics.

7.1.3.1. Receiver RegID Management

RegIDs are slightly more involved for receivers than for sources. Since RegIDs are per source per topic per store and a topic may have several sources, a receiver may have to manage several RegIDs per store in use. Fortunately, receivers in UMP can leverage the RegID of the source with the use of a callback as discussed in Adding Fault Recovery with Registration IDs and shown in ume-example-rcv-2.c. Your application can determine the correct RegID to use and return it to UMP . You can also use Session IDs to enable UMP to manage receiver RegIDs. See Managing RegIDs with Session IDs.

Much like sources, receivers typically have a lifetime based on an amount of work, perhaps an infinite amount. And just like sources, it may be helpful to consider that a RegID is "assigned" at the start of that work and is out of use at the end. In between, the RegID is in use by the instance of the receiver application. However, the nature of RegIDs being per source means that the expected lifetime of a source should play a role in how RegIDs on the receiver are managed. Thus, it may be helpful for the application developer to consider the source application lifetime when deciding how best to handle RegIDs on the receiver.

7.1.3.2. Receiver Message and Event Handler

The Receiver Message and Event Handler is a function callback started at receiver initialization to provide Receiver messages to your application on behalf of the receiver. The following source code examples illustrate the use of a receiver message and event handler for registration messages. To accept other receiver events, additional case statements would be required, one for each additional event. See also UMP and UMQ Events.

C API

int rcv_handle_msg(lbm_rcv_t *rcv, lbm_msg_t *msg, void *clientd)
{
    switch (msg->type) {
    ...
    case LBM_MSG_UME_REGISTRATION_ERROR:
            printf("[%s][%s] UME registration error: %s\n", msg->topic_name, msg->source, 
                msg->data); 
            exit(0);
    break;
    case LBM_MSG_UME_REGISTRATION_SUCCESS:
            {
                    lbm_msg_ume_registration_t *reg = (lbm_msg_ume_registration_t *)
                        (msg->data);

                    printf("[%s][%s] UME registration successful. SrcRegID %u RcvRegID %u\n",
                     msg->topic_name, msg->source, reg->src_registration_id, 
                         reg->rcv_registration_id);
            }
         break;
    case LBM_MSG_UME_REGISTRATION_SUCCESS_EX:
            {
                    lbm_msg_ume_registration_ex_t *reg = (lbm_msg_ume_registration_ex_t *)
                        (msg->data);

                    printf("[%s][%s] store %u: %s UME registration successful. SrcRegID %u 
                        RcvRegID %u. Flags %x ",
                msg->topic_name, msg->source, reg->store_index, reg->store,
                reg->src_registration_id, reg->rcv_registration_id, reg->flags);
                if (reg->flags & LBM_MSG_UME_REGISTRATION_SUCCESS_EX_FLAG_OLD)
                     printf("OLD[SQN %x] ", reg->sequence_number);
                    if (reg->flags & LBM_MSG_UME_REGISTRATION_SUCCESS_EX_FLAG_NOCACHE)
                        printf("NOCACHE ");
                    printf("\n");
            }
            break;
    case LBM_MSG_UME_REGISTRATION_COMPLETE_EX:
         {
                    lbm_msg_ume_registration_complete_ex_t *reg;

reg  = (lbm_msg_ume_registration_complete_ex_t *)(msg->data);
printf("[%s][%s] UME registration complete. SQN %x. Flags %x ",
                    msg->topic_name, msg->source, reg->sequence_number, reg->flags);
                    if (reg->flags & LBM_MSG_UME_REGISTRATION_COMPLETE_EX_FLAG_QUORUM)
                     printf("QUORUM ");
                 if (reg->flags & LBM_MSG_UME_REGISTRATION_COMPLETE_EX_FLAG_RXREQMAX)
                        printf("RXREQMAX ");
                    printf("\n");
            }
        break;
    case LBM_MSG_UME_REGISTRATION_CHANGE:
            printf("[%s][%s] UME registration change: %s\n", msg->topic_name, msg->source, 
                msg->data);
            break;
    ...
    default:
            printf("Unknown lbm_msg_t type %x [%s][%s]\n", msg->type, msg->topic_name, 
                msg->source);
         break;
    }
    return 0;
}
       

JAVA API

public int onReceive(Object cbArg, LBMMessage msg) 
{
    case LBM.MSG_UME_REGISTRATION_ERROR:
            System.out.println("[" + msg.topicName() + "][" + msg.source()
                    + "] UME registration error: " + msg.dataString());
            break;
    case LBM.MSG_UME_REGISTRATION_SUCCESS_EX:
            UMERegistrationSuccessInfo reg = msg.registrationSuccessInfo();
            System.out.print("[" + msg.topicName() + "][" + msg.source()
                    + "] store " + reg.storeIndex() + ": "
                    + reg.store() + " UME registration successful. SrcRegID "
                    + reg.sourceRegistrationId() + " RcvRegID " + reg.receiverRegistrationId()
                    + ". Flags " + reg.flags() + " ");
            if ((reg.flags() & LBM.MSG_UME_REGISTRATION_SUCCESS_EX_FLAG_OLD) != 0)
                System.out.print("OLD[SQN " + reg.sequenceNumber() + "] ");
            if ((reg.flags() & LBM.MSG_UME_REGISTRATION_SUCCESS_EX_FLAG_NOCACHE) != 0)
                System.out.print("NOCACHE ");
            System.out.println();
            break;
    case LBM.MSG_UME_REGISTRATION_COMPLETE_EX:
            UMERegistrationCompleteInfo regcomplete = msg.registrationCompleteInfo();
            System.out.print("[" + msg.topicName() + "][" + msg.source()
                    + "] UME registration complete. SQN " + regcomplete.sequenceNumber()
                    + ". Flags " + regcomplete.flags() + " ");
            if ((regcomplete.flags() & LBM.MSG_UME_REGISTRATION_COMPLETE_EX_FLAG_QUORUM) 
            != 0) {
                System.out.print("QUORUM ");
            }
            if ((regcomplete.flags() & LBM.MSG_UME_REGISTRATION_COMPLETE_EX_FLAG_RXREQMAX) 
            != 0) {
                System.out.print("RXREQMAX ");
            }
            System.out.println();
            break;
    case LBM.MSG_UME_REGISTRATION_CHANGE:
            System.out.println("[" + msg.topicName() + "][" + msg.source()
                    + "] UME registration change: " + msg.dataString());
            break;
    ...
    default:
            System.err.println("Unknown lbm_msg_t type " + msg.type() + " ["
                    + msg.topicName() + "][" + msg.source() + "]");
            break;
        }
    return 0;
}
       

.NET API

public int onReceive(Object cbArg, LBMMessage msg) 
{
    case LBM.MSG_UME_REGISTRATION_ERROR:
            System. Console.Out.WriteLine("[" + msg.topicName() + "][" + msg.source()
                    + "] UME registration error: " + msg.dataString());
            break;
    case LBM.MSG_UME_REGISTRATION_SUCCESS_EX:
            UMERegistrationSuccessInfo reg = msg.registrationSuccessInfo();
            System.Console.Out.Write("[" + msg.topicName() + "][" + msg.source()
                    + "] store " + reg.storeIndex() + ": "
                    + reg.store() + " UME registration successful. SrcRegID "
                    + reg.sourceRegistrationId() + " RcvRegID " + reg.receiverRegistrationId()
                    + ". Flags " + reg.flags() + " ");
            if ((reg.flags() & LBM.MSG_UME_REGISTRATION_SUCCESS_EX_FLAG_OLD) != 0)
                    System.Console.Out.Write ("OLD[SQN " + reg.sequenceNumber() + "] ");
            if ((reg.flags() & LBM.MSG_UME_REGISTRATION_SUCCESS_EX_FLAG_NOCACHE) != 0)
                    System.Console.Out.Write ("NOCACHE ");
            System.Console.Out.WriteLine();
            break;
    case LBM.MSG_UME_REGISTRATION_COMPLETE_EX:
            UMERegistrationCompleteInfo regcomplete = msg.registrationCompleteInfo();
            System.Console.Out.Write("[" + msg.topicName() + "][" + msg.source()
                    + "] UME registration complete. SQN " + regcomplete.sequenceNumber()
                    + ". Flags " + regcomplete.flags() + " ");
            if ((regcomplete.flags() & LBM.MSG_UME_REGISTRATION_COMPLETE_EX_FLAG_QUORUM) 
            != 0) {
                System.Console.Out.Write("QUORUM ");
            }
            if ((regcomplete.flags() & LBM.MSG_UME_REGISTRATION_COMPLETE_EX_FLAG_RXREQMAX) 
            != 0) {
                System.Console.Out.Write("RXREQMAX ");
            }
System.Console.Out.WriteLine();
            break;
    case LBM.MSG_UME_REGISTRATION_CHANGE:
            System.Console.Out.WriteLine("[" + msg.topicName() + "][" + msg.source()
                    + "] UME registration change: " + msg.dataString());
            break;
    ...
    default:
            System.Console.Out.WriteLine("Unknown lbm_msg_t type " + msg.type() + " ["
                    + msg.topicName() + "][" + msg.source() + "]");
            break;
        }
    return 0;
}
       

7.1.3.3. Recovery Management

Recovery management for receivers is fairly simple because UMP requests any missing messages from the store(s) and delivers them as they are retransmitted. However, your application can specify a different message to begin retransmission with using either the retransmit_request_maximum configuration option or lbm_ume_rcv_recovery_info_ex_func_t.

For example, assume a source sends 7 messages with sequence numbers 0-6 which are stabilized at the store. The receiver, configured with the retransmit_request_maximum set to 2, consumes message 0, goes down, then comes back at message 6. lbm_ume_rcv_recovery_info_ex_func_t returns the following:

high_sequence_number = 6
low_rxreq_max_sequence_number = 4
low_sequence_number = 1

NOTE: low_rxreq_max_sequence_number = high_sequence_number - retransmit_request_maximum
       
  • UMP obeys the retransmit_request_maximum configuration option and restarts with message 4. This is the default.

  • If you modify the low_sequence_number to satisfy some other requirements, you can override the configuration option and restart at message 0, 2, 3, 5 or 6. See Setting Callback Function to Set Recovery Sequence Number below.

  • The only way to restart at message 1 in this case, is to set the retransmit_request_maximum configuration option to its default value of 0. If your application changes the low_sequence_number and for whatever reason, the calculation results in the same value as the low_sequence_number, UMP ignores the calculation and restarts with message 4.

All messages retransmitted to a receiver are marked as retransmissions via a flag in the message structure. Thus it is easy for an application to determine if a message is a new message from the source or a retransmission, which may or may not have been processed before the failure. The presence or absence of the retransmit flag gives the application a hint of how best to handle the message with regard to it being processed previously or not.

7.1.3.4. Setting Callback Function to Set Recovery Sequence Number

The sample source code below demonstrates how to use the recovery sequence number info function to determine the stored message with which to restart a receiver. This method retrieves the low sequence number from the recovery sequence number structure and adds an offset to determine the beginning sequence number. The offset is a value completely under the control of your application. For example, if a receiver was down for a "long" period and you only want the receiver to receive the last 10 messages, use an offset to start the receiver with the 10th most recent message. If you wish not to receive any messages, set the low_sequence_number to the high_sequence_number plus one.

C API

lbm_ume_rcv_recovery_info_ex_func_t cb;

cb.func = ume_rcv_seqnum_ex;
cb.clientd = NULL;
if (lbm_rcv_topic_attr_setopt(&rcv_attr, "ume_recovery_sequence_number_info_function", 
&cb, sizeof(cb)) == LBM_FAILURE) {
            fprintf(stderr, 
            "lbm_rcv_topic_attr_setopt:ume_recovery_sequence_number_info_function: %s\n", 
            lbm_errmsg());
            exit(1);
}
printf("Will use seqnum info with low offset %u.\n", seqnum_offset); 

int ume_rcv_seqnum_ex(lbm_ume_rcv_recovery_info_ex_func_info_t *info, void *clientd)
{
     lbm_uint_t new_lo = info->low_sequence_number + seqnum_offset;

     printf("[%s] SQNs Low %x (will set to %x), Low rxreqmax %x, High %x (CD %p)\n",
info->source, info->low_sequence_number,
             new_lo, info->low_rxreq_max_sequence_number, info->high_sequence_number, 
info->source_clientd);
        info->low_sequence_number = new_lo;
        return 0;
}
       

JAVA API

UMERcvRecInfo umerecinfocb = new UMERcvRecInfo(seqnum_offset);
rcv_attr.setRecoverySequenceNumberCallback(umerecinfocb, null);
System.out.println("Will use seqnum info with low offset " + seqnum_offset + ".");

class UMERcvRecInfo implements UMERecoverySequenceNumberCallback {
    private long _seqnum_offset = 0;

    public UMERcvRecInfo(long seqnum_offset) {
        _seqnum_offset = seqnum_offset;
    }

    public int setRecoverySequenceNumberInfo(Object cbArg, 
    UMERecoverySequenceNumberCallbackInfo cbInfo) 
    {
long new_low = cbInfo.lowSequenceNumber() + _seqnum_offset;
if (new_low < 0) {
System.out.println("New low sequence number would be negative.  
Leaving low SQN unchanged.");
new_low = cbInfo.lowSequenceNumber();
         }
            System.out.println("SQNs Low " + cbInfo.lowSequenceNumber() + " (will set to "
                    + new_low + "), Low rxreqmax " + cbInfo.lowRxReqMaxSequenceNumber()
                 + ", High " + cbInfo.highSequenceNumber());
            try {
                    cbInfo.setLowSequenceNumber(new_low);
         }
            catch (LBMEInvalException e) {
                    System.err.println(e.getMessage());
        }
            return 0;
 }
       

.NET API

UMERcvRecInfo umerecinfocb = new UMERcvRecInfo(seqnum_offset);
rcv_attr.setRecoverySequenceNumberCallback(umerecinfocb, null);
System.Console.Out.WriteLine("Will use seqnum info with low offset " + seqnum_offset + ".");

class UMERcvRecInfo implements UMERecoverySequenceNumberCallback {
    private long _seqnum_offset = 0;

    public UMERcvRecInfo(long seqnum_offset) {
        _seqnum_offset = seqnum_offset;
    }

    public int setRecoverySequenceNumberInfo(Object cbArg, 
    UMERecoverySequenceNumberCallbackInfo cbInfo) 
    {
long new_low = cbInfo.lowSequenceNumber() + _seqnum_offset;
if (new_low < 0) {
System.Console.Out.WriteLine ("New low sequence number would be negative.  
Leaving low SQN unchanged.");
new_low = cbInfo.lowSequenceNumber();
         }
            System.Console.Out.WriteLine ("SQNs Low " + cbInfo.lowSequenceNumber() + " 
                (will set to "
                    + new_low + "), Low rxreqmax " + cbInfo.lowRxReqMaxSequenceNumber()
                 + ", High " + cbInfo.highSequenceNumber());
            try {
                    cbInfo.setLowSequenceNumber(new_low);
         }
            catch (LBMEInvalException e) {
                    System.Console.Out.WriteLine (e.getMessage());
        }
            return 0;
 }
       

7.1.3.5. Message Consumption

Receivers use message consumption, defined as message deletion, to indicate that UMP should notify the store(s) that the application consumed the message. This notification takes the form of an acknowledgement, or ACK, to the store(s) in use as well as to the source if you configured the source for delivery confirmation.

  • In the C API, message deletion happens by default when the receive callback returns, unless the callback uses lbm_msg_retain(). If the callback uses lbm_msg_retain() then the application has responsibility to use lbm_msg_delete() when it has finished processing the message.

  • In the Java API and .NET API, message deletion must be triggered explicitly by the application by using the dispose() method of the message object. Without explicit usage of dispose(), UMP does not know when the application has finished processing the message.

7.1.3.5.1. Batching Acknowledgments

You can configure UMP to acknowledge message consumption to a store(s) for a series of messages independent of when the receiving application consumed the messages. This option works well if multiple threads process messages off of an event queue, which may result in messages being consumed out of order. This feature is not compatible with Explicit Acknowledgments.

If you set ume_use_ack_batching to 1, UMP does not acknowledge individual messages as the application consumes them. Instead, UMP checks the consumed, but unacknowledged messages at the interval configured with ume_ack_batching_interval. When UMP discovers a contiguous series of consumed message sequence numbers (sqn), it sends acknowledgments to the store(s) for all the contiguous messages.

For example, assume your application consumes and acknowledges messages 1 and 2, then consumes subsequent messages in the following order: 4, 5, 7, 8, 6, 10, 3. At the next ume_ack_batching_interval, UMP sends consumption acknowledgments to the store(s) for messages 3 - 8.

7.1.3.5.2. Explicit Acknowledgments

In addition, UMP supports Explicit ACKs ( ume_explicit_ack_only), which silences UMP's acknowledgement behavior, allowing your application control of message consumption notification. See also lbm_msg_ume_send_explicit_ack() in the C API and the LBMMessage class method sendExplicitAck() in the Java API and .NET API.

The explicit ACK sending function/method automatically supplies additional ACKs for missing messages in sequence number gaps. This can be a useful efficiency feature, but note that to acknowledge each message consumption individually, you must issue their ACKs in ascending sequence-number order.

7.1.3.5.3. Object-free Explicit Acknowledgments

When using explicit ACKs in your Java or .NET application, you can extract ACK information from messages and then send acknowledgements to the store(s) for any sequence number. You can also extract ACK information from a message when using the C API with lbm_msg_extract_ume_ack().

The following source code examples show how to extract ACK information and send an explicit ACK.

C API

int rcv_handle_msg(lbm_rcv_t *rcv, lbm_msg_t *msg, void *clientd)
{
    lbm_ume_rcv_ack_t *ack = NULL;
...

    ack = lbm_msg_extract_ume_ack(msg);
    lbm_ume_ack_send_explicit_ack(ack, msg->sequence_number);
    lbm_ume_ack_delete(ack);
...

}
       

JAVA API or .NET API

public int onReceive(Object cbArg, LBMMessage msg)
{
    UMEMessageAck ack;
...

    ack = msg.extractUMEAck();
    ack.sendExplicitAck(msg.sequenceNumber());
    ack.dispose();

...

}
       

7.1.4. UMP Stores

As mentioned in Persistent Store, the UMP persistent stores, also just called stores, actually persist the source and receiver state and use RegIDs to identify sources and receivers. Each source to which a store provides persistence may have zero or more receivers. The store maintains each receiver's state along with the source's state and the messages the source has sent.

The store can be configured with its own set of options to persist this state information on disk or simply in memory. The term disk store is used to signify a store that persists state to disk, and the term memory store is used to signify a store that persists state only in memory. A store may also be configured not to cache the source's data, but to simply persist the source and receiver state in memory. This is called a no-cache store.

Unlike many persistent systems, the persistent store in UMP is not in the message path. In other words, a source does not send data to the store and then have the store forward it to the receivers. In UMP , the source sends to the receiver(s) and the store(s) in parallel. See Normal Operation. Thus, UMP can provide extremely low latency to receiving applications.

The store(s) that a source uses are part of the source's configuration settings. Sources must be configured to use specific store(s) and use one of two different types of store failover behaviors to match expected failure scenarios. See Round-Robin Store Usage and Quorum/Consensus Store Usage below for more about store failover scenarios.

Receivers, on the other hand, do not need to be configured with store information a priori. The source advertises store information as part of the normal UM topic resolution process. Thus the receivers will learn the store(s) to use from the source without needing to be configured themselves. Because receivers learn about the store(s) a source is using via topic resolution, the source needs to be available to receivers as long as the receivers may need them. However, the source does not have to be actively sending data to do this.

7.1.4.1. Round-Robin Store Usage

Stores can be used in a Round-Robin fashion by a source during failover. A source provides UMP with a list of stores to use. The first is the primary, the second is the secondary, the third is the tertiary, etc. The source uses a single store at any one time. If the currently active store becomes unresponsive due to a crash or network disconnect, UMP tries other stores in the list one by one until it finds a responsive store.

With round-robin store usage, inactive stores do not receive data from the source. Thus, a store that becomes the active store will not have any data from the source. In this case, the source may be configured to retain messages and stream those messages to the new store using Late Join. Cascading failures of sources, stores and receivers may require using stores in a Quorum/Consensus fashion.

See also Sources Using Round-Robin Store Configuration.

7.1.4.2. Quorum/Consensus Store Usage

To provide the highest degree of resiliency in the face of failures, UMP provides the Quorum/Consensus failover strategy which allows a source to provide UMP with a number of stores to be used at the same time. Multiple stores can fail and UMP can continue operation unhindered. Moreover, Late Join is not needed as in Round-Robin.

Quorum/Consensus, also called QC, allows a source and the associated receivers to have their persisted state maintained at several stores at the same time. Central to QC is the concept of a group of stores, which is a logical grouping of stores that are intended to signify a single entity of resilience. Within the group, individual stores may fail but for the group as a whole to be viable and provide resiliency, a quorum must be available. In UMP , a quorum is a simple majority. For example, in a group of five stores, three stores are required to maintain a quorum. One or two stores may fail and the group continues to provide resiliency. UMP requires a source to have a quorum of stores available in the group in order to send messages. A group can consist of a single store.

QC also provides the ability to use multiple groups. As long as a single group maintains quorum, then UMP allows a source to proceed. Groups are logical in nature and can be combined in any way imaginable, such as by store location, store type, etc. In addition, QC provides the ability to specify backup stores within groups. Backups may be used if or when a store in the group becomes unresponsive to the source. Quorum/Consensus allows a source many different failure scenarios simply not available in other persistent messaging systems.

See also Sources Using Quorum/Consensus Store Configuration, Quorum/Consensus - Single Location Groups and Quorum/Consensus - Mixed Location Groups.

7.2. Fault Recovery

Recovery from source and receiver failure is the real heart of UMP operation. For a source, this means continuing operation from where it stopped. For a receiver, this means essentially the same thing, but with the retransmission of missed messages. Application developers can easily leverage the information in UMP to make their applications recover from failure in graceful ways.

Late Join is the mechanism of UMP recovery as well as an UM streaming feature. If Late Join is turned off on a source ( late_join) or receiver ( use_late_join), it also turns off UMP recovery. In order to control Late Join behavior, UMP provides a mechanism for a receiver to control the low sequence number. See Recovery Management.

Not all failures are recoverable. For application developers it usually pays in the long run to identify what types of errors are non-recoverable and how best to handle them when possible. Such an exercise establishes the precise boundaries of expected versus abnormal operating conditions.

Note: UMP does not acknowledge messages that are lost. If the store is unable to recover a lost message, any receivers attempting to recover this message from the store will experience unrecoverable loss as well. Sources can pay attention to any gaps in stability or confirmed delivery acknowledgements as these most likely represent unrecoverable loss at the store or receivers, respectively.

This section discussed the following recovery topics.

7.2.1. Source Recovery

The following shows the basic steps of source recovery.

  1. Re-register with the store.

  2. Determine the highest sequence number that the store has from the source.

  3. Resume sending with the next sequence number.

Because UMP allows you to stream messages and not wait until a message is stable at the persistent store before sending the next message, the main task of source recovery is to determine what messages the persistent store(s) have and what they don't. Therefore, when a source re-registers with a store during recovery, the store tells the source what sequence number it has as the most recent from the source. The registration event informs the application of this sequence number. See Source Event Handler.

In addition, a mechanism exists (LBM_SRC_EVENT_SEQUENCE_NUMBER_INFO) that allows the application to know the sequence number assigned to every piece of data it sends. The combination of registration and sequence number information allows an application to know exactly what a store does have and what it does not and where it should pick up sending. An application designed to stream data in this way should consider how best to maintain this information.

When QC is in use, UMP uses the consensus of the group(s) to determine what sequence number to use in the first message it will send. This is necessary as not all stores can be expected to be in total agreement about what was sent in a distributed system. The application can configure the source with the ume_consensus_sequence_number_behavior to use the lowest sequence number of the latest group of sequence numbers seen from any store, the highest, or the majority. In most cases, the majority, which is the default, makes the most sense as the consensus. The lowest is a very conservative setting. And the highest is somewhat optimistic. Your application has the flexibility to handle this in any way needed.

If streaming is not what an application desires due to complexity, then it is very simple to use the UMP events (UMP and UMQ Events) delivered to the application to mimic the behavior of restricting a source to having only one unstable message at a time.

7.2.2. Receiver Recovery

The following shows the basic steps of receiver recovery.

  1. Re-register with the store.

  2. Determine the low sequence number.

  3. Request retransmission of messages starting with the low sequence number.

UMP provides extensive options for controlling how receivers handle recovery. By default, receivers want to restart after the last piece of data that was consumed prior to failure or graceful suspension. Since UMP persists receiver state at the store, receivers request this state from the store as part of re-registration and recovery.

The actual sequence number that a receiver uses as the first topic level message to resume reception with is called the "low sequence number". UMP provides a means of modifying this sequence number if desired. An application can decide to use the sequence number as is, to use an even older sequence number, to use a more recent sequence number, or to simply use the most recent sequence number from the source. See Recovery Management and Setting Callback Function to Set Recovery Sequence Number. This allows receivers great flexibility on a per source basis when recovering. New receivers, receivers with no pre-existing registration, also have the same flexibility in determining the sequence number to begin data reception.

Like sources, when QC is in use, UMP uses the consensus of the group(s) to determine the low sequence number. And as with sources, this is necessary as not all stores can be expected to be in total agreement about what was acknowledged. The application can configure the receiver with ume_consensus_sequence_number_behavior to use the lowest sequence number of the latest group of sequence numbers seen from any store, the highest, or the majority. In most cases, the majority, which is the default, makes the most sense as the consensus. The lowest is a very conservative setting. And the highest is somewhat optimistic. In addition, this sequence number may be modified by the application after the consensus is determined.

For QC, UMP load balances receiver retransmission requests among the available stores. In addition, if requests are unanswered, retransmissions of the actual requests will use different stores. This means that as long as a single store has a message, then it is possible for that message to be retransmitted to a requesting receiver.

Note: Receivers need to consider if the use of arrival order delivery is appropriate. See ordered_delivery. UMP stores save the highest sequence number acknowledged by a receiver. When receivers using arrival order delivery receive - and thereby acknowledge - messages out of order, recovery problems may arise because stores will not have earlier messages not acknowledged by the receiver.

Copyright 2007 - 2014 Informatica Corporation.