Concepts Guide
Application Design Principles


UM Monitoring  <-

For an Ultra Messaging deployment, "monitoring" is the process of overseeing the operation of UM and the resources it uses to determine its health and performance.

Informatica strongly recommends that users of Ultra Messaging actively monitor its operation.

Monitoring is addressed in depth in the Operations Guide; see Monitoring Introduction. From an application design point of view, you should decide on a monitoring approach before you start coding your applications.

In particular, implement the recommendations related to the Application Log File.


Message Reception  <-

Applications receive messages from UM via application callback. The application registers its callback function with UM during the creation of the Receiver Object. As messages are received, the application's receiver callback function is called, passing in the received message.

Note: there are events other than message reception that can trigger calls to the application's receiver callback. Those other event types are not covered in this section (see lbm_msg_t_stct::type).

At a high level, there are three common approaches to handling received messages:

  1. Message is fully processed by the application's receiver callback, called by a context or XSP thread. This is typically the most efficient approach.
  2. Message is fully processed by the application's receiver callback, called by an event queue dispatch thread. This is a simple method to move message processing to a different thread.
  3. Message is retained by the receiver callback for further processing by a different application thread.

For example code demonstrating message reception best practices, see the API language:


C Message Reception  <-

C: Message Is Fully Processed by the Application's Receiver Callback, Called by a Context or XSP Thread

int my_receiver_callback(lbm_rcv_t *rcv, lbm_msg_t *msg, void *clientd)
{
switch (msg->type) {
/* Process "msg->len" bytes in "msg->data". */
...
break;
/* Handle other receiver events. */
} /* switch */
/* No need to delete message (C API deletes by default). */
return 0;
} /* my_receiver_callback */

Important rules regarding the receiver callback function when called by a context/XSP thread:

  • The function must return 0.
  • The passed-in message must not be modified.
  • The function should not perform any operation that might be time consuming or put the thread to sleep (block). This is because any delays in your callback prevents the context thread from servicing its sockets, increasing the risk of packet loss.
  • If the receiver callback sends a message, non-blocking sends must be used (see LBM_SRC_NONBLOCK). The code should be written to handle a send failure of LBM_EWOULDBLOCK.
  • It is not allowed to create/delete sources or receivers, or subscribe/unsubscribe from a Spectrum channel from a callback function executed by a context/XSP thread.
  • You may schedule UM timers.

If this is a Persistent receiver, see Persistence Message Consumption.

C: Message Is Fully Processed by the Application's Receiver Callback, Called by an Event Queue Dispatch Thread

The code is identical to the context/XSP thread case above.

The rules are similar, with some important differences (in bold):

  • The function must return 0.
  • The passed-in message must not be modified.
  • The application is permitted to perform operations that are time consuming and/or blocking, as long as the average message processing rate is greater than the average message sending rate. Any temporary delays will buffer messages in the event queue.
  • Blocking or non-blocking sends may be used, according to the application's preferences.
  • You may create/delete sources or receivers, or subscribe/unsubscribe from a Spectrum channel. However, the receiver which is delivering the current message must not be deleted.
  • You may schedule UM timers.
  • Informatica strongly recommends monitoring event queue length. See Event Queue Monitor for more information.

If this is a Persistent receiver, see Persistence Message Consumption.

C: Message Is Retained by the Receiver Callback for Further Processing by a Different Application Thread

int my_receiver_callback(lbm_rcv_t *rcv, lbm_msg_t *msg, void *clientd)
{
int err, more_processing_needed;
switch (msg->type) {
more_processing_needed = my_process_received_message(msg);
if (more_processing_needed) {
err = lbm_msg_retain(msg); /* Check and handle errors. */
my_save_message_for_more_processing(msg);
}
break;
/* Handle other receiver events. */
} /* switch */
return 0;
} /* my_receiver_callback */
...
void my_additional_processing(lbm_msg_t *msg)
{
int err;
/* Process "msg->len" bytes in "msg->data". */
...
/* Tell UM: finished with "msg". */
err = lbm_msg_delete(msg);
/* Check and handle errors. */
} /* my_additional_processing */

This demonstrates the use of:

By default, when a receiver callback returns, the UM message is implicitly deleted. To prevent that from happening, the message must be "retained". The application must ensure that every message retained is eventually deleted.

The my_save_message_for_more_processing() function is just whatever you use to transfer the message to your processing thread. The my_additional_processing() function is typically called by your thread to process the message.

The rules for the message processing code are the same as the event queue case (above).

If this is a Persistent receiver, see Persistence Message Consumption.


Java Message Reception  <-

Java: Message Is Fully Processed by the Application's Receiver Callback, Called by a Context or XSP Thread

public int onReceive(Object cbArg, LBMMessage msg)
{
try {
switch (msg.type())
{
case LBM.MSG_DATA:
ByteBuffer message = msg.dataBuffer();
long msgLen = msg.dataLength();
// Process "msgLen" bytes in "message".
...
// Tell UM: finished with "msg".
msg.dispose();
break;
/* Handle other receiver events. */
} /* switch */
} catch (Exception e) {
/* Handle exception. */
}
return 0;
} /* onReceive */

