#ifdef __VOS__
#define _POSIX_C_SOURCE 200112L
#include <sys/time.h>
#endif
#include <stdio.h>
#include <time.h>
#include <string.h>
#ifdef _WIN32
#define strcasecmp stricmp
#define snprintf _snprintf
#else
#include "config.h"
#include <unistd.h>
#if defined(__TANDEM)
#if defined(HAVE_TANDEM_SPT)
#include <ktdmtyp.h>
#include <spthread.h>
#else
#include <pthread.h>
#endif
#else
#include <pthread.h>
#endif
#include <strings.h>
#endif
#include <lbm/lbmmontrlbmsnmp.h>
{
lbmmon_transport_lbmsnmp_initsrc,
lbmmon_transport_lbmsnmp_initrcv,
lbmmon_transport_lbmsnmp_send,
lbmmon_transport_lbmsnmp_receive,
lbmmon_transport_lbmsnmp_src_finish,
lbmmon_transport_lbmsnmp_rcv_finish,
lbmmon_transport_lbmsnmp_errmsg,
NULL,
NULL,
NULL,
NULL
};
typedef struct
{
} lbmmon_transport_lbmsnmp_src_t;
struct lbmmon_transport_lbmsnmp_rcv_node_t_stct
{
size_t mUsedBytes;
struct lbmmon_transport_lbmsnmp_rcv_node_t_stct * mNext;
};
typedef struct lbmmon_transport_lbmsnmp_rcv_node_t_stct lbmmon_transport_lbmsnmp_rcv_node_t;
typedef struct
{
unsigned int mLockCreated;
#ifdef _WIN32
CRITICAL_SECTION mLock;
#else
pthread_mutex_t mLock;
#endif
lbmmon_transport_lbmsnmp_rcv_node_t * mHead;
lbmmon_transport_lbmsnmp_rcv_node_t * mTail;
} lbmmon_transport_lbmsnmp_rcv_t;
static void src_cleanup(lbmmon_transport_lbmsnmp_src_t * Data);
static void rcv_cleanup(lbmmon_transport_lbmsnmp_rcv_t * Data);
static int receive_callback(
lbm_rcv_t * Receiver,
lbm_msg_t * Message,
void * ClientData);
static void lock_receiver(lbmmon_transport_lbmsnmp_rcv_t * Receiver);
static void unlock_receiver(lbmmon_transport_lbmsnmp_rcv_t * Receiver);
static int scope_is_valid(const char * Scope);
#define DEFAULT_CONTEXT_NAME "29west_statistics_context"
#define DEFAULT_TOPIC "/29west/statistics"
#define DEFAULT_MULTICAST_TTL "0"
#define DEFAULT_TOPIC_RESOLUTION_ADDRESS "225.200.200.200"
#define DEFAULT_LBTRM_ADDRESS "225.200.200.201"
static char ErrorString[2048];
typedef struct
{
const char * option;
const char * value;
} option_entry_t;
static option_entry_t SourceContextOption[] =
{
{ "operational_mode", "embedded" },
{ "monitor_interval", "0" },
{ "request_tcp_bind_request_port", "0" },
{ "mim_incoming_address", "0.0.0.0" },
{ "resolver_cache", "0" },
{ "resolver_multicast_ttl", DEFAULT_MULTICAST_TTL },
{ "resolver_multicast_address", DEFAULT_TOPIC_RESOLUTION_ADDRESS },
{ NULL, NULL }
};
static option_entry_t SourceContextOptionFixed[] =
{
{ "operational_mode", "embedded" },
{ "monitor_interval", "0" },
{ "request_tcp_bind_request_port", "0" },
{ "mim_incoming_address", "0.0.0.0" },
{ "resolver_cache", "0" },
{ NULL, NULL }
};
static option_entry_t ReceiverContextOption[] =
{
{ "operational_mode", "embedded" },
{ "monitor_interval", "0" },
{ "request_tcp_bind_request_port", "0" },
{ "mim_incoming_address", "0.0.0.0" },
{ "resolver_cache", "0" },
{ "resolver_multicast_ttl", DEFAULT_MULTICAST_TTL },
{ "resolver_multicast_address", DEFAULT_TOPIC_RESOLUTION_ADDRESS },
{ NULL, NULL }
};
static option_entry_t ReceiverContextOptionFixed[] =
{
{ "operational_mode", "embedded" },
{ "monitor_interval", "0" },
{ "request_tcp_bind_request_port", "0" },
{ "mim_incoming_address", "0.0.0.0" },
{ "resolver_cache", "0" },
{ NULL, NULL }
};
static option_entry_t SourceTopicOption[] =
{
{ "transport_lbtru_transmission_window_size", "500000" },
{ "transport_lbtrm_transmission_window_size", "500000" },
{ "transport", "lbtrm" },
{ "transport_lbtrm_multicast_address", DEFAULT_LBTRM_ADDRESS },
{ NULL, NULL }
};
static option_entry_t ReceiverTopicOption[] =
{
{ NULL, NULL }
};
static option_entry_t WildcardReceiverOption[] =
{
{ NULL, NULL }
};
static void lbmmon_transport_lbmsnmp_report_allocation_error(size_t Size)
{
snprintf(ErrorString, sizeof(ErrorString), "Unable to allocate %u bytes", (unsigned) Size);
}
lbmmon_transport_lbmsnmp_module(void)
{
return (&LBMMON_TRANSPORT_LBMSNMP);
}
int
lbmmon_transport_lbmsnmp_initsrc(void * * TransportClientData, const void * TransportOptions)
{
lbmmon_transport_lbmsnmp_src_t * data;
int rc;
const char * ptr = (const char *) TransportOptions;
char key[512];
char value[512];
char config_file[512];
char topic[512];
char scope[512];
char option[512];
option_entry_t * entry;
memset(ErrorString, 0, sizeof(ErrorString));
data = malloc(sizeof(lbmmon_transport_lbmsnmp_src_t));
if (data == NULL)
{
lbmmon_transport_lbmsnmp_report_allocation_error(sizeof(lbmmon_transport_lbmsnmp_src_t));
return (-1);
}
data->mContextAttributes = NULL;
data->mContext = NULL;
data->mTopicAttributes = NULL;
data->mSource = NULL;
data->mTopic = NULL;
memset(config_file, 0, sizeof(config_file));
strncpy(topic, DEFAULT_TOPIC, sizeof(topic));
{
if (strcasecmp(key, "config") == 0)
{
strncpy(config_file, value, sizeof(config_file));
}
else if (strcasecmp(key, "topic") == 0)
{
strncpy(topic, value, sizeof(topic));
}
}
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_attr_create() failed, %s",
return (rc);
}
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_attr_str_setopt() failed, %s",
return (rc);
}
entry = &SourceContextOption[0];
while (entry->option != NULL)
{
{
snprintf(ErrorString,
sizeof(ErrorString),
"error setting option [context %s %s], %s",
entry->option,
entry->value,
src_cleanup(data);
return (rc);
}
entry++;
}
if (config_file[0] != '\0')
{
if (rc != 0)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbmaux_context_attr_setopt_from_file() failed, %s",
src_cleanup(data);
return (-1);
}
}
ptr = (const char *) TransportOptions;
{
if (sscanf(key, "%[a-zA-Z_]|%[a-zA-Z_]", scope, option) != 2)
{
continue;
}
if (scope_is_valid(scope) == -1)
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid option scope [%s]",
scope);
src_cleanup(data);
return (-1);
}
if (strcasecmp(scope, "context") == 0)
{
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid option [context %s %s], %s",
option,
value,
src_cleanup(data);
return (rc);
}
}
}
entry = &SourceContextOptionFixed[0];
while (entry->option != NULL)
{
{
snprintf(ErrorString,
sizeof(ErrorString),
"error setting option [context %s %s], %s",
entry->option,
entry->value,
src_cleanup(data);
return (rc);
}
entry++;
}
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_create() failed, %s",
src_cleanup(data);
return (rc);
}
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_src_topic_attr_create_default() failed, %s",
src_cleanup(data);
return (rc);
}
entry = &SourceTopicOption[0];
while (entry->option != NULL)
{
{
snprintf(ErrorString,
sizeof(ErrorString),
"error setting option [source %s %s], %s",
entry->option,
entry->value,
src_cleanup(data);
return (rc);
}
entry++;
}
if (config_file[0] != '\0')
{
if (rc != 0)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbmaux_src_topic_attr_setopt_from_file() failed, %s",
src_cleanup(data);
return (-1);
}
}
ptr = (const char *) TransportOptions;
{
if (sscanf(key, "%[a-zA-Z_]|%[a-zA-Z_]", scope, option) != 2)
{
continue;
}
if (strcasecmp(scope, "source") == 0)
{
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid option [source %s %s], %s",
option,
value,
src_cleanup(data);
return (rc);
}
}
}
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_src_topic_alloc() failed, %s",
src_cleanup(data);
return (rc);
}
rc =
lbm_src_create(&(data->mSource), data->mContext, data->mTopic, NULL, NULL, NULL);
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_src_create() failed, %s",
src_cleanup(data);
return (rc);
}
*TransportClientData = data;
return (0);
}
int
{
lbmmon_transport_lbmsnmp_rcv_t * rcv = (lbmmon_transport_lbmsnmp_rcv_t *) ClientData;
lbmmon_transport_lbmsnmp_rcv_node_t * node;
{
lock_receiver(rcv);
node = malloc(sizeof(lbmmon_transport_lbmsnmp_rcv_node_t));
if (node == NULL)
{
lbmmon_transport_lbmsnmp_report_allocation_error(sizeof(lbmmon_transport_lbmsnmp_rcv_node_t));
return (-1);
}
node->mMessage = Message;
node->mUsedBytes = 0;
node->mNext = NULL;
if (rcv->mTail != NULL)
{
rcv->mTail->mNext = node;
}
else
{
rcv->mHead = node;
}
rcv->mTail = node;
unlock_receiver(rcv);
}
return (0);
}
int
lbmmon_transport_lbmsnmp_initrcv(void * * TransportClientData, const void * TransportOptions)
{
lbmmon_transport_lbmsnmp_rcv_t * data;
int rc;
const char * ptr = (const char *) TransportOptions;
char key[512];
char value[512];
char config_file[512];
char topic[512];
char wildcard_topic[512];
char scope[512];
char option[512];
option_entry_t * entry;
memset(ErrorString, 0, sizeof(ErrorString));
data = malloc(sizeof(lbmmon_transport_lbmsnmp_rcv_t));
if (data == NULL)
{
lbmmon_transport_lbmsnmp_report_allocation_error(sizeof(lbmmon_transport_lbmsnmp_rcv_t));
return (-1);
}
data->mLockCreated = 0;
data->mContextAttributes = NULL;
data->mContext = NULL;
data->mReceiver = NULL;
data->mTopicAttributes = NULL;
data->mTopic = NULL;
data->mWildcardReceiverAttributes = NULL;
data->mWildcardReceiver = NULL;
data->mHead = NULL;
data->mTail = NULL;
memset(config_file, 0, sizeof(config_file));
strncpy(topic, DEFAULT_TOPIC, sizeof(topic));
memset(wildcard_topic, 0, sizeof(wildcard_topic));
{
if (strcasecmp(key, "config") == 0)
{
strncpy(config_file, value, sizeof(config_file));
}
else if (strcasecmp(key, "topic") == 0)
{
strncpy(topic, value, sizeof(topic));
}
else if (strcasecmp(key, "wctopic") == 0)
{
strncpy(wildcard_topic, value, sizeof(wildcard_topic));
}
}
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_attr_create() failed, %s",
rcv_cleanup(data);
return (rc);
}
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_attr_str_setopt() failed, %s",
return (rc);
}
entry = &ReceiverContextOption[0];
while (entry->option != NULL)
{
{
snprintf(ErrorString,
sizeof(ErrorString),
"error setting option [context %s %s], %s",
entry->option,
entry->value,
rcv_cleanup(data);
return (rc);
}
entry++;
}
if (config_file[0] != '\0')
{
if (rc != 0)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbmaux_context_attr_setopt_from_file() failed, %s",
rcv_cleanup(data);
return (-1);
}
}
ptr = (const char *) TransportOptions;
{
if (sscanf(key, "%[a-zA-Z_]|%[a-zA-Z_]", scope, option) != 2)
{
continue;
}
if (scope_is_valid(scope) == -1)
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid option scope [%s]",
scope);
rcv_cleanup(data);
return (-1);
}
if (strcasecmp(scope, "context") == 0)
{
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid option [context %s %s], %s",
option,
value,
rcv_cleanup(data);
return (rc);
}
}
}
entry = &ReceiverContextOptionFixed[0];
while (entry->option != NULL)
{
{
snprintf(ErrorString,
sizeof(ErrorString),
"error setting option [context %s %s], %s",
entry->option,
entry->value,
rcv_cleanup(data);
return (rc);
}
entry++;
}
if (wildcard_topic[0] != '\0')
{
{
snprintf(ErrorString,
sizeof(ErrorString),
"error setting option [context %s %s], %s",
"resolver_cache",
"1",
rcv_cleanup(data);
return (rc);
}
}
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_create() failed, %s",
rcv_cleanup(data);
return (rc);
}
if (wildcard_topic[0] != '\0')
{
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_wildcard_rcv_attr_init() failed, %s",
rcv_cleanup(data);
return (rc);
}
if (config_file[0] != '\0')
{
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbmaux_wildcard_rcv_attr_setopt_from_file() failed, %s",
rcv_cleanup(data);
return (-1);
}
}
ptr = (const char *) TransportOptions;
{
if (sscanf(key, "%[a-zA-Z_]|%[a-zA-Z_]", scope, option) != 2)
{
continue;
}
if (strcasecmp(scope, "wildcard_receiver") == 0)
{
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid option [wildcard_receiver %s %s], %s",
option,
value,
rcv_cleanup(data);
return (rc);
}
}
}
entry = &WildcardReceiverOption[0];
while (entry->option != NULL)
{
{
snprintf(ErrorString,
sizeof(ErrorString),
"error setting option [wildcard_receiver %s %s], %s",
entry->option,
entry->value,
rcv_cleanup(data);
return (rc);
}
}
entry++;
}
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_rcv_topic_attr_init() failed, %s",
rcv_cleanup(data);
return (rc);
}
if (config_file[0] != '\0')
{
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbmaux_rcv_topic_attr_setopt_from_file() failed, %s",
rcv_cleanup(data);
return (-1);
}
}
ptr = (const char *) TransportOptions;
{
if (sscanf(key, "%[a-zA-Z_]|%[a-zA-Z_]", scope, option) != 2)
{
continue;
}
if (strcasecmp(scope, "receiver") == 0)
{
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid option [receiver %s %s], %s",
option,
value,
rcv_cleanup(data);
return (rc);
}
}
}
entry = &ReceiverTopicOption[0];
while (entry->option != NULL)
{
{
snprintf(ErrorString,
sizeof(ErrorString),
"error setting option [receiver %s %s], %s",
entry->option,
entry->value,
rcv_cleanup(data);
return (rc);
}
entry++;
}
if (wildcard_topic[0] == '\0')
{
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_rcv_topic_lookup() failed, %s",
rcv_cleanup(data);
return (rc);
}
}
#ifdef _WIN32
InitializeCriticalSection(&(data->mLock));
#else
pthread_mutex_init(&(data->mLock), NULL);
#endif
data->mLockCreated = 1;
lock_receiver(data);
if (wildcard_topic[0] != '\0')
{
data->mContext,
wildcard_topic,
data->mTopicAttributes,
data->mWildcardReceiverAttributes,
receive_callback,
data,
NULL);
}
else
{
data->mContext,
data->mTopic,
receive_callback,
data,
NULL);
}
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_wildcard_rcv_create()/lbm_rcv_create() failed, %s",
unlock_receiver(data);
rcv_cleanup(data);
return (rc);
}
*TransportClientData = data;
unlock_receiver(data);
return (0);
}
int
lbmmon_transport_lbmsnmp_send(const char * Data, size_t Length, void * TransportClientData)
{
lbmmon_transport_lbmsnmp_src_t * src;
int rc;
if ((Data == NULL) || (TransportClientData == NULL))
{
strncpy(ErrorString, "Invalid argument", sizeof(ErrorString));
return (-1);
}
src = (lbmmon_transport_lbmsnmp_src_t *) TransportClientData;
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_src_send() failed, %s",
}
return (rc);
}
int
lbmmon_transport_lbmsnmp_receive(char * Data, size_t * Length, unsigned int TimeoutMS, void * TransportClientData)
{
lbmmon_transport_lbmsnmp_rcv_t * rcv = (lbmmon_transport_lbmsnmp_rcv_t *) TransportClientData;
lbmmon_transport_lbmsnmp_rcv_node_t * node;
int rc = 0;
size_t length_remaining;
#if defined(_WIN32)
#elif defined(__TANDEM)
unsigned int sleep_sec;
unsigned int sleep_usec;
#else
struct timespec ivl;
#endif
if ((Data == NULL) || (Length == NULL) || (TransportClientData == NULL))
{
strncpy(ErrorString, "Invalid argument", sizeof(ErrorString));
return (-1);
}
if (*Length == 0)
{
return (0);
}
lock_receiver(rcv);
if (rcv->mHead != NULL)
{
node = rcv->mHead;
length_remaining = node->mMessage->len - node->mUsedBytes;
if (*Length >= length_remaining)
{
memcpy(Data, node->mMessage->data + node->mUsedBytes, length_remaining);
*Length = length_remaining;
rc = 0;
rcv->mHead = node->mNext;
if (rcv->mHead == NULL)
{
rcv->mTail = NULL;
}
free(node);
}
else
{
lbm_logf(
LBM_LOG_ERR,
"Core-8034-3: [LBMMON] Dropping monitoring message that is larger than the maximum allowed size of %d (size=%d)",
*Length, node->mMessage->len);
rcv->mHead = node->mNext;
if (rcv->mHead == NULL)
{
rcv->mTail = NULL;
}
free(node);
rc = 1;
}
unlock_receiver(rcv);
}
else
{
unlock_receiver(rcv);
#define NANOSECONDS_PER_SECOND 1000000000
#define MICROSECONDS_PER_SECOND 1000000
#define MILLISECONDS_PER_SECOND 1000
#define NANOSECONDS_PER_MILLISECOND (NANOSECONDS_PER_SECOND / MILLISECONDS_PER_SECOND)
#define MICROSECONDS_PER_MILLISECOND (MICROSECONDS_PER_SECOND / MILLISECONDS_PER_SECOND)
#if defined(_WIN32)
Sleep(TimeoutMS);
#elif defined(__TANDEM)
sleep_sec = TimeoutMS / MILLISECONDS_PER_SECOND;
sleep_usec = (TimeoutMS % MILLISECONDS_PER_SECOND) * MICROSECONDS_PER_MILLISECOND;
if (sleep_usec > 0)
{
usleep(sleep_usec);
}
if (sleep_sec > 0)
{
sleep(sleep_sec);
}
#else
ivl.tv_sec = TimeoutMS / MILLISECONDS_PER_SECOND;
ivl.tv_nsec = (TimeoutMS % MILLISECONDS_PER_SECOND) * NANOSECONDS_PER_MILLISECOND;
nanosleep(&ivl, NULL);
#endif
rc = 1;
}
return (rc);
}
void
src_cleanup(lbmmon_transport_lbmsnmp_src_t * Data)
{
if (Data->mSource != NULL)
{
Data->mSource = NULL;
}
Data->mTopic = NULL;
if (Data->mTopicAttributes != NULL)
{
Data->mTopicAttributes = NULL;
}
if (Data->mContext != NULL)
{
Data->mContext = NULL;
}
if (Data->mContextAttributes != NULL)
{
Data->mContextAttributes = NULL;
}
free(Data);
}
int
lbmmon_transport_lbmsnmp_src_finish(void * TransportClientData)
{
lbmmon_transport_lbmsnmp_src_t * src;
if (TransportClientData == NULL)
{
strncpy(ErrorString, "Invalid argument", sizeof(ErrorString));
return (-1);
}
src = (lbmmon_transport_lbmsnmp_src_t *) TransportClientData;
src_cleanup(src);
return (0);
}
void
rcv_cleanup(lbmmon_transport_lbmsnmp_rcv_t * Data)
{
lbmmon_transport_lbmsnmp_rcv_node_t * node;
lbmmon_transport_lbmsnmp_rcv_node_t * next;
if (Data->mWildcardReceiver != NULL)
{
Data->mWildcardReceiver = NULL;
}
if (Data->mWildcardReceiverAttributes != NULL)
{
Data->mWildcardReceiverAttributes = NULL;
}
if (Data->mReceiver != NULL)
{
Data->mReceiver = NULL;
}
if (Data->mTopicAttributes != NULL)
{
Data->mTopicAttributes = NULL;
}
Data->mTopic = NULL;
if (Data->mLockCreated != 0)
{
lock_receiver(Data);
}
if (Data->mContext != NULL)
{
Data->mContext = NULL;
}
if (Data->mContextAttributes != NULL)
{
Data->mContextAttributes = NULL;
}
node = Data->mHead;
while (node != NULL)
{
next = node->mNext;
free(node);
node = next;
}
if (Data->mLockCreated)
{
unlock_receiver(Data);
#ifdef _WIN32
DeleteCriticalSection(&(Data->mLock));
#else
pthread_mutex_destroy(&(Data->mLock));
#endif
}
free(Data);
}
int
lbmmon_transport_lbmsnmp_rcv_finish(void * TransportClientData)
{
lbmmon_transport_lbmsnmp_rcv_t * rcv;
if (TransportClientData == NULL)
{
strncpy(ErrorString, "Invalid argument", sizeof(ErrorString));
return (-1);
}
rcv = (lbmmon_transport_lbmsnmp_rcv_t *) TransportClientData;
rcv_cleanup(rcv);
return (0);
}
void
lock_receiver(lbmmon_transport_lbmsnmp_rcv_t * Receiver)
{
#ifdef _WIN32
EnterCriticalSection(&(Receiver->mLock));
#else
pthread_mutex_lock(&(Receiver->mLock));
#endif
}
void
unlock_receiver(lbmmon_transport_lbmsnmp_rcv_t * Receiver)
{
#ifdef _WIN32
LeaveCriticalSection(&(Receiver->mLock));
#else
pthread_mutex_unlock(&(Receiver->mLock));
#endif
}
const char *
lbmmon_transport_lbmsnmp_errmsg(void)
{
return (ErrorString);
}
int
scope_is_valid(const char * Scope)
{
if (strcasecmp(Scope, "context") == 0)
{
return (0);
}
if (strcasecmp(Scope, "source") == 0)
{
return (0);
}
if (strcasecmp(Scope, "receiver") == 0)
{
return (0);
}
if (strcasecmp(Scope, "event_queue") == 0)
{
return (0);
}
return (-1);
}