Guide for Persistence
|
A persistent system is composed of sources, receivers, and stores managed by one or more applications. Sources and receivers are the endpoints of communication and the store(s) provide fault recovery and persistence of state information. Your application can leverage UM's flexible methods of persistence to add fault tolerance. With this flexibility, your applications assume new responsibilities not normally required in other persistent messaging systems. This section identifies the important considerations for your messaging applications when implementing the following persistence features:
As mentioned in Registration Identifier Concept and Adding Fault Recovery with Registration IDs, stores use RegIDs to identify sources and receivers. UM offers three main methods for managing RegIDs:
Recommended: use Session IDs to enable the Store to both assign and manage RegIDs. See Managing RegIDs with Session IDs. Note: while the use of Session IDs is recommended, an understanding of the underlying registration IDs is often helpful to understanding persistence.
Your applications assign static RegIDs and ensure that the same RegID is not assigned to multiple sources and/or receivers. See Use Static RegIDs.
Your applications can manage RegIDs for the lifetime of a source or receiver as long as multiple applications do not reuse RegIDs simultaneously on the same store. RegIDs only need to be unique on the same store and may be reused between stores as desired. You can use a static mapping of RegIDs to applications or use some simple service to assign them.
For very small deployments, the simplest method uses static RegIDs for individual applications. This method requires every persistent source connecting to a given store have a unique RegID from every other persistent source attaching to the same store. This includes publishing applications that have multiple persistent topics; each topic's source object must have a unique RegID. (The use of session IDs greatly simplifies the management of these RegIDs.)
The following source code examples assign a static RegID to a source by adding the RegID, 1000, to the ume_store (source) attribute. See also ume-example-src-2.c
C API
JAVA API
.NET API
When using RegIDs, your application can request that the store assign it a new and unique RegID when it registers for the first time. That RegID is made available to the application, which can then save it to local storage. Thus, the next time the application starts (or restarts) and wants to use the same registration, it reads the value written to local storage. This method of managing RegIDs is not common. For example, what if the application needs to be restarted on a different server due to hardware failure? If it cannot re-register with its earlier RegID, it will not be able to recover only those messages it had not yet acknowledged. (The use of Session IDs simplifies this greatly by essentially saving the registration IDs for you on the store itself.)
The following minimal source code example saves the RegID assigned to a source to a file. See also ume-example-src-3.c
C API
The RegIDs used by stores to identify sources and receivers must be unique. Rather than maintaining RegIDs (either statically or dynamically), applications can use a Session ID, which is simply a 64-bit value that uniquely identifies any set of sources with unique topics and receivers with unique topics. A single Session ID allows UM stores to correctly identify all the sources and receivers for a particular application.
In practice, a Session ID is often thought of as an application identifier, although it is more accurately thought of as a context identifier. (For applications that only have a single context with persistent sources and/or receivers, the two are effectively the same.) However, be aware that many application systems run multiple instances of a given program, perhaps for horizontal scaling. Each instance needs its own Session ID.
It is also possible for a single context to host multiple Session IDs, although this is rarely done. The UM configuration options ume_session_id (source) and ume_session_id (receiver) can be used to arrange individual source and/or receiver objects into registration groupings. However, it is more common to use the option ume_session_id (context) to group all sources and receivers created within a context into a single session ID. (If both a context and a source or receiver option is specified, the source or receiver option will override the context option.)
How Stores Associate Session IDs and RegIDs
Session IDs do not replace the use of RegIDs by UM but rather simplify RegID management. Using Session IDs equates to your application specifying a 0 (zero) RegID for all sources and receivers. However, instead of your application persisting the RegID assigned by the store, the store maintains the RegID for you.
When a store receives a registration request from a source or receiver with a particular Session ID, it checks to see if it already has a source or receiver for that topic/Session ID. If it does, then it responds with that source's or receiver's RegID.
If it does not find a source or receiver for that topic/Session ID pair, the store:
The source can then advertise with the RegID supplied by the store. Receivers include the source's RegID in their registration request.
All of the above steps happen within UM itself without any intervention by the application. However, the application does have access to the underlying registration ID, if it desires it.
The major concerns of sources revolve around RegID management and message retention.
Any source needs to know at start-up if it is a new registration or a re-registration. The answer determines how a source registers with the store. The UM library can not answer this question. Therefore, it is essential that the developer consider what identifies the lifetime of a source and how a source determines the appropriate value to use as the RegID when it is ready to register. RegIDs are per source per topic per store, thus a single RegID per store is needed.
The following source code examples look for an existing RegID from a file and uses a new RegID assigned from the store if it finds no existing RegID. See also ume-example-src-3.c
C API
The use of Session IDs allows UM, as opposed to your application, to accomplish the same RegID management. See Managing RegIDs with Session IDs Managing RegIDs with Session IDs.
A source sends messages unless UM prevents it, in which case, the send function returns an error. A source may lose the ability to send messages temporarily if the store(s) in use become unresponsive, e.g. the store(s) die or become disconnected from the source. Once the store(s) are responsive again, sending can continue. Thus source applications need to take into account that sending may fail temporarily under specific failure cases and be able to resume sending when the failure is removed.
The following source code examples demonstrate how a failed send function can sleep for a second and try again:
C API
Java API
for (;;) { try { src.send(message, len, 0); } catch (UMENoRegException ex) { System.out.println("Send unsuccessful. Waiting..."); try { Thread.sleep(1000); } catch (InterruptedException e) { } continue; } catch (LBMException ex) { System.err.println("Error sending message: " + ex.toString()); System.exit(1); } break; }
.NET API
UM allows streaming of messages from a source without regard to message stability at a store, which is one reason for UM's performance advantage over other persistent messaging systems. Sources retain all messages until notified by the active store(s) that they are stable. This provides a method for stores to be brought up to date when restarted or started anew.
When messages are considered stable at the store, the source can release them which frees up source retention memory for new messages. Generally, the source releases older stable messages first. To release the oldest retained message, all the following conditions must be met:
Message must meet stability requirements of the source, which can range from a single stability notice from the active store to stability notices from a group of stores (See Sources Using Quorum/Consensus Store Configuration).
Message must have been confirmed as delivered by a configured number of receivers (ume_retention_unique_confirmations (source)).
Some things to note:
If retransmit_retention_size_threshold (source) is not met, no messages will be released regardless of stability.
If the source registered with a "no-cache" store (See Persistent Store Concept) or ume_message_stability_notification (source) is turned off, ume_retention_unique_confirmations (source) is the only way to allow the source to release messages before retention size options come into play.
If the aggregate amount of buffered messages exceeds retransmit_retention_size_limit (source) bytes in payload and headers, then UM forcibly releases the oldest retained message even if it does not meet one or more of the conditions stated in Source Message Retention and Release. This condition should be avoided and Informatica suggests increasing the retransmit_retention_size_limit (source).
A second condition that produces a forced reclaim is when a message remains unstabilized when the ume_message_stability_lifetime (source) expires.
Whenever UM performs a Forced Reclaim, it notifies the application in the following ways:
The source event callback's RECLAIMED_EX event (see Persistence Source Events) includes a "FORCED" flag on the event. (UM uses the same RECLAIMED_EX event, without the FORCED flag, for normal reclaims.)
The following sample code, from umesrc.c, implements the extended reclaim source event with the 'Forced' flag set if the reclamation is a forced reclaim.
C API
Java API
.NET API
Sources use a set of configuration options to release messages that, in effect, specify the source's release policy. The following configuration options directly impact when the source may release retained messages:
The configuration option ume_retention_unique_confirmations (source) requires a message to have a minimum number of unique confirmations from different receivers before the message may be released. This retains messages that have not been confirmed as being received and processed and keeps them available to fulfill any retransmission requests. This provides a form of receiver-pacing; the source will not be allowed to exceed Persistence Flight Size beyond receiving applications.
For example, a topic might have 2 receivers which are considered essential to keep up, and which should therefore contribute to flight size calculation. There might be any number of less-essential receivers which can be allowed to lag behind. In this case, ume_retention_unique_confirmations (source) would be set to 2, and the non-essential receivers would set ume_allow_confirmed_delivery (receiver) to 0.
The following code samples show how to require a message to have 10 unique receiver confirmations
C API
JAVA API
.NET API
The Source Event Handler is a function callback initialized at source creation to provide source events to your application related to the operation of the source. The following source code examples illustrate the use of a source event handler for registration events. To accept other source events, additional case statements would be required, one for each additional source event. See also Persistence Events.
C API
JAVA API
.NET API
As shown in Source Event Handler above, the Source Event Handler can be expanded to handle more source events by adding additional case statements. The following source code examples show case statements to handle message stability events, delivery confirmation events and message release (reclaim) events. See also Persistence Events.
C API
JAVA API
.NET API
The C API function lbm_src_sendv_ex() allows you to create a pointer to an object or structure. This pointer will be returned to your application along with all source events. You can then update the object or structure with source event information. For example, if your messages exceed 8K - which requires fragmentation your application's message into more than one UM message - receiving sequence number events with this pointer allows you to determine all the UM sequence numbers for the message and, therefore, how many release (reclaim) events to expect. The following two source code examples show how to:
C API - Enable Message Information
C API - Sequence Number Event Handler
JAVA API - Enable Message Information
JAVA API - Sequence Number Event Handler
.NET API - Enable Message Information
.NET API - Sequence Number Event Handler
As an extension to Confirmed Delivery, you can set receivers to send a keepalive to a source during a measured absence of delivery confirmations (due to traffic lapse). In the event that neither message reaches the source within a designated interval, or if the delivery confirmation TCP connection breaks down, the receiver is assumed to have "died". UM then notifies the publishing application via context event callback. This lets the publisher assign a new subscriber.
To use this feature, set these five configuration options:
This specialized feature is not recommended for general use. If you are considering it, please note the following caveats:
Other false receiver-alive assumptions could be caused by the following:
Receivers are predominantly interested in RegID management and recovery management.
RegIDs are slightly more involved for receivers than for sources. Since RegIDs are per source per topic per store and a topic may have several sources, a receiver may have to manage several RegIDs per store in use. Fortunately, receivers in UM can leverage the RegID of the source with the use of a callback as discussed in Adding Fault Recovery with Registration IDs and shown in ume-example-rcv-2.c. Your application can determine the correct RegID to use and return it to UM. You can also use Session IDs to enable UM to manage receiver RegIDs. See Managing RegIDs with Session IDs.
Much like sources, receivers typically have a lifetime based on an amount of work, perhaps an infinite amount. And just like sources, it may be helpful to consider that a RegID is "assigned" at the start of that work and is out of use at the end. In between, the RegID is in use by the instance of the receiver application. However, the nature of RegIDs being per source means that the expected lifetime of a source should play a role in how RegIDs on the receiver are managed. Thus, it may be helpful for the application developer to consider the source application lifetime when deciding how best to handle RegIDs on the receiver.
Receiver Message and Event Handler
The Receiver Message and Event Handler is an application callback, defined at receiver initialization, to deliver received messages to your application. The following source code examples illustrate the use of a receiver message and event handler for registration messages. To accept other receiver events, additional case statements would be required, one for each additional event. See also Persistence Events
C API
JAVA API
.NET API
Recovery management for failed and restarted receivers is fairly simple. UM requests any missed messages from the store(s) and delivers them to the restarted receiver. However, your application can override that default behavior either by configuring a retransmit_request_maximum (receiver) value, or by configuring a ume_recovery_sequence_number_info_function (receiver) application callback, or both.
For example, let's say a source sends 7 messages with sequence numbers 0-6 which are stabilized at the store. A C-based receiver, configured with retransmit_request_maximum (receiver) set to 2, and an application callback ume_recovery_sequence_number_info_function (receiver), consumes (and acknowledges) message 0, goes down, then restarts right after message 6.
During receiver registration, the lbm_ume_rcv_recovery_info_ex_func_t application callback is called with the following values in the passed-in structure lbm_ume_rcv_recovery_info_ex_func_info_t *info:
Where:
lbm_ume_rcv_recovery_info_ex_func_info_t::high_sequence_number - the most recent message sent by the source,
lbm_ume_rcv_recovery_info_ex_func_info_t::low_rxreq_max_sequence_number - high_sequence_number (above) minus the number configured for retransmit_request_maximum (receiver) (2 in this example), and
Normally, UM would start delivering messages at 1, but retransmit_request_maximum (receiver) is set to 2, which overrides UM's normal behavior. So in this example, the first message delivered will be number 4.
Finally, the application can, at run-time, further override the starting sequence number. The callback function can modify the contents of the passed-in structure lbm_ume_rcv_recovery_info_ex_func_info_t *info; specifically it can update the lbm_ume_rcv_recovery_info_ex_func_info_t::low_sequence_number field. When the callback returns, UM examines that field to see if it was modified by the callback. If so, UM overrides the effect of retransmit_request_maximum (receiver) and starts at the requested sequence number.
Notice that this design does not allow the callback to override the effect of retransmit_request_maximum (receiver) by setting the lbm_ume_rcv_recovery_info_ex_func_info_t::low_sequence_number field to its original value, 1 in this example. Upon return, UM will see the value unchanged, and will allow retransmit_request_maximum (receiver) to override the starting sequence number. This is only an issue if both retransmit_request_maximum (receiver) and ume_recovery_sequence_number_info_function (receiver) are used. If the application wants to use the sequence number remembered by the store, it should not configure retransmit_request_maximum (receiver).
In a distributed system, it is not possible to guarantee "once-and-only-once" delivery of messages in the face of unpredictable system or component failure. Regardless of the algorithms and handshaking, there is always the possibility of messages sent that are never received, as well as messages received and then received again if the receiving application fails and restarts.
UM's persistence design is based on the principle of being close to once-and-only-once, but when that is not possible, UM prefers to fail on the side of duplicate message delivery. Due to other design goals (low latency and high throughput), the possibility of receiving duplicate messages is significant after an application failure and restart.
It is therefore important for persistent applications to be designed to tolerate duplicate message reception, either by making message processing idempotent, or by including logic in the receiving application to detect duplicates and only process the messages which have not been previously processed.
To assist the application in implementing "de-duplication", all messages retransmitted to a receiver are marked as retransmissions via a flag in the message structure. Thus it is easy for an application to determine if a message is a new "live" message from the source, or a retransmission, which may or may not have been processed before the failure. The presence or absence of the retransmit flag gives the application a hint of how best to handle the message with regard to it being processed previously or not.
Informatica recommends that you always check the data or other message properties of messages with the retransmit flag set to be sure the message has not been already processed. Relying on UM sequence numbers is not a 100% reliable method for detecting duplicate messages.
Whereas the UM persistence design attempts to choose the correct starting sequence number for a recovering receiver, there are cases where the application wishes to override UM's choice.
The sample code below demonstrates how to use the recovery sequence number info function to determine the stored message with which to restart a receiver. This example retrieves the low sequence number from the recovery sequence number structure and adds an offset to determine the beginning sequence number. The offset is a value completely under the control of your application. For example, if a receiver was down for a "long" period and you only want the receiver to receive the last 10 messages, use an offset to start the receiver with the 10th most recent message. If you wish not to receive any messages, set the lbm_ume_rcv_recovery_info_ex_func_info_t::low_sequence_number to the lbm_ume_rcv_recovery_info_ex_func_info_t::high_sequence_number plus one.
C API
JAVA API
.NET API
Receivers use message consumption, defined as message deletion, to indicate that UM should notify the store(s) that the application consumed the message. This notification takes the form of an acknowledgement, or ACK, to the store(s) in use, and optionally to the source if you configure the source for delivery confirmation.
In many applications, the message receiver application callback will fully process the received message. When the application callback returns, the message should be deleted and acknowledged.
However, there are other application designs where a received message cannot be fully processed inside the receiver application callback. For example, the message might need to be passed to a worker thread for longer-term processing. Or the acknowledgement must be delayed until some other event happens, like a handshake with another application. In these cases, the message deletion and/or message acknowledgement must not be done when the receiver callback returns.
Finally, for high-throughput applications, an application can completely suppress the acknowledgement of each individual message in favor of acknowledgement batching (acknowledging multiple messages in one operation). This is done to reduce the per-message overhead. Note that acknowledgement batching increases the chances that a restarted application will receive duplicate messages (messages that had been previously process but not yet acknowledged). See Duplicate Message Delivery for more information.
In many applications, the message receiver application callback will fully process the received message. When the receive callback returns, the message should be deleted and acknowledged. This is handled differently between the C API vs. the Java and .NET APIs.
C API
The default behavior for a C receiver application callback is for the message to be deleted and acknowledged when the receiver callback returns. No special coding is needed for this use case.
Java and .NET
With Java and .NET, the UM library is not able to differentiate between a message that is passed to a different part of the application vs. a message which is simply dereferenced for eventual garbage collection. So the default behavior of the UM library is different – it is assumed that the message should not be deleted and acknowledged when the receiver application callback returns. Instead, the application is expected to explicitly dispose of received messages when processing is complete.
In the case where message processing is completed in the receiver callback, the application must call the "dispose()" method of the message object before returning. This triggers acknowledgement as well as cleanup of the message's resources.
There are application designs where a received message cannot be fully processed inside the receiver application callback. For example, the message might need to be passed to a worker thread for longer-term processing. Or the acknowledgement must be delayed until some other event happens, like a handshake with another application.
This is handled differently between the C API vs. the Java and .NET APIs.
C API
In the C API, the application's receiver callback function must call the lbm_msg_retain() function for the received message. This suppresses the automatic deletion of the received message when the receiver callback returns, and allows the message buffer to be handed to some other part of the application for processing and deletion at a later time.
When the application subsequently completes all processing of the message and is ready for the message to be deleted and acknowledged, it calls lbm_msg_delete().
Java and .NET
With Java and .NET, the UM library assumes that the message should not be deleted and acknowledged when the receiver application callback returns. The callback can simply pass the message to some other part of the application for subsequent processing.
When the application has completed all processing on the message, the message's "dispose()" method should be called. This releases resources held by the object and also triggers the acknowledgement.
For high-throughput applications, it is often desired to reduce the per-message overhead. Sending acknowledgements to the Store and optionally to the source normally involves multiple socket operations, which can limit the maximum sustainable throughput of a persistent receiver.
A significant reduction in per-message overhead can be achieved by batching acknowledgements. In this use case, the sending of acknowledgements is delayed until multiple messages have been received and processed. Then an acknowledgement is sent which covers all messages processed so far.
ACK Batching can be done implicitly or explicitly. For implicit ACK batching, use the configuration options ume_use_ack_batching (receiver) and ume_ack_batching_interval (context). Note that implicit ACK batching also supports out-of-order acknowledgements. See ACK Ordering.
Explicit ACK batching gives the application precise control over when acknowledgements are sent via API calls. This mode of operation is enabled with the ume_explicit_ack_only (receiver) configuration option. If enabled, acknowledgements are only sent as a result of the application explicitly calling an API. This allows the application to use application-level knowledge to optimize when to send acknowledgements, potentially minimizing the time that processed messages are left unacknowledged (and therefore minimizing the number of potential duplicate messages).
See lbm_ume_ack_send_explicit_ack() and lbm_msg_ume_send_explicit_ack() for the C API. See com::latencybusters::lbm::LBMMessage::sendExplicitAck() for Java and .NET. See Explicit Acknowledgments for details on explicit ACKs.
The Persistent Store does not support "out of order" acknowledgement of messages. If the Store receives an acknowledgement of sequence number N, that implicitly acknowledges all sequence numbers less than N. If a receiving application has the ability to complete processing of messages out of order, it must ensure that an acknowledgement is sent for a given message until all previously-received messages have been completely processed.
Normally, the only way that a receiving application can process messages out of order is to retain those messages and complete processing of them outside of the receiver application callback function. This normally requires "retaining" the messages so that they aren't deleted (and therefore acknowledged) automatically when the receiver callback returns. In this usage, when a message is completely processed, that message is deleted by the application, triggering the acknowledgement of that message. However, if the application design allows those messages to be processed out of order, then the risk exists that the acknowledgement of a given message will implicitly acknowledge previous message which have not been completely processed. This will prevent those incompletely processed messages from being recovered if the receiving application fails and restarts.
ACK Batching can provide a solution, implicitly or explicitly.
The implicit form of ACK batching provides, as a convenience, the ability to postpone the sending of a message ACK until all previous received messages have also been processed. When the UM context wakes up every ume_ack_batching_interval (context) milliseconds, it checks for unacknowledged messages that have been deleted, either implicitly from the receiver callback returning, or explicitly by API calls to retain and then delete the message. UM will only acknowledge up to the highest continuous sequence number.
For example, let's say the application deletes messages with sequence numbers 0, 1, 5, 2, 4. Messages 3 and 6 are still being processed. If the context wakes up at this point, it will send an acknowledgement for sequence 2. If the application fails at this point and restarts, the Store will re-send messages 3, 4, 5, and 6. The receiving application must handle the fact that 3 and 6 were incompletely processed, whereas 4 and 5 were completely processed (see Duplicate Message Delivery).
Instead of using implicit batching for this, the application can be coded to use Explicit Acknowledgments. However, in this case, the application has the responsibility to implement a similar algorithm as the implicit ACK batcher described above. I.e. even though the messages 4 and 5 were fully processed, the application would need to postpone sending an acknowledgement until message 3 is also completed, at which point a single acknowledgement for sequence 5 can be sent.
UM supports Explicit acknowledgement which suppresses UM's default acknowledgement behavior, allowing your application complete control of message consumption notification.
There are two common use cases for Explicit Acknowledgements:
Deferred Acknowledgement means that the receiving application is not able to fully process a message within the message receiver application callback. For example, the message may require processing in a separate thread. By default, UM will acknowledge a persisted message when the receiver callback returns.
Application-level ACK batching means that the application chooses not to acknowledge every received message. Instead, it implements its own logic to decide which messages to acknowledge. Note that acknowledging a given message implicitly acknowledges all earlier messages. For example, acknowledging messages 5, 10, and 15 tells the Store that all messages 0-15 are acknowledged.
Also note that this imposes the restriction that messages be acknowledged in ascending order. See ACK Ordering for more information.
Explicit acknowledgement is enabled using the configuration option ume_explicit_ack_only (receiver).
When using explicit ACKs, you can extract ACK information from messages. This allows the received message buffer to be deleted when the receiver callback is done, while still allowing the application to save the ACK structure for persistent acknowledgement to the Store at a future time. This can improve receiver performance when used with the Receive Buffer Recycling feature to reduce the per-message use of dynamic memory (malloc/free) with a persistent receiver. Extracting ACKs can also additionally improve performance of Java and .NET applications by allowing the use of Zero Object Delivery.
The following source code examples show how to extract ACK information and send an explicit ACK.
C API
JAVA API or .NET API
As mentioned in Persistent Store Concept, the persistent stores, also just called stores, actually persist the source and receiver state and use RegIDs to identify sources and receivers. Each source to which a store provides persistence may have zero or more receivers. The store maintains each receiver's state along with the source's state and the messages the source has sent.
The store can be configured with its own set of options to persist this state information on disk or simply in memory. The term disk store is used to signify a store that persists state to disk, and the term memory store is used to signify a store that persists state only in memory. A store may also be configured not to cache the source's data, but to simply persist the source and receiver state in memory. This is called a no-cache store.
A source does not send data to the store and then have the store forward it to the receivers. In UM, the source sends to receivers and the stores in parallel. See Persistence Normal Operation. Thus, UM can provide extremely low latency to receiving applications.
The store(s) that a source uses are part of the source's configuration settings. Sources must be configured to use specific store(s) in a Quorum/Consensus arrangement.
Receivers, on the other hand, do not need to be configured with store information a priori. The source provides store information to receivers via a Source Registration Information (SRI) message after the source registers with a store. Thus the receivers learn about stores from the source, without needing to be configured themselves. Because receivers learn about the store or stores with which they must register via a SRI record, the source must be available to receivers. However, the source does not have to be actively sending data to do this.
The store daemon generates log messages that are used to monitor its health and operation. You can configure these to be directed to "console" (standard output) or a specified log "file", via the <log> configuration element. Normally "console" is only used during testing, as a persistent log file is preferred for production use. The store does not over-write log files on startup, but instead appends them.
To prevent unbounded disk file growth, the store supports rolling log files. When the log file rolls, the file is renamed according to the model:
CONFIGUREDNAME_
PID.
DATE.
SEQNUM
where:
For example: umestorelog_9867.2017-08-20.2
The user can configure when the log file is eligible to roll over by either or both of two criteria: size and frequency. The size criterion is in millions of bytes. The frequency criterion can be daily or hourly. Once one or both criteria are met, the next message written to the log will trigger a roll operation. These criteria are supplied as attributes to the <log> configuration element.
If both criteria are supplied, then the first one to be reached will trigger a roll. For example, consider the setting:
Let say that the log file grows at 1 million bytes per hour. At 11:00 pm, the log file will reach 23 million bytes, and will roll. Then, at 12:00 midnight, the log file will roll again, even though it is only 1 million bytes in size.
To provide the highest degree of resiliency in the face of failures, UM provides the Quorum/Consensus failover strategy which allows a source to provide UM with a number of stores to be used at the same time. Multiple stores can fail and messaging can continue operation unhindered as long as a majority of configured stores are operational.
Quorum/Consensus, also called QC, allows a source and the associated receivers to have their persisted state maintained at several stores at the same time. Central to QC is the concept of a group of stores, which is a logical grouping of stores that are intended to signify a single entity of resilience. Within the group, individual stores may fail but for the group as a whole to be viable and provide resiliency, a quorum must be available. In UM, a quorum is a simple majority. For example, in a group of five stores, three stores are required to maintain a quorum. One or two stores may fail and the group continues to provide resiliency. UM requires a source to have a quorum of stores available in the group in order to send messages. A group can consist of a single store.
QC also provides the ability to use multiple groups. As long as a single group maintains quorum, then UM allows a source to proceed. Groups are logical in nature and can be combined in any way imaginable, such as by store location, store type, etc. In addition, QC provides the ability to specify backup stores within groups. Backups may be used if or when a store in the group becomes unresponsive to the source. Quorum/Consensus allows a source many different failure scenarios simply not available in other persistent messaging systems.
In the case of Quorum/Consensus store behavior, a message is considered stable after it has been successfully stored within a group of stores or among groups of stores according to the two settings, intergroup behavior and intragroup behavior, described below.
The intragroup behavior specifies the requirements needed to stabilize a message among the stores within a group. A message is stable for the group once it is successfully stored at a quorum (majority) of the group's stores or successfully stored in all the stores in the group.
Notice that a message needs to meet intragroup stability requirements before it can meet intergroup stability requirements. These options provide a number of possibilities for retention of messages for the source.
The following figure displays a 3-group Quorum/Consensus configuration with each group in a different location. A message is considered stable when it has been successfully stored at a quorum of stores in all the active groups.
Quorum/Consensus - Single Location Groups
The source application's UM configuration file appears below.
Recovery from source and receiver failure is the real heart of persistent operation. For a source, this means continuing operation from where it stopped. For a receiver, this means essentially the same thing, but with the retransmission of missed messages. Application developers can easily leverage the information in UM to make their applications recover from failure in graceful ways.
Late Join is the mechanism of persistent recovery as well as an UM streaming feature. If Late Join is turned off on a source (late_join (source)) or receiver (use_late_join (receiver)), it also turns off persistent recovery. In order to control Late Join behavior, UM provides a mechanism for a receiver to control the low sequence number. See Recovery Management.
Not all failures are recoverable. For application developers it usually pays in the long run to identify what types of errors are non-recoverable and how best to handle them when possible. Such an exercise establishes the precise boundaries of expected versus abnormal operating conditions.
The following shows the basic steps of source recovery:
Because UM allows you to stream messages and not wait until a message is stable at the persistent store before sending the next message, the main task of source recovery is to determine what messages the persistent store(s) have and what they don't. Therefore, when a source re-registers with a store during recovery, the store tells the source what sequence number it has as the most recent from the source. The registration event informs the application of this sequence number. See Source Event Handler.
In addition, a mechanism exists (LBM_SRC_EVENT_SEQUENCE_NUMBER_INFO) that allows the application to know the sequence number assigned to every piece of data it sends. The combination of registration and sequence number information allows an application to know exactly what a store does have and what it does not and where it should pick up sending. An application designed to stream data in this way should consider how best to maintain this information.
When QC is in use, UM uses the consensus of the group(s) to determine what sequence number to use in the first message it will send. This is necessary as not all stores can be expected to be in total agreement about what was sent in a distributed system. The application can configure the source with the ume_consensus_sequence_number_behavior (source) to use the lowest sequence number of the latest group of sequence numbers seen from any store, the highest, or the majority. In most cases, the majority, which is the default, makes the most sense as the consensus. The lowest is a very conservative setting. And the highest is somewhat optimistic. Your application has the flexibility to handle this in any way needed.
If streaming is not what an application desires due to complexity, then it is very simple to use the Persistence Events delivered to the application to mimic the behavior of restricting a source to having only one unstable message at a time.
The following shows the basic steps of receiver recovery:
UM provides extensive options for controlling how receivers handle recovery. By default, receivers want to restart after the last piece of data that was consumed prior to failure or graceful suspension. Since UM persists receiver state at the store, receivers request this state from the store as part of re-registration and recovery. Receiving applications experiencing unrecoverable loss can potentially retrieve missed messages from the stores by deleting and recreating the receiver object.
The actual sequence number that a receiver uses as the first topic level message to resume reception with is called the "low sequence number". UM provides a means of modifying this sequence number if desired. An application can decide to use the sequence number as is, to use an even older sequence number, to use a more recent sequence number, or to simply use the most recent sequence number from the source. See Recovery Management and Setting Callback Function to Set Recovery Sequence Number. This allows receivers great flexibility on a per source basis when recovering. New receivers, receivers with no pre-existing registration, also have the same flexibility in determining the sequence number to begin data reception.
Like sources, when QC is in use, UM uses the consensus of the group(s) to determine the low sequence number. And as with sources, this is necessary as not all stores can be expected to be in total agreement about what was acknowledged. The application can configure the receiver with ume_consensus_sequence_number_behavior (receiver) to use the lowest sequence number of the latest group of sequence numbers seen from any store, the highest, or the majority. In most cases, the majority, which is the default, makes the most sense as the consensus. The lowest is a very conservative setting. And the highest is somewhat optimistic. In addition, this sequence number may be modified by the application after the consensus is determined.
For QC, UM load balances receiver retransmission requests among the available stores. In addition, if requests are unanswered, retransmissions of the actual requests will use different stores. This means that as long as a single store has a message, then it is possible for that message to be retransmitted to a requesting receiver.
It is possible for an application to start an instance of the store to run as an independent set of threads within the application process. However, there are several restrictions:
The application may not make use of messaging. I.e. an application which intends to start a store instance must not create contexts, sources, or receivers, or make any use of UM except starting (and optionally stopping) the store. For applications that need to use messaging, it is suggested that the application create a child process from which to invoke the store. The parent process can then use messaging freely. See the example program umestored_example.c for an example of how this can be done.
Only a C API is provided at this time. Two API functions are available: umestored_main() to start the store threads running, and umestored_main_shutdown() to request the store threads to stop gracefully.
The umestored_main() API will not return until the store exits, either by processing a signal, or by the application calling umestored_main_shutdown(). When umestored_main() does return, the store is in a safe state for the application to exit.
Only a single instance of the store may be started. This means that an application may not have two stores running concurrently, and it also means that an application may not start a store, shut it down, and then start it again. The store API is "single use".
For an example of how to use the umestored_main() API, see the example program umestored_example.c. Note that while the callable store APIs are usable on all supported platforms, this example program is restricted to Linux due to its use of prctl()
, a Linux-only function.