Example index

How to enable late and detect late join

In messaging, the concept of late join is used to describe a receiver that may join a message stream or topic "late". In this case, late would mean that the receiver may have missed one or several messages that were published before it successfully joined the transport. In several use cases, it can be useful to still receive and process these messages in order to initialize application logic, or resolve race conditions between applications during startup. In UM the late join feature can be enabled at any source. Doing so will enable a limited in memory buffer of messages to be maintained at the source in the event receivers subscribe and request late join.

Program explanation: late_join.java

Enabling Late Join

Sources by default have late join disabled. To enable late join the application must do two things. The first is to enable late-join. This can be done as follows:

00019              srcAttr.setValue("late_join", "1");

Once late join is enabled, you must define a retention policy. This will determine how many messages will be buffered at the source, and thus available for late join. There are two retention policies, one based on age, the other bytes. In this case, the source will be configured to hold at least 1 byte of message(s). In effect this means that at least 1 message will be in the buffer at all times. This can be done as follows:

00020              srcAttr.setValue("retransmit_retention_size_threshold", "1");

Create a source, send message

Once late join is enabled and configured, the source will send a message. At this point, no receivers are "up", so they can't receive the message. In this example, the receiver simply hasn't been created. More commonly, when users start an application that sends, and another that receives simultaneously, it is very often the case that the receiver can miss the first few messages published. This is because it takes a non-zero amount of time to resolve the source and join the underlying transport.

00023              early_source = new LBMSource(ctx, src_topic);
00024              
00025              early_source.send("test".getBytes(), "test".getBytes().length, LBM.MSG_FLUSH | LBM.SRC_NONBLOCK);

Creating the receiver and receiving late join

Receivers have late join enabled by default. It is possible to disable late join on a receiver by receiver basis. In this example, we want to receive late join, so the default does not have to be changed. Therefore, the receiver is simply created:

00029                  late_rcv = new LBMReceiver(ctx, topic, rcvCB, null, null);

The receiver will receive late join the same way it receives any live message. One critical component to applications with late join enabled is that it should make use of the late join flag. There is no record or state kept in UMS, and as a consequence it is not only possible, but likely that receiving applications can receive duplicates via late join, especially when joining the stream due to start up or disconnects. Therefore it is typically recommended that application level dedupe logic be applied to the message stream if there is a requirement to not process dups. In this example, we will simply use the message flags to determine if a message is late joined. Messages can also be retransmitted via OTR, which is a separate type of retransmission that will not be covered in this example.

00054          switch (msg.type())
00055          {
00056              case LBM.MSG_DATA:
00057                  if((msg.flags() & LBM.MSG_FLAG_OTR)==LBM.MSG_FLAG_OTR)
00058                      System.out.println("Processing Late Join Message. SQN: " + msg.sequenceNumber());
00059                  else if((msg.flags() & LBM.MSG_FLAG_RETRANSMIT)==LBM.MSG_FLAG_RETRANSMIT)
00060                      System.out.println("Processing OTR Message. SQN: " + msg.sequenceNumber());
00061                  else 
00062                      System.out.println("Processing Normal Message. SQN: " + msg.sequenceNumber());
00063                  break;

Cleanup

Cleanup in this case is simple. Simple close or delete all source & receiver objects, and then close or delete the context.

00034              late_rcv.close();
00035              early_source.close();
00036              ctx.close();