Example index

How to use sources to send requests and responses.

There are two source files associated with this example:

The UM "request/response" feature provides a convenient, easy-to-code method for a classic server program to respond to a request sent by a client. The feature takes care of routing the server's response message back to the client that sent the request. It also associates response messages with the corresponding request messages, even if multiple messages are outstanding. See Request-Response Sample.

However, there are advantages for the server to use a normal UM source to send response messages:

This sample program uses an LBT-RU source to return response messages to the requesting client program.

Overall Approach

Clients use a UM source to send a request to a server, and the server uses a UM receiver to receive it. The server then uses a UM source to send a response back to only the client that sent the request, and that client uses a UM receiver to receive it. This sample allows and arbitrary number of client programs to come and go, without the server being pre-configured to know about them.

When a client program initializes, it creates a source to send requests and a receiver to receive responses. The client then exchanges handshakes with the server to register with it, and to verify round-trip connectivity. This verification avoids initial message loss during topic resolution.

Registration

The client uses the server's well-known request topic to create the source.

00124      server.topic_name = "Server1.Request";

. . .

00136      /* Create source to send requests to server. */
00137      {
00138          lbm_topic_t *topic;
00139          err = lbm_src_topic_alloc(&topic, ctx, server.topic_name, NULL);
00140          EX_LBM_CHK(err);
00141          err = lbm_src_create(&server.src, ctx, topic, NULL, NULL, NULL);
00142          EX_LBM_CHK(err);
00143      }

The client needs a unique topic to receive is responses. Using the process PID is an easy, but non-robust method (two clients on different machines can have the same PID).

00110      /* Get (pretty much) unique client name. */
00111      sprintf(response_topic_name, "Client.%lx.Response", (long)getpid());

. . .

00145      /* Create receiver for responses from server. */
00146      {
00147          lbm_topic_t *topic;
00148          err = lbm_rcv_topic_lookup(&topic, ctx, response_topic_name, NULL);
00149          EX_LBM_CHK(err);
00150          err = lbm_rcv_create(&server.rcv, ctx, topic, response_rcv_cb, &server, NULL);
00151          EX_LBM_CHK(err);
00152      }

Registration: Client Side

The registration message that the client sends to the server is all-ascii, consisting of lower-case 'r' followed by a null-terminated topic name on which the client receives responses.

00158          char register_msg[257];
00159          sprintf(register_msg, "r%s", response_topic_name);

The client sends registration messages repeatedly in a loop, with exponential backoff delays. For example, if the server is not running, the client sends registration attempts at 1, 2, 4, 8, ... milliseconds, to a maximum of 1 second.

00161          backoff_delay = 1;  /* In milliseconds. */
00162          MSLEEP(backoff_delay);  /* Let TR complete. */
00163          while (server.state == 0) {
00164              try_cnt ++;
00165              err = lbm_src_send(server.src, register_msg,
00166                  strlen(register_msg) + 1, LBM_MSG_FLUSH | LBM_SRC_BLOCK);
00167              EX_LBM_CHK(err);
00168              printf("Sent '%s' to %s\n", register_msg, server.topic_name);
00169  
00170              /* Exponential backoff, to max of 1 sec. */
00171              backoff_delay *= 2;  /* Exponential backoff to max of 1 sec. */
00172              if (backoff_delay > 1000) {
00173                  backoff_delay = 1000;
00174              }
00175              MSLEEP(backoff_delay);  /* Wait for server response. */
00176          }
00177          printf("Took %d tries to register with server.\n", try_cnt);

The loop exits when the client receives a registration response.

00070      case LBM_MSG_DATA:    /* Received a message from the client. */
00071          printf("Received %ld bytes on topic %s: '%.*s'\n",
00072              (long)msg->len, msg->topic_name, (int)msg->len, msg->data);
00073  
00074          if (msg->len > 1 && msg->data[0] == 'r') {
00075              /* Should check for success. */
00076              server->state = 1;  /* Registered. */
00077          }

Registration: Server Side

The server makes use of source notification create/delete callbacks to manage client state. The callbacks are configured as part of creation of the server's main receiver.

