UMP receivers, by default will recover messages from the store based on its configured attribute for ume_consensus_sequence_number_behavior . By default, this is configured to "majority". Consensus is based on all the sequence numbers provided to the receiver during store registration events (registration success events indicate a successful registration with a sinle store). Once enough stores have registered to satisy a quorum, registration can complete. The sequence number provided in the registration complete callback event indicates the starting sequence number for the source (the consensus sequence number).
UMP receivers have the ability to alter the starting sequence number by registering the receiver sequence number info callback function, configured programatically with ume_recovery_sequence_number_info_function . This callback function will be executed by the UM library immediately prior to registration complete, and it provides sequence number data to the application so that the application can change the starting sequence number if desired. The specific data provided by the callback includes:
It is highly recommended every UMP application use this function, at the very least for logging purposes. It is generally a good idea to use this callback and check with internal application logic that the receiver is starting at the correct sequence number as well, to avoid the possibility of requesting retransmission for messages that may have already been consumed by a UMP receiver.
There is one program source file:
The object UMERcvRecInfo is used to configure the callback function, and is defined as a class below by implementing the LBM object UMERecoverySequenceNumberCallback. Once the object is set, it is passed into the setRecoverySequenceNumberCallback() method like so:
00021 UMERcvRecInfo umerecinfocb = new UMERcvRecInfo(); 00022 rcv_attr.setRecoverySequenceNumberCallback(umerecinfocb, null);
Here is the actual callback function that will be executed right before registration complete. In this example, the 3 provided sequence numbers are printed with explanation. In a real world application, the expectation would be to simply print the sequence numbers with the appropriate labels (high_sqn, low_sqn, low_rx_sqn). Also in this example, the application is blindly setting the starting sequence number to 100. Again, in a real world application, the user should verify the starting sequence number against some internal application logic to ensure the receiver is starting at the correct sequence number.
00057 private static class UMERcvRecInfo implements UMERecoverySequenceNumberCallback { 00058 public int setRecoverySequenceNumberInfo(Object cbArg, UMERecoverySequenceNumberCallbackInfo cbInfo) { 00059 00060 long new_low = 100; 00061 00062 System.out.println("lowSequenceNumber["+cbInfo.lowSequenceNumber()+"] is the starting sequence number as determined by registration consensus."); 00063 System.out.println("highSequenceNumber["+cbInfo.highSequenceNumber()+"] is the highest sequence number available in the persisted stores."); 00064 System.out.println("lowRxReqMaxSequenceNumber["+cbInfo.lowRxReqMaxSequenceNumber()+"] is the lowest sequence number to be requested."); 00065 try { 00066 cbInfo.setLowSequenceNumber(new_low); 00067 } 00068 catch (LBMEInvalException e) { 00069 System.err.println(e.getMessage()); 00070 } 00071 System.out.flush(); 00072 return 0; 00073 } 00074 }
This sample includes a message handler that handles the persistent registration events, success and complete.
When using UMP, it should be a strong application requirement to handle these registration events in the code, as well as logging the data returned in each callback, such as registration ID's and sequence numbers. This data is absolutely crucial for debugging any kind of issues with related to message recovery, connectivity issues, and even performance.
00034 private static class ReceiverCallback implements LBMReceiverCallback { 00035 public int onReceive(Object cbArg, LBMMessage msg) 00036 { 00037 switch (msg.type()) 00038 { 00039 case LBM.MSG_UME_REGISTRATION_SUCCESS_EX: 00040 UMERegistrationSuccessInfo reg = msg.registrationSuccessInfo(); 00041 System.out.println("[" + msg.topicName() + "][" + msg.source() + "] store " + reg.storeIndex() + ": " 00042 + reg.store() + " UME registration successful. Sequence number is " + reg.sequenceNumber()); 00043 break; 00044 case LBM.MSG_UME_REGISTRATION_COMPLETE_EX: 00045 UMERegistrationCompleteInfo regcomplete = msg.registrationCompleteInfo(); 00046 System.out.println("[" + msg.topicName() + "][" + msg.source() + "] UME registration complete. SQN " 00047 + regcomplete.sequenceNumber() + ". Flags " + regcomplete.flags()); 00048 break; 00049 default: 00050 System.out.println("Other event, type=" + msg.type()); 00051 break; 00052 } 00053 return 0; 00054 } 00055 } /* ReceiverCallback */