Concepts Guide
|
Except where otherwise indicated, the features described in this section are available in the UMS, UMP, and UMQ products.
As of UM version 6.11, a new receive-side object is available to the user: the Transport Services Provider Object.
The earlier feature, Multi-Transport Threads (MTT), is removed from UM in favor of XSP.
By default, a UM context combines all network data reception into a single context thread. This thread is responsible for reception and processing of application messages, topic resolution, and immediate message traffic (UIM and MIM). The context thread is also used for processing timers. This single-threaded model conserves CPU core resources, and can simplify application design. However, it can also introduce significant latency outliers (jitter) if a time-sensitive user message is waiting behind, say, a topic resolution message, or a timer callback.
Using an XSP object, an application can reassign the processing of a subscribed Transport Session to an independent thread. This allows concurrent processing of received messages with topic resolution and timers, and even allows different groups Transport Sessions to be processed concurrently with each other.
By default, when an XSP object is created, UM creates a new thread associated with the XSP. Alternatively, the XSP can be created with operational mode "sequential", which gives the responsibility of thread creation to the application. Either way, the XSP uses its independent thread to read data from the sockets associated with one or more subscribed Transport Sessions. That thread then delivers received messages to the application via a normal receive application callback function.
Creation of an XSP does not by itself cause any receiver Transport Sessions to be assigned to it. Central to the use of XSPs is an application-supplied mapping callback function which tells UM which XSP to associate with subscribed Transport Sessions as they are discovered and joined. This callback allows the application to examine the newly-joined Transport Session, if desired. Then the callback returns, informing UM which XSP, if any, to assign the receiver Transport Session to.
Conceptually, an application designer might want to assign the reception and processing of received data to XSPs on a topic basis. This is not always possible. The XSP thread must process received data on a socket basis, and sockets map to Transport Sessions. As mentioned in UM Transports, a publishing application maps one or more topic-based sources to a Transport Session.
Consider the following example:
Publisher A and B are two separate application instances, both of which create a source for topic "X". A subscriber application might create two XSPs and assign one Transport Session to each. In this case, you have two independent threads delivering messages to the subscriber's receiver callback, which may not be what the developer wanted. If the developer wants topic X to be serialized, a single XSP should be created and mapped to both Transport Sessions:
Now let's introduce a second topic. The developer might want to create two XSPs so that each topic will be handled by an independent thread. However, this is not possible, given the way that the topics are mapped to Transport Sessions in the following example:
In this case, XSP 1 is delivering both topics X and Y from Publisher A, and XSP 2 is delivering topics X and Y from Publisher B. Once again, the receiver callback for topic X will be called by two independent threads, which is not desired.
The only way to achieve independent processing of topics is to design the publishers to map their topics to Transport Sessions carefully. For example:
When contexts are used single-threaded, the application programmer can assume serialization of event delivery to the application callbacks. This can greatly simplify the design of applications, at the cost of added latency outliers (jitter).
When XSPs are used to provide multi-threaded receivers, care must be taken in application design to account for potential concurrent calls to application callbacks. This is especially true if multiple subscribed Transport Sessions are assigned different XSPs, as demonstrated in XSP Handles Transport Sessions, Not Topics.
Even in the most simple case, where a single XSP is created and used for all subscribed Transport Sessions, there are still events generated by the main context thread which can be called concurrently with XSP callbacks. Reception of MIM or UIM messages, scheduled timers, and some topic resolution-related callbacks all come from the main context thread, and can all be invoked concurrently with XSP callbacks.
Threading Example: Message Timeout
Consider as an example a common timer use case: message timeout. Application A expects to receive messages for topic "X" every 5 seconds. If 10 seconds pass without a message, the application assumes that the publisher for "X" has exited, so it cleans up internal state and deletes the UM receiver object. Each time a message is received, the current timer is cancelled and re-created for 10 seconds.
Without XSPs, this can be easily coded since message reception and timer expiration events are serialized. The timer callback can clean up and delete the receiver, confident that no receiver events might get delivered while this is in progress.
However, if the Transport Session carrying topic "X" is assigned to an independent XSP thread, message reception and timer expiration events are no longer serialized. Publisher of "X" might send it's message on-time, but a temporary network outage could delay its delivery, introducing a race condition between message delivery and timer expiration. Consider the case where the timer expiration is a little ahead of the message callback. The timer callback might clean up application state which the message callback will attempt to use. This could lead to unexpected behavior, possibly including segmentation faults.
In this case, proper sequencing of operations is critical. The timer should delete the receiver first. While inside the receiver delete API, the XSP might deliver messages to the application. However, once the receiver delete API returns, it is guaranteed that the XSP is finished making receiver callbacks.
Note that in this example case, if the message receive callback attempts to cancel the timer, the cancel API will return an error. This is because the timer has already expired and the execution of the callback has begun, and is inside the receiver delete API. The message receive callback needs to be able to handle this sequence, presumably by not re-scheduling the timer.
This section provides simplified C code fragments that demonstrate some of the XSP-related API calls. For full examples of XSP usage, see lbmrcvxsp.c (for C) and lbmrcvxsp.java (for Java).
The common sequence of operations during application initialization is minimally shown below. In the code fragments below, error detection and handling are omitted for clarity.
Create a context attribute object and set the transport_mapping_function (context) option to point at the application's XSP mapping callback function using the structure lbm_transport_mapping_func_t.
Create the context.
Create XSPs using lbm_xsp_create(). In this example, only a single XSP is created.
Note that the application can optionally pass in a context attribute object and an XSP attribute object. The context attribute is because XSP is implemented as a sort of reduced-function sub-context, and so it is possible to modify context options for the XSP. However, this is rarely needed since the default action is for the XSP to inherit all the configuration of the main context.
Create a receiver for topic "X".
Event queues may also be used with XSP-assigned Transport Sessions.
At this point, when the main context discovers a source for topic "X", it will proceed to join the Transport Session. It will call the application's app_xsp_mapper_callback() function, which is minimally this:
This minimal callback simply returns the XSP that was created during initialization (the "clientd" can be helpful for that). By assigning all receiver Transport Sessions to the same XSP, you have effectively separated message processing from UM housekeeping tasks, like processing of topic resolution and timers. This can greatly reduce latency outliers.
As described in XSP Handles Transport Sessions, Not Topics, some users want to have multiple XSPs and assign the Transport Sessions to XSPs according to application logic. Note that the passed-in lbm_new_transport_info_t structure contains information about the Transport Session, such as the IP address of the sender. However, this structure does not contain topic information. Applications can use the resolver's source notification callback via the resolver_source_notification_function (context) attribute option to associate topics with source strings.
As of UM 6.12, XSP supports persistent receivers.
When an XSP object is created, an XSP attribute object can be supplied to set XSP options. The XSP options are:
To create and manipulate an XSP attribute object, see:
To delete an XSP, all receivers associated with Transport Sessions handled by that XSP must first be deleted. Then the XSP can be deleted using lbm_xsp_delete().
To register and cancel an application file descriptor with an XSP, see:
There are some restrictions and limitations on the XSP feature.
The only transport types currently supported are LBT-RM, LBT-RU, and TCP. IPC, SMX, DBL, and BROKER are not supported with XSPs at this time.
For a persistent receiver assigned to an XSP, the user is not allowed to disable ume_proactive_keepalive_interval (context).
The ULB feature is not currently supported.
This section introduces the use of Ultra Messaging Late Join in default and specialized configurations. See Late Join Options for more information.
The Late Join feature enables newly created receivers to receive previously transmitted messages. Sources configured for Late Join maintain a retention buffer (not to be confused with a transport retransmission window), which holds transmitted messages for late-joining receivers.
A Late Join operation follows the following sequence:
The source's retention buffer's is not pre-allocated and occupies an increasing amount of memory as the source sends messages and adds them to the buffer. If a retention buffer grows to a size equal to the value of the source configuration option, retransmit_retention_size_threshold (source), the source deletes older messages as it adds new ones. The source configuration option retransmit_retention_age_threshold (source), controls message deletion based on message age.
UM uses control-structure overhead memory on a per-message basis for messages held in the retention buffer, in addition to the retention buffer's memory. Such memory usage can become significantly higher when retained messages are smaller in size, since more of them can then fit in the retention buffer.
With the UMP/UMQ products, late Join can be implemented in conjunction with the Persistent Store, however in this configuration, it functions somewhat differently from Streaming. After a late-Join-enabled receiver has been created, resolved a topic, and become registered with a store, it may then request older messages. The store unicasts the retransmission messages. If the store does not have these messages, it requests them of the source (assuming option retransmission-request-forwarding is enabled), thus initiating Late Join.
To implement Late Join with default options, set the Late Join configuration options to activate the feature on both a source and receiver in the following manner.
Create a configuration file with source and receiver Late Join activation options set to 1. For example, file cfg1.cfg containing the two lines:
Run an application that starts a Late-Join-enabled source. For example:
Wait a few seconds, then run an application that starts a Late-Join-enabled receiver. For example:
The output for each should closely resemble the following:
LBMSRC
LBMRCV
Note that the source only retained 1 Late Join message (due to default retention settings) and that this message appears as a retransmit (-RX-). Also note that it is possible to sometimes receive 2 RX messages in this scenario (see Retransmitting Only Recent Messages.)
To receive more than one or two Late Join messages, increase the source's retransmit_retention_size_threshold (source) from its default value of 0. Once the buffer exceeds this threshold, the source allows the next new message entering the retention buffer to bump out the oldest one. Note that this threshold's units are bytes (which includes a small overhead per message).
While the retention threshold endeavors to keep the buffer size close to its value, it does not set hard upper limit for retention buffer size. For this, the retransmit_retention_size_limit (source) configuration option (also in bytes) sets this boundary.
Follow the steps below to demonstrate how a source can retain about 50MB of messages, but no more than 60MB:
Create a second configuration file (cfg2.cfg) with the following options:
lbmrcv -c cfg2.cfg -v topicName
. The output for each should closely resemble the following: LBMSRC
LBMRCV
Note that lbmrcv received live messages with sequence numbers 7, 6, and 5, and RX messages going from 4 all the way back to Sequence Number 0.
Thus far we have worked with only source late join settings, but suppose that you want to receive only the last 10 messages. To do this, configure the receiver option retransmit_request_maximum (receiver) to set how many messages to request backwards from the latest message.
Follow the steps below to set this option to 10.
Add the following line to cfg2.cfg and rename it cfg3.cfg:
Run:
lbmrcv -c cfg3.cfg -v topicName
. The output for each should closely resemble the following. LBMSRC
LBMRCV
Note that 11, not 10, retransmits were actually received. This can happen because network and timing circumstances may have one RX already in transit while the specific RX amount is being processed. (Hence, it is not possible to guarantee one and only one RX message for every possible Late Join recovery.)
Suppose you have a persistent receiver that comes up at midday and must gracefully catch up on the large number of messages it has missed. The following discussion explains the relevant Late Join options and how to use them. (The discussion also applies to streaming-based late join, but since streaming sources must hold all retained messages in memory, there are typically far fewer messages available.)
Option: retransmit_request_outstanding_maximum (receiver)
When a receiver comes up and begins requesting Late Join messages, it does not simply request messages starting at Sequence Number 0 through 1000000. Rather, it requests the messages a little at a time, depending upon how option retransmit_request_outstanding_maximum (receiver) is set. For example, when set to the default of 10, the receiver sends requests the first 10 messages (Sequence Number 0 - 9). Upon receiving Sequence Number 0, it then requests the next message (10), and so on, limiting the number of outstanding unfulfilled requests to 10.
Note that higher for values retransmit_request_outstanding_maximum can increase the rate of RXs received, which can reduce the time required for receiver recovery. However, this can lead to heavy loading of the Store, potentially making it unable to sustain the incoming data rate.
Also, be aware that increasing retransmit_request_outstanding_maximum may require a corresponding increase to retransmit_request_interval (receiver). Otherwise you can have a situation where messages time out because it takes a store longer than retransmit_request_interval to process all retransmit_request_outstanding_maximum requests. When this happens, you can see messages needlessly requested and sent many times (generates warnings to the receiver application log file).
Option: retransmit_message_caching_proximity (receiver)
When sequence number delivery order is used, long recoveries of active sources can create receiver memory cache problems due to the processing of both new and retransmitted messages. This option provides a method to control caching and cache size during recovery.
It does this by comparing the option value (default 2147483647) to the difference between the newest (live) received sequence number and the latest received RX sequence number. If the difference is less than the option's value, the receiver caches incoming live new messages. Otherwise, new messages are dropped and not cached (with the assumption that they can be requested later as retransmissions).
For example, as shown in the diagram below, a receiver may be receiving both live streaming messages (latest, #200) and catch-up retransmissions (latest, #100). The difference here is 100. If retransmit_message_caching_proximity (receiver) is 75, the receiver caches the live messages and will deliver them when it is all caught up with the retransmissions. However, if this option is 150, streamed messages are dropped and later picked up again as a retransmission.
The default value of this option is high enough to still encourage caching most of the time, and should be optimal for most receivers.
If your source streams faster than it retransmits, caching is beneficial, as it ensures new data are received only once, thus reducing recovery time. If the source retransmits faster than it streams, which is the optimal condition, you can lower the value of this option to use less memory during recovery, with little performance impact.
Off-Transport Recovery (OTR) is a lost-message-recovery feature that provides a level of hedging against the possibility of brief and incidental unrecoverable loss at the transport level or from a UM Router. This section describes the OTR feature.
When a transport cannot recover lost messages, OTR engages and looks to the source for message recovery. It does this by accessing the source's retention buffer (used also by the Late Join feature) to re-request messages that no longer exist in a transport's transmission window, or other places such as a Persistent Store or redundant source.
OTR functions in a manner very similar to that of Late Join, but differs mainly in that it activates in message loss situations rather than following the creation of a receiver, and shares only the late_join (source) option setting.
Upon detecting loss, a receiver initiates OTR by sending repeated, spaced, OTR requests to the source, until it recovers lost messages or a timeout period elapses.
OTR operates independently from transport-level recovery mechanisms such as NAKs for LBT-RU or LBT-RM. When you enable OTR for a receiver with use_otr (receiver), the otr_request_initial_delay (receiver) period starts as soon as the Delivery Controller detects a sequence gap. If the gap is not resolved by the end of the delay interval, OTR recovery initiates. OTR recovery can occur before, during or after transport-level recovery attempts.
When a receiver initiates OTR, the intervals between OTR requests increases twofold after each request, until the maximum interval is reached (assuming the receiver is still waiting to receive the retransmission). You use configuration options otr_request_minimum_interval (receiver) and otr_request_maximum_interval (receiver) to set the initial (minimum) and maximum intervals, respectively.
The source retransmits lost messages to the recovered receiver via unicast.
When sequence number delivery order is used and a gap of missing messages occurs, a receiver buffers the new incoming messages while it attempts to recover the earlier missing ones. Long recoveries of actively streaming sources can cause excessive receiver cache memory growth due to the processing of both new and retransmitted messages. You can control caching and cache size during recovery with options otr_message_caching_threshold (receiver) and retransmit_message_caching_proximity (receiver).
The option otr_message_caching_threshold (receiver) sets the maximum number of messages a receiver can buffer. When the number of cached messages hits this threshold, new streamed messages are dropped and not cached, with the assumption that they can be requested later as retransmissions.
The retransmit_message_caching_proximity (receiver), which is also used by Late Join (see retransmit_message_caching_proximity (receiver)), turns off this caching if there are too many messages to buffer between the last delivered message and the currently streaming messages.
Both of these option thresholds must be satisfied before caching resumes.
With the UMP/UMQ products, you can implement OTR in conjunction with the Persistent Store, however in this configuration, it functions somewhat differently from Streaming. If an OTR-enabled receiver registered with a store detects a sequence gap in the live stream and that gap is not resolved by other means within the next otr_request_initial_delay (receiver) period, the receiver requests those messages from the store(s). If the store does not have some of the requested messages, the receiver requests them from the source. Regardless of whether the messages are recovered from a store or from the source, OTR delivers all recovered messages with the LBM_MSG_OTR flag, unlike Late Join, which uses the LBM_MSG_RETRANSMIT flag.
This section introduces the use of Transport Layer Security (TLS), sometimes known by its older designation Secure Sockets Layer (SSL).
The goal of the Ultra Messaging (UM) TLS feature is to provide encrypted transport of application data. TLS supports authentication (through certificates), data confidentiality (through encryption), and data integrity (ensuring data are not changed, removed, or added-to). UM can be configured to apply TLS security measures to all Streaming and/or Persisted TCP communication, including UM Router peer links. Non-TCP communication is not encrypted (e.g. topic resolution).
TLS is a family of standard protocols and algorithms for securing TCP communication between a client and a server. It is sometimes referred as "SSL", which technically is the name of an older (less secure) version of the protocol. Over the years, security researchers (and hackers) have discovered flaws in SSL/TLS. However, the vast majority of the widely publicized security vulnerabilities have been flaws in the implementations of TLS, not in the recent TLS protocols or algorithms themselves. As of UM version 6.9, there are no known security weaknesses in TLS version 1.2, the version used by UM.
TLS is generally implemented by several different software packages. UM makes use of OpenSSL, a widely deployed and actively maintained open-source project.
TLS authentication uses X.509 digital certificates. Certificate creation and management is the responsibility of the user. Ultra Messaging's usage of OpenSSL expects PEM encoded certificates. There are a variety of generally available tools for converting certificates between different encodings. Since user infrastructures vary widely, the UM package does not include tools for creation, formatting, or management of certificates.
Although UM is designed as a peer-to-peer messaging system, TLS has the concept of client and server. The client initiates the TCP connection and the server accepts it. In the case of a TCP source, the receiver initiates and is therefore the client, with the source (sender of data) being the server. However, with unicast immediate messages, the sender of data is the client, and the recipient is the server. Due to the fact that unicast immediate messages are used by UM for internal control and coordination, it is typically not possible to constrain a given application to only operate as a pure client or pure server. For this reason, UM requires all applications participating in encryption to have a certificate. Server-only authentication (i.e. anonymous client, as is used by web browsers) is not supported. It is permissible for groups of processes, or even all processes, to share the same certificate.
A detailed discussion of certificate usage is beyond the scope of the Ultra Messaging documentation.
The TLS protocol was designed to allow for a high degree of backwards compatibility. During the connection establishment phase, the client and server perform a negotiation handshake in which they identify the highest common versions of various security options. For example, an old web browser might pre-date the introduction of TLS and only support the older SSL protocol. OpenSSL is often configured to allow clients and servers to "negotiate down" to those older, less-secure protocols or algorithms.
Ultra Messaging has the advantage of not needing to communicate with old versions of SSL or TLS. UM's default configuration directs OpenSSL to require both the client and the server to use protocols and algorithms which were highly regarded, as of UM's release date. If vulnerabilities are discovered in the future, the user can override UM's defaults and chose other protocols or algorithms.
When a TLS connection is initiated, a handshake takes place prior to application data encryption. Once the handshake is completed, the CPU effort required to encrypt and decrypt application data is minimal. However, the handshake phase involves the use of much less efficient algorithms.
There are two factors under the user's control, which greatly affect the handshake efficiency: the choice of cipher suite and the key length. We have seen an RSA key of 8192 bits take 4 seconds of CPU time on a 1.3GHz SparcV9 processor just to complete the handshake for a single TLS connection.
Users should make their choices with an understanding of the threat profiles they are protecting against. For example, it is estimated that a 1024-bit RSA key can be broken in about a year by brute force using specialized hardware (see http://www.tau.ac.il/~tromer/papers/cbtwirl.pdf). This may be beyond the means of the average hacker, but well within the means of a large government. RSA keys of 2048 bits are generally considered secure for the foreseeable future.
TLS is enabled on a context basis. When enabled, all Streaming and Persistence related TCP-based communication into or out of the context is encrypted by TLS. A context with TLS enabled will not accept source creation with transports other than TCP.
Subscribers will only successfully receive data if the receiver's context and the source's context share the same encryption settings. A receiver created in an encrypted enabled context will ignore topic resolution source advertisements for non-encrypted sources, and will therefore not subscribe. Similarly, a receiver created in a non-encrypted context will ignore topic resolution source advertisements for encrypted sources. Topic resolution queries are also ignored by mismatched contexts. No warning will be logged when these topic resolution datagrams are ignored, but each time this happens, the context-level statistic tr_dgrams_dropped_type is incremented.
TLS is applied to unicast immediate messages as well, as invoked either directly by the user, or internally by functions like late join, request/response, and Persistence-related communication between sources, receivers, and stores.
Brokered Queuing using AMQP does not use the UM TLS feature. A UM brokered context does not allow TLS to be enabled.
The tls_cipher_suites (context) configuration option defines the list of one or more (comma separated) cipher suites that are acceptable to this context. If more than one is supplied, they should be in descending order of preference. When a remote context negotiates encrypted TCP, the two sides must find a cipher suite in common, otherwise the connection will be canceled.
OpenSSL uses the cipher suite to define the algorithms and key lengths for encrypting the data stream. The choice of cipher suite is critical for ensuring the security of the connection. To achieve a high degree of backwards compatibility, OpenSSL supports old cipher suites which are no longer considered secure. The user is advised to use UM's default suite.
OpenSSL follows its own naming convention for cipher suites. See https://www.openssl.org/docs/man1.1.0/man1/ciphers.html for a list of valid suite names (the ones with dashes) and the equivalent IANA names (with underscores). The UM configuration should use the OpenSSL-style names (with dashes).
TLS is designed to encrypt a TCP connection, and works with TCP-based persisted data Transport Sessions and control traffic. However, TLS is not intended to encrypt data at rest. When a Persistent Store is used with the UM TLS feature, the user messages are written to disk in plaintext form, not encrypted.
The UM TLS feature does not apply to the AMQP connection to the brokered queue. UM does not currently support security on the AMQP connection.
However, the ULB form of queuing does not use a broker. For ULB sources that are configured for TCP, the UM TLS feature will encrypt the application data.
When a UM Router is used to route messages across Topic Resolution Domains (TRDs), be aware that the TLS session is terminated at the UM Router's proxy receiver/source. Because each endpoint portal on a UM Router is implemented with its own context, care must be taken to ensure end-to-end security. It is possible to have a TLS source publishing in one TRD, received by a UM Router (via an endpoint portal also configured for TLS), and re-published to a different TRD via an endpoint portal configured with a non-encrypted context. This would allow a non-encrypted receiver to access messages that the source intended to be encrypted. As a message is forwarded through a UM Router network, it does not propagate the security settings of the originator, so each portal needs to be appropriately encrypted. The user is strongly encouraged to configure ALL portals on an interconnected network of UM Routers with the same encryption settings.
The encryption feature is extended to UM Router peer links, however peer links are not context-based and are not configured the same way. The following XML elements are used by the UM Router to configure a peer link:
As with sources and receivers, the portals on both sides of a peer link must be configured for compatible encryption settings.
Notice that there is no element corresponding to the context option tls_compression_negotiation_timeout (context). The UM Router peer link's negotiation timeout is hard-coded to 5 seconds.
See the UM Router configuration DTD for details.
Many users have advanced network equipment (switches/routers), which transparently compress packets as they traverse the network. This compression is especially valued to conserve bandwidth over long-haul WAN links. However, when packets are encrypted, the network compressors are typically not able to reduce the size of the data. If the user desires UM messages to be compressed and encrypted, the data needs to be compressed before it is encrypted.
The UM compression feature (see Compressed TCP) accomplishes this. When both TLS and compression are enabled, the compression is applied to user data first, then encryption.
Be aware that there can be information leakage when compression is applied and an attacker is able to inject data of known content over a compressed and encrypted session. For example, this leakage is exploited by the CRIME attack, albeit primarily for web browsers. Users must weigh the benefits of compression against the potential risk of information leakage.
Version Interoperability
It is not recommended to mix pre-6.9 contexts with encrypted contexts on topics of shared interest. If a process with a pre-6.9 version of UM creates a receiver, and another process with UM 6.9 or beyond creates a TLS source, the pre-6.9 receiver will attempt to join the TLS source. After a timeout, the handshake will fail and the source will disconnect. The pre-6.9 receiver will retry the connection, leading to flapping.
Note that in the reverse situation, a 6.9 TLS receiver will simply ignore a pre-6.9 source. I.e. no attempt will be made to join, and no flapping will occur.
For UM versions 6.9 through 6.12, the UM dynamic library was linked with OpenSSL in such a way as to require its presence in order for UM to load and initialize. I.e. it was a load-time dependency.
As of UM version 6.12.1, the linkage with OpenSSL is made at run-time. If encryption features are not used, the OpenSSL libraries do not need to be present on the system. UM is able to initialize without OpenSSL.
There are two UM features which utilize encryption provided by OpenSSL:
This section introduces the use of Compression with TCP connections.
The goal of the Ultra Messaging (UM) compression feature is to decrease the size of transmitted application data. UM can be configured to apply compression to all Streaming and/or Persisted TCP communication.
Non-TCP communication is not compressed (e.g. topic resolution).
Compression is generally implemented by any of several different software packages. UM makes use of LZ4, a widely deployed open-source project.
While the UM compression feature is usable for TCP-based sources and receivers, it is possibly most useful when applied to UM Router peer links.
Compression is enabled on a context basis. When enabled, all Streaming and Persistence related TCP-based communication into or out of the context is compressed by LZ4. A context with compression enabled will not accept source creation with transports other than TCP.
Subscribers will only successfully receive data if the receiver's context and the source's context share the same compression settings. A receiver created in a compression-enabled context will ignore topic resolution source advertisements for non-compressed sources, and will therefore not subscribe. Similarly, a receiver created in an non-compressed context will ignore topic resolution source advertisements for compressed sources. Topic resolution queries are also ignored by mismatched contexts. No warning will be logged when these topic resolution datagrams are ignored, but each time this happens, the context-level statistic tr_dgrams_dropped_type is incremented.
Compression is applied to unicast immediate messages as well, as invoked either directly by the user, or internally by functions like late join, request/response, and Persistence-related communication between sources, receivers, and stores.
Brokered Queuing using AMQP does not use the UM compression feature. A UM brokered context does not allow compression to be enabled.
The compression-related configuration options used by the Ultra Messaging library are:
Compression is designed to compress a data Transport Session. It is not intended to compress data at rest. When a Persistent Store is used with the UM compression feature, the user messages are written to disk in uncompressed form.
The UM compression feature does not apply to the AMQP connection to the brokered queue. UM does not currently support compression on the AMQP connection.
However, the ULB form of queuing does not use a broker. For ULB sources that are configured for TCP, the UM compression feature will compress the application data.
When a UM Router is used to route messages across Topic Resolution Domains (TRDs), be aware that the compression session is terminated at the UM Router's proxy receiver/source. Because each endpoint portal on a UM Router is implemented with its own context, care must be taken to ensure end-to-end compression (if desired). As a message is forwarded through a UM Router network, it does not propagate the compression setting of the originator, so each portal needs to be appropriately compressed.
Possibly the most-useful application of the UM compression feature is not TCP sources, but rather UM Router peer links. The compression feature is extended to UM Router peer links, however peer links are not context-based and are not configured the same way. The following XML elements are used by the UM Router to configure a peer link:
As with sources and receivers, the portals on both sides of a peer link must be configured for the same compression setting.
Notice that there is no element corresponding to the context option tls_compression_negotiation_timeout (context). The UM Router peer link's negotiation timeout is hard-coded to 5 seconds.
See the UM Router configuration DTD for details.
See TLS and Compression.
It is not recommended to mix pre-6.9 contexts with compressed contexts on topics of shared interest. As mentioned above, if a compressed and an uncompressed context connect via TCP, the connection will fail and retry, resulting in flapping.
This section introduces the use of high-resolution timestamps with LBT-RM.
The Ultra Messaging (UM) high-resolution message timestamp feature leverages the hardware timestamping function of certain Solarflare network interface cards (NICs) to measure sub-microsecond times that packets are transmitted and received. Solarflare's NICs and Onload kernel-bypass driver implement PTP to synchronize timestamps across the network, allowing very accurate one-way latency measurements. The UM timestamp feature requires Solarflare OpenOnload version 201509 or later.
For subscribers, each message's receive timestamp is delivered in the message's header structure (for C programs, lbm_msg_t field hr_timestamp, of type lbm_timespec_t). Each timestamp is a structure of 32 bits worth of seconds and 32 bits worth of nanoseconds. When both values are zero, the timestamp is not available.
For publishers, each message's transmit timestamp is delivered via the source event callback (for C programs, event type LBM_SRC_EVENT_TIMESTAMP). The same timestamp structure as above is delivered with the event, as well as the message's sequence number. Sending applications can be informed of the outgoing sequence number range of each message by using the extended form of the send function and supplying the LBM_SRC_SEND_EX_FLAG_SEQUENCE_NUMBER_INFO flag. This causes the LBM_SRC_EVENT_SEQUENCE_NUMBER_INFO event to be delivered to the source event handler.
Due to the specialized nature of this feature, there are several restrictions in its use.
Unicast Immediate Messaging (UIM) deviates from the normal publish/subscribe paradigm by allowing the sending application to send messages to a specific destination application context. Various features within UM make use of UIMs transparently to the application. For example, a persistent receiver sends consumption acknowledgements to the Store using UIM messages.
The application can make direct use of UIM in two ways:
A UIM message can be associated with a topic string, but that topic is not used to determine where the message is sent to. Instead, the topic string is included in the message header, and the application must specify the destination of the desired context in the UIM send call.
UIM messages are sent using the TCP protocol; no other protocol is supported for UIM. The TCP connection is created dynamically as-needed, when a message is sent. That is, when an application sends its first UIM message to a particular destination context, the sender's context holds the message and initiates the TCP connection. When the connection is accepted by the destination context, the sender sends the message. When the message is fully sent, the sender will keep the TCP connection open for a period of time in case more UIMs are sent to the same destination context. See UIM Connection Management.
There are three ways to specify the destination address of a UIM:
In the Explicit addressing method, the "ip" and "port" refer to the binding of the destination context's UIM port. By default, when a context is created, UM will select values from a range of possibilities for ip and port. However, this makes it difficult for a sender to construct an explicit address since the ip and port are not deterministic.
One solution is to explicitly set the ip and port for the context's UIM port using the configuration options: request_tcp_port (context) and request_tcp_interface (context).
There are two kinds of UIM messages:
To receive UIM messages with a topic, an application simply creates a normal receiver for that topic. Alternatively, it can create a wildcard receiver for a matching topic pattern. Finally, the application can also register a callback specifically for UIM messages that contain a topic but for which no matching receiver exists, using immediate_message_topic_receiver_function (context). Alternatively, lbm_context_rcv_immediate_topic_msgs() can be used.
To receive UIM messages with no topic (topicless), the application must register a callback for topicless messages, using immediate_message_receiver_function (context)). Alternatively, lbm_context_rcv_immediate_msgs() can be used.
Note that only the specified destination context will deliver the message. If other applications have a receiver for that same topic, they will not receive a copy of that UIM message.
UIM Port
To receive UIMs, a context must bind to and listen on the "UIM Port", also known as the "Request Port". See UIM Addressing for details.
The following APIs are used to send application UIM messages:
For the lbm_unicast_immediate_message() and lbm_unicast_immediate_request() APIs, the user has a choice between sending messages with a topic or without a topic (topicless). With the C API, passing a NULL pointer for the topic string sends a topicless message.
The act of sending a UIM message will check to see if the context already has a TCP connection open to the destination context. If so, the existing connection is used to send the UIM. Otherwise, UM will initiate a new TCP connection to the destination context.
Once the message is sent, an activity deletion timer is started for the connection; see response_tcp_deletion_timeout (context). If another UIM message is sent to the same destination, the activity deletion timer is canceled and restarted. Thus, if messages are sent periodically with a shorter period than the activity deletion timer, the TCP connection will remain established.
However, if no messages are sent for more time than the activity deletion timer, the timer will expire and the TCP connection will be deleted and resources cleaned up.
An exception to this workflow exists for the Request/Response feature. When a request message is received by a context, the context automatically initiates a connection to the requester, even though the application has not yet sent its response UIM. The activity deletion timer is not started at this time. When the application's receiver callback is invoked with the request message, the message contains a reference to a response object. This response object is used for sending response UIMs back to the requester. However, the act of sending these responses also does not start the activity deletion timer for the TCP connection. The activity deletion timer is only started when the response object is deleted (usually implicitly when the message itself is deleted, usually as a result of returning from the receiver callback).
Note that the application that receives a request has the option of retaining the message, which delays deletion of the message until the application explicitly decides to delete it. In this case, the TCP connection is held open for as long as the application retains the response object. When the application has finished sending any and all of its responses to the request and does not need the request object any more, it deletes the request object. This starts the activity deletion timer running.
Finally, note that there is a queue in front of the UIM connection which holds messages when the connection is slow. It is possible that messages are still held in the queue for transmission after the response object is deleted, and if the response message is very large and/or the destination application is slow processing responses, it is possible for data to still be queued when the activity deletion timer expires. In that case, UM does not delete the TCP connection, and instead restarts the activity deletion timer for the connection.
Multicast Immediate Messaging (MIM) is not recommended for general use. It is inefficient and can affect the operation of all applications on a UM network. This is partly because every message sent via the MIM protocol is distributed to every other application on the network, regardless of that application's interest in such messages.
MIM uses the same reliable multicast protocol as normal LBT-RM sources. MIM messages can be sent to a topic, in which case each receiving context will filter that message, discarding it if no receiver exists for that topic. MIM avoids using Topic Resolution by including the full topic string in the message, and sending it to a multicast group that all application contexts are configured to use.
A receiving context will receive the message and check to see if the application created a receiver for the topic. If so, then the message is delivered. However, if no receiver exists for that topic, the context checks to see if immediate_message_topic_receiver_function (context) is configured. If so, then the message is delivered. But if neither exists, then the receiving context discards the message.
It is also possible to send a "topicless" message via MIM. The recipient context must have configured a topicless receiver using immediate_message_receiver_function (context); otherwise the message is discarded.
A valid use case for MIM might be an application that starts running, sends a request message, gets back a response, and then exits. With this kind of short-lived application, it can be a burden to create a source and wait for it to resolve. With MIM, topic resolution is skipped, so no delay is needed prior to sending.
MIM is typically not used for normal Streaming data because messages are somewhat less efficiently handled than source-based messages. Inefficiencies derive from larger message sizes due to the inclusion of the topic name, and on the receiving side, the MIM Delivery Controller hashing of topic names to find receivers, which consumes some extra CPU. If you have a high-message-rate stream, you should use a source-based method and not MIM. If head-loss is a concern and delay before sending is not feasible, then consider using late join (although this replaces head-loss with some head latency).
Note: Multicast Immediate Messaging can benefit from hardware acceleration. See Transport Acceleration Options for more information
MIM uses the same reliable multicast algorithms as LBT-RM. When a sending application sends a message with lbm_multicast_immediate_message(), MIM creates a temporary Transport Session. Note that no topic-level source object is created.
MIM automatically deletes the temporary Transport Session after a period of inactivity defined by mim_src_deletion_timeout (context) which defaults to 30 seconds. A subsequent send creates a new Transport Session. Due to the possibility of head-loss in the switch, it is recommended that sending applications use a long deletion timeout if they continue to use MIM after significant periods of inactivity.
MIM forces all topics across all sending applications to be concentrated onto a single multicast address to which ALL applications listen, even if they aren't interested in any of the topics. Thus, all topic filtering must happen in UM.
MIM can also be used to send an UM request message with lbm_multicast_immediate_request(). For example, an application can use MIM to request initialization information right when it starts up. MIM sends the response directly to the initializing application, avoiding the topic resolution delay inherent in the normal source-based lbm_send_request() function.
MIM notifications differ in the following ways from normal UM source-based sending.
To receive MIM messages with a topic, an application simply creates a normal receiver for that topic. Alternatively, it can create a wildcard receiver for a matching topic pattern. Finally, the application can also register a callback specifically for UIM messages that contain a topic but for which no matching receiver exists, using immediate_message_topic_receiver_function (context). Alternatively, lbm_context_rcv_immediate_topic_msgs() can be used.
To receive MIM messages with no topic (topicless), the application must register a callback for topicless messages, using immediate_message_receiver_function (context)). Alternatively, lbm_context_rcv_immediate_msgs() can be used.
If needed, an application can send topicless messages using MIM. A MIM sender passes in a NULL string instead of a topic name. The message goes out on the MIM multicast address and is received by all other receivers. A receiving application can use lbm_context_rcv_immediate_msgs() to set the callback procedure and delivery method for non-topic immediate messages.
When an application receives an immediate message, it's topic is hashed to see if there is at least one regular (non-wildcard) receiver object listening to the topic. If so, then MIM delivers the message data to the list of receivers.
However, if there are no regular receivers for that topic in the receive hash, MIM runs the message topic through all existing wildcard patterns and delivers matches to the appropriate wildcard receiver objects without creating sub-receivers. The next MIM message received for the same topic will again be run through all existing wildcard patterns. This can consume significant CPU resources since it is done on a per-message basis.
The receiving application can set up a context callback to be notified of MIM unrecoverable loss (lbm_mim_unrecloss_function_cb()). It is not possible to do this notification on a topic basis because the receiving UM has no way of knowing which topics were affected by the loss.
As of UM 3.1, MIM supports ordered delivery. As of UM 3.3.2, the MIM configuration option, mim_ordered_delivery (context) defaults to ordered delivery.
See the UM Configuration Guide for the descriptions of the MIM configuration options:
UM includes two example applications that illustrate MIM.
lbmimsg.c - application that sends immediate messages as fast as it can to a given topic (single source). See also the Java example, lbmimsg.java and the .NET example, lbmimsg.cs.
lbmimsg.c
We can demonstrate the default operation of Immediate Messaging with lbmimsg and lbmrcv.
The lbmrcv output should resemble the following:
Each line in the lbmrcv output is a message received, showing the topic name, transport type, receiver IP:Port, multicast address and message number.
lbmireq.c
Sending an UM request by MIM can be demonstrated with lbmireq and lbmrcv, which shows a single request being sent by lbmireq and received by lbmrcv. (lbmrcv sends no response.)
The lbmrcv output should resemble the following:
The lbmireq output should resemble the following: