In messaging, the concept of late join is used to describe a receiver that may join a message stream or topic "late". In this case, late would mean that the receiver may have missed one or several messages that were published before it successfully joined the transport. In several use cases, it can be useful to still receive and process these messages in order to initialize application logic, or resolve race conditions between applications during startup. In UM the late join feature can be enabled at any source. Doing so will enable a limited in memory buffer of messages to be maintained at the source in the event receivers subscribe and request late join.
Sources by default have late join disabled. To enable late join the application must do two things. The first is to enable late-join. This can be done as follows:
00026 srcAttr.setValue("late_join", "1");
Once late join is enabled, you must define a retention policy. This will determine how many messages will be buffered at the source, and thus available for late join. There are two retention policies, one based on age, the other bytes. In this case, the source will be configured to hold at least 1 byte of message(s). In effect this means that at least 1 message will be in the buffer at all times. This can be done as follows:
00027 srcAttr.setValue("retransmit_retention_size_threshold", "1");
Once late join is enabled and configured, the source will send a message. At this point, no receivers are "up", so they can't receive the message. In this example, the receiver simply hasn't been created. More commonly, when users start an application that sends, and another that receives simultaneously, it is very often the case that the receiver can miss the first few messages published. This is because it takes a non-zero amount of time to resolve the source and join the underlying transport.
00030 early_src = new LBMSource(ctx, srcTopic); 00031 00032 early_src.send(Encoding.ASCII.GetBytes("test"), 4, LBM.MSG_FLUSH | LBM.SRC_NONBLOCK);
Receivers have late join enabled by default. It is possible to disable late join on a receiver by receiver basis. In this example, we want to receive late join, so the default does not have to be changed. Therefore, the receiver is simply created:
00044 lateRcv = new LBMReceiver(ctx, topic, onReceive, null, null);
The receiver will receive late join the same way it receives any live message. One critical component to applications with late join enabled is that it should make use of the late join flag. There is no record or state kept in UMS, and as a consequence it is not only possible, but likely that receiving applications can receive duplicates via late join, especially when joining the stream due to start up or disconnects. Therefore it is typically recommended that application level dedupe logic be applied to the message stream if there is a requirement to not process dups. In this example, we will simply use the message flags to determine if a message is late joined. Messages can also be retransmitted via OTR, which is a separate type of retransmission that will not be covered in this example.
00063 switch (msg.type()) 00064 { 00065 case LBM.MSG_DATA: 00066 if((msg.flags() & LBM.MSG_FLAG_OTR)==LBM.MSG_FLAG_OTR) 00067 System.Console.Out.WriteLine("Processing OTR Message. SQN: " + msg.sequenceNumber()); 00068 else if((msg.flags() & LBM.MSG_FLAG_RETRANSMIT)==LBM.MSG_FLAG_RETRANSMIT) 00069 System.Console.Out.WriteLine("Processing Late Join Message. SQN: " + msg.sequenceNumber()); 00070 else 00071 System.Console.Out.WriteLine("Processing Normal Message. SQN: " + msg.sequenceNumber()); 00072 00073 System.Console.Out.WriteLine("Msg Received: " + msg.dataString());
Cleanup in this case is simple. Simple close or delete all source & receiver objects, and then close or delete the context.
00050 early_src.close(); 00051 lateRcv.close(); 00052 ctx.close();