00257          lbm_rcv_src_notification_func_t source_notification_function;
00258          lbm_rcv_topic_attr_t *rcv_attr;
00259          lbm_topic_t *topic;
00260  
00261          err = lbm_rcv_topic_attr_create(&rcv_attr);
00262          EX_LBM_CHK(err);
00263  
00264          /* Set up per-source state management. */
00265          source_notification_function.create_func = start_client_source;
00266          source_notification_function.delete_func = end_client_source;
00267          source_notification_function.clientd = ctx;  /* Pass this to start_client_source(). */
00268          err = lbm_rcv_topic_attr_setopt(rcv_attr, "source_notification_function",
00269              &source_notification_function, sizeof(source_notification_function));
00270          EX_LBM_CHK(err);
00271  
00272          err = lbm_rcv_topic_lookup(&topic, ctx, "Server1.Request", rcv_attr);
00273          EX_LBM_CHK(err);
00274          /* Pass context object as clientd. */
00275          err = lbm_rcv_create(&rcv, ctx, topic, request_rcv_cb, ctx, NULL);
00276          EX_LBM_CHK(err);
00277  
00278          err = lbm_rcv_topic_attr_delete(rcv_attr);
00279          EX_LBM_CHK(err);
00280      }

Then, when the receiver discovers a new client's source, start_client_source() is called. That function only creates the state structure associated with the new client.

00193  void *start_client_source(const char *source_name, void *clientd)
00194  {
00195      lbm_context_t *ctx = (lbm_context_t *)clientd;;
00196      client_t *new_client = (client_t *)malloc(sizeof(client_t));
00197      NULL_CHK(new_client);
00198  
00199      new_client->state = 0;  /* Waiting for register. */
00200      new_client->topic_name = NULL;
00201      new_client->source_name = strdup(source_name);
00202      NULL_CHK(new_client->source_name);
00203      new_client->ctx = ctx;
00204      new_client->resp_src = NULL;
00205  
00206      return new_client;
00207  }  /* start_client_source */

When the client sends its registration message, it is received by the server, which uses it to create the source used for sending responses. The state structure created above is passed to the receiver callback in msg->source_clientd.

