#ifdef __VOS__
#define _POSIX_C_SOURCE 200112L
#include <sys/time.h>
#endif
#include <stdio.h>
#include <time.h>
#include <string.h>
#ifdef _WIN32
#define strcasecmp stricmp
#define snprintf _snprintf
#else
#include "config.h"
#include <unistd.h>
#if defined(__TANDEM)
#if defined(HAVE_TANDEM_SPT)
#include <ktdmtyp.h>
#include <spthread.h>
#else
#include <pthread.h>
#endif
#else
#include <pthread.h>
#endif
#include <strings.h>
#endif
#include <lbm/lbmmon.h>
#include <lbm/lbmmontrlbm.h>
#include <lbm/lbmaux.h>
static const lbmmon_transport_func_t LBMMON_TRANSPORT_LBM =
{
lbmmon_transport_lbm_initsrc,
lbmmon_transport_lbm_initrcv,
lbmmon_transport_lbm_send,
lbmmon_transport_lbm_receive,
lbmmon_transport_lbm_src_finish,
lbmmon_transport_lbm_rcv_finish,
lbmmon_transport_lbm_errmsg
};
typedef struct
{
lbm_context_attr_t * mContextAttributes;
lbm_context_t * mContext;
lbm_src_topic_attr_t * mTopicAttributes;
lbm_src_t * mSource;
lbm_topic_t * mTopic;
} lbmmon_transport_lbm_src_t;
struct lbmmon_transport_lbm_rcv_node_t_stct
{
lbm_msg_t * mMessage;
size_t mUsedBytes;
struct lbmmon_transport_lbm_rcv_node_t_stct * mNext;
};
typedef struct lbmmon_transport_lbm_rcv_node_t_stct lbmmon_transport_lbm_rcv_node_t;
typedef struct
{
unsigned int mLockCreated;
#ifdef _WIN32
CRITICAL_SECTION mLock;
#else
pthread_mutex_t mLock;
#endif
lbm_context_attr_t * mContextAttributes;
lbm_context_t * mContext;
lbm_rcv_t * mReceiver;
lbm_rcv_topic_attr_t * mTopicAttributes;
lbm_topic_t * mTopic;
lbm_wildcard_rcv_attr_t * mWildcardReceiverAttributes;
lbm_wildcard_rcv_t * mWildcardReceiver;
lbmmon_transport_lbm_rcv_node_t * mHead;
lbmmon_transport_lbm_rcv_node_t * mTail;
} lbmmon_transport_lbm_rcv_t;
static void src_cleanup(lbmmon_transport_lbm_src_t * Data);
static void rcv_cleanup(lbmmon_transport_lbm_rcv_t * Data);
static int receive_callback(lbm_rcv_t * Receiver, lbm_msg_t * Message, void * ClientData);
static void lock_receiver(lbmmon_transport_lbm_rcv_t * Receiver);
static void unlock_receiver(lbmmon_transport_lbm_rcv_t * Receiver);
static int scope_is_valid(const char * Scope);
#define DEFAULT_CONTEXT_NAME "29west_statistics_context"
#define DEFAULT_TOPIC "/29west/statistics"
static char ErrorString[1024];
typedef struct
{
const char * option;
const char * value;
} option_entry_t;
static option_entry_t SourceContextOption[] =
{
{ "operational_mode", "embedded" },
{ "monitor_interval", "0" },
{ "request_tcp_bind_request_port", "0" },
{ "mim_incoming_address", "0.0.0.0" },
{ "resolver_cache", "0" },
{ NULL, NULL }
};
static option_entry_t ReceiverContextOption[] =
{
{ "operational_mode", "embedded" },
{ "monitor_interval", "0" },
{ "request_tcp_bind_request_port", "0" },
{ "mim_incoming_address", "0.0.0.0" },
{ "resolver_cache", "0" },
{ NULL, NULL }
};
static option_entry_t SourceTopicOption[] =
{
{ "transport_lbtru_transmission_window_size", "500000" },
{ "transport_lbtrm_transmission_window_size", "500000" },
{ NULL, NULL }
};
static option_entry_t ReceiverTopicOption[] =
{
{ NULL, NULL }
};
static option_entry_t WildcardReceiverOption[] =
{
{ NULL, NULL }
};
const lbmmon_transport_func_t *
lbmmon_transport_lbm_module(void)
{
return (&LBMMON_TRANSPORT_LBM);
}
int
lbmmon_transport_lbm_initsrc(void * * TransportClientData, const void * TransportOptions)
{
lbmmon_transport_lbm_src_t * data;
int rc;
const char * ptr = (const char *) TransportOptions;
char key[512];
char value[512];
char config_file[512];
char topic[512];
char scope[512];
char option[512];
option_entry_t * entry;
memset(ErrorString, 0, sizeof(ErrorString));
data = malloc(sizeof(lbmmon_transport_lbm_src_t));
data->mContextAttributes = NULL;
data->mContext = NULL;
data->mTopicAttributes = NULL;
data->mSource = NULL;
data->mTopic = NULL;
memset(config_file, 0, sizeof(config_file));
strncpy(topic, DEFAULT_TOPIC, sizeof(topic));
while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
{
if (strcasecmp(key, "config") == 0)
{
strncpy(config_file, value, sizeof(config_file));
}
else if (strcasecmp(key, "topic") == 0)
{
strncpy(topic, value, sizeof(topic));
}
}
rc = lbm_context_attr_create_default(&(data->mContextAttributes));
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_attr_init() failed, %s",
lbm_errmsg());
return (rc);
}
rc = lbm_context_attr_str_setopt(data->mContextAttributes, "context_name", DEFAULT_CONTEXT_NAME);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_attr_str_setopt() failed, %s",
lbm_errmsg());
return (rc);
}
if (config_file[0] != '\0')
{
rc = lbmaux_context_attr_setopt_from_file(data->mContextAttributes, config_file);
if (rc != 0)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbmaux_context_attr_setopt_from_file() failed, %s",
lbm_errmsg());
src_cleanup(data);
return (-1);
}
}
ptr = (const char *) TransportOptions;
while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
{
if (sscanf(key, "%[a-zA-Z_]|%[a-zA-Z_]", scope, option) != 2)
{
continue;
}
if (scope_is_valid(scope) == -1)
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid option scope [%s]",
scope);
src_cleanup(data);
return (-1);
}
if (strcasecmp(scope, "context") == 0)
{
rc = lbm_context_attr_str_setopt(data->mContextAttributes, option, value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid option [context %s %s], %s",
option,
value,
lbm_errmsg());
src_cleanup(data);
return (rc);
}
}
}
entry = &SourceContextOption[0];
while (entry->option != NULL)
{
rc = lbm_context_attr_str_setopt(data->mContextAttributes, entry->option, entry->value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"error setting option [context %s %s], %s",
entry->option,
entry->value,
lbm_errmsg());
src_cleanup(data);
return (rc);
}
entry++;
}
rc = lbm_context_create(&(data->mContext), data->mContextAttributes, NULL, NULL);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_create() failed, %s",
lbm_errmsg());
src_cleanup(data);
return (rc);
}
rc = lbm_src_topic_attr_create_default(&(data->mTopicAttributes));
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_src_topic_attr_create_default() failed, %s",
lbm_errmsg());
src_cleanup(data);
return (rc);
}
entry = &SourceTopicOption[0];
while (entry->option != NULL)
{
rc = lbm_src_topic_attr_str_setopt(data->mTopicAttributes, entry->option, entry->value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"error setting option [source %s %s], %s",
entry->option,
entry->value,
lbm_errmsg());
src_cleanup(data);
return (rc);
}
entry++;
}
if (config_file[0] != '\0')
{
rc = lbmaux_src_topic_attr_setopt_from_file(data->mTopicAttributes, config_file);
if (rc != 0)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbmaux_src_topic_attr_setopt_from_file() failed, %s",
lbm_errmsg());
src_cleanup(data);
return (-1);
}
}
ptr = (const char *) TransportOptions;
while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
{
if (sscanf(key, "%[a-zA-Z_]|%[a-zA-Z_]", scope, option) != 2)
{
continue;
}
if (strcasecmp(scope, "source") == 0)
{
rc = lbm_src_topic_attr_str_setopt(data->mTopicAttributes, option, value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid option [source %s %s], %s",
option,
value,
lbm_errmsg());
src_cleanup(data);
return (rc);
}
}
}
rc = lbm_src_topic_alloc(&(data->mTopic), data->mContext, topic, data->mTopicAttributes);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_src_topic_alloc() failed, %s",
lbm_errmsg());
src_cleanup(data);
return (rc);
}
rc = lbm_src_create(&(data->mSource), data->mContext, data->mTopic, NULL, NULL, NULL);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_src_create() failed, %s",
lbm_errmsg());
src_cleanup(data);
return (rc);
}
*TransportClientData = data;
return (0);
}
int
receive_callback(lbm_rcv_t * Receiver, lbm_msg_t * Message, void * ClientData)
{
lbmmon_transport_lbm_rcv_t * rcv = (lbmmon_transport_lbm_rcv_t *) ClientData;
lbmmon_transport_lbm_rcv_node_t * node;
if (Message->type == LBM_MSG_DATA)
{
lock_receiver(rcv);
node = malloc(sizeof(lbmmon_transport_lbm_rcv_node_t));
lbm_msg_retain(Message);
node->mMessage = Message;
node->mUsedBytes = 0;
node->mNext = NULL;
if (rcv->mTail != NULL)
{
rcv->mTail->mNext = node;
}
else
{
rcv->mHead = node;
}
rcv->mTail = node;
unlock_receiver(rcv);
}
return (0);
}
int
lbmmon_transport_lbm_initrcv(void * * TransportClientData, const void * TransportOptions)
{
lbmmon_transport_lbm_rcv_t * data;
int rc;
const char * ptr = (const char *) TransportOptions;
char key[512];
char value[512];
char config_file[512];
char topic[512];
char wildcard_topic[512];
char scope[512];
char option[512];
option_entry_t * entry;
memset(ErrorString, 0, sizeof(ErrorString));
data = malloc(sizeof(lbmmon_transport_lbm_rcv_t));
data->mLockCreated = 0;
data->mContextAttributes = NULL;
data->mContext = NULL;
data->mReceiver = NULL;
data->mTopicAttributes = NULL;
data->mTopic = NULL;
data->mWildcardReceiverAttributes = NULL;
data->mWildcardReceiver = NULL;
data->mHead = NULL;
data->mTail = NULL;
memset(config_file, 0, sizeof(config_file));
strncpy(topic, DEFAULT_TOPIC, sizeof(topic));
memset(wildcard_topic, 0, sizeof(wildcard_topic));
while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
{
if (strcasecmp(key, "config") == 0)
{
strncpy(config_file, value, sizeof(config_file));
}
else if (strcasecmp(key, "topic") == 0)
{
strncpy(topic, value, sizeof(topic));
}
else if (strcasecmp(key, "wctopic") == 0)
{
strncpy(wildcard_topic, value, sizeof(wildcard_topic));
}
}
rc = lbm_context_attr_create_default(&(data->mContextAttributes));
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_attr_init() failed, %s",
lbm_errmsg());
rcv_cleanup(data);
return (rc);
}
rc = lbm_context_attr_str_setopt(data->mContextAttributes, "context_name", DEFAULT_CONTEXT_NAME);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_attr_str_setopt() failed, %s",
lbm_errmsg());
return (rc);
}
if (config_file[0] != '\0')
{
rc = lbmaux_context_attr_setopt_from_file(data->mContextAttributes, config_file);
if (rc != 0)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbmaux_context_attr_setopt_from_file() failed, %s",
lbm_errmsg());
rcv_cleanup(data);
return (-1);
}
}
ptr = (const char *) TransportOptions;
while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
{
if (sscanf(key, "%[a-zA-Z_]|%[a-zA-Z_]", scope, option) != 2)
{
continue;
}
if (scope_is_valid(scope) == -1)
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid option scope [%s]",
scope);
rcv_cleanup(data);
return (-1);
}
if (strcasecmp(scope, "context") == 0)
{
rc = lbm_context_attr_str_setopt(data->mContextAttributes, option, value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid option [context %s %s], %s",
option,
value,
lbm_errmsg());
rcv_cleanup(data);
return (rc);
}
}
}
entry = &ReceiverContextOption[0];
while (entry->option != NULL)
{
rc = lbm_context_attr_str_setopt(data->mContextAttributes, entry->option, entry->value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"error setting option [context %s %s], %s",
entry->option,
entry->value,
lbm_errmsg());
rcv_cleanup(data);
return (rc);
}
entry++;
}
rc = lbm_context_create(&(data->mContext), data->mContextAttributes, NULL, NULL);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_context_create() failed, %s",
lbm_errmsg());
rcv_cleanup(data);
return (rc);
}
if (wildcard_topic[0] != '\0')
{
rc = lbm_wildcard_rcv_attr_create_default(&(data->mWildcardReceiverAttributes));
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_wildcard_rcv_attr_init() failed, %s",
lbm_errmsg());
rcv_cleanup(data);
return (rc);
}
if (config_file[0] != '\0')
{
rc = lbmaux_wildcard_rcv_attr_setopt_from_file(data->mWildcardReceiverAttributes, config_file);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbmaux_wildcard_rcv_attr_setopt_from_file() failed, %s",
lbm_errmsg());
rcv_cleanup(data);
return (-1);
}
}
ptr = (const char *) TransportOptions;
while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
{
if (sscanf(key, "%[a-zA-Z_]|%[a-zA-Z_]", scope, option) != 2)
{
continue;
}
if (strcasecmp(scope, "wildcard_receiver") == 0)
{
rc = lbm_wildcard_rcv_attr_str_setopt(data->mWildcardReceiverAttributes, option, value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid option [wildcard_receiver %s %s], %s",
option,
value,
lbm_errmsg());
rcv_cleanup(data);
return (rc);
}
}
}
entry = &WildcardReceiverOption[0];
while (entry->option != NULL)
{
rc = lbm_wildcard_rcv_attr_str_setopt(data->mWildcardReceiverAttributes, entry->option, entry->value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"error setting option [wildcard_receiver %s %s], %s",
entry->option,
entry->value,
lbm_errmsg());
rcv_cleanup(data);
return (rc);
}
entry++;
}
}
rc = lbm_rcv_topic_attr_create_default(&(data->mTopicAttributes));
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_rcv_topic_attr_init() failed, %s",
lbm_errmsg());
rcv_cleanup(data);
return (rc);
}
if (config_file[0] != '\0')
{
rc = lbmaux_rcv_topic_attr_setopt_from_file(data->mTopicAttributes, config_file);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbmaux_rcv_topic_attr_setopt_from_file() failed, %s",
lbm_errmsg());
rcv_cleanup(data);
return (-1);
}
}
ptr = (const char *) TransportOptions;
while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
{
if (sscanf(key, "%[a-zA-Z_]|%[a-zA-Z_]", scope, option) != 2)
{
continue;
}
if (strcasecmp(scope, "receiver") == 0)
{
rc = lbm_rcv_topic_attr_str_setopt(data->mTopicAttributes, option, value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"invalid option [receiver %s %s], %s",
option,
value,
lbm_errmsg());
rcv_cleanup(data);
return (rc);
}
}
}
entry = &ReceiverTopicOption[0];
while (entry->option != NULL)
{
rc = lbm_rcv_topic_attr_str_setopt(data->mTopicAttributes, entry->option, entry->value);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"error setting option [receiver %s %s], %s",
entry->option,
entry->value,
lbm_errmsg());
rcv_cleanup(data);
return (rc);
}
entry++;
}
if (wildcard_topic[0] == '\0')
{
rc = lbm_rcv_topic_lookup(&(data->mTopic), data->mContext, topic, data->mTopicAttributes);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_rcv_topic_lookup() failed, %s",
lbm_errmsg());
rcv_cleanup(data);
return (rc);
}
}
#ifdef _WIN32
InitializeCriticalSection(&(data->mLock));
#else
pthread_mutex_init(&(data->mLock), NULL);
#endif
data->mLockCreated = 1;
lock_receiver(data);
if (wildcard_topic[0] != '\0')
{
rc = lbm_wildcard_rcv_create(&(data->mWildcardReceiver),
data->mContext,
wildcard_topic,
data->mTopicAttributes,
data->mWildcardReceiverAttributes,
receive_callback,
data,
NULL);
}
else
{
rc = lbm_rcv_create(&(data->mReceiver),
data->mContext,
data->mTopic,
receive_callback,
data,
NULL);
}
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_wildcard_rcv_create()/lbm_rcv_create() failed, %s",
lbm_errmsg());
unlock_receiver(data);
rcv_cleanup(data);
return (rc);
}
*TransportClientData = data;
unlock_receiver(data);
return (0);
}
int
lbmmon_transport_lbm_send(const char * Data, size_t Length, void * TransportClientData)
{
lbmmon_transport_lbm_src_t * src;
int rc;
if ((Data == NULL) || (TransportClientData == NULL))
{
strncpy(ErrorString, "Invalid argument", sizeof(ErrorString));
return (-1);
}
src = (lbmmon_transport_lbm_src_t *) TransportClientData;
rc = lbm_src_send(src->mSource, Data, Length, 0);
if (rc == LBM_FAILURE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"lbm_src_send() failed, %s",
lbm_errmsg());
}
return (rc);
}
int
lbmmon_transport_lbm_receive(char * Data, size_t * Length, unsigned int TimeoutMS, void * TransportClientData)
{
lbmmon_transport_lbm_rcv_t * rcv = (lbmmon_transport_lbm_rcv_t *) TransportClientData;
lbmmon_transport_lbm_rcv_node_t * node;
int rc = 0;
size_t length_remaining;
#if defined(_WIN32)
#elif defined(__TANDEM)
unsigned int sleep_sec;
unsigned int sleep_usec;
#else
struct timespec ivl;
#endif
if ((Data == NULL) || (Length == NULL) || (TransportClientData == NULL))
{
strncpy(ErrorString, "Invalid argument", sizeof(ErrorString));
return (-1);
}
if (*Length == 0)
{
return (0);
}
lock_receiver(rcv);
if (rcv->mHead != NULL)
{
node = rcv->mHead;
length_remaining = node->mMessage->len - node->mUsedBytes;
if (*Length >= length_remaining)
{
memcpy(Data, node->mMessage->data + node->mUsedBytes, length_remaining);
*Length = length_remaining;
rc = 0;
lbm_msg_delete(node->mMessage);
rcv->mHead = node->mNext;
if (rcv->mHead == NULL)
{
rcv->mTail = NULL;
}
free(node);
}
else
{
memcpy(Data, node->mMessage->data + node->mUsedBytes, *Length);
node->mUsedBytes -= *Length;
rc = 0;
}
unlock_receiver(rcv);
}
else
{
unlock_receiver(rcv);
#define NANOSECONDS_PER_SECOND 1000000000
#define MICROSECONDS_PER_SECOND 1000000
#define MILLISECONDS_PER_SECOND 1000
#define NANOSECONDS_PER_MILLISECOND (NANOSECONDS_PER_SECOND / MILLISECONDS_PER_SECOND)
#define MICROSECONDS_PER_MILLISECOND (MICROSECONDS_PER_SECOND / MILLISECONDS_PER_SECOND)
#if defined(_WIN32)
Sleep(TimeoutMS);
#elif defined(__TANDEM)
sleep_sec = TimeoutMS / MILLISECONDS_PER_SECOND;
sleep_usec = (TimeoutMS % MILLISECONDS_PER_SECOND) * MICROSECONDS_PER_MILLISECOND;
if (sleep_usec > 0)
{
usleep(sleep_usec);
}
if (sleep_sec > 0)
{
sleep(sleep_sec);
}
#else
ivl.tv_sec = TimeoutMS / MILLISECONDS_PER_SECOND;
ivl.tv_nsec = (TimeoutMS % MILLISECONDS_PER_SECOND) * NANOSECONDS_PER_MILLISECOND;
nanosleep(&ivl, NULL);
#endif
rc = 1;
}
return (rc);
}
void
src_cleanup(lbmmon_transport_lbm_src_t * Data)
{
if (Data->mSource != NULL)
{
lbm_src_delete(Data->mSource);
Data->mSource = NULL;
}
Data->mTopic = NULL;
if (Data->mTopicAttributes != NULL)
{
lbm_src_topic_attr_delete(Data->mTopicAttributes);
Data->mTopicAttributes = NULL;
}
if (Data->mContext != NULL)
{
lbm_context_delete(Data->mContext);
Data->mContext = NULL;
}
if (Data->mContextAttributes != NULL)
{
lbm_context_attr_delete(Data->mContextAttributes);
Data->mContextAttributes = NULL;
}
free(Data);
}
int
lbmmon_transport_lbm_src_finish(void * TransportClientData)
{
lbmmon_transport_lbm_src_t * src;
if (TransportClientData == NULL)
{
strncpy(ErrorString, "Invalid argument", sizeof(ErrorString));
return (-1);
}
src = (lbmmon_transport_lbm_src_t *) TransportClientData;
src_cleanup(src);
return (0);
}
void
rcv_cleanup(lbmmon_transport_lbm_rcv_t * Data)
{
lbmmon_transport_lbm_rcv_node_t * node;
lbmmon_transport_lbm_rcv_node_t * next;
if (Data->mWildcardReceiver != NULL)
{
lbm_wildcard_rcv_delete(Data->mWildcardReceiver);
Data->mWildcardReceiver = NULL;
}
if (Data->mWildcardReceiverAttributes != NULL)
{
lbm_wildcard_rcv_attr_delete(Data->mWildcardReceiverAttributes);
Data->mWildcardReceiverAttributes = NULL;
}
if (Data->mReceiver != NULL)
{
lbm_rcv_delete(Data->mReceiver);
Data->mReceiver = NULL;
}
if (Data->mTopicAttributes != NULL)
{
lbm_rcv_topic_attr_delete(Data->mTopicAttributes);
Data->mTopicAttributes = NULL;
}
Data->mTopic = NULL;
if (Data->mLockCreated != 0)
{
lock_receiver(Data);
}
if (Data->mContext != NULL)
{
lbm_context_delete(Data->mContext);
Data->mContext = NULL;
}
if (Data->mContextAttributes != NULL)
{
lbm_context_attr_delete(Data->mContextAttributes);
Data->mContextAttributes = NULL;
}
node = Data->mHead;
while (node != NULL)
{
lbm_msg_delete(node->mMessage);
next = node->mNext;
free(node);
node = next;
}
if (Data->mLockCreated)
{
unlock_receiver(Data);
#ifdef _WIN32
DeleteCriticalSection(&(Data->mLock));
#else
pthread_mutex_destroy(&(Data->mLock));
#endif
}
free(Data);
}
int
lbmmon_transport_lbm_rcv_finish(void * TransportClientData)
{
lbmmon_transport_lbm_rcv_t * rcv;
if (TransportClientData == NULL)
{
strncpy(ErrorString, "Invalid argument", sizeof(ErrorString));
return (-1);
}
rcv = (lbmmon_transport_lbm_rcv_t *) TransportClientData;
rcv_cleanup(rcv);
return (0);
}
void
lock_receiver(lbmmon_transport_lbm_rcv_t * Receiver)
{
#ifdef _WIN32
EnterCriticalSection(&(Receiver->mLock));
#else
pthread_mutex_lock(&(Receiver->mLock));
#endif
}
void
unlock_receiver(lbmmon_transport_lbm_rcv_t * Receiver)
{
#ifdef _WIN32
LeaveCriticalSection(&(Receiver->mLock));
#else
pthread_mutex_unlock(&(Receiver->mLock));
#endif
}
const char *
lbmmon_transport_lbm_errmsg(void)
{
return (ErrorString);
}
int
scope_is_valid(const char * Scope)
{
if (strcasecmp(Scope, "context") == 0)
{
return (0);
}
if (strcasecmp(Scope, "source") == 0)
{
return (0);
}
if (strcasecmp(Scope, "receiver") == 0)
{
return (0);
}
if (strcasecmp(Scope, "event_queue") == 0)
{
return (0);
}
return (-1);
}