This demonstrates the use of:

These functions prevent unnecessary garbage and are strongly recommended by Informatica (see Zero Object Delivery).

Important rules regarding the receiver callback function when called by the context thread:

  • The function must return 0.
  • The function must not be allowed to pass an unhandled exception back into UM. For example, you could have your entire callback function enclosed in a large try/catch. (This is true of all UM callbacks, not just receiver.)
  • The passed-in message must not be modified.
  • The passed-in message must be disposed. In Java, every message must explicitly be disposed, to properly clean up the native memory. Do not assume that GC will clean it up.
  • The function should not perform any operation that might be time consuming or put the thread to sleep (block). This is because any delays in your callback prevents the context thread from servicing its sockets, increasing the risk of packet loss.
  • If the receiver callback sends a message, non-blocking sends must be used (see com::latencybusters::lbm::LBM::SRC_NONBLOCK). And the code should be written to handle a send failure of com::latencybusters::lbm::LBMEWouldBlockException.
  • It is not allowed to create/delete sources or receivers, or subscribe/unsubscribe from a Spectrum channel from a callback function executed by the context thread.
  • You may schedule UM timers.

Note the requirement to call msg.dispose() on every message. Prior to UM version 6.7, calling dispose() on every message was considered best practice, but was only required for certain use cases. UM version 6.7 introduced significant performance improvements with Java, but these improvements made calling msg.dispose() mandatory.

If this is a Persistent receiver, see Persistence Message Consumption.

Java: Message Is Fully Processed by the Application's Receiver Callback, Called by an Event Queue Dispatch Thread

The code is identical to the context thread case above.

The rules are similar, with some important differences (in bold):

  • The function must return 0.
  • The function must not be allowed to pass an unhandled exception back into UM. For example, you could have your entire callback function enclosed in a large try/catch. (This is true of all UM callbacks, not just receiver.)
  • The passed-in message must not be modified.
  • The passed-in message must be disposed. In Java, every message must explicitly be disposed, to properly clean up the native memory. Do not assume that GC will clean it up.
  • The application is permitted to perform operations that are time consuming and/or blocking, as long as the average message processing rate is greater than the average message sending rate. Any temporary delays will buffer messages in the event queue.
  • Blocking or non-blocking sends may be used, according to the application's preferences.
  • You may create/delete sources or receivers, or subscribe/unsubscribe from a Spectrum channel. However, the receiver which is delivering the current message should not be deleted.
  • You may schedule UM timers.
  • Informatica strongly recommends monitoring event queue length. See Event Queue Monitor for more information.

If this is a Persistent receiver, see Persistence Message Consumption.

Java: Message Is Retained by the Receiver Callback for Further Processing by a Different Application Thread

public void init_UM()
{
// During initialization, the UM context and receiver(s) are
// configured to use a recycler.
LBMObjectRecycler objRec = new LBMObjectRecycler();
...
// Preparing to create context - set up attribute.
LBMContextAttributes ctx_attr = new LBMContextAttributes();
ctx_attr.setObjectRecycler(objRec, null);
// Proceed with creating context, using ctx_attr.
...
// Preparing to create receiver - set up attribute.
LBMReceiverAttributes rcv_attr = new LBMReceiverAttributes();
rcv_attr.setObjectRecycler(objRec, null);
// Proceed with creating receiver, using rcv_attr.
...
}
...
public int onReceive(Object cbArg, LBMMessage msg)
{
try {
switch (msg.type())
{
case LBM.MSG_DATA:
msg.promote(); // Tell UM: not finished with "msg".
saveMessageForMoreProcessing(msg);
break;
/* Handle other receiver events. */
} /* switch */
} catch (Exception e) {
/* Handle exception. */
}
return 0;
} /* onReceive */
...
public int myAdditonalProcessing(LBMMessage msg)
{
ByteBuffer message = msg.dataBuffer();
long msgLen = msg.dataLength();
// Process "msgLen" bytes in "message".
...
// Tell UM: finished with "msg".
msg.dispose();
objRec.doneWithMessage(msg);
} /* myAdditonalProcessing */

This demonstrates the use of:

These functions prevent unnecessary garbage and are strongly recommended by Informatica (see Zero Object Delivery).

The mySaveMessageForMoreProcessing() function is just whatever you use to transfer the message to your processing thread. The myAdditionalProcessing() function is typically called by you when you are ready to complete processing of the message.

The rules for the message processing code are the same as the event queue case. However, the application must ensure that every message promoted is eventually disposed.

If this is a Persistent receiver, see Persistence Message Consumption.


.NET Message Reception  <-

Except where indicated, .NET coding for message reception is identical to that of Java.

Note
Historically, .NET programs that receive UM messages were not required to call the message object's dispose() method. However, it is now strongly recommended. Not calling dispose() will introduce significant latency outliers (jitter) when GC runs, and also makes persistence acknowledgements to the Store non-deterministic. Finally, in the future, performance improvements for .NET might require the use of "dispose()". Informatica strongly recommends that .NET subscribers call "dispose()" for every message.