00147  int request_rcv_cb(lbm_rcv_t *rcv, lbm_msg_t *msg, void *clientd) 
00148  {
00149      client_t *client = (client_t *)msg->source_clientd;

Registration requests start with the character 'r'.

00153      case LBM_MSG_DATA:  /* Received a message from the client. */
00154          printf("Received %ld bytes on topic %s: '%.*s'\n",
00155               (long)msg->len, msg->topic_name, (int)msg->len, msg->data);
00156  
00157          /* Register message. */
00158          if (msg->len > 1 && msg->data[0] == 'r') {
00159              handle_register(client, &msg->data[1]);
00160          }

. . .

00075  void handle_register(client_t *client, const char *client_resp_name)
00076  {
00077      int err;
00078      /* This work should probably be passed to a separate worker thread, but
00079       * I'll do it here to simplify the code. */
00080  
00081      if (client->state == 1) {
00082          printf("Source '%s' re-confirmed\n", client->topic_name);
00083          /* Reply to client. */
00084          err = lbm_src_send(client->resp_src, "rOK", 4, LBM_MSG_FLUSH | LBM_SRC_NONBLOCK);
00085          EX_LBM_CHK(err);
00086      }
00087      else if (client->state == 0) {
00088          lbm_src_topic_attr_t * src_attr;
00089          lbm_topic_t *lbm_topic;
00090  
00091          /* Create source to send responses to client. */
00092          err = lbm_src_topic_attr_create(&src_attr);
00093          EX_LBM_CHK(err);
00094          err = lbm_src_topic_attr_str_setopt(src_attr, "transport", "lbt-ru");
00095          EX_LBM_CHK(err);
00096          err = lbm_src_topic_attr_str_setopt(src_attr, "transport_lbtru_port", "12070");
00097          EX_LBM_CHK(err);
00098          err = lbm_src_topic_attr_str_setopt(src_attr,
00099              "transport_source_side_filtering_behavior", "inclusion");
00100          EX_LBM_CHK(err);
00101          err = lbm_src_topic_alloc(&lbm_topic, client->ctx, client_resp_name, src_attr);
00102          EX_LBM_CHK(err);
00103          /* The following create can fail if a new client joins with the same
00104           * response topic name as an existing client.  Should handle this cleanly. */
00105          err = lbm_src_create(&client->resp_src, client->ctx, lbm_topic,
00106              NULL, NULL, NULL);
00107          if (err) {
00108              printf("Source '%s' failed; %s\n", client_resp_name, lbm_errmsg());
00109          }
00110          else {
00111              client->state = 1;
00112              client->topic_name = strdup(client_resp_name);
00113              NULL_CHK(client->topic_name);
00114              printf("Source '%s' created\n", client->topic_name);
00115          }
00116  
00117          err = lbm_src_topic_attr_delete(src_attr);
00118          EX_LBM_CHK(err);
00119      }
00120  }  /* handle_register */

The response source is created in a special way. It is an LBT-RU transport type, with a fixed port number (the choice of 12070 was arbitrary). This means that all clients' sources are mapped to the same LBT-RU transport session.

00094          err = lbm_src_topic_attr_str_setopt(src_attr, "transport", "lbt-ru");
00095          EX_LBM_CHK(err);
00096          err = lbm_src_topic_attr_str_setopt(src_attr, "transport_lbtru_port", "12070");
00097          EX_LBM_CHK(err);

To prevent each client's responses from being sent to all clients, source-side filtering is enabled.

00098          err = lbm_src_topic_attr_str_setopt(src_attr,
00099              "transport_source_side_filtering_behavior", "inclusion");
00100          EX_LBM_CHK(err);

On successful source creation, the client state is filled in. But no reply is sent to the client. This is because the client has not yet had a chance to discover and join the new source, so any message sent at this point would not be delivered.

00111              client->state = 1;
00112              client->topic_name = strdup(client_resp_name);
00113              NULL_CHK(client->topic_name);
00114              printf("Source '%s' created\n", client->topic_name);

The server could set some kind of timer to send its reply, but to simplify this code, we just wait for the client to retry. When the next registration request comes in, we send the OK message. Thus, it will always take at least 2 tries to register.

00081      if (client->state == 1) {
00082          printf("Source '%s' re-confirmed\n", client->topic_name);
00083          /* Reply to client. */
00084          err = lbm_src_send(client->resp_src, "rOK", 4, LBM_MSG_FLUSH | LBM_SRC_NONBLOCK);
00085          EX_LBM_CHK(err);
00086      }

Request/Response

This example does not contain the code necessary to associate responses with their requests. However, the simple protocol was designed to allow this to be added. A request message consists of an 8-ascii-byte header followed by the application message content. The first byte of the header is the character 'R' to indicate a request message. The other 7 bytes are currently just set to ascii '0' characters, but could be used as a request ID. When the server responds to requests, it uses the same header it received, so the client can maintain a table of outstanding requests. This logic is not included in this example.

Request: Client Side

In this example, the send buffer for requests is declared as an arbitrary large buffer. The header is initialized once.

00183          char send_buf[500];
00184          req_hdr_t *req_hdr = (req_hdr_t *)send_buf;
00185  
00186          memset((char *)req_hdr, '0', sizeof(req_hdr_t));
00187          req_hdr->data[0] = 'R';

For this simple example, 5 requests are sent.

00189          for (i = 0; i < 5; i++) {
00190              /* The application builds a request into <tt>request_msg</tt>. */
00191              char request_msg[257];
00192              sprintf(request_msg, "%s.%d", response_topic_name, i);
00193  
00194              /* The application message is copied in after the header. */
00195              strcpy(&send_buf[sizeof(req_hdr_t)], request_msg);
00196              err = lbm_src_send(server.src, send_buf, strlen(send_buf) + 1,
00197                  LBM_MSG_FLUSH | LBM_SRC_NONBLOCK);
00198              EX_LBM_CHK(err);
00199              MSLEEP(1000);
00200          }

Response: Server Side

When the server receives the request, it sends a response.

00162          /* Request message. */
00163          else if (msg->len >= sizeof(req_hdr_t) && msg->data[0] == 'R') {
00164              req_hdr_t req_hdr;
00165              memcpy((char *)&req_hdr, msg->data, sizeof(req_hdr_t));
00166  
00167              handle_req(client, &req_hdr, &msg->data[sizeof(req_hdr_t)],
00168                  msg->len - sizeof(req_hdr_t));
00169          }

. . .

00123  void handle_req(client_t *client, req_hdr_t *req_hdr, const char *req_msg, size_t len)
00124  {
00125      int err;
00126  
00127      /* This work should probably be passed to a separate worker thread, but
00128       * I'll do it here to simplify the code. */
00129  
00130      /* Responses copy the header from the request. */
00131      memcpy(&client->send_buf[0], (char *)req_hdr, sizeof(req_hdr_t));
00132      /* Response message is put after the header. */
00133      char *response_msg = &client->send_buf[sizeof(req_hdr_t)];
00134  
00135      /* Do the work of the request and put the response in response_msg.
00136       * (For this sample, just echo back the request.) */
00137      strcpy(response_msg, req_msg);
00138      
00139      /* Reply to client. */
00140      err = lbm_src_send(client->resp_src, client->send_buf,
00141          strlen(client->send_buf) + 1, LBM_MSG_FLUSH | LBM_SRC_NONBLOCK);
00142      EX_LBM_CHK(err);
00143      printf("sent response to '%s'.\n", client->topic_name);
00144  }  /* handle_req */