UM C API  6.16
Source code for lbmmontrlbmsnmp.c
/*
All of the documentation and software included in this and any
other Informatica Inc. Ultra Messaging Releases
Copyright (C) Informatica Inc. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted only as covered by the terms of a
valid software license agreement with Informatica Inc.
(C) Copyright 2004,2023 Informatica Inc. All Rights Reserved.
THE SOFTWARE IS PROVIDED "AS IS" AND INFORMATICA DISCLAIMS ALL WARRANTIES
EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION, ANY IMPLIED WARRANTIES OF
NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A PARTICULAR
PURPOSE. INFORMATICA DOES NOT WARRANT THAT USE OF THE SOFTWARE WILL BE
UNINTERRUPTED OR ERROR-FREE. INFORMATICA SHALL NOT, UNDER ANY CIRCUMSTANCES, BE
LIABLE TO LICENSEE FOR LOST PROFITS, CONSEQUENTIAL, INCIDENTAL, SPECIAL OR
INDIRECT DAMAGES ARISING OUT OF OR RELATED TO THIS AGREEMENT OR THE
TRANSACTIONS CONTEMPLATED HEREUNDER, EVEN IF INFORMATICA HAS BEEN APPRISED OF
THE LIKELIHOOD OF SUCH DAMAGES.
*/
#ifdef __VOS__
#define _POSIX_C_SOURCE 200112L
#include <sys/time.h>
#endif
#include <stdio.h>
#include <time.h>
#include <string.h>
#ifdef _WIN32
#define strcasecmp stricmp
#define snprintf _snprintf
#else
#include "config.h"
#include <unistd.h>
#if defined(__TANDEM)
#if defined(HAVE_TANDEM_SPT)
#include <ktdmtyp.h>
#include <spthread.h>
#else
#include <pthread.h>
#endif
#else
#include <pthread.h>
#endif
#include <strings.h>
#endif
#include <lbm/lbmmon.h>
#include <lbm/lbmmontrlbmsnmp.h>
#include <lbm/lbmaux.h>
/*
Package all of the needed function pointers for this module into a
lbmmon_transport_func_t structure.
*/
static const lbmmon_transport_func_t LBMMON_TRANSPORT_LBMSNMP =
{
lbmmon_transport_lbmsnmp_initsrc,
lbmmon_transport_lbmsnmp_initrcv,
lbmmon_transport_lbmsnmp_send,
lbmmon_transport_lbmsnmp_receive,
lbmmon_transport_lbmsnmp_src_finish,
lbmmon_transport_lbmsnmp_rcv_finish,
lbmmon_transport_lbmsnmp_errmsg,
NULL,
NULL,
NULL,
NULL
};
/*
For a statistics source, one of these gets returned as the TransportClientData.
*/
typedef struct
{
/* LBM context attributes */
lbm_context_attr_t * mContextAttributes;
/* LBM context created to send a statistics packet */
lbm_context_t * mContext;
/* LBM topic attributes */
lbm_src_topic_attr_t * mTopicAttributes;
/* LBM source created to send a statistics packet */
lbm_src_t * mSource;
/* LBM topic */
lbm_topic_t * mTopic;
} lbmmon_transport_lbmsnmp_src_t;
/*
A queue of incoming statistics packets is maintained. This describes each
entry in the queue.
*/
struct lbmmon_transport_lbmsnmp_rcv_node_t_stct
{
/* Pointer to the LBM message */
lbm_msg_t * mMessage;
/* Number of bytes of the message returned to caller */
size_t mUsedBytes;
/* Next entry in the queue */
struct lbmmon_transport_lbmsnmp_rcv_node_t_stct * mNext;
};
typedef struct lbmmon_transport_lbmsnmp_rcv_node_t_stct lbmmon_transport_lbmsnmp_rcv_node_t;
/*
For a statistics receiver, one of these gets returned as the TransportClientData.
*/
typedef struct
{
/* Flag to indicate lock has been created */
unsigned int mLockCreated;
/* Lock to prevent access by multiple threads */
#ifdef _WIN32
CRITICAL_SECTION mLock;
#else
pthread_mutex_t mLock;
#endif
/* LBM context attributes */
lbm_context_attr_t * mContextAttributes;
/* LBM context used to receive packets */
lbm_context_t * mContext;
/* LBM receiver used to receive packets */
lbm_rcv_t * mReceiver;
/* Topic attributes */
lbm_rcv_topic_attr_t * mTopicAttributes;
/* Topic */
lbm_topic_t * mTopic;
/* Wildcard receiver attributes */
lbm_wildcard_rcv_attr_t * mWildcardReceiverAttributes;
/* If we're using a wildcard receiver... */
lbm_wildcard_rcv_t * mWildcardReceiver;
/* Head of the message queue */
lbmmon_transport_lbmsnmp_rcv_node_t * mHead;
/* Tail of the message queue */
lbmmon_transport_lbmsnmp_rcv_node_t * mTail;
} lbmmon_transport_lbmsnmp_rcv_t;
static void src_cleanup(lbmmon_transport_lbmsnmp_src_t * Data);
static void rcv_cleanup(lbmmon_transport_lbmsnmp_rcv_t * Data);
static int receive_callback(lbm_rcv_t * Receiver, lbm_msg_t * Message, void * ClientData);
static void lock_receiver(lbmmon_transport_lbmsnmp_rcv_t * Receiver);
static void unlock_receiver(lbmmon_transport_lbmsnmp_rcv_t * Receiver);
static int scope_is_valid(const char * Scope);
#define DEFAULT_CONTEXT_NAME "29west_statistics_context"
#define DEFAULT_TOPIC "/29west/statistics"
#define DEFAULT_MULTICAST_TTL "0"
#define DEFAULT_TOPIC_RESOLUTION_ADDRESS "225.200.200.200"
#define DEFAULT_LBTRM_ADDRESS "225.200.200.201"
static char ErrorString[2048];
typedef struct
{
const char * option;
const char * value;
} option_entry_t;
static option_entry_t SourceContextOption[] =
{
/* Force embedded mode for simplicity. */
{ "operational_mode", "embedded" },
/* Disable monitoring for this context. */
{ "monitor_interval", "0" },
/* We don't need request/response, so don't use up ports. */
{ "request_tcp_bind_request_port", "0" },
/* We don't need MIM, so disable MIM receiver. */
{ "mim_incoming_address", "0.0.0.0" },
/* No need to cache topics. */
{ "resolver_cache", "0" },
/* Force TTL=0 to keep stats and advertisements on the local machine. */
{ "resolver_multicast_ttl", DEFAULT_MULTICAST_TTL },
/* Use a specific topic resolution address. */
{ "resolver_multicast_address", DEFAULT_TOPIC_RESOLUTION_ADDRESS },
/* End of list. */
{ NULL, NULL }
};
static option_entry_t SourceContextOptionFixed[] =
{
/* Force embedded mode for simplicity. */
{ "operational_mode", "embedded" },
/* Disable monitoring for this context. */
{ "monitor_interval", "0" },
/* We don't need request/response, so don't use up ports. */
{ "request_tcp_bind_request_port", "0" },
/* We don't need MIM, so disable MIM receiver. */
{ "mim_incoming_address", "0.0.0.0" },
/* No need to cache topics. */
{ "resolver_cache", "0" },
/* End of list. */
{ NULL, NULL }
};
static option_entry_t ReceiverContextOption[] =
{
/* Force embedded mode for simplicity. */
{ "operational_mode", "embedded" },
/* Disable monitoring for this context. */
{ "monitor_interval", "0" },
/* We don't need request/response, so don't use up ports. */
{ "request_tcp_bind_request_port", "0" },
/* We don't need MIM, so disable MIM receiver. */
{ "mim_incoming_address", "0.0.0.0" },
/* No need to cache topics. */
{ "resolver_cache", "0" },
/* Force TTL=0. */
{ "resolver_multicast_ttl", DEFAULT_MULTICAST_TTL },
/* Use a specific topic resolution address. */
{ "resolver_multicast_address", DEFAULT_TOPIC_RESOLUTION_ADDRESS },
/* End of list. */
{ NULL, NULL }
};
static option_entry_t ReceiverContextOptionFixed[] =
{
/* Force embedded mode for simplicity. */
{ "operational_mode", "embedded" },
/* Disable monitoring for this context. */
{ "monitor_interval", "0" },
/* We don't need request/response, so don't use up ports. */
{ "request_tcp_bind_request_port", "0" },
/* We don't need MIM, so disable MIM receiver. */
{ "mim_incoming_address", "0.0.0.0" },
/* No need to cache topics. */
{ "resolver_cache", "0" },
/* End of list. */
{ NULL, NULL }
};
static option_entry_t SourceTopicOption[] =
{
/* Minimize memory used for LBT-RU retransmissions. */
{ "transport_lbtru_transmission_window_size", "500000" },
/* Minimize memory used for LBT-RM retransmissions. */
{ "transport_lbtrm_transmission_window_size", "500000" },
/* Force LBT-RM. */
{ "transport", "lbtrm" },
/* Force the LBT-RM address. */
{ "transport_lbtrm_multicast_address", DEFAULT_LBTRM_ADDRESS },
/* End of list. */
{ NULL, NULL }
};
static option_entry_t ReceiverTopicOption[] =
{
/* End of list. */
{ NULL, NULL }
};
static option_entry_t WildcardReceiverOption[] =
{
/* End of list. */
{ NULL, NULL }
};
static void lbmmon_transport_lbmsnmp_report_allocation_error(size_t Size)
{
snprintf(ErrorString, sizeof(ErrorString), "Unable to allocate %u bytes", (unsigned) Size);
}
lbmmon_transport_lbmsnmp_module(void)
{
return (&LBMMON_TRANSPORT_LBMSNMP);
}
int
lbmmon_transport_lbmsnmp_initsrc(void * * TransportClientData, const void * TransportOptions)
{
lbmmon_transport_lbmsnmp_src_t * data;
int rc;
const char * ptr = (const char *) TransportOptions;
char key[512];
char value[512];
char config_file[512];
char topic[512];
char scope[512];
char option[512];
option_entry_t * entry;
memset(ErrorString, 0, sizeof(ErrorString));
data = malloc(sizeof(lbmmon_transport_lbmsnmp_src_t));
if (data == NULL)
{
lbmmon_transport_lbmsnmp_report_allocation_error(sizeof(lbmmon_transport_lbmsnmp_src_t));
return (-1);
}
data->mContextAttributes = NULL;
data->mContext = NULL;
data->mTopicAttributes = NULL;
data->mSource = NULL;
data->mTopic = NULL;
/* Process any options */
memset(config_file, 0, sizeof(config_file));
strncpy(topic, DEFAULT_TOPIC, sizeof(topic));
while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
{
if (strcasecmp(key, "config") == 0)
{
strncpy(config_file, value, sizeof(config_file));
}
else if (strcasecmp(key, "topic") == 0)
{
strncpy(topic, value, sizeof(topic));
}
}
/* Initialize the context attributes */
rc = lbm_context_attr_create_default(&(data->mContextAttributes));
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_attr_create() failed, %s",
return (rc);
}
/* Set the default context name */
rc = lbm_context_attr_str_setopt(data->mContextAttributes, "context_name", DEFAULT_CONTEXT_NAME);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_attr_str_setopt() failed, %s",
return (rc);
}
entry = &SourceContextOption[0];
while (entry->option != NULL)
{
rc = lbm_context_attr_str_setopt(data->mContextAttributes, entry->option, entry->value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"error setting option [context %s %s], %s",
entry->option,
entry->value,
src_cleanup(data);
return (rc);
}
entry++;
}
/* Create the context */
if (config_file[0] != '\0')
{
/* A config file was passed as an option. Use it to populate the context attributes. */
rc = lbmaux_context_attr_setopt_from_file(data->mContextAttributes, config_file);
if (rc != 0)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbmaux_context_attr_setopt_from_file() failed, %s",
src_cleanup(data);
return (-1);
}
}
/* Go back through the options, looking for any specific context options. */
ptr = (const char *) TransportOptions;
while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
{
if (sscanf(key, "%[a-zA-Z_]|%[a-zA-Z_]", scope, option) != 2)
{
continue;
}
if (scope_is_valid(scope) == -1)
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid option scope [%s]",
scope);
src_cleanup(data);
return (-1);
}
if (strcasecmp(scope, "context") == 0)
{
rc = lbm_context_attr_str_setopt(data->mContextAttributes, option, value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid option [context %s %s], %s",
option,
value,
src_cleanup(data);
return (rc);
}
}
}
entry = &SourceContextOptionFixed[0];
while (entry->option != NULL)
{
rc = lbm_context_attr_str_setopt(data->mContextAttributes, entry->option, entry->value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"error setting option [context %s %s], %s",
entry->option,
entry->value,
src_cleanup(data);
return (rc);
}
entry++;
}
/* Create the context */
rc = lbm_context_create(&(data->mContext), data->mContextAttributes, NULL, NULL);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_create() failed, %s",
src_cleanup(data);
return (rc);
}
/* Initialize the source topic attributes */
rc = lbm_src_topic_attr_create_default(&(data->mTopicAttributes));
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_src_topic_attr_create_default() failed, %s",
src_cleanup(data);
return (rc);
}
/* Apply the default options first */
entry = &SourceTopicOption[0];
while (entry->option != NULL)
{
rc = lbm_src_topic_attr_str_setopt(data->mTopicAttributes, entry->option, entry->value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"error setting option [source %s %s], %s",
entry->option,
entry->value,
src_cleanup(data);
return (rc);
}
entry++;
}
/* Overwrite the transport options if they are Part of config file */
if (config_file[0] != '\0')
{
/* A config file was passed as an option. Use it to populate the source topic attributes. */
rc = lbmaux_src_topic_attr_setopt_from_file(data->mTopicAttributes, config_file);
if (rc != 0)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbmaux_src_topic_attr_setopt_from_file() failed, %s",
src_cleanup(data);
return (-1);
}
}
/* Go back through the options, looking for any specific source options. */
ptr = (const char *) TransportOptions;
while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
{
if (sscanf(key, "%[a-zA-Z_]|%[a-zA-Z_]", scope, option) != 2)
{
continue;
}
if (strcasecmp(scope, "source") == 0)
{
rc = lbm_src_topic_attr_str_setopt(data->mTopicAttributes, option, value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid option [source %s %s], %s",
option,
value,
src_cleanup(data);
return (rc);
}
}
}
/* Create the topic */
rc = lbm_src_topic_alloc(&(data->mTopic), data->mContext, topic, data->mTopicAttributes);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_src_topic_alloc() failed, %s",
src_cleanup(data);
return (rc);
}
/* Create the source */
rc = lbm_src_create(&(data->mSource), data->mContext, data->mTopic, NULL, NULL, NULL);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_src_create() failed, %s",
src_cleanup(data);
return (rc);
}
/* Pass back the lbmmon_transport_lbmsnmp_src_t created */
*TransportClientData = data;
return (0);
}
/*
This function is called upon receipt of an LBM message (when operating as
a statistics receiver).
*/
int
receive_callback(lbm_rcv_t * Receiver, lbm_msg_t * Message, void * ClientData)
{
lbmmon_transport_lbmsnmp_rcv_t * rcv = (lbmmon_transport_lbmsnmp_rcv_t *) ClientData;
lbmmon_transport_lbmsnmp_rcv_node_t * node;
if (Message->type == LBM_MSG_DATA)
{
/* A data message. We want to enqueue it for processing. */
lock_receiver(rcv);
node = malloc(sizeof(lbmmon_transport_lbmsnmp_rcv_node_t));
if (node == NULL)
{
lbmmon_transport_lbmsnmp_report_allocation_error(sizeof(lbmmon_transport_lbmsnmp_rcv_node_t));
return (-1);
}
/*
Since we hold onto the message until it is actually processed,
let LBM know about it.
*/
lbm_msg_retain(Message);
node->mMessage = Message;
node->mUsedBytes = 0; /* No data returned as yet */
/* Link the message onto the queue */
node->mNext = NULL;
if (rcv->mTail != NULL)
{
rcv->mTail->mNext = node;
}
else
{
rcv->mHead = node;
}
rcv->mTail = node;
unlock_receiver(rcv);
}
return (0);
}
int
lbmmon_transport_lbmsnmp_initrcv(void * * TransportClientData, const void * TransportOptions)
{
lbmmon_transport_lbmsnmp_rcv_t * data;
int rc;
const char * ptr = (const char *) TransportOptions;
char key[512];
char value[512];
char config_file[512];
char topic[512];
char wildcard_topic[512];
char scope[512];
char option[512];
option_entry_t * entry;
memset(ErrorString, 0, sizeof(ErrorString));
data = malloc(sizeof(lbmmon_transport_lbmsnmp_rcv_t));
if (data == NULL)
{
lbmmon_transport_lbmsnmp_report_allocation_error(sizeof(lbmmon_transport_lbmsnmp_rcv_t));
return (-1);
}
data->mLockCreated = 0;
data->mContextAttributes = NULL;
data->mContext = NULL;
data->mReceiver = NULL;
data->mTopicAttributes = NULL;
data->mTopic = NULL;
data->mWildcardReceiverAttributes = NULL;
data->mWildcardReceiver = NULL;
data->mHead = NULL;
data->mTail = NULL;
/* Process any options */
memset(config_file, 0, sizeof(config_file));
strncpy(topic, DEFAULT_TOPIC, sizeof(topic));
memset(wildcard_topic, 0, sizeof(wildcard_topic));
while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
{
if (strcasecmp(key, "config") == 0)
{
strncpy(config_file, value, sizeof(config_file));
}
else if (strcasecmp(key, "topic") == 0)
{
strncpy(topic, value, sizeof(topic));
}
else if (strcasecmp(key, "wctopic") == 0)
{
strncpy(wildcard_topic, value, sizeof(wildcard_topic));
}
}
/* Initialize the context attributes */
rc = lbm_context_attr_create_default(&(data->mContextAttributes));
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_attr_create() failed, %s",
rcv_cleanup(data);
return (rc);
}
/* Set the default context name */
rc = lbm_context_attr_str_setopt(data->mContextAttributes, "context_name", DEFAULT_CONTEXT_NAME);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_attr_str_setopt() failed, %s",
return (rc);
}
/* Populate with Default Values */
entry = &ReceiverContextOption[0];
while (entry->option != NULL)
{
rc = lbm_context_attr_str_setopt(data->mContextAttributes, entry->option, entry->value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"error setting option [context %s %s], %s",
entry->option,
entry->value,
rcv_cleanup(data);
return (rc);
}
entry++;
}
/* Create the context */
if (config_file[0] != '\0')
{
/* A config file was passed as an option. Use it to populate the context attributes. */
rc = lbmaux_context_attr_setopt_from_file(data->mContextAttributes, config_file);
if (rc != 0)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbmaux_context_attr_setopt_from_file() failed, %s",
rcv_cleanup(data);
return (-1);
}
}
/* Go back through the options, looking for any specific context options. */
ptr = (const char *) TransportOptions;
while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
{
if (sscanf(key, "%[a-zA-Z_]|%[a-zA-Z_]", scope, option) != 2)
{
continue;
}
if (scope_is_valid(scope) == -1)
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid option scope [%s]",
scope);
rcv_cleanup(data);
return (-1);
}
if (strcasecmp(scope, "context") == 0)
{
rc = lbm_context_attr_str_setopt(data->mContextAttributes, option, value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid option [context %s %s], %s",
option,
value,
rcv_cleanup(data);
return (rc);
}
}
}
entry = &ReceiverContextOptionFixed[0];
while (entry->option != NULL)
{
rc = lbm_context_attr_str_setopt(data->mContextAttributes, entry->option, entry->value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"error setting option [context %s %s], %s",
entry->option,
entry->value,
rcv_cleanup(data);
return (rc);
}
entry++;
}
/* Resolver cache need to enabled for wildcard receiver to work */
if (wildcard_topic[0] != '\0')
{
rc = lbm_context_attr_str_setopt(data->mContextAttributes, "resolver_cache", "1");
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"error setting option [context %s %s], %s",
"resolver_cache",
"1",
rcv_cleanup(data);
return (rc);
}
}
/* Create the context */
rc = lbm_context_create(&(data->mContext), data->mContextAttributes, NULL, NULL);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_create() failed, %s",
rcv_cleanup(data);
return (rc);
}
/* If a wildcard topic was specified, initialize the wildcard receiver attributes. */
if (wildcard_topic[0] != '\0')
{
rc = lbm_wildcard_rcv_attr_create_default(&(data->mWildcardReceiverAttributes));
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_wildcard_rcv_attr_init() failed, %s",
rcv_cleanup(data);
return (rc);
}
if (config_file[0] != '\0')
{
/* A config file was passed as an option. Use it to populate the wildcard receiver attributes. */
rc = lbmaux_wildcard_rcv_attr_setopt_from_file(data->mWildcardReceiverAttributes, config_file);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbmaux_wildcard_rcv_attr_setopt_from_file() failed, %s",
rcv_cleanup(data);
return (-1);
}
}
/* Go back through the options, looking for any specific wildcard receiver options. */
ptr = (const char *) TransportOptions;
while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
{
if (sscanf(key, "%[a-zA-Z_]|%[a-zA-Z_]", scope, option) != 2)
{
continue;
}
if (strcasecmp(scope, "wildcard_receiver") == 0)
{
rc = lbm_wildcard_rcv_attr_str_setopt(data->mWildcardReceiverAttributes, option, value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid option [wildcard_receiver %s %s], %s",
option,
value,
rcv_cleanup(data);
return (rc);
}
}
}
entry = &WildcardReceiverOption[0];
while (entry->option != NULL)
{
rc = lbm_wildcard_rcv_attr_str_setopt(data->mWildcardReceiverAttributes, entry->option, entry->value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"error setting option [wildcard_receiver %s %s], %s",
entry->option,
entry->value,
rcv_cleanup(data);
return (rc);
}
}
entry++;
}
/* Initialize and set the receiver topic attributes. */
rc = lbm_rcv_topic_attr_create_default(&(data->mTopicAttributes));
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_rcv_topic_attr_init() failed, %s",
rcv_cleanup(data);
return (rc);
}
if (config_file[0] != '\0')
{
/* A config file was passed as an option. Use it to populate the receiver topic attributes. */
rc = lbmaux_rcv_topic_attr_setopt_from_file(data->mTopicAttributes, config_file);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbmaux_rcv_topic_attr_setopt_from_file() failed, %s",
rcv_cleanup(data);
return (-1);
}
}
/* Go back through the options, looking for any specific receiver options. */
ptr = (const char *) TransportOptions;
while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
{
if (sscanf(key, "%[a-zA-Z_]|%[a-zA-Z_]", scope, option) != 2)
{
continue;
}
if (strcasecmp(scope, "receiver") == 0)
{
rc = lbm_rcv_topic_attr_str_setopt(data->mTopicAttributes, option, value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid option [receiver %s %s], %s",
option,
value,
rcv_cleanup(data);
return (rc);
}
}
}
entry = &ReceiverTopicOption[0];
while (entry->option != NULL)
{
rc = lbm_rcv_topic_attr_str_setopt(data->mTopicAttributes, entry->option, entry->value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"error setting option [receiver %s %s], %s",
entry->option,
entry->value,
rcv_cleanup(data);
return (rc);
}
entry++;
}
/* For a non-wildcard topic, lookup the topic. */
if (wildcard_topic[0] == '\0')
{
rc = lbm_rcv_topic_lookup(&(data->mTopic), data->mContext, topic, data->mTopicAttributes);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_rcv_topic_lookup() failed, %s",
rcv_cleanup(data);
return (rc);
}
}
#ifdef _WIN32
InitializeCriticalSection(&(data->mLock));
#else
pthread_mutex_init(&(data->mLock), NULL);
#endif
data->mLockCreated = 1;
lock_receiver(data);
if (wildcard_topic[0] != '\0')
{
/* Wildcard topic, create a wildcard receiver */
rc = lbm_wildcard_rcv_create(&(data->mWildcardReceiver),
data->mContext,
wildcard_topic,
data->mTopicAttributes,
data->mWildcardReceiverAttributes,
receive_callback,
data,
NULL);
}
else
{
/* Non-wildcard topic, create a normal receiver */
rc = lbm_rcv_create(&(data->mReceiver),
data->mContext,
data->mTopic,
receive_callback,
data,
NULL);
}
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_wildcard_rcv_create()/lbm_rcv_create() failed, %s",
unlock_receiver(data);
rcv_cleanup(data);
return (rc);
}
/* Pass back the lbmmon_transport_lbmsnmp_rcv_t created */
*TransportClientData = data;
unlock_receiver(data);
return (0);
}
int
lbmmon_transport_lbmsnmp_send(const char * Data, size_t Length, void * TransportClientData)
{
lbmmon_transport_lbmsnmp_src_t * src;
int rc;
if ((Data == NULL) || (TransportClientData == NULL))
{
strncpy(ErrorString, "Invalid argument", sizeof(ErrorString));
return (-1);
}
src = (lbmmon_transport_lbmsnmp_src_t *) TransportClientData;
rc = lbm_src_send(src->mSource, Data, Length, 0);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_src_send() failed, %s",
}
return (rc);
}
int
lbmmon_transport_lbmsnmp_receive(char * Data, size_t * Length, unsigned int TimeoutMS, void * TransportClientData)
{
lbmmon_transport_lbmsnmp_rcv_t * rcv = (lbmmon_transport_lbmsnmp_rcv_t *) TransportClientData;
lbmmon_transport_lbmsnmp_rcv_node_t * node;
int rc = 0;
size_t length_remaining;
#if defined(_WIN32)
#elif defined(__TANDEM)
unsigned int sleep_sec;
unsigned int sleep_usec;
#else
struct timespec ivl;
#endif
if ((Data == NULL) || (Length == NULL) || (TransportClientData == NULL))
{
strncpy(ErrorString, "Invalid argument", sizeof(ErrorString));
return (-1);
}
if (*Length == 0)
{
return (0);
}
lock_receiver(rcv);
if (rcv->mHead != NULL)
{
/* Queue is non-empty. Pull the first message from the queue. */
node = rcv->mHead;
length_remaining = node->mMessage->len - node->mUsedBytes;
if (*Length >= length_remaining)
{
/* We can transfer the rest of the message */
memcpy(Data, node->mMessage->data + node->mUsedBytes, length_remaining);
*Length = length_remaining;
rc = 0;
/* We're done with the LBM message, so let LBM know. */
lbm_msg_delete(node->mMessage);
/* Unlink the node from the queue */
rcv->mHead = node->mNext;
if (rcv->mHead == NULL)
{
rcv->mTail = NULL;
}
free(node);
}
else
{
/* MSGDESC: The monitoring message received is larger than the maximum allowed size given.
* MSGRES: This is a hard coded maximum. */
lbm_logf(LBM_LOG_ERR, "Core-8034-3: [LBMMON] Dropping monitoring message that is larger than the maximum allowed size of %d (size=%d)",
*Length, node->mMessage->len);
/* We're done with the LBM message, so let LBM know. */
lbm_msg_delete(node->mMessage);
/* Unlink the node from the queue */
rcv->mHead = node->mNext;
if (rcv->mHead == NULL)
{
rcv->mTail = NULL;
}
free(node);
rc = 1; /* Positive number prevents caller from logging message too */
}
unlock_receiver(rcv);
}
else
{
unlock_receiver(rcv);
/* Sleep for wait time */
#define NANOSECONDS_PER_SECOND 1000000000
#define MICROSECONDS_PER_SECOND 1000000
#define MILLISECONDS_PER_SECOND 1000
#define NANOSECONDS_PER_MILLISECOND (NANOSECONDS_PER_SECOND / MILLISECONDS_PER_SECOND)
#define MICROSECONDS_PER_MILLISECOND (MICROSECONDS_PER_SECOND / MILLISECONDS_PER_SECOND)
#if defined(_WIN32)
Sleep(TimeoutMS);
#elif defined(__TANDEM)
sleep_sec = TimeoutMS / MILLISECONDS_PER_SECOND;
sleep_usec = (TimeoutMS % MILLISECONDS_PER_SECOND) * MICROSECONDS_PER_MILLISECOND;
if (sleep_usec > 0)
{
usleep(sleep_usec);
}
if (sleep_sec > 0)
{
sleep(sleep_sec);
}
#else
ivl.tv_sec = TimeoutMS / MILLISECONDS_PER_SECOND;
ivl.tv_nsec = (TimeoutMS % MILLISECONDS_PER_SECOND) * NANOSECONDS_PER_MILLISECOND;
nanosleep(&ivl, NULL);
#endif
rc = 1;
}
return (rc);
}
void
src_cleanup(lbmmon_transport_lbmsnmp_src_t * Data)
{
if (Data->mSource != NULL)
{
lbm_src_delete(Data->mSource);
Data->mSource = NULL;
}
Data->mTopic = NULL;
if (Data->mTopicAttributes != NULL)
{
lbm_src_topic_attr_delete(Data->mTopicAttributes);
Data->mTopicAttributes = NULL;
}
if (Data->mContext != NULL)
{
lbm_context_delete(Data->mContext);
Data->mContext = NULL;
}
if (Data->mContextAttributes != NULL)
{
lbm_context_attr_delete(Data->mContextAttributes);
Data->mContextAttributes = NULL;
}
free(Data);
}
int
lbmmon_transport_lbmsnmp_src_finish(void * TransportClientData)
{
lbmmon_transport_lbmsnmp_src_t * src;
if (TransportClientData == NULL)
{
strncpy(ErrorString, "Invalid argument", sizeof(ErrorString));
return (-1);
}
src = (lbmmon_transport_lbmsnmp_src_t *) TransportClientData;
src_cleanup(src);
return (0);
}
void
rcv_cleanup(lbmmon_transport_lbmsnmp_rcv_t * Data)
{
lbmmon_transport_lbmsnmp_rcv_node_t * node;
lbmmon_transport_lbmsnmp_rcv_node_t * next;
/* Stop the receiver to prevent any more incoming messages */
if (Data->mWildcardReceiver != NULL)
{
lbm_wildcard_rcv_delete(Data->mWildcardReceiver);
Data->mWildcardReceiver = NULL;
}
if (Data->mWildcardReceiverAttributes != NULL)
{
lbm_wildcard_rcv_attr_delete(Data->mWildcardReceiverAttributes);
Data->mWildcardReceiverAttributes = NULL;
}
if (Data->mReceiver != NULL)
{
lbm_rcv_delete(Data->mReceiver);
Data->mReceiver = NULL;
}
if (Data->mTopicAttributes != NULL)
{
lbm_rcv_topic_attr_delete(Data->mTopicAttributes);
Data->mTopicAttributes = NULL;
}
Data->mTopic = NULL;
/* Lock the receiver */
if (Data->mLockCreated != 0)
{
lock_receiver(Data);
}
/* Delete the context to really make sure no more messages come in */
if (Data->mContext != NULL)
{
lbm_context_delete(Data->mContext);
Data->mContext = NULL;
}
if (Data->mContextAttributes != NULL)
{
lbm_context_attr_delete(Data->mContextAttributes);
Data->mContextAttributes = NULL;
}
/* Clean out the queue */
node = Data->mHead;
while (node != NULL)
{
/* Let LBM know we're done with the message */
lbm_msg_delete(node->mMessage);
next = node->mNext;
free(node);
node = next;
}
if (Data->mLockCreated)
{
unlock_receiver(Data);
#ifdef _WIN32
DeleteCriticalSection(&(Data->mLock));
#else
pthread_mutex_destroy(&(Data->mLock));
#endif
}
free(Data);
}
int
lbmmon_transport_lbmsnmp_rcv_finish(void * TransportClientData)
{
lbmmon_transport_lbmsnmp_rcv_t * rcv;
if (TransportClientData == NULL)
{
strncpy(ErrorString, "Invalid argument", sizeof(ErrorString));
return (-1);
}
rcv = (lbmmon_transport_lbmsnmp_rcv_t *) TransportClientData;
rcv_cleanup(rcv);
return (0);
}
void
lock_receiver(lbmmon_transport_lbmsnmp_rcv_t * Receiver)
{
#ifdef _WIN32
EnterCriticalSection(&(Receiver->mLock));
#else
pthread_mutex_lock(&(Receiver->mLock));
#endif
}
void
unlock_receiver(lbmmon_transport_lbmsnmp_rcv_t * Receiver)
{
#ifdef _WIN32
LeaveCriticalSection(&(Receiver->mLock));
#else
pthread_mutex_unlock(&(Receiver->mLock));
#endif
}
const char *
lbmmon_transport_lbmsnmp_errmsg(void)
{
return (ErrorString);
}
int
scope_is_valid(const char * Scope)
{
if (strcasecmp(Scope, "context") == 0)
{
return (0);
}
if (strcasecmp(Scope, "source") == 0)
{
return (0);
}
if (strcasecmp(Scope, "receiver") == 0)
{
return (0);
}
if (strcasecmp(Scope, "event_queue") == 0)
{
return (0);
}
return (-1);
}