Example index

UMP Receiver Sequence Number Info Callback

UMP receivers, by default will recover messages from the store based on its configured attribute for ume_consensus_sequence_number_behavior . By default, this is configured to "majority". Consensus is based on all the sequence numbers provided to the receiver during store registration events (registration success events indicate a successful registration with a sinle store). Once enough stores have registered to satisy a quorum, registration can complete. The sequence number provided in the registration complete callback event indicates the starting sequence number for the source (the consensus sequence number).

UMP receivers have the ability to alter the starting sequence number by registering the receiver sequence number info callback function, configured programatically with ume_recovery_sequence_number_info_function . This callback function will be executed by the UM library immediately prior to registration complete, and it provides sequence number data to the application so that the application can change the starting sequence number if desired. The specific data provided by the callback includes:

It is highly recommended every UMP application use this function, at the very least for logging purposes. It is generally a good idea to use this callback and check with internal application logic that the receiver is starting at the correct sequence number as well, to avoid the possibility of requesting retransmission for messages that may have already been consumed by a UMP receiver.

There is one program source file:

Program explanation: SeqNumberCallback.java

Set Sequence Number Callback Attrribute

The object UMERcvRecInfo is used to configure the callback function, and is defined as a class below by implementing the LBM object UMERecoverySequenceNumberCallback. Once the object is set, it is passed into the setRecoverySequenceNumberCallback() method like so:

00021          UMERcvRecInfo umerecinfocb = new UMERcvRecInfo();
00022                  rcv_attr.setRecoverySequenceNumberCallback(umerecinfocb, null);

The Sequence Number Callback

Here is the actual callback function that will be executed right before registration complete. In this example, the 3 provided sequence numbers are printed with explanation. In a real world application, the expectation would be to simply print the sequence numbers with the appropriate labels (high_sqn, low_sqn, low_rx_sqn). Also in this example, the application is blindly setting the starting sequence number to 100. Again, in a real world application, the user should verify the starting sequence number against some internal application logic to ensure the receiver is starting at the correct sequence number.

00057      private static class UMERcvRecInfo implements UMERecoverySequenceNumberCallback {
00058          public int setRecoverySequenceNumberInfo(Object cbArg, UMERecoverySequenceNumberCallbackInfo cbInfo) {
00059  
00060              long new_low = 100;
00061              
00062              System.out.println("lowSequenceNumber["+cbInfo.lowSequenceNumber()+"] is the starting sequence number as determined by registration consensus.");
00063              System.out.println("highSequenceNumber["+cbInfo.highSequenceNumber()+"] is the highest sequence number available in the persisted stores.");
00064              System.out.println("lowRxReqMaxSequenceNumber["+cbInfo.lowRxReqMaxSequenceNumber()+"] is the lowest sequence number to be requested.");
00065              try {
00066                  cbInfo.setLowSequenceNumber(new_low);
00067              }
00068              catch (LBMEInvalException e) {
00069                  System.err.println(e.getMessage());
00070              }
00071              System.out.flush();
00072              return 0;
00073          }
00074      }

Handle Message Callback

This sample includes a message handler that handles the persistent registration events, success and complete.

  1. REGISTRATION_SUCCESS - This indicates the successful registration with a single UMP store.
  2. REGISTRATION_COMPLETE - This indicates that the receiver has registered with at least a quorum of stores and can begin accepting messages either via store retransmissions or from the live stream, depending on it's starting sequence number.

When using UMP, it should be a strong application requirement to handle these registration events in the code, as well as logging the data returned in each callback, such as registration ID's and sequence numbers. This data is absolutely crucial for debugging any kind of issues with related to message recovery, connectivity issues, and even performance.

00034      private static class ReceiverCallback implements LBMReceiverCallback {
00035          public int onReceive(Object cbArg, LBMMessage msg)
00036          {
00037              switch (msg.type())
00038                          {
00039                                  case LBM.MSG_UME_REGISTRATION_SUCCESS_EX:
00040                                          UMERegistrationSuccessInfo reg = msg.registrationSuccessInfo();
00041                                          System.out.println("[" + msg.topicName() + "][" + msg.source() + "] store " + reg.storeIndex() + ": "
00042                                                  + reg.store() + " UME registration successful. Sequence number is " + reg.sequenceNumber());
00043                                          break;
00044                                  case LBM.MSG_UME_REGISTRATION_COMPLETE_EX:
00045                                          UMERegistrationCompleteInfo regcomplete = msg.registrationCompleteInfo();
00046                                          System.out.println("[" + msg.topicName() + "][" + msg.source() + "] UME registration complete. SQN "
00047                                                  + regcomplete.sequenceNumber() + ". Flags " + regcomplete.flags());
00048                                          break;
00049                                  default:
00050                                          System.out.println("Other event, type=" + msg.type());
00051                                          break;
00052                          }
00053              return 0;
00054          }
00055      }  /* ReceiverCallback */