UM C API  6.16
Source code for lbmmontrlbm.c
/*
All of the documentation and software included in this and any
other Informatica Inc. Ultra Messaging Releases
Copyright (C) Informatica Inc. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted only as covered by the terms of a
valid software license agreement with Informatica Inc.
(C) Copyright 2004,2023 Informatica Inc. All Rights Reserved.
THE SOFTWARE IS PROVIDED "AS IS" AND INFORMATICA DISCLAIMS ALL WARRANTIES
EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION, ANY IMPLIED WARRANTIES OF
NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A PARTICULAR
PURPOSE. INFORMATICA DOES NOT WARRANT THAT USE OF THE SOFTWARE WILL BE
UNINTERRUPTED OR ERROR-FREE. INFORMATICA SHALL NOT, UNDER ANY CIRCUMSTANCES, BE
LIABLE TO LICENSEE FOR LOST PROFITS, CONSEQUENTIAL, INCIDENTAL, SPECIAL OR
INDIRECT DAMAGES ARISING OUT OF OR RELATED TO THIS AGREEMENT OR THE
TRANSACTIONS CONTEMPLATED HEREUNDER, EVEN IF INFORMATICA HAS BEEN APPRISED OF
THE LIKELIHOOD OF SUCH DAMAGES.
*/
#ifdef __VOS__
#define _POSIX_C_SOURCE 200112L
#include <sys/time.h>
#endif
#include <stdio.h>
#include <time.h>
#include <string.h>
#ifdef _WIN32
#define strcasecmp stricmp
#define snprintf _snprintf
#else
#include "config.h"
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#if defined(__TANDEM)
#if defined(HAVE_TANDEM_SPT)
#include <ktdmtyp.h>
#include <spthread.h>
#else
#include <pthread.h>
#endif
#else
#include <pthread.h>
#endif
#include <strings.h>
#endif
#include <lbm/lbmmon.h>
#include <lbm/lbmmontrlbm.h>
#include <lbm/lbmaux.h>
/*
Package all of the needed function pointers for this module into a
lbmmon_transport_func_t structure.
*/
static const lbmmon_transport_func_t LBMMON_TRANSPORT_LBM =
{
lbmmon_transport_lbm_initsrc,
lbmmon_transport_lbm_initrcv,
lbmmon_transport_lbm_send,
lbmmon_transport_lbm_receive,
lbmmon_transport_lbm_src_finish,
lbmmon_transport_lbm_rcv_finish,
lbmmon_transport_lbm_errmsg,
lbmmon_transport_lbm_ctrlmsgtarget,
lbmmon_transport_lbm_ctrlmsg_receive,
lbmmon_transport_lbm_ctrlmsg_response,
lbmmon_transport_allow_debug
};
/*
A queue of incoming statistics or control messages is maintained. This describes each
entry in the queue.
*/
struct lbmmon_transport_lbm_rcv_node_t_stct
{
/* Pointer to the LBM message */
lbm_msg_t * mMessage;
/* Number of bytes of the message returned to caller */
size_t mUsedBytes;
/* Next entry in the queue */
struct lbmmon_transport_lbm_rcv_node_t_stct * mNext;
};
typedef struct lbmmon_transport_lbm_rcv_node_t_stct lbmmon_transport_lbm_rcv_node_t;
/*
For a statistics source or receiver, one of these gets returned as the TransportClientData.
*/
typedef struct
{
/* Flag to indicate lock has been created */
unsigned int mLockCreated;
/* Lock to prevent access by multiple threads */
#ifdef _WIN32
CRITICAL_SECTION mLock;
#else
pthread_mutex_t mLock;
#endif
/* LBM context attributes */
lbm_context_attr_t * mContextAttributes;
/* LBM context created to send or receive packets */
lbm_context_t * mContext;
/* LBM source topic attributes */
lbm_src_topic_attr_t * mSourceTopicAttributes;
/* LBM source created to send a statistics packet */
lbm_src_t * mSource;
/* LBM receiver topic attributes */
lbm_rcv_topic_attr_t * mReceiverTopicAttributes;
/* LBM receiver used to receive packets */
lbm_rcv_t * mReceiver;
/* LBM topic */
lbm_topic_t * mTopic;
/* Wildcard receiver attributes */
lbm_wildcard_rcv_attr_t * mWildcardReceiverAttributes;
/* If we're using a wildcard receiver... */
lbm_wildcard_rcv_t * mWildcardReceiver;
/* Head of the message queue */
lbmmon_transport_lbm_rcv_node_t * mHead;
/* Tail of the message queue */
lbmmon_transport_lbm_rcv_node_t * mTail;
/* Allow debug indicator */
int mAllowDebug;
} lbmmon_transport_lbm_clientd_t;
static void cleanup_clientd(lbmmon_transport_lbm_clientd_t * Data);
static int handle_immediate_msg(lbm_context_t * Context, lbm_msg_t * Message, void * ClientData);
static int receive_callback(lbm_rcv_t * Receiver, lbm_msg_t * Message, void * ClientData);
static void lock(lbmmon_transport_lbm_clientd_t * Receiver);
static void unlock(lbmmon_transport_lbm_clientd_t * Receiver);
static int scope_is_valid(const char * Scope);
#define DEFAULT_CONTEXT_NAME "29west_statistics_context"
#define DEFAULT_TOPIC "/29west/statistics"
static char ErrorString[2048];
static char RequestTargetAddress[80];
typedef struct
{
const char * option;
const char * value;
} option_entry_t;
static option_entry_t SourceContextOption[] =
{
/* Force embedded mode for simplicity. */
{ "operational_mode", "embedded" },
/* Disable monitoring for this context. */
{ "monitor_interval", "0" },
/* We don't need MIM, so disable MIM receiver. */
{ "mim_incoming_address", "0.0.0.0" },
/* No need to cache topics. */
{ "resolver_cache", "0" },
/* End of list. */
{ NULL, NULL }
};
static option_entry_t ReceiverContextOption[] =
{
/* Force embedded mode for simplicity. */
{ "operational_mode", "embedded" },
/* Disable monitoring for this context. */
{ "monitor_interval", "0" },
/* We don't need request/response, so don't use up ports. */
{ "request_tcp_bind_request_port", "0" },
/* We don't need MIM, so disable MIM receiver. */
{ "mim_incoming_address", "0.0.0.0" },
/* No need to cache topics. */
{ "resolver_cache", "0" },
/* End of list. */
{ NULL, NULL }
};
static option_entry_t SourceTopicOption[] =
{
/* Minimize memory used for LBT-RU retransmissions. */
{ "transport_lbtru_transmission_window_size", "500000" },
/* Minimize memory used for LBT-RM retransmissions. */
{ "transport_lbtrm_transmission_window_size", "500000" },
/* End of list. */
{ NULL, NULL }
};
static option_entry_t ReceiverTopicOption[] =
{
/* End of list. */
{ NULL, NULL }
};
static option_entry_t WildcardReceiverOption[] =
{
/* End of list. */
{ NULL, NULL }
};
static void lbmmon_transport_lbm_report_allocation_error(size_t Size)
{
snprintf(ErrorString, sizeof(ErrorString), "Unable to allocate %u bytes", (unsigned) Size);
}
lbmmon_transport_lbm_module(void)
{
return (&LBMMON_TRANSPORT_LBM);
}
/*
* Handler for immediate messages directed to NULL topic
* (when operating as a statitics source).
*/
int handle_immediate_msg(lbm_context_t * Context, lbm_msg_t * Message, void * ClientData)
{
lbmmon_transport_lbm_clientd_t * src = (lbmmon_transport_lbm_clientd_t *)ClientData;
lbmmon_transport_lbm_rcv_node_t * node;
if (Message->type == LBM_MSG_REQUEST)
{
/* A received control message. We want to enqueue it for processing. */
lock(src);
node = malloc(sizeof(lbmmon_transport_lbm_rcv_node_t));
if (node == NULL)
{
lbmmon_transport_lbm_report_allocation_error(sizeof(lbmmon_transport_lbm_rcv_node_t));
return (-1);
}
/*
Since we hold onto the message until it is actually processed,
let LBM know about it.
*/
lbm_msg_retain(Message);
node->mMessage = Message;
node->mUsedBytes = 0; /* No data returned as yet */
/* Link the message onto the queue */
node->mNext = NULL;
if (src->mTail != NULL)
{
src->mTail->mNext = node;
} else
{
src->mHead = node;
}
src->mTail = node;
unlock(src);
} else {
/* MSGDESC: LBMMON received an unexpected message type on the control request port.
* MSGRES: Stop the source from sending to this port. */
lbm_logf(LBM_LOG_ERR, "Core-10995-13: [LBMMON] Dropping message of unexpected type %d from source %s", Message->type, Message->source);
}
return (0);
}
int
lbmmon_transport_lbm_initsrc(void * * TransportClientData, const void * TransportOptions)
{
lbmmon_transport_lbm_clientd_t * data;
int rc;
const char * ptr = (const char *) TransportOptions;
char key[512];
char value[512];
char config_file[512];
char topic[512];
char scope[512];
char option[512];
option_entry_t * entry;
/* following variables are for options we want to retrieve via getopt calls */
size_t optlen;
unsigned short int request_port;
int request_port_bound;
lbm_ipv4_address_mask_t unicast_target_iface;
struct in_addr inaddr;
memset(ErrorString, 0, sizeof(ErrorString));
memset(RequestTargetAddress, 0, sizeof(RequestTargetAddress));
data = malloc(sizeof(lbmmon_transport_lbm_clientd_t));
if (data == NULL)
{
lbmmon_transport_lbm_report_allocation_error(sizeof(lbmmon_transport_lbm_clientd_t));
return (-1);
}
memset(data, 0, sizeof(lbmmon_transport_lbm_clientd_t));
/* Process any options */
memset(config_file, 0, sizeof(config_file));
strncpy(topic, DEFAULT_TOPIC, sizeof(topic));
while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
{
if (strcasecmp(key, "config") == 0)
{
strncpy(config_file, value, sizeof(config_file));
}
else if (strcasecmp(key, "topic") == 0)
{
strncpy(topic, value, sizeof(topic));
}
else if (strcasecmp(key, "allow_debug") == 0)
{
if (strcasecmp(value, "on") == 0)
{
data->mAllowDebug = 1;
}
else if (strcasecmp(value, "off") == 0)
{
data->mAllowDebug = 0;
}
else
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid allow_debug argument [%s], argument must be \"on\" or \"off\"", value);
return (LBM_FAILURE);
}
}
}
/* Initialize the context attributes */
rc = lbm_context_attr_create_default(&(data->mContextAttributes));
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_attr_create() failed, %s",
return (rc);
}
/* Set the default context name */
rc = lbm_context_attr_str_setopt(data->mContextAttributes, "context_name", DEFAULT_CONTEXT_NAME);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_attr_str_setopt() failed, %s",
return (rc);
}
/* Disable request/response by default but customer can override */
rc = lbm_context_attr_str_setopt(data->mContextAttributes, "request_tcp_bind_request_port", "0");
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_attr_str_setopt() failed, %s",
return (rc);
}
if (config_file[0] != '\0')
{
/* A config file was passed as an option. Use it to populate the context attributes. */
rc = lbmaux_context_attr_setopt_from_file(data->mContextAttributes, config_file);
if (rc != 0)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbmaux_context_attr_setopt_from_file() failed, %s",
cleanup_clientd(data);
return (-1);
}
}
/* Go back through the options, looking for any specific context options. */
ptr = (const char *) TransportOptions;
while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
{
if (sscanf(key, "%[a-zA-Z_]|%[a-zA-Z_]", scope, option) != 2)
{
continue;
}
if (scope_is_valid(scope) == -1)
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid option scope [%s]",
scope);
cleanup_clientd(data);
return (-1);
}
if (strcasecmp(scope, "context") == 0)
{
rc = lbm_context_attr_str_setopt(data->mContextAttributes, option, value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid option [context %s %s], %s",
option,
value,
cleanup_clientd(data);
return (rc);
}
}
}
entry = &SourceContextOption[0];
while (entry->option != NULL)
{
rc = lbm_context_attr_str_setopt(data->mContextAttributes, entry->option, entry->value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"error setting option [context %s %s], %s",
entry->option,
entry->value,
cleanup_clientd(data);
return (rc);
}
entry++;
}
{
topicless_im_rcv_func.clientd = data;
topicless_im_rcv_func.evq = NULL;
topicless_im_rcv_func.func = handle_immediate_msg;
rc = lbm_context_attr_setopt(data->mContextAttributes, "immediate_message_receiver_function",
&topicless_im_rcv_func, sizeof(topicless_im_rcv_func));
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_rcv_immediate_msgs: %s",
cleanup_clientd(data);
return (rc);
}
}
#ifdef _WIN32
InitializeCriticalSection(&(data->mLock));
#else
pthread_mutex_init(&(data->mLock), NULL);
#endif
data->mLockCreated = 1;
lock(data);
/* Create the context */
rc = lbm_context_create(&(data->mContext), data->mContextAttributes, NULL, NULL);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_create() failed, %s",
unlock(data);
cleanup_clientd(data);
return (rc);
}
optlen = sizeof(request_port_bound);
rc = (lbm_context_getopt(data->mContext,
"request_tcp_bind_request_port",
&request_port_bound,
&optlen) == LBM_FAILURE);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_getopt(request_tcp_bind_request_port): %s",
unlock(data);
cleanup_clientd(data);
return (rc);
}
if (request_port_bound == 1) {
optlen = sizeof(request_port);
rc = (lbm_context_getopt(data->mContext,
"request_tcp_port",
&request_port,
&optlen) == LBM_FAILURE);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_getopt(request_tcp_port): %s",
unlock(data);
cleanup_clientd(data);
return (rc);
}
optlen = sizeof(unicast_target_iface);
rc = (lbm_context_getopt(data->mContext,
"request_tcp_interface",
&unicast_target_iface,
&optlen) == LBM_FAILURE);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_getopt(request_tcp_interface): %s",
unlock(data);
cleanup_clientd(data);
return (rc);
}
/* if the request_tcp_interface is INADDR_ANY, get one we know is good. */
if (unicast_target_iface.addr == INADDR_ANY) {
rc = (lbm_context_getopt(data->mContext,
"resolver_multicast_interface",
&unicast_target_iface,
&optlen) == LBM_FAILURE);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_getopt(resolver_multicast_interface): %s",
unlock(data);
cleanup_clientd(data);
return (rc);
}
}
inaddr.s_addr = unicast_target_iface.addr;
snprintf(RequestTargetAddress, sizeof(RequestTargetAddress), "TCP:%s:%d", inet_ntoa(inaddr), ntohs(request_port));
}
/* Initialize the source topic attributes */
rc = lbm_src_topic_attr_create_default(&(data->mSourceTopicAttributes));
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_src_topic_attr_create_default() failed, %s",
unlock(data);
cleanup_clientd(data);
return (rc);
}
entry = &SourceTopicOption[0];
while (entry->option != NULL)
{
rc = lbm_src_topic_attr_str_setopt(data->mSourceTopicAttributes, entry->option, entry->value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"error setting option [source %s %s], %s",
entry->option,
entry->value,
unlock(data);
cleanup_clientd(data);
return (rc);
}
entry++;
}
if (config_file[0] != '\0')
{
/* A config file was passed as an option. Use it to populate the source topic attributes. */
rc = lbmaux_src_topic_attr_setopt_from_file(data->mSourceTopicAttributes, config_file);
if (rc != 0)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbmaux_src_topic_attr_setopt_from_file() failed, %s",
unlock(data);
cleanup_clientd(data);
return (-1);
}
}
/* Go back through the options, looking for any specific source options. */
ptr = (const char *) TransportOptions;
while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
{
if (sscanf(key, "%[a-zA-Z_]|%[a-zA-Z_]", scope, option) != 2)
{
continue;
}
if (strcasecmp(scope, "source") == 0)
{
rc = lbm_src_topic_attr_str_setopt(data->mSourceTopicAttributes, option, value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid option [source %s %s], %s",
option,
value,
unlock(data);
cleanup_clientd(data);
return (rc);
}
}
}
/* Create the topic */
rc = lbm_src_topic_alloc(&(data->mTopic), data->mContext, topic, data->mSourceTopicAttributes);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_src_topic_alloc() failed, %s",
unlock(data);
cleanup_clientd(data);
return (rc);
}
/* Create the source */
rc = lbm_src_create(&(data->mSource), data->mContext, data->mTopic, NULL, NULL, NULL);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_src_create() failed, %s",
unlock(data);
cleanup_clientd(data);
return (rc);
}
/* Pass back the lbmmon_transport_lbm_clientd_t created */
*TransportClientData = data;
unlock(data);
return (0);
}
/*
This function is called upon receipt of an LBM message (when operating as
a statistics receiver).
*/
int
receive_callback(lbm_rcv_t * Receiver, lbm_msg_t * Message, void * ClientData)
{
lbmmon_transport_lbm_clientd_t * rcv = (lbmmon_transport_lbm_clientd_t *) ClientData;
lbmmon_transport_lbm_rcv_node_t * node;
if (Message->type == LBM_MSG_DATA)
{
/* A data message. We want to enqueue it for processing. */
lock(rcv);
node = malloc(sizeof(lbmmon_transport_lbm_rcv_node_t));
if (node == NULL)
{
lbmmon_transport_lbm_report_allocation_error(sizeof(lbmmon_transport_lbm_rcv_node_t));
return (-1);
}
/*
Since we hold onto the message until it is actually processed,
let LBM know about it.
*/
lbm_msg_retain(Message);
node->mMessage = Message;
node->mUsedBytes = 0; /* No data returned as yet */
/* Link the message onto the queue */
node->mNext = NULL;
if (rcv->mTail != NULL)
{
rcv->mTail->mNext = node;
}
else
{
rcv->mHead = node;
}
rcv->mTail = node;
unlock(rcv);
} else if (Message->type == LBM_MSG_BOS) {
/*MSGDESC: LBMMON has received a Beginning of Transport Session message.
*MSGRES: This message for informational purposes only and can be ignored. */
lbm_logf(LBM_LOG_INFO, "Core-10995-140: [LBMMON] Topic[%s] Source[%s], Beginning of Transport Session\n", Message->topic_name, Message->source);
} else if (Message->type == LBM_MSG_EOS) {
/*MSGDESC: LBMMON has received a Beginning of Transport Session message.
*MSGRES: This message for informational purposes only and can be ignored. */
lbm_logf(LBM_LOG_INFO, "Core-10995-141: [LBMMON] Topic[%s] Source[%s], End of Transport Session\n", Message->topic_name, Message->source);
} else if (Message->type == LBM_MSG_UNRECOVERABLE_LOSS) {
/*MSGDESC: A message was declared unrecoverably lost by LBMMON's receiver's underlying delivery controller.
*MSGRES: Identify the source of the loss or Contact Informatica Support. */
lbm_logf(LBM_LOG_WARNING, "Core-10995-142: [LBMMON] Topic[%s] Source[%s] Unrecoverable Loss at SQN[%u]\n", Message->topic_name, Message->source, Message->sequence_number);
} else if (Message->type == LBM_MSG_UNRECOVERABLE_LOSS_BURST) {
/*MSGDESC: A burst of messages were declared unrecoverably lost by LBMMON's receiver's underlying delivery controller.
*MSGRES: Identify the source of the loss or Contact Informatica Support. */
lbm_logf(LBM_LOG_WARNING, "Core-10995-143: [LBMMON] Topic[%s] Source[%s] Unrecoverable Loss Burst at SQN[%u]\n", Message->topic_name, Message->source, Message->sequence_number);
}
return (0);
}
int
lbmmon_transport_lbm_initrcv(void * * TransportClientData, const void * TransportOptions)
{
lbmmon_transport_lbm_clientd_t * data;
int rc;
const char * ptr = (const char *) TransportOptions;
char key[512];
char value[512];
char config_file[512];
char topic[512];
char wildcard_topic[512];
char scope[512];
char option[512];
option_entry_t * entry;
memset(ErrorString, 0, sizeof(ErrorString));
data = malloc(sizeof(lbmmon_transport_lbm_clientd_t));
if (data == NULL)
{
lbmmon_transport_lbm_report_allocation_error(sizeof(lbmmon_transport_lbm_clientd_t));
return (-1);
}
memset(data, 0, sizeof(lbmmon_transport_lbm_clientd_t));
/* Process any options */
memset(config_file, 0, sizeof(config_file));
strncpy(topic, DEFAULT_TOPIC, sizeof(topic));
memset(wildcard_topic, 0, sizeof(wildcard_topic));
while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
{
if (strcasecmp(key, "config") == 0)
{
strncpy(config_file, value, sizeof(config_file));
}
else if (strcasecmp(key, "topic") == 0)
{
strncpy(topic, value, sizeof(topic));
}
else if (strcasecmp(key, "wctopic") == 0)
{
strncpy(wildcard_topic, value, sizeof(wildcard_topic));
}
}
/* Initialize the context attributes */
rc = lbm_context_attr_create_default(&(data->mContextAttributes));
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_attr_create() failed, %s",
cleanup_clientd(data);
return (rc);
}
/* Set the default context name */
rc = lbm_context_attr_str_setopt(data->mContextAttributes, "context_name", DEFAULT_CONTEXT_NAME);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_attr_str_setopt() failed, %s",
return (rc);
}
if (config_file[0] != '\0')
{
/* A config file was passed as an option. Use it to populate the context attributes. */
rc = lbmaux_context_attr_setopt_from_file(data->mContextAttributes, config_file);
if (rc != 0)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbmaux_context_attr_setopt_from_file() failed, %s",
cleanup_clientd(data);
return (-1);
}
}
/* Go back through the options, looking for any specific context options. */
ptr = (const char *) TransportOptions;
while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
{
if (sscanf(key, "%[a-zA-Z_]|%[a-zA-Z_]", scope, option) != 2)
{
continue;
}
if (scope_is_valid(scope) == -1)
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid option scope [%s]",
scope);
cleanup_clientd(data);
return (-1);
}
if (strcasecmp(scope, "context") == 0)
{
rc = lbm_context_attr_str_setopt(data->mContextAttributes, option, value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid option [context %s %s], %s",
option,
value,
cleanup_clientd(data);
return (rc);
}
}
}
entry = &ReceiverContextOption[0];
while (entry->option != NULL)
{
rc = lbm_context_attr_str_setopt(data->mContextAttributes, entry->option, entry->value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"error setting option [context %s %s], %s",
entry->option,
entry->value,
cleanup_clientd(data);
return (rc);
}
entry++;
}
/* Resolver cache need to enabled for wildcard receiver to work */
if (wildcard_topic[0] != '\0')
{
rc = lbm_context_attr_str_setopt(data->mContextAttributes, "resolver_cache", "1");
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"error setting option [context %s %s], %s",
"resolver_cache",
"1",
cleanup_clientd(data);
return (rc);
}
}
/* Create the context */
rc = lbm_context_create(&(data->mContext), data->mContextAttributes, NULL, NULL);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_create() failed, %s",
cleanup_clientd(data);
return (rc);
}
/* If a wildcard topic was specified, initialize the wildcard receiver attributes. */
if (wildcard_topic[0] != '\0')
{
rc = lbm_wildcard_rcv_attr_create_default(&(data->mWildcardReceiverAttributes));
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_wildcard_rcv_attr_init() failed, %s",
cleanup_clientd(data);
return (rc);
}
if (config_file[0] != '\0')
{
/* A config file was passed as an option. Use it to populate the wildcard receiver attributes. */
rc = lbmaux_wildcard_rcv_attr_setopt_from_file(data->mWildcardReceiverAttributes, config_file);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbmaux_wildcard_rcv_attr_setopt_from_file() failed, %s",
cleanup_clientd(data);
return (-1);
}
}
/* Go back through the options, looking for any specific wildcard receiver options. */
ptr = (const char *) TransportOptions;
while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
{
if (sscanf(key, "%[a-zA-Z_]|%[a-zA-Z_]", scope, option) != 2)
{
continue;
}
if (strcasecmp(scope, "wildcard_receiver") == 0)
{
rc = lbm_wildcard_rcv_attr_str_setopt(data->mWildcardReceiverAttributes, option, value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid option [wildcard_receiver %s %s], %s",
option,
value,
cleanup_clientd(data);
return (rc);
}
}
}
entry = &WildcardReceiverOption[0];
while (entry->option != NULL)
{
rc = lbm_wildcard_rcv_attr_str_setopt(data->mWildcardReceiverAttributes, entry->option, entry->value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"error setting option [wildcard_receiver %s %s], %s",
entry->option,
entry->value,
cleanup_clientd(data);
return (rc);
}
entry++;
}
}
/* Initialize and set the receiver topic attributes. */
rc = lbm_rcv_topic_attr_create_default(&(data->mReceiverTopicAttributes));
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_rcv_topic_attr_init() failed, %s",
cleanup_clientd(data);
return (rc);
}
if (config_file[0] != '\0')
{
/* A config file was passed as an option. Use it to populate the receiver topic attributes. */
rc = lbmaux_rcv_topic_attr_setopt_from_file(data->mReceiverTopicAttributes, config_file);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbmaux_rcv_topic_attr_setopt_from_file() failed, %s",
cleanup_clientd(data);
return (-1);
}
}
/* Go back through the options, looking for any specific receiver options. */
ptr = (const char *) TransportOptions;
while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
{
if (sscanf(key, "%[a-zA-Z_]|%[a-zA-Z_]", scope, option) != 2)
{
continue;
}
if (strcasecmp(scope, "receiver") == 0)
{
rc = lbm_rcv_topic_attr_str_setopt(data->mReceiverTopicAttributes, option, value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid option [receiver %s %s], %s",
option,
value,
cleanup_clientd(data);
return (rc);
}
}
}
entry = &ReceiverTopicOption[0];
while (entry->option != NULL)
{
rc = lbm_rcv_topic_attr_str_setopt(data->mReceiverTopicAttributes, entry->option, entry->value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"error setting option [receiver %s %s], %s",
entry->option,
entry->value,
cleanup_clientd(data);
return (rc);
}
entry++;
}
/* For a non-wildcard topic, lookup the topic. */
if (wildcard_topic[0] == '\0')
{
rc = lbm_rcv_topic_lookup(&(data->mTopic), data->mContext, topic, data->mReceiverTopicAttributes);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_rcv_topic_lookup() failed, %s",
cleanup_clientd(data);
return (rc);
}
}
#ifdef _WIN32
InitializeCriticalSection(&(data->mLock));
#else
pthread_mutex_init(&(data->mLock), NULL);
#endif
data->mLockCreated = 1;
lock(data);
if (wildcard_topic[0] != '\0')
{
/* Wildcard topic, create a wildcard receiver */
rc = lbm_wildcard_rcv_create(&(data->mWildcardReceiver),
data->mContext,
wildcard_topic,
data->mReceiverTopicAttributes,
data->mWildcardReceiverAttributes,
receive_callback,
data,
NULL);
}
else
{
/* Non-wildcard topic, create a normal receiver */
rc = lbm_rcv_create(&(data->mReceiver),
data->mContext,
data->mTopic,
receive_callback,
data,
NULL);
}
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_wildcard_rcv_create()/lbm_rcv_create() failed, %s",
unlock(data);
cleanup_clientd(data);
return (rc);
}
/* Pass back the lbmmon_transport_lbm_clientd_t created */
*TransportClientData = data;
unlock(data);
return (0);
}
int
lbmmon_transport_lbm_send(const char * Data, size_t Length, void * TransportClientData)
{
lbmmon_transport_lbm_clientd_t * src;
int rc;
if ((Data == NULL) || (TransportClientData == NULL))
{
strncpy(ErrorString, "Invalid argument", sizeof(ErrorString));
return (-1);
}
src = (lbmmon_transport_lbm_clientd_t *) TransportClientData;
rc = lbm_src_send(src->mSource, Data, Length, 0);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_src_send() failed, %s",
}
return (rc);
}
int
lbmmon_transport_lbm_receive(char * Data, size_t * Length, unsigned int TimeoutMS, void * TransportClientData)
{
lbmmon_transport_lbm_clientd_t * rcv = (lbmmon_transport_lbm_clientd_t *) TransportClientData;
lbmmon_transport_lbm_rcv_node_t * node;
int rc = 0;
size_t length_remaining;
#if defined(_WIN32)
#elif defined(__TANDEM)
unsigned int sleep_sec;
unsigned int sleep_usec;
#else
struct timespec ivl;
#endif
if ((Data == NULL) || (Length == NULL) || (TransportClientData == NULL))
{
strncpy(ErrorString, "Invalid argument", sizeof(ErrorString));
return (-1);
}
if (*Length == 0)
{
return (0);
}
lock(rcv);
if (rcv->mHead != NULL)
{
/* Queue is non-empty. Pull the first message from the queue. */
node = rcv->mHead;
length_remaining = node->mMessage->len - node->mUsedBytes;
if (*Length >= length_remaining)
{
/* We can transfer the rest of the message */
memcpy(Data, node->mMessage->data + node->mUsedBytes, length_remaining);
*Length = length_remaining;
rc = 0;
/* We're done with the LBM message, so let LBM know. */
lbm_msg_delete(node->mMessage);
/* Unlink the node from the queue */
rcv->mHead = node->mNext;
if (rcv->mHead == NULL)
{
rcv->mTail = NULL;
}
free(node);
}
else
{
/* MSGDESC: The monitoring message received is larger than the maximum allowed size given.
* MSGRES: This is a hard coded maximum. */
lbm_logf(LBM_LOG_ERR, "Core-8034-1: [LBMMON] Dropping monitoring message that is larger than the maximum allowed size of %lu (size=%lu)",
*Length, node->mMessage->len);
/* We're done with the LBM message, so let LBM know. */
lbm_msg_delete(node->mMessage);
/* Unlink the node from the queue */
rcv->mHead = node->mNext;
if (rcv->mHead == NULL)
{
rcv->mTail = NULL;
}
free(node);
rc = 1; /* Positive number prevents caller from logging message too */
}
unlock(rcv);
}
else
{
unlock(rcv);
/* Sleep for wait time */
#define NANOSECONDS_PER_SECOND 1000000000
#define MICROSECONDS_PER_SECOND 1000000
#define MILLISECONDS_PER_SECOND 1000
#define NANOSECONDS_PER_MILLISECOND (NANOSECONDS_PER_SECOND / MILLISECONDS_PER_SECOND)
#define MICROSECONDS_PER_MILLISECOND (MICROSECONDS_PER_SECOND / MILLISECONDS_PER_SECOND)
#if defined(_WIN32)
Sleep(TimeoutMS);
#elif defined(__TANDEM)
sleep_sec = TimeoutMS / MILLISECONDS_PER_SECOND;
sleep_usec = (TimeoutMS % MILLISECONDS_PER_SECOND) * MICROSECONDS_PER_MILLISECOND;
if (sleep_usec > 0)
{
usleep(sleep_usec);
}
if (sleep_sec > 0)
{
sleep(sleep_sec);
}
#else
ivl.tv_sec = TimeoutMS / MILLISECONDS_PER_SECOND;
ivl.tv_nsec = (TimeoutMS % MILLISECONDS_PER_SECOND) * NANOSECONDS_PER_MILLISECOND;
nanosleep(&ivl, NULL);
#endif
rc = 1;
}
return (rc);
}
void
cleanup_clientd(lbmmon_transport_lbm_clientd_t * Data)
{
lbmmon_transport_lbm_rcv_node_t * node;
lbmmon_transport_lbm_rcv_node_t * next;
if (Data->mSource != NULL)
{
lbm_src_delete(Data->mSource);
Data->mSource = NULL;
}
if (Data->mSourceTopicAttributes != NULL)
{
lbm_src_topic_attr_delete(Data->mSourceTopicAttributes);
Data->mSourceTopicAttributes = NULL;
}
/* Stop the receiver to prevent any more incoming messages */
if (Data->mWildcardReceiver != NULL)
{
lbm_wildcard_rcv_delete(Data->mWildcardReceiver);
Data->mWildcardReceiver = NULL;
}
if (Data->mWildcardReceiverAttributes != NULL)
{
lbm_wildcard_rcv_attr_delete(Data->mWildcardReceiverAttributes);
Data->mWildcardReceiverAttributes = NULL;
}
if (Data->mReceiver != NULL)
{
lbm_rcv_delete(Data->mReceiver);
Data->mReceiver = NULL;
}
if (Data->mReceiverTopicAttributes != NULL)
{
lbm_rcv_topic_attr_delete(Data->mReceiverTopicAttributes);
Data->mReceiverTopicAttributes = NULL;
}
Data->mTopic = NULL;
/* Lock the receiver */
if (Data->mLockCreated != 0)
{
lock(Data);
}
/* Delete the context to really make sure no more messages come in */
if (Data->mContext != NULL)
{
lbm_context_delete(Data->mContext);
Data->mContext = NULL;
}
if (Data->mContextAttributes != NULL)
{
lbm_context_attr_delete(Data->mContextAttributes);
Data->mContextAttributes = NULL;
}
/* Clean out the queue */
node = Data->mHead;
while (node != NULL)
{
/* Let LBM know we're done with the message */
lbm_msg_delete(node->mMessage);
next = node->mNext;
free(node);
node = next;
}
if (Data->mLockCreated)
{
unlock(Data);
#ifdef _WIN32
DeleteCriticalSection(&(Data->mLock));
#else
pthread_mutex_destroy(&(Data->mLock));
#endif
}
free(Data);
}
int
lbmmon_transport_lbm_src_finish(void * TransportClientData)
{
lbmmon_transport_lbm_clientd_t * rcv;
if (TransportClientData == NULL)
{
strncpy(ErrorString, "Invalid argument", sizeof(ErrorString));
return (-1);
}
rcv = (lbmmon_transport_lbm_clientd_t *)TransportClientData;
cleanup_clientd(rcv);
return (0);
}
int
lbmmon_transport_lbm_rcv_finish(void * TransportClientData)
{
lbmmon_transport_lbm_clientd_t * rcv;
if (TransportClientData == NULL)
{
strncpy(ErrorString, "Invalid argument", sizeof(ErrorString));
return (-1);
}
rcv = (lbmmon_transport_lbm_clientd_t *) TransportClientData;
cleanup_clientd(rcv);
return (0);
}
void
lock(lbmmon_transport_lbm_clientd_t * Receiver)
{
#ifdef _WIN32
EnterCriticalSection(&(Receiver->mLock));
#else
pthread_mutex_lock(&(Receiver->mLock));
#endif
}
void
unlock(lbmmon_transport_lbm_clientd_t * Receiver)
{
#ifdef _WIN32
LeaveCriticalSection(&(Receiver->mLock));
#else
pthread_mutex_unlock(&(Receiver->mLock));
#endif
}
const int
lbmmon_transport_allow_debug(void * TransportClientData)
{
lbmmon_transport_lbm_clientd_t * client_data = (lbmmon_transport_lbm_clientd_t *)TransportClientData;
return (client_data->mAllowDebug);
}
const char *
lbmmon_transport_lbm_errmsg(void)
{
return (ErrorString);
}
const char *
lbmmon_transport_lbm_ctrlmsgtarget(void)
{
if (RequestTargetAddress[0] == '\0') {
return NULL;
} else {
return (RequestTargetAddress);
}
}
/* lbmmon_transport_lbm_ctrlmsg_receive() - used to dequeue an immediate message request.
* The subsequent call to lbmmon_transport_lbm_ctrlmsg_response() must be invoked to return
* a message in response and to remove the message at mHead. */
int
lbmmon_transport_lbm_ctrlmsg_receive(char * Data, size_t * Length, void * TransportClientData)
{
lbmmon_transport_lbm_clientd_t * rcv = (lbmmon_transport_lbm_clientd_t *)TransportClientData;
lbmmon_transport_lbm_rcv_node_t * node;
int rc = 0;
size_t length_remaining;
if ((Data == NULL) || (Length == NULL) || (TransportClientData == NULL))
{
strncpy(ErrorString, "Invalid argument", sizeof(ErrorString));
return (-1);
}
if (*Length == 0)
{
return (0);
}
lock(rcv);
if (rcv->mHead != NULL)
{
/* Queue is non-empty. Pull the first message from the queue. */
node = rcv->mHead;
length_remaining = node->mMessage->len - node->mUsedBytes;
if (*Length >= length_remaining)
{
memcpy(Data, node->mMessage->data + node->mUsedBytes, length_remaining);
*Length = length_remaining;
rc = 0;
}
else
{
/* MSGDESC: The monitoring message received is larger than the maximum allowed size given.
* MSGRES: This is a hard coded maximum. */
lbm_logf(LBM_LOG_ERR, "Core-10995-15: [LBMMON] Dropping monitoring message that is larger than the maximum allowed size of %lu (size=%lu)",
*Length, node->mMessage->len);
/* We're done with the LBM message, so let LBM know. */
lbm_msg_delete(node->mMessage);
/* Unlink the node from the queue */
rcv->mHead = node->mNext;
if (rcv->mHead == NULL)
{
rcv->mTail = NULL;
}
free(node);
rc = 1; /* Positive number prevents caller from logging message too */
}
}
else
{
rc = 1;
}
unlock(rcv);
return (rc);
}
int
lbmmon_transport_lbm_ctrlmsg_response(const char * Data, size_t Length, void * clientd)
{
lbmmon_transport_lbm_clientd_t * rcv = (lbmmon_transport_lbm_clientd_t *)clientd;
lbmmon_transport_lbm_rcv_node_t * node;
lock(rcv);
if (rcv->mHead != NULL) {
node = rcv->mHead;
if (node->mMessage->type == LBM_MSG_REQUEST)
{
if (lbm_send_response(node->mMessage->response, Data, Length, 0) == LBM_FAILURE)
{
/*MSGDESC: LBMMON could not send a control message response.
*MSGRES: Contact Informatica Support. */
lbm_logf(LBM_LOG_ERR, "Core-10995-14: [LBMMON] Failed to send response [%s]", lbm_errmsg());
}
}
lbm_msg_delete(node->mMessage);
rcv->mHead = node->mNext;
if (rcv->mHead == NULL)
{
rcv->mTail = NULL;
}
free(node);
}
unlock(rcv);
return (0);
}
int
scope_is_valid(const char * Scope)
{
if (strcasecmp(Scope, "context") == 0)
{
return (0);
}
if (strcasecmp(Scope, "source") == 0)
{
return (0);
}
if (strcasecmp(Scope, "receiver") == 0)
{
return (0);
}
if (strcasecmp(Scope, "event_queue") == 0)
{
return (0);
}
return (-1);
}