Ultra Messaging has a feature called Smart Sources which provides lower average latency and much lower latency variation (jitter) than traditional UM sources. Smart Sources are able to deliver these benefits by trading off reduced flexibility for simplicity, especially simplicity in memory usage.
One area of reduced flexibility is related to message sizes. To avoid dynamic memory allocation/deallocation, Smart Sources require the user to configure the maximum expected message size. Also, Smart Sources do not support fragmentation and reassembly of messages larger than the maximum datagram size. An application message must fit in a single datagram. Finally, users of kernel bypass TCP/IP stacks (like Solarflare's Onload) pay a heavy penalty if IP fragmentation is performed by the kernel. These users typically want to keep each datagram at or below the NIC's MTU size (typically 1500 bytes).
However, many applications have a fairly small average message size, but occasionally need to send messages that are much larger. If that worst-case large message size is larger than an MTU, it cannot be efficiently sent using Onload or other kernel bypass stacks.
Also, even if IP fragmentation is not a problem, a Smart Sources pre-allocates its retransmission buffers based on the worst-case message size. For applications that normally send small messages but occasionally need to send very large ones, this can lead to a significant memory footprint.
One solution to these problems is for an application to configure Smart Source for small messages, and perform message fragmentation and reassembly at the application level. This example program does just that.
An integer message property with key name "Remain" is used to manage message fragmentation and reassembly. Small messages may be sent without a property. Messages which exceed a configured threshold will be sent in "chunks", with a message property providing the information needed by the receiver to reassemble the chunks.
When the application wants to send a message larger than [smart_src_max_message_length (source)](https://ultramessaging.github.io/currdoc/doc/Config/grpsmartsource.html#smartsrcmaxmessagelengthsource), the first fragment is sent with the "Remain" property is a negative number representing the number of bytes that still need to be sent (not counting the content of that first message). Each subsequent message fragment sent has the "Remain" property as a positive number of the number of bytes remaining after that message.
For example, if smart_src_max_message_length is set to 1400 and a 5000 byte message is sent, 4 UM messages will be sent. Here are the message lengths and values for the "Remain" property:
When the receiver sees Remain=0, it can deliver the reassembled message.
There is one program source file and one test configuration file:
The ss_frag.c file contains:
At the heart of the source-side code is the function smart_source_send():
00313 /* Send a message, fragmenting it if necessary. 00314 */ 00315 void smart_source_send(smart_source_t *smart_source, char *buf, size_t len, 00316 int flags) 00317 {
CLICK ON A LINE NUBMER TO SEE THE CODE ON THE RIGHT IN CONTEXT!
This function first checks to see if the application message is small enough to fit in a single message. If so, it sends the message without any message properties.
00320 if (len <= smart_source->max_msg_len) { 00321 /* Message fits in one buffer, send it. */ 00322 memcpy(smart_source->msgbuf_plain, buf, len); 00323 err = lbm_ssrc_send_ex(smart_source->ssrc, smart_source->msgbuf_plain, 00324 len, 0, NULL); 00325 LBM_ERR(err); 00326 }
For application messages that are too big, they need to be fragmented. The "Remain" message property is set to negative of the remaining size after this fragment is sent.
00337 this_len = smart_source->max_msg_len; 00338 remaining -= this_len; 00339 /* Indicate first fragment with negative remainer. */ 00340 smart_source->int_value_array[0] = - remaining; 00341 memcpy(smart_source->msgbuf_props, &buf[offset], this_len); 00342 err = lbm_ssrc_send_ex(smart_source->ssrc, smart_source->msgbuf_props, 00343 this_len, 0, &(smart_source->info_prop)); 00344 LBM_ERR(err); 00345 offset += this_len;
Subsequent fragments are sent, chunk at a time, with a positive "Remain" property.
00350 /* Send rest of fragments. */ 00351 while (remaining > 0) { 00352 if (remaining > smart_source->max_msg_len) { 00353 this_len = smart_source->max_msg_len; 00354 } 00355 else { 00356 this_len = remaining; 00357 } 00358 remaining -= this_len; 00359 smart_source->int_value_array[0] = remaining; 00360 memcpy(smart_source->msgbuf_props, &buf[offset], this_len); 00361 err = lbm_ssrc_send_ex(smart_source->ssrc, smart_source->msgbuf_props, 00362 this_len, 0, &(smart_source->info_prop)); 00363 00364 offset += this_len; 00365 } /* while */
For efficiency and simplicity, two Smart Source send buffers are used to send messages: msgbuf_plain for small messages with no properties, and msgbuf_props for large fragmented messages that have properties. They are initialized in smart_source_init().
00298 err = lbm_ssrc_buff_get(smart_source->ssrc, &smart_source->msgbuf_plain, 0); 00299 LBM_ERR(err); 00300 err = lbm_ssrc_buff_get(smart_source->ssrc, &smart_source->msgbuf_props, 0); 00301 LBM_ERR(err);
The message property is also initialized.
00303 /* Set up message property. */ 00304 smart_source->info_prop.flags = LBM_SSRC_SEND_EX_FLAG_PROPERTIES; 00305 smart_source->info_prop.mprop_int_cnt = 1; 00306 smart_source->info_prop.mprop_int_vals = smart_source->int_value_array; 00307 smart_source->info_prop.mprop_int_keys = &smart_source->key_ptr_array[0]; 00308 smart_source->key_ptr_array[0] = smart_source->remain_key; 00309 strncpy(smart_source->remain_key, "Remain", sizeof(smart_source->remain_key));
The source-side code needs to know what the size threshold is for fragmented messages. It determines this by creating a source attribute object, which inherets the user's configuration.
00265 err = lbm_src_topic_attr_create(&src_tattr); 00266 LBM_ERR(err);
Then, the configured value for the option smart_src_max_message_length (source) is read.
00282 /* Find out the user's config for max message length. */ 00283 opt_len = sizeof(smart_source->max_msg_len); 00284 err = lbm_src_topic_attr_getopt(src_tattr, 00285 "smart_src_max_message_length", &(smart_source->max_msg_len), &opt_len); 00286 LBM_ERR(err);
There is also a sanity check to make sure the user configured a message property.
00268 /* Make sure user's config allows at least 1 msg property. */ 00269 opt_len = sizeof(prop_count); 00270 err = lbm_src_topic_attr_getopt(src_tattr, 00271 "smart_src_message_property_int_count", &prop_count, &opt_len); 00272 LBM_ERR(err); 00273 if (prop_count == 0) { 00274 /* No props configured, add one. */ 00275 prop_count = 1; 00276 opt_len = sizeof(prop_count); 00277 err = lbm_src_topic_attr_setopt(src_tattr, 00278 "smart_src_message_property_int_count", &prop_count, opt_len); 00279 LBM_ERR(err); 00280 }
At the heart of the receiver-side code is the receiver application callback function smart_source_send():
00158 /* UM receiver callback. 00159 */ 00160 int msg_rcv_cb(lbm_rcv_t *rcv, lbm_msg_t *msg, void *clientd) 00161 {
If there is no message property with the message, it is a small (non-fragmented) message, and can be delivered. It does a sanity check first to make sure there isn't a fragmented message in-progress.
00167 if (msg->properties == NULL) { 00168 /* Fast path (not part of a fragmented messqge). */ 00169 if (rcv_state->collecting) { 00170 /* Error, non-frag, but state is collecting (should never happen). */ 00171 printf("Collect error non-frag, offset=%ld, len=%lu, message_len=%ld\n", rcv_state->offset, msg->len, rcv_state->message_len); 00172 rcv_state->collecting = 0; 00173 rcv_state->num_bad_frags ++; 00174 } 00175 00176 /* Deliver data message. */ 00177 printf("PROCESS message, buf[0]=%d, buf[%ld]=%d\n", 00178 msg->data[0], 00179 msg->len - 1, 00180 msg->data[msg->len - 1]); 00181 }
Otherwise, there is a property, which we make sure is "Remain".
00187 /* Found property, is it for fragmentation? */
00188 err = lbm_msg_properties_get(msg->properties, "Remain",
00189 &remaining, &prop_type, &prop_size);
00190 if (err == LBM_OK) {
00191 handle_msg_frag(msg, rcv_state, remaining);
00192 }
Once the message is known to be a fragment, it is passed to the function handle_msg_frag() for reassembly. This function has two states it can be in: collecting (in the middle of a fragmented message), or not collecting (waiting for a negative "Remain" property to indicate the start of a fragmented message).
If it is collecting, the newly-received message is validated to make sure it is the expected next fragment.
00101 if (rcv_state->collecting) { 00102 /* Make sure the fragment is OK. */ 00103 if (remaining >= 0 && 00104 (rcv_state->message_len == (rcv_state->offset + msg->len + remaining))) 00105 {
Once validated, the message content is collected. If this is the last fragment in the message ("Remain"==0) then the reassembled message is delivered.
00106 /* Collect the fragment. */ 00107 memcpy(&rcv_state->reassem_buf[rcv_state->offset], msg->data, 00108 msg->len); 00109 rcv_state->offset += msg->len; 00110 00111 if (remaining == 0) { /* No more, deliver the data. */ 00112 printf("PROCESS message, buf[0]=%d, buf[%ld]=%d\n", 00113 rcv_state->reassem_buf[0], 00114 rcv_state->message_len - 1, 00115 rcv_state->reassem_buf[rcv_state->message_len - 1]); 00116 00117 /* No longer collecting. */ 00118 rcv_state->collecting = 0; 00119 }
If handle_msg_frag() is called and the receiver state is not collecting, then the fragment should be the first fragment of a large message ("Remain" negative).
00131 else { /* not collecting */ 00132 /* Not collecting a fragmented message, is this message first frag? */ 00133 if (remaining < 0) { 00134 /* First fragment. */
Start the collection state.
00135 rcv_state->collecting = 1; 00136 rcv_state->offset = 0; 00137 rcv_state->message_len = msg->len + (- remaining);
To keep the algorithm general, if the fragmented message is larger than the currently-allocated reassembly buffer, expand it.
00138 /* Expand buffer if necessary. */ 00139 if (rcv_state->message_len > rcv_state->reassem_buf_size) { 00140 rcv_state->reassem_buf_size = rcv_state->message_len; 00141 rcv_state->reassem_buf = (char *)realloc(rcv_state->reassem_buf, 00142 rcv_state->reassem_buf_size); 00143 }
Finally, collect the first fragment's data.
00145 /* Collect data. */ 00146 memcpy(&rcv_state->reassem_buf[rcv_state->offset], msg->data, msg->len); 00147 rcv_state->offset += msg->len;