The major concerns of sources revolve around RegID management and message retention.
New or Re-Registration <-
Any source needs to know at start-up if it is a new registration or a re-registration. The answer determines how a source registers with the Store. The UM library can not answer this question. Therefore, it is essential that the developer consider what identifies the lifetime of a source and how a source determines the appropriate value to use as the RegID when it is ready to register. RegIDs are per source per topic per Store, thus a single RegID per Store is needed.
The following source code examples look for an existing RegID from a file and uses a new RegID assigned from the Store if it finds no existing RegID. See also ume-example-src-3.c
C API
if (err) {printf(
"line %d: %s\n", __LINE__,
lbm_errmsg()); exit(1);}
srcinfo.message_num = 1;
srcinfo.existing_regid = 0;
err = read_src_regid_from_file(SRC_REGID_SAVE_FILENAME, store_info, sizeof(store_info));
if (!err) { srcinfo.existing_regid = 1; }
if (err) {printf(
"line %d: %s\n", __LINE__,
lbm_errmsg()); exit(1);}
if (err) {printf(
"line %d: %s\n", __LINE__,
lbm_errmsg()); exit(1);}
The use of Session IDs allows UM, as opposed to your application, to accomplish the same RegID management. See Managing RegIDs with Session IDs Managing RegIDs with Session IDs.
Sources Must Be Able to Resume Sending <-
A source sends messages unless UM prevents it, in which case, the send function returns an error. A source may lose the ability to send messages temporarily if the Store(s) in use become unresponsive, e.g. the Store(s) die or become disconnected from the source. Once the Store(s) are responsive again, sending can continue. Thus source applications need to take into account that sending may fail temporarily under specific failure cases and be able to resume sending when the failure is removed.
The following source code examples demonstrate how a failed send function can sleep for a second and try again:
C API
printf("Send unsuccessful. Waiting...\n");
sleep(1);
continue;
}
fprintf(stderr,
"lbm_src_send: %s\n",
lbm_errmsg());
exit(1);
}
Java API
for (;;) {
try {
src.send(message, len, 0);
}
catch (UMENoRegException ex) {
System.out.println("Send unsuccessful. Waiting...");
try {
Thread.sleep(1000);
}
catch (InterruptedException e) { }
continue;
}
catch (LBMException ex) {
System.err.println("Error sending message: " + ex.toString());
System.exit(1);
}
break;
}
.NET API
for (;;) {
try {
src.send(message, len, 0);
}
catch (UMENoRegException ex) {
System.Console.Out.WriteLine("Send unsuccessful. Waiting...");
System.Threading.Thread.Sleep(1000);
continue;
}
catch (LBMException ex) {
System.Console.Out.WriteLine ("Error sending message: " + ex.toString());
System.exit(1);
}
break;
}
Source Message Retention and Release <-
UM allows streaming of messages from a source without regard to message stability at a Store, which is one reason for UM's performance advantage over other persistent messaging systems. Sources retain all messages until notified by the active Store(s) that they are stable. This provides a method for Stores to be brought up to date when restarted or started anew.
When messages are considered stable at the Store, the source can release them which frees up source retention memory for new messages. Generally, the source releases older stable messages first. To release the oldest retained message, all the following conditions must be met:
Some things to note:
- Note
- Smart Sources simplify matters somewhat by pre-allocating retention buffers. They are not dynamically allocated or deallocated during operation. See Smart Sources and Persistence for more information.
Forced Reclaims <-
If the aggregate amount of buffered messages exceeds retransmit_retention_size_limit (source) bytes in payload and headers, then UM forcibly releases the oldest retained message even if it does not meet one or more of the conditions stated in Source Message Retention and Release. This condition should be avoided and Informatica suggests increasing the retransmit_retention_size_limit (source).
A second condition that produces a forced reclaim is when a message remains unstabilized when the ume_message_stability_lifetime (source) expires.
Whenever UM performs a Forced Reclaim, it notifies the application in the following ways:
-
The source event callback's RECLAIMED_EX event (see Persistence Source Events) includes a "FORCED" flag on the event. (UM uses the same RECLAIMED_EX event, without the FORCED flag, for normal reclaims.)
-
Through the separate forced reclaim callback, if registered. You set this separate forced reclaim callback with the ume_force_reclaim_function (source) configuration option.
- Note
- UM retains the separate callback for backwards compatibility purposes and may be deprecated in future releases. The source event FORCED flag is the recommended method of tracking forced reclaims.
The following sample code, from Example umesrc.c, implements the extended reclaim source event with the 'Forced' flag set if the reclamation is a forced reclaim.
C API
{
if (opts->verbose) {
printf("UME message reclaimed (ex) - sequence number %x (cd %p). Flags 0x%x ",
if (ackinfo->
flags & LBM_SRC_EVENT_UME_MESSAGE_RECLAIMED_EX_FLAG_FORCED) {
printf("FORCED");
}
printf("\n");
}
}
break;
Java API
case LBM.SRC_EVENT_UME_MESSAGE_RECLAIMED_EX:
UMESourceEventAckInfo reclaiminfo = sourceEvent.ackInfo();
if (_verbose > 0) {
if (reclaiminfo.clientObject() != null) {
System.out.print("UME message reclaimed (ex) - sequence number "
+ Long.toHexString(reclaiminfo.sequenceNumber())
+ " (cd "
+ Long.toHexString(((Long)reclaiminfo.clientObject()).longValue())
+ "). Flags 0x"
+ reclaiminfo.flags());
} else {
System.out.print("UME message reclaimed (ex) - sequence number "
+ Long.toHexString(reclaiminfo.sequenceNumber())
+ " Flags 0x"
+ reclaiminfo.flags());
}
if ((reclaiminfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_RECLAIMED_EX_FLAG_FORCED) != 0) {
System.out.print(" FORCED");
}
System.out.println();
}
break;
.NET API
case LBM.SRC_EVENT_UME_MESSAGE_RECLAIMED_EX:
UMESourceEventAckInfo reclaiminfo = sourceEvent.ackInfo();
if (_verbose > 0) {
System.Console.Out.Write("UME message reclaimed (ex) - sequence number "
+ reclaiminfo.sequenceNumber()
+ " (cd "
+ ((uint)reclaiminfo.clientObject()).ToString("x")
+ "). Flags "
+ reclaiminfo.flags());
if ((reclaiminfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_RECLAIMED_EX_FLAG_FORCED) != 0) {
System.Console.Out.Write(" FORCED");
}
System.Console.Out.WriteLine();
}
break;
Source Retention Policy Options <-
Sources use a set of configuration options to release messages that, in effect, specify the source's retention policy. The following configuration options directly impact when the source may release retained messages:
Confirmed Delivery <-
The configuration option ume_retention_unique_confirmations (source) requires a message to have a minimum number of unique confirmations from different receivers before the message may be released. This retains messages that have not been confirmed as being received and processed and keeps them available to fulfill any retransmission requests. This provides a form of receiver-pacing; the source will not be allowed to exceed Persistence Flight Size beyond receiving applications.
For example, a topic might have 2 receivers which are considered essential to keep up, and which should therefore contribute to flight size calculation. There might be any number of less-essential receivers which can be allowed to lag behind. In this case, ume_retention_unique_confirmations (source) would be set to 2, and the non-essential receivers would set ume_allow_confirmed_delivery (receiver) to 0.
- Note
- Smart Sources do not support delivery confirmation.
The following code samples show how to require a message to have 10 unique receiver confirmations
C API
fprintf(stderr,
"lbm_src_topic_attr_create_from_xml: %s\n",
lbm_errmsg());
exit(1);
}
"10") == LBM_FAILURE) {
fprintf(stderr,
"lbm_src_topic_attr_str_setopt: %s\n",
lbm_errmsg());
exit(1);
}
JAVA API
LBMSourceAttributes sattr = null;
try {
sattr = new LBMSourceAttributes();
sattr.setValue("ume_retention_unique_confirmations", "10");
}
catch (LBMException ex) {
System.err.println("Error creating source attribute: " + ex.toString());
System.exit(1);
}
.NET API
LBMSourceAttributes sattr = null;
try {
sattr = new LBMSourceAttributes();
sattr.setValue("ume_retention_unique_confirmations", "10");
}
catch (LBMException ex) {
System.Console.Error.WriteLine ("Error creating source attribute: " + ex.toString());
System.Environment.Exit(1);
}
Source Event Handler <-
The Source Event Handler is a function callback initialized at source creation to provide source events to your application related to the operation of the source. The following source code examples illustrate the use of a source event handler for registration events. To accept other source events, additional case statements would be required, one for each additional source event. See also Persistence Events.
C API
int handle_src_event(
lbm_src_t *src,
int event,
void *ed,
void *cd)
{
switch (event) {
{
const char *errstr = (const char *)ed;
printf("Error registering source with UME store: %s\n", errstr);
}
break;
{
printf("UME store %u: %s registration success. RegID %u. Flags %x ",
if (reg->
flags & LBM_SRC_EVENT_UME_REGISTRATION_SUCCESS_EX_FLAG_OLD)
if (reg->
flags & LBM_SRC_EVENT_UME_REGISTRATION_SUCCESS_EX_FLAG_NOACKS)
printf("NOACKS ");
printf("\n");
}
break;
{
(lbm_src_event_ume__complete_ex_t *)ed;
printf("UME registration complete. SQN %x. Flags %x ",
if (reg->
flags & LBM_SRC_EVENT_UME_REGISTRATION_COMPLETE_EX_FLAG_QUORUM)
printf("QUORUM ");
printf("\n");
}
break;
{
const char *infostr = (const char *)ed;
printf("UME store: %s\n", infostr);
}
break;
default:
printf("Unknown source event %d\n", event);
break;
}
return 0;
}
JAVA API
public int onSourceEvent(Object arg, LBMSourceEvent sourceEvent)
{
switch (sourceEvent.type()) {
case LBM.SRC_EVENT_UME_REGISTRATION_ERROR:
System.out.println("Error registering source with UME store: "
+ sourceEvent.dataString());
break;
case LBM.SRC_EVENT_UME_REGISTRATION_SUCCESS_EX:
UMESourceEventRegistrationSuccessInfo reg =
sourceEvent.registrationSuccessInfo();
System.out.print("UME store " + reg.storeIndex() + ": " + reg.store()
+ " registration success. RegID " + reg.registrationId() + ". Flags "
+ reg.flags() + " ");
if (((reg.flags() & LBM.SRC_EVENT_UME_REGISTRATION_SUCCESS_EX_FLAG_OLD)) != 0) {
System.out.print("OLD[SQN " + reg.sequenceNumber() + "] ");
}
if (((reg.flags() & LBM.SRC_EVENT_UME_REGISTRATION_SUCCESS_EX_FLAG_NOACKS)) != 0) {
System.out.print("NOACKS ");
}
System.out.println();
break;
case LBM.SRC_EVENT_UME_REGISTRATION_COMPLETE_EX:
UMESourceEventRegistrationCompleteInfo regcomp =
sourceEvent.registrationCompleteInfo();
System.out.print("UME registration complete. SQN " + regcomp.sequenceNumber()
+ ". Flags " + regcomp.flags() + " ");
if ((regcomp.flags() & LBM.SRC_EVENT_UME_REGISTRATION_COMPLETE_EX_FLAG_QUORUM) != 0) {
System.out.print("QUORUM ");
}
System.out.println();
break;
case LBM.SRC_EVENT_UME_STORE_UNRESPONSIVE:
System.out.println("UME store: " + sourceEvent.dataString());
break;
...
default:
System.out.println("Unknown source event " + sourceEvent.type());
break;
}
return 0;
}
.NET API
public int onSourceEvent(Object arg, LBMSourceEvent sourceEvent)
{
switch (sourceEvent.type()) {
case LBM.SRC_EVENT_UME_REGISTRATION_ERROR:
System.Console.Out.WriteLine("Error registering source with UME store: "
+ sourceEvent.dataString());
break;
case LBM.SRC_EVENT_UME_REGISTRATION_SUCCESS_EX:
UMESourceEventRegistrationSuccessInfo reg = sourceEvent.registrationSuccessInfo();
System.Console.Out.Write("UME store " + reg.storeIndex() + ": " + reg.store()
+ " registration success. RegID " + reg.registrationId() + ". Flags "
+ reg.flags() + " ");
if (((reg.flags() & LBM.SRC_EVENT_UME_REGISTRATION_SUCCESS_EX_FLAG_OLD)) != 0) {
System.Console.Out.Write("OLD[SQN " + reg.sequenceNumber() + "] ");
}
if (((reg.flags() & LBM.SRC_EVENT_UME_REGISTRATION_SUCCESS_EX_FLAG_NOACKS)) != 0) {
System.Console.Out.Write("NOACKS ");
}
System.Console.Out.WriteLine();
break;
case LBM.SRC_EVENT_UME_REGISTRATION_COMPLETE_EX:
UMESourceEventRegistrationCompleteInfo regcomp =
sourceEvent.registrationCompleteInfo();
System.Console.Out.Write("UME registration complete. SQN " +
regcomp.sequenceNumber() + ". Flags " + regcomp.flags() + " ");
if ((regcomp.flags() & LBM.SRC_EVENT_UME_REGISTRATION_COMPLETE_EX_FLAG_QUORUM) != 0) {
System.Console.Out.Write("QUORUM ");
}
System.Console.Out.WriteLine();
break;
case LBM.SRC_EVENT_UME_STORE_UNRESPONSIVE:
System.Console.Out.WriteLine("UME store: " + sourceEvent.dataString());
break;
...
default:
System.Console.Out.WriteLine("Unknown source event " + sourceEvent.type());
break;
}
return 0;
}
Source Event Handler - Stability, Confirmation and Release <-
As shown in Source Event Handler above, the Source Event Handler can be expanded to handle more source events by adding additional case statements. The following source code examples show case statements to handle message stability events, delivery confirmation events and message release (reclaim) events. See also Persistence Events.
C API
{
printf("UME store %u: %s message stable. SQN %x (msgno %d). Flags %x ",
if (info->
flags & LBM_SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_INTRAGROUP_STABLE)
printf("IA ");
if (info->
flags & LBM_SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_INTERGROUP_STABLE)
printf("IR ");
if (info->
flags & LBM_SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_STABLE)
printf("STABLE ");
if (info->
flags & LBM_SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_STORE)
printf("STORE ");
printf("\n");
}
break;
{
printf("UME delivery confirmation. SQN %x, Receiver RegID %u (msgno %d). Flags %x ",
if (info->
flags & LBM_SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_UNIQUEACKS)
printf("UNIQUEACKS ");
if (info->
flags & LBM_SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_UREGID)
printf("UREGID ");
if (info->
flags & LBM_SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_OOD)
printf("OOD ");
if (info->
flags & LBM_SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_EXACK)
printf("EXACK ");
printf("\n");
}
break;
{
printf("UME message released - sequence number %x (msgno %d)\n",
}
break;
JAVA API
case LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX:
UMESourceEventAckInfo staInfo = sourceEvent.ackInfo();
System.out.print("UME store " + staInfo.storeIndex() + ": "
+ staInfo.store() + " message stable. SQN " + staInfo.sequenceNumber()
+ " (msgno " + staInfo.clientObject() + "). Flags "
+ staInfo.flags() + " ");
if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_INTRAGROUP_STABLE) != 0) {
System.out.print("IA ");
}
if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_INTERGROUP_STABLE) != 0) {
System.out.print("IR ");
}
if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_STABLE) != 0) {
System.out.print("STABLE ");
}
if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_STORE) != 0) {
System.out.print("STORE ");
}
System.out.println();
break;
case LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX:
UMESourceEventAckInfo cdelvinfo = sourceEvent.ackInfo();
System.out.print("UME delivery confirmation. SQN " + cdelvinfo.sequenceNumber()
+ ", RcvRegID " + cdelvinfo.receiverRegistrationId() + " (msgno "
+ cdelvinfo.clientObject() + "). Flags " + cdelvinfo.flags() + " ");
if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_UNIQUEACKS) != 0) {
System.out.print("UNIQUEACKS ");
}
if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_UREGID) != 0) {
System.out.print("UREGID ");
}
if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_OOD) != 0) {
System.out.print("OOD ");
}
if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_EXACK) != 0) {
System.out.print("EXACK ");
}
System.out.println();
break;
case LBM.SRC_EVENT_UME_MESSAGE_RECLAIMED:
System.out.println("UME message released - sequence number "
+ Long.toHexString(sourceEvent.sequenceNumber())
+ " (msgno "
+ Long.toHexString(((Integer)sourceEvent.clientObject()).longValue())
+ ")");
break;
.NET API
case LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX:
UMESourceEventAckInfo staInfo = sourceEvent.ackInfo();
System.Console.Out.Write("UME store " + staInfo.storeIndex() + ": "
+ staInfo.store() + " message stable. SQN " + staInfo.sequenceNumber()
+ " (msgno " + ((int)staInfo.clientObject()).ToString("x") + ").
Flags " + staInfo.flags() + " ");
if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_INTRAGROUP_STABLE) != 0) {
System.Console.Out.Write("IA ");
}
if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_INTERGROUP_STABLE) != 0) {
System.Console.Out.Write("IR ");
}
if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_STABLE) != 0) {
System.Console.Out.Write("STABLE ");
}
if ((staInfo.flags() & LBM.SRC_EVENT_UME_MESSAGE_STABLE_EX_FLAG_STORE) != 0) {
System.Console.Out.Write("STORE ");
}
System.Console.Out.WriteLine();
break;
case LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX:
UMESourceEventAckInfo cdelvinfo = sourceEvent.ackInfo();
System.Console.Out.Write("UME delivery confirmation. SQN " +
cdelvinfo.sequenceNumber()
+ ", RcvRegID " + cdelvinfo.receiverRegistrationId() + " (msgno "
+ ((int)cdelvinfo.clientObject()).ToString("x") + "). Flags " +
cdelvinfo.flags() + " ");
if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_UNIQUEACKS) != 0) {
System.Console.Out.Write("UNIQUEACKS ");
}
if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_UREGID) != 0) {
System.Console.Out.Write("UREGID ");
}
if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_OOD) != 0) {
System.Console.Out.Write("OOD ");
}
if ((cdelvinfo.flags() & LBM.SRC_EVENT_UME_DELIVERY_CONFIRMATION_EX_FLAG_EXACK) != 0) {
System.Console.Out.Write("EXACK ");
}
System.Console.Out.WriteLine();
break;
case LBM.SRC_EVENT_UME_MESSAGE_RECLAIMED:
System.Console.Out.WriteLine("UME message released - sequence number "
+ sourceEvent.sequenceNumber().ToString("x")
+ " (msgno "
+ ((int)sourceEvent.clientObject()).ToString("x")
+ ")");
break;
Mapping Your Message Numbers to Sequence Numbers <-
Some application developers want their publishing applications to know the UM-assigned topic-level sequence numbers assigned to outgoing messages.
The C API functions lbm_src_send_ex() and lbm_src_sendv_ex() allow you to expand the number of source events that will be delivered to your sending application, including a source event that informs your sender of the topic-level sequence numbers assigned to each message you send. In the case of a large message that requires fragmentation, the callback will tell you the starting and ending topic-level sequence numbers assigned to it.
The following two source code examples show how to:
-
Enable message sequence number information.
-
Handle sequence number source events to determine the application message number in the Source Event Handler.
C API - Enable Message Information
struct my_sqn_nums_s{
unsigned int first_sqn;
unsigned int last_sqn;
};
typedef struct my_sqn_nums_s my_sqn_nums_t;
my_sqn_nums_t msg_sqn_nums;
...
...
}
printf("first_sqn=%u, last_sqn=%u\n", msg_sqn_nums.first_sqn, msg_sqn_nums.last_sqn);
C API - Source Number Event Handler
int handle_src_event(
lbm_src_t *src,
int event,
void *ed,
void *cd)
{
switch (event) {
{
my_sqn_nums_t *sqn_nums = (my_sqn_nums_t *)info->
msg_clientd;
}
break;
...
}
return 0;
}
JAVA API - Enable Message Information
LBMSourceSendExInfo exinfo = new LBMSourceSendExInfo();
exinfo.setClientObject(new Integer(msgno));
exinfo.setFlags(LBM.SRC_SEND_EX_FLAG_SEQUENCE_NUMBER_INFO);
try {
src.send(message, msglen, 0, exinfo);
}
catch(UMENoRegException ex) {
}
JAVA API - Sequence Number Event Handler
public int onSourceEvent(Object arg, LBMSourceEvent sourceEvent)
{
switch (sourceEvent.type()) {
case LBM.SRC_EVENT_SEQUENCE_NUMBER_INFO:
{
LBMSourceEventSequenceNumberInfo info = sourceEvent.sequenceNumberInfo();
break;
}
...
}
return 0;
}
.NET API - Enable Message Information
.NET code is the same as Java (above).
Receiver Liveness Detection <-
As an extension to Confirmed Delivery, you can set receivers to send a keepalive to a source during a measured absence of delivery confirmations (due to traffic lapse). In the event that neither message reaches the source within a designated interval, or if the delivery confirmation TCP connection breaks down, the receiver is assumed to have "died". UM then notifies the publishing application via context event callback. This lets the publisher assign a new subscriber.
To use this feature, set these five configuration options:
- Note
- Smart Sources do not support liveness detection.
This specialized feature is not recommended for general use. If you are considering it, please note the following caveats:
-
Do not use in conjunction with a DRO.
-
There is a variety of potential network occurrences that can break or reset the TCP connection and falsely indicate the death of a receiver.
-
In cases where a receiver object is deleted while its context is not, the publisher may still falsely assume the receiver to be alive.
Other false receiver-alive assumptions could be caused by the following:
-
TCP connections can enter a half-open or otherwise corrupted state.
-
Failed TCP connections sometimes do not fully close, or experience objectionable delays before fully closing.
-
A switch or router failure along the path does not affect the TCP connection state.