There are two source files associated with this example:
The UM "request/response" feature provides a convenient, easy-to-code method for a classic server program to respond to a request sent by a client. The feature takes care of routing the server's response message back to the client that sent the request. It also associates response messages with the corresponding request messages, even if multiple messages are outstanding. See Request-Response Sample.
However, there are advantages for the server to use a normal UM source to send response messages:
This sample program uses an LBT-RU source to return response messages to the requesting client program.
Clients use a UM source to send a request to a server, and the server uses a UM receiver to receive it. The server then uses a UM source to send a response back to only the client that sent the request, and that client uses a UM receiver to receive it. This sample allows and arbitrary number of client programs to come and go, without the server being pre-configured to know about them.
When a client program initializes, it creates a source to send requests and a receiver to receive responses. The client then exchanges handshakes with the server to register with it, and to verify round-trip connectivity. This verification avoids initial message loss during topic resolution.
The client uses the server's well-known request topic to create the source.
00124 server.topic_name = "Server1.Request";
. . .
00136 /* Create source to send requests to server. */ 00137 { 00138 lbm_topic_t *topic; 00139 err = lbm_src_topic_alloc(&topic, ctx, server.topic_name, NULL); 00140 EX_LBM_CHK(err); 00141 err = lbm_src_create(&server.src, ctx, topic, NULL, NULL, NULL); 00142 EX_LBM_CHK(err); 00143 }
The client needs a unique topic to receive is responses. Using the process PID is an easy, but non-robust method (two clients on different machines can have the same PID).
00110 /* Get (pretty much) unique client name. */ 00111 sprintf(response_topic_name, "Client.%lx.Response", (long)getpid());
. . .
00145 /* Create receiver for responses from server. */ 00146 { 00147 lbm_topic_t *topic; 00148 err = lbm_rcv_topic_lookup(&topic, ctx, response_topic_name, NULL); 00149 EX_LBM_CHK(err); 00150 err = lbm_rcv_create(&server.rcv, ctx, topic, response_rcv_cb, &server, NULL); 00151 EX_LBM_CHK(err); 00152 }
The registration message that the client sends to the server is all-ascii, consisting of lower-case 'r' followed by a null-terminated topic name on which the client receives responses.
00158 char register_msg[257]; 00159 sprintf(register_msg, "r%s", response_topic_name);
The client sends registration messages repeatedly in a loop, with exponential backoff delays. For example, if the server is not running, the client sends registration attempts at 1, 2, 4, 8, ... milliseconds, to a maximum of 1 second.
00161 backoff_delay = 1; /* In milliseconds. */ 00162 MSLEEP(backoff_delay); /* Let TR complete. */ 00163 while (server.state == 0) { 00164 try_cnt ++; 00165 err = lbm_src_send(server.src, register_msg, 00166 strlen(register_msg) + 1, LBM_MSG_FLUSH | LBM_SRC_BLOCK); 00167 EX_LBM_CHK(err); 00168 printf("Sent '%s' to %s\n", register_msg, server.topic_name); 00169 00170 /* Exponential backoff, to max of 1 sec. */ 00171 backoff_delay *= 2; /* Exponential backoff to max of 1 sec. */ 00172 if (backoff_delay > 1000) { 00173 backoff_delay = 1000; 00174 } 00175 MSLEEP(backoff_delay); /* Wait for server response. */ 00176 } 00177 printf("Took %d tries to register with server.\n", try_cnt);
The loop exits when the client receives a registration response.
00070 case LBM_MSG_DATA: /* Received a message from the client. */ 00071 printf("Received %ld bytes on topic %s: '%.*s'\n", 00072 (long)msg->len, msg->topic_name, (int)msg->len, msg->data); 00073 00074 if (msg->len > 1 && msg->data[0] == 'r') { 00075 /* Should check for success. */ 00076 server->state = 1; /* Registered. */ 00077 }
The server makes use of source notification create/delete callbacks to manage client state. The callbacks are configured as part of creation of the server's main receiver.
00257 lbm_rcv_src_notification_func_t source_notification_function; 00258 lbm_rcv_topic_attr_t *rcv_attr; 00259 lbm_topic_t *topic; 00260 00261 err = lbm_rcv_topic_attr_create(&rcv_attr); 00262 EX_LBM_CHK(err); 00263 00264 /* Set up per-source state management. */ 00265 source_notification_function.create_func = start_client_source; 00266 source_notification_function.delete_func = end_client_source; 00267 source_notification_function.clientd = ctx; /* Pass this to start_client_source(). */ 00268 err = lbm_rcv_topic_attr_setopt(rcv_attr, "source_notification_function", 00269 &source_notification_function, sizeof(source_notification_function)); 00270 EX_LBM_CHK(err); 00271 00272 err = lbm_rcv_topic_lookup(&topic, ctx, "Server1.Request", rcv_attr); 00273 EX_LBM_CHK(err); 00274 /* Pass context object as clientd. */ 00275 err = lbm_rcv_create(&rcv, ctx, topic, request_rcv_cb, ctx, NULL); 00276 EX_LBM_CHK(err); 00277 00278 err = lbm_rcv_topic_attr_delete(rcv_attr); 00279 EX_LBM_CHK(err); 00280 }
Then, when the receiver discovers a new client's source, start_client_source() is called. That function only creates the state structure associated with the new client.
00193 void *start_client_source(const char *source_name, void *clientd) 00194 { 00195 lbm_context_t *ctx = (lbm_context_t *)clientd;; 00196 client_t *new_client = (client_t *)malloc(sizeof(client_t)); 00197 NULL_CHK(new_client); 00198 00199 new_client->state = 0; /* Waiting for register. */ 00200 new_client->topic_name = NULL; 00201 new_client->source_name = strdup(source_name); 00202 NULL_CHK(new_client->source_name); 00203 new_client->ctx = ctx; 00204 new_client->resp_src = NULL; 00205 00206 return new_client; 00207 } /* start_client_source */
When the client sends its registration message, it is received by the server, which uses it to create the source used for sending responses. The state structure created above is passed to the receiver callback in msg->source_clientd.
00147 int request_rcv_cb(lbm_rcv_t *rcv, lbm_msg_t *msg, void *clientd) 00148 { 00149 client_t *client = (client_t *)msg->source_clientd;
Registration requests start with the character 'r'.
00153 case LBM_MSG_DATA: /* Received a message from the client. */ 00154 printf("Received %ld bytes on topic %s: '%.*s'\n", 00155 (long)msg->len, msg->topic_name, (int)msg->len, msg->data); 00156 00157 /* Register message. */ 00158 if (msg->len > 1 && msg->data[0] == 'r') { 00159 handle_register(client, &msg->data[1]); 00160 }
. . .
00075 void handle_register(client_t *client, const char *client_resp_name) 00076 { 00077 int err; 00078 /* This work should probably be passed to a separate worker thread, but 00079 * I'll do it here to simplify the code. */ 00080 00081 if (client->state == 1) { 00082 printf("Source '%s' re-confirmed\n", client->topic_name); 00083 /* Reply to client. */ 00084 err = lbm_src_send(client->resp_src, "rOK", 4, LBM_MSG_FLUSH | LBM_SRC_NONBLOCK); 00085 EX_LBM_CHK(err); 00086 } 00087 else if (client->state == 0) { 00088 lbm_src_topic_attr_t * src_attr; 00089 lbm_topic_t *lbm_topic; 00090 00091 /* Create source to send responses to client. */ 00092 err = lbm_src_topic_attr_create(&src_attr); 00093 EX_LBM_CHK(err); 00094 err = lbm_src_topic_attr_str_setopt(src_attr, "transport", "lbt-ru"); 00095 EX_LBM_CHK(err); 00096 err = lbm_src_topic_attr_str_setopt(src_attr, "transport_lbtru_port", "12070"); 00097 EX_LBM_CHK(err); 00098 err = lbm_src_topic_attr_str_setopt(src_attr, 00099 "transport_source_side_filtering_behavior", "inclusion"); 00100 EX_LBM_CHK(err); 00101 err = lbm_src_topic_alloc(&lbm_topic, client->ctx, client_resp_name, src_attr); 00102 EX_LBM_CHK(err); 00103 /* The following create can fail if a new client joins with the same 00104 * response topic name as an existing client. Should handle this cleanly. */ 00105 err = lbm_src_create(&client->resp_src, client->ctx, lbm_topic, 00106 NULL, NULL, NULL); 00107 if (err) { 00108 printf("Source '%s' failed; %s\n", client_resp_name, lbm_errmsg()); 00109 } 00110 else { 00111 client->state = 1; 00112 client->topic_name = strdup(client_resp_name); 00113 NULL_CHK(client->topic_name); 00114 printf("Source '%s' created\n", client->topic_name); 00115 } 00116 00117 err = lbm_src_topic_attr_delete(src_attr); 00118 EX_LBM_CHK(err); 00119 } 00120 } /* handle_register */
The response source is created in a special way. It is an LBT-RU transport type, with a fixed port number (the choice of 12070 was arbitrary). This means that all clients' sources are mapped to the same LBT-RU transport session.
00094 err = lbm_src_topic_attr_str_setopt(src_attr, "transport", "lbt-ru"); 00095 EX_LBM_CHK(err); 00096 err = lbm_src_topic_attr_str_setopt(src_attr, "transport_lbtru_port", "12070"); 00097 EX_LBM_CHK(err);
To prevent each client's responses from being sent to all clients, source-side filtering is enabled.
00098 err = lbm_src_topic_attr_str_setopt(src_attr, 00099 "transport_source_side_filtering_behavior", "inclusion"); 00100 EX_LBM_CHK(err);
On successful source creation, the client state is filled in. But no reply is sent to the client. This is because the client has not yet had a chance to discover and join the new source, so any message sent at this point would not be delivered.
00111 client->state = 1; 00112 client->topic_name = strdup(client_resp_name); 00113 NULL_CHK(client->topic_name); 00114 printf("Source '%s' created\n", client->topic_name);
The server could set some kind of timer to send its reply, but to simplify this code, we just wait for the client to retry. When the next registration request comes in, we send the OK message. Thus, it will always take at least 2 tries to register.
00081 if (client->state == 1) { 00082 printf("Source '%s' re-confirmed\n", client->topic_name); 00083 /* Reply to client. */ 00084 err = lbm_src_send(client->resp_src, "rOK", 4, LBM_MSG_FLUSH | LBM_SRC_NONBLOCK); 00085 EX_LBM_CHK(err); 00086 }
This example does not contain the code necessary to associate responses with their requests. However, the simple protocol was designed to allow this to be added. A request message consists of an 8-ascii-byte header followed by the application message content. The first byte of the header is the character 'R' to indicate a request message. The other 7 bytes are currently just set to ascii '0' characters, but could be used as a request ID. When the server responds to requests, it uses the same header it received, so the client can maintain a table of outstanding requests. This logic is not included in this example.
In this example, the send buffer for requests is declared as an arbitrary large buffer. The header is initialized once.
00183 char send_buf[500]; 00184 req_hdr_t *req_hdr = (req_hdr_t *)send_buf; 00185 00186 memset((char *)req_hdr, '0', sizeof(req_hdr_t)); 00187 req_hdr->data[0] = 'R';
For this simple example, 5 requests are sent.
00189 for (i = 0; i < 5; i++) { 00190 /* The application builds a request into <tt>request_msg</tt>. */ 00191 char request_msg[257]; 00192 sprintf(request_msg, "%s.%d", response_topic_name, i); 00193 00194 /* The application message is copied in after the header. */ 00195 strcpy(&send_buf[sizeof(req_hdr_t)], request_msg); 00196 err = lbm_src_send(server.src, send_buf, strlen(send_buf) + 1, 00197 LBM_MSG_FLUSH | LBM_SRC_NONBLOCK); 00198 EX_LBM_CHK(err); 00199 MSLEEP(1000); 00200 }
When the server receives the request, it sends a response.
00162 /* Request message. */ 00163 else if (msg->len >= sizeof(req_hdr_t) && msg->data[0] == 'R') { 00164 req_hdr_t req_hdr; 00165 memcpy((char *)&req_hdr, msg->data, sizeof(req_hdr_t)); 00166 00167 handle_req(client, &req_hdr, &msg->data[sizeof(req_hdr_t)], 00168 msg->len - sizeof(req_hdr_t)); 00169 }
. . .
00123 void handle_req(client_t *client, req_hdr_t *req_hdr, const char *req_msg, size_t len) 00124 { 00125 int err; 00126 00127 /* This work should probably be passed to a separate worker thread, but 00128 * I'll do it here to simplify the code. */ 00129 00130 /* Responses copy the header from the request. */ 00131 memcpy(&client->send_buf[0], (char *)req_hdr, sizeof(req_hdr_t)); 00132 /* Response message is put after the header. */ 00133 char *response_msg = &client->send_buf[sizeof(req_hdr_t)]; 00134 00135 /* Do the work of the request and put the response in response_msg. 00136 * (For this sample, just echo back the request.) */ 00137 strcpy(response_msg, req_msg); 00138 00139 /* Reply to client. */ 00140 err = lbm_src_send(client->resp_src, client->send_buf, 00141 strlen(client->send_buf) + 1, LBM_MSG_FLUSH | LBM_SRC_NONBLOCK); 00142 EX_LBM_CHK(err); 00143 printf("sent response to '%s'.\n", client->topic_name); 00144 } /* handle_req */