.NET: Message Is Fully Processed by the Application's Receiver Callback, Called by a Context or XSP Thread

public int onReceive(Object cbArg, LBMMessage msg)
{
try {
switch (msg.type())
{
case LBM.MSG_DATA:
byte * message = msg.dataPointer(); // Different from java!
uint msgLen = msg.length(); // Different from java!
// Process "msgLen" bytes in "message".
...
// Tell UM: finished with "msg".
msg.dispose();
break;
/* Handle other receiver events. */
} /* switch */
} catch (Exception e) {
/* Handle exception. */
}
return 0;
} /* onReceive */

This demonstrates the use of:

These functions prevent unnecessary garbage and are strongly recommended by Informatica (see Zero Object Delivery).

Important rules regarding the receiver callback function when called by the context thread:

  • The function must return 0.
  • The function must not be allowed to pass an unhandled exception back into UM. For example, you could have your entire callback function enclosed in a large try/catch. (This is true of all UM callbacks, not just receiver.)
  • The passed-in message must not be modified.
  • The passed-in message must be disposed. In .NET, every message should explicitly be disposed, to properly clean up the native memory. Do not assume that GC will clean it up.
  • The function should not perform any operation that might be time consuming or put the thread to sleep (block). This is because any delays in your callback prevents the context thread from servicing its sockets, increasing the risk of packet loss.
  • If the receiver callback sends a message, non-blocking sends must be used (see SRC_NONBLOCK()). And the code should be written to handle a send failure of LBMEWouldBlockException().
  • It is not allowed to create/delete sources or receivers, or subscribe/unsubscribe from a Spectrum channel from a callback function executed by the context thread.
  • You may schedule UM timers.

If this is a Persistent receiver, see Persistence Message Consumption.

.NET: Message Is Fully Processed by the Application's Receiver Callback, Called by an Event Queue Dispatch Thread

The code is identical to the context thread case above.

The rules are similar, with some important differences (in bold):

  • The function must return 0.
  • The function must not be allowed to pass an unhandled exception back into UM. For example, you could have your entire callback function enclosed in a large try/catch. (This is true of all UM callbacks, not just receiver.)
  • The passed-in message must not be modified.
  • The passed-in message must be disposed. In .NET, every message should explicitly be disposed, to properly clean up the native memory. Do not assume that GC will clean it up.
  • The application is permitted to perform operations that are time consuming and/or blocking, as long as the average message processing rate is greater than the average message sending rate. Any temporary delays will buffer messages in the event queue.
  • Blocking or non-blocking sends may be used, according to the application's preferences.
  • You may create/delete sources or receivers, or subscribe/unsubscribe from a Spectrum channel. However, the receiver which is delivering the current message should not be deleted.
  • You may schedule UM timers.
  • Informatica strongly recommends monitoring event queue length. See Event Queue Monitor for more information.

If this is a Persistent receiver, see Persistence Message Consumption.

.NET: Message Is Retained by the Receiver Callback for Further Processing by a Different Application Thread

public void init_UM()
{
// During initialization, the UM context and receiver(s) must be
// configured to use a recycler.
LBMObjectRecycler objRec = new LBMObjectRecycler();
...
// Preparing to create context - set up attribute.
LBMContextAttributes ctx_attr = new LBMContextAttributes();
ctx_attr.setObjectRecycler(objRec, null);
// Proceed with creating context, using ctx_attr.
...
// Preparing to create receiver - set up attribute.
LBMReceiverAttributes rcv_attr = new LBMReceiverAttributes();
rcv_attr.setObjectRecycler(objRec, null);
// Proceed with creating receiver, using rcv_attr.
...
}
...
public int onReceive(Object cbArg, LBMMessage msg)
{
try {
switch (msg.type())
{
case LBM.MSG_DATA:
msg.promote();
// Tell UM: not finished with "msg".
saveMessageForMoreProcessing(msg);
break;
/* Handle other receiver events. */
} /* switch */
} catch (Exception e) {
/* Handle exception. */
}
return 0;
} /* onReceive */
...
public int myAdditonalProcessing(LBMMessage msg)
{
byte * message = msg.dataPointer(); // Different from java!
uint msgLen = msg.length(); // Different from java!
// Process "msgLen" bytes in "message".
...
// Tell UM: finished with "msg".
msg.dispose();
objRec.doneWithMessage(msg);
} /* myAdditonalProcessing */

This demonstrates the use of:

These functions prevent unnecessary garbage and are strongly recommended by Informatica (see Zero Object Delivery).

The mySaveMessageForMoreProcessing() function is just whatever you use to transfer the message to your processing thread. The myAdditionalProcessing() function is typically called by you when you are ready to complete processing of the message.

The rules for the message processing code are the same as the event queue case. However, the application must ensure that every message promoted is eventually disposed.

If this is a Persistent receiver, see Persistence Message Consumption.