Example index

Smart Source Message Fragmentation

Ultra Messaging has a feature called Smart Sources which provides lower average latency and much lower latency variation (jitter) than traditional UM sources. Smart Sources are able to deliver these benefits by trading off reduced flexibility for simplicity, especially simplicity in memory usage.

One area of reduced flexibility is related to message sizes. To avoid dynamic memory allocation/deallocation, Smart Sources require the user to configure the maximum expected message size. Also, Smart Sources do not support fragmentation and reassembly of messages larger than the maximum datagram size. An application message must fit in a single datagram. Finally, users of kernel bypass TCP/IP stacks (like Solarflare's Onload) pay a heavy penalty if IP fragmentation is performed by the kernel. These users typically want to keep each datagram at or below the NIC's MTU size (typically 1500 bytes).

However, many applications have a fairly small average message size, but occasionally need to send messages that are much larger. If that worst-case large message size is larger than an MTU, it cannot be efficiently sent using Onload or other kernel bypass stacks.

Also, even if IP fragmentation is not a problem, a Smart Sources pre-allocates its retransmission buffers based on the worst-case message size. For applications that normally send small messages but occasionally need to send very large ones, this can lead to a significant memory footprint.

One solution to these problems is for an application to configure Smart Source for small messages, and perform message fragmentation and reassembly at the application level. This example program does just that.

High-Level Design

An integer message property with key name "Remain" is used to manage message fragmentation and reassembly. Small messages may be sent without a property. Messages which exceed a configured threshold will be sent in "chunks", with a message property providing the information needed by the receiver to reassemble the chunks.

When the application wants to send a message larger than [smart_src_max_message_length (source)](https://ultramessaging.github.io/currdoc/doc/Config/grpsmartsource.html#smartsrcmaxmessagelengthsource), the first fragment is sent with the "Remain" property is a negative number representing the number of bytes that still need to be sent (not counting the content of that first message). Each subsequent message fragment sent has the "Remain" property as a positive number of the number of bytes remaining after that message.

For example, if smart_src_max_message_length is set to 1400 and a 5000 byte message is sent, 4 UM messages will be sent. Here are the message lengths and values for the "Remain" property:

  1. length=1400, Remain=-3600
  2. length=1400, Remain=2200
  3. length=1400, Remain=800
  4. length=800, Remain=0
  5. When the receiver sees Remain=0, it can deliver the reassembled message.

    There is one program source file and one test configuration file:

    The ss_frag.c file contains:

    Source-Side Code

    At the heart of the source-side code is the function smart_source_send():

    00313  /* Send a message, fragmenting it if necessary.
    00314   */
    00315  void smart_source_send(smart_source_t *smart_source, char *buf, size_t len,
    00316    int flags)
    00317  {
    

    CLICK ON A LINE NUBMER TO SEE THE CODE ON THE RIGHT IN CONTEXT!

    This function first checks to see if the application message is small enough to fit in a single message. If so, it sends the message without any message properties.

    00320    if (len <= smart_source->max_msg_len) {
    00321      /* Message fits in one buffer, send it. */
    00322      memcpy(smart_source->msgbuf_plain, buf, len);
    00323      err = lbm_ssrc_send_ex(smart_source->ssrc, smart_source->msgbuf_plain,
    00324        len, 0, NULL);
    00325      LBM_ERR(err);
    00326    }
    

    For application messages that are too big, they need to be fragmented. The "Remain" message property is set to negative of the remaining size after this fragment is sent.

    00337      this_len = smart_source->max_msg_len;
    00338      remaining -= this_len;
    00339      /* Indicate first fragment with negative remainer. */
    00340      smart_source->int_value_array[0] = - remaining;
    00341      memcpy(smart_source->msgbuf_props, &buf[offset], this_len);
    00342      err = lbm_ssrc_send_ex(smart_source->ssrc, smart_source->msgbuf_props,
    00343        this_len, 0, &(smart_source->info_prop));
    00344      LBM_ERR(err);
    00345      offset += this_len;
    

    Subsequent fragments are sent, chunk at a time, with a positive "Remain" property.

    00350      /* Send rest of fragments. */
    00351      while (remaining > 0) {
    00352        if (remaining > smart_source->max_msg_len) {
    00353          this_len = smart_source->max_msg_len;
    00354        }
    00355        else {
    00356          this_len = remaining;
    00357        }
    00358        remaining -= this_len;
    00359        smart_source->int_value_array[0] = remaining;
    00360        memcpy(smart_source->msgbuf_props, &buf[offset], this_len);
    00361        err = lbm_ssrc_send_ex(smart_source->ssrc, smart_source->msgbuf_props,
    00362          this_len, 0, &(smart_source->info_prop));
    00363  
    00364        offset += this_len;
    00365      }  /* while */
    

    Send Buffers

    For efficiency and simplicity, two Smart Source send buffers are used to send messages: msgbuf_plain for small messages with no properties, and msgbuf_props for large fragmented messages that have properties. They are initialized in smart_source_init().

    00298    err = lbm_ssrc_buff_get(smart_source->ssrc, &smart_source->msgbuf_plain, 0);
    00299    LBM_ERR(err);
    00300    err = lbm_ssrc_buff_get(smart_source->ssrc, &smart_source->msgbuf_props, 0);
    00301    LBM_ERR(err);
    

    The message property is also initialized.

    00303    /* Set up message property. */
    00304    smart_source->info_prop.flags = LBM_SSRC_SEND_EX_FLAG_PROPERTIES;
    00305    smart_source->info_prop.mprop_int_cnt = 1;
    00306    smart_source->info_prop.mprop_int_vals = smart_source->int_value_array;
    00307    smart_source->info_prop.mprop_int_keys = &smart_source->key_ptr_array[0];
    00308    smart_source->key_ptr_array[0] = smart_source->remain_key;
    00309    strncpy(smart_source->remain_key, "Remain", sizeof(smart_source->remain_key));
    

    The source-side code needs to know what the size threshold is for fragmented messages. It determines this by creating a source attribute object, which inherets the user's configuration.

    00265    err = lbm_src_topic_attr_create(&src_tattr);
    00266    LBM_ERR(err);
    

    Then, the configured value for the option smart_src_max_message_length (source) is read.

    00282    /* Find out the user's config for max message length. */
    00283    opt_len = sizeof(smart_source->max_msg_len);
    00284    err = lbm_src_topic_attr_getopt(src_tattr,
    00285      "smart_src_max_message_length", &(smart_source->max_msg_len), &opt_len);
    00286    LBM_ERR(err);
    

    There is also a sanity check to make sure the user configured a message property.

    00268    /* Make sure user's config allows at least 1 msg property. */
    00269    opt_len = sizeof(prop_count);
    00270    err = lbm_src_topic_attr_getopt(src_tattr,
    00271      "smart_src_message_property_int_count", &prop_count, &opt_len);
    00272    LBM_ERR(err);
    00273    if (prop_count == 0) {
    00274      /* No props configured, add one. */
    00275      prop_count = 1;
    00276      opt_len = sizeof(prop_count);
    00277      err = lbm_src_topic_attr_setopt(src_tattr,
    00278        "smart_src_message_property_int_count", &prop_count, opt_len);
    00279      LBM_ERR(err);
    00280    }
    

    Receiver-Side Code

    At the heart of the receiver-side code is the receiver application callback function smart_source_send():

    00158  /* UM receiver callback.
    00159   */
    00160  int msg_rcv_cb(lbm_rcv_t *rcv, lbm_msg_t *msg, void *clientd)
    00161  {
    

    If there is no message property with the message, it is a small (non-fragmented) message, and can be delivered. It does a sanity check first to make sure there isn't a fragmented message in-progress.

    00167        if (msg->properties == NULL) {
    00168          /* Fast path (not part of a fragmented messqge). */
    00169          if (rcv_state->collecting) {
    00170            /* Error, non-frag, but state is collecting (should never happen). */
    00171            printf("Collect error non-frag, offset=%ld, len=%lu, message_len=%ld\n", rcv_state->offset, msg->len, rcv_state->message_len);
    00172            rcv_state->collecting = 0;
    00173            rcv_state->num_bad_frags ++;
    00174          }
    00175  
    00176          /* Deliver data message. */
    00177          printf("PROCESS message, buf[0]=%d, buf[%ld]=%d\n",
    00178            msg->data[0],
    00179            msg->len - 1,
    00180            msg->data[msg->len - 1]);
    00181        }
    

    Otherwise, there is a property, which we make sure is "Remain".

    00187          /* Found property, is it for fragmentation? */
    00188          err = lbm_msg_properties_get(msg->properties, "Remain",
    00189            &remaining, &prop_type, &prop_size);
    00190          if (err == LBM_OK) {
    00191            handle_msg_frag(msg, rcv_state, remaining);
    00192          }
    

    Once the message is known to be a fragment, it is passed to the function handle_msg_frag() for reassembly. This function has two states it can be in: collecting (in the middle of a fragmented message), or not collecting (waiting for a negative "Remain" property to indicate the start of a fragmented message).

    If it is collecting, the newly-received message is validated to make sure it is the expected next fragment.

    00101    if (rcv_state->collecting) {
    00102      /* Make sure the fragment is OK. */
    00103      if (remaining >= 0 &&
    00104        (rcv_state->message_len == (rcv_state->offset + msg->len + remaining)))
    00105      {
    

    Once validated, the message content is collected. If this is the last fragment in the message ("Remain"==0) then the reassembled message is delivered.

    00106        /* Collect the fragment. */
    00107        memcpy(&rcv_state->reassem_buf[rcv_state->offset], msg->data,
    00108          msg->len);
    00109        rcv_state->offset += msg->len;
    00110  
    00111        if (remaining == 0) {  /* No more, deliver the data. */
    00112          printf("PROCESS message, buf[0]=%d, buf[%ld]=%d\n",
    00113            rcv_state->reassem_buf[0],
    00114            rcv_state->message_len - 1,
    00115            rcv_state->reassem_buf[rcv_state->message_len - 1]);
    00116  
    00117          /* No longer collecting. */
    00118          rcv_state->collecting = 0;
    00119        }
    

    First Fragment Processing

    If handle_msg_frag() is called and the receiver state is not collecting, then the fragment should be the first fragment of a large message ("Remain" negative).

    00131    else {  /* not collecting */
    00132      /* Not collecting a fragmented message, is this message first frag? */
    00133      if (remaining < 0) {
    00134        /* First fragment. */
    

    Start the collection state.

    00135        rcv_state->collecting = 1;
    00136        rcv_state->offset = 0;
    00137        rcv_state->message_len = msg->len + (- remaining);
    

    To keep the algorithm general, if the fragmented message is larger than the currently-allocated reassembly buffer, expand it.

    00138        /* Expand buffer if necessary. */
    00139        if (rcv_state->message_len > rcv_state->reassem_buf_size) {
    00140          rcv_state->reassem_buf_size = rcv_state->message_len;
    00141          rcv_state->reassem_buf = (char *)realloc(rcv_state->reassem_buf,
    00142            rcv_state->reassem_buf_size);
    00143        }
    

    Finally, collect the first fragment's data.

    00145        /* Collect data. */
    00146        memcpy(&rcv_state->reassem_buf[rcv_state->offset], msg->data, msg->len);
    00147        rcv_state->offset += msg->len;