Source code for lbmmontrlbm.c

/*
  All of the documentation and software included in this and any
  other Informatica Corporation Ultra Messaging Releases
  Copyright (C) Informatica Corporation. All rights reserved.
  
  Redistribution and use in source and binary forms, with or without
  modification, are permitted only as covered by the terms of a
  valid software license agreement with Informatica Corporation.

  Copyright (C) 2004-2014, Informatica Corporation. All Rights Reserved.

  THE SOFTWARE IS PROVIDED "AS IS" AND INFORMATICA DISCLAIMS ALL WARRANTIES 
  EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION, ANY IMPLIED WARRANTIES OF 
  NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A PARTICULAR 
  PURPOSE.  INFORMATICA DOES NOT WARRANT THAT USE OF THE SOFTWARE WILL BE 
  UNINTERRUPTED OR ERROR-FREE.  INFORMATICA SHALL NOT, UNDER ANY CIRCUMSTANCES, BE 
  LIABLE TO LICENSEE FOR LOST PROFITS, CONSEQUENTIAL, INCIDENTAL, SPECIAL OR 
  INDIRECT DAMAGES ARISING OUT OF OR RELATED TO THIS AGREEMENT OR THE 
  TRANSACTIONS CONTEMPLATED HEREUNDER, EVEN IF INFORMATICA HAS BEEN APPRISED OF 
  THE LIKELIHOOD OF SUCH DAMAGES.
  
*/

#ifdef __VOS__
#define _POSIX_C_SOURCE 200112L
#include <sys/time.h>
#endif

#include <stdio.h>
#include <time.h>
#include <string.h>
#ifdef _WIN32
        #define strcasecmp stricmp
        #define snprintf _snprintf
#else
        #include "config.h"
        #include <unistd.h>
        #if defined(__TANDEM)
                #if defined(HAVE_TANDEM_SPT)
                        #include <ktdmtyp.h>
                        #include <spthread.h>
                #else
                        #include <pthread.h>
                #endif
        #else
                #include <pthread.h>
        #endif
        #include <strings.h>
#endif
#include <lbm/lbmmon.h>
#include <lbm/lbmmontrlbm.h>
#include <lbm/lbmaux.h>

/*                                                                              
        Package all of the needed function pointers for this module into a
        lbmmon_transport_func_t structure.                                                                                                                                                              
*/
static const lbmmon_transport_func_t LBMMON_TRANSPORT_LBM =
{
        lbmmon_transport_lbm_initsrc,
        lbmmon_transport_lbm_initrcv,
        lbmmon_transport_lbm_send,
        lbmmon_transport_lbm_receive,
        lbmmon_transport_lbm_src_finish,
        lbmmon_transport_lbm_rcv_finish,
        lbmmon_transport_lbm_errmsg
};

/*                                                                              
        For a statistics source, one of these gets returned as the TransportClientData.                                                                                                                                                         
*/
typedef struct
{
        /* LBM context attributes */
        lbm_context_attr_t * mContextAttributes;
        /* LBM context created to send a statistics packet */
        lbm_context_t * mContext;
        /* LBM topic attributes */
        lbm_src_topic_attr_t * mTopicAttributes;
        /* LBM source created to send a statistics packet */
        lbm_src_t * mSource;
        /* LBM topic */
        lbm_topic_t * mTopic;
} lbmmon_transport_lbm_src_t;

/*
        A queue of incoming statistics packets is maintained. This describes each 
        entry in the queue.
*/
struct lbmmon_transport_lbm_rcv_node_t_stct
{
        /* Pointer to the LBM message */
        lbm_msg_t * mMessage;
        /* Number of bytes of the message returned to caller */
        size_t mUsedBytes;
        /* Next entry in the queue */
        struct lbmmon_transport_lbm_rcv_node_t_stct * mNext;
};
typedef struct lbmmon_transport_lbm_rcv_node_t_stct lbmmon_transport_lbm_rcv_node_t;

/*                                                                              
        For a statistics receiver, one of these gets returned as the TransportClientData.                                                                                                                                                               
*/
typedef struct
{
        /* Flag to indicate lock has been created */
        unsigned int mLockCreated;
        /* Lock to prevent access by multiple threads */
#ifdef _WIN32
        CRITICAL_SECTION mLock;
#else
        pthread_mutex_t mLock;
#endif
        /* LBM context attributes */
        lbm_context_attr_t * mContextAttributes;
        /* LBM context used to receive packets */
        lbm_context_t * mContext;
        /* LBM receiver used to receive packets */
        lbm_rcv_t * mReceiver;
        /* Topic attributes */
        lbm_rcv_topic_attr_t * mTopicAttributes;
        /* Topic */
        lbm_topic_t * mTopic;
        /* Wildcard receiver attributes */
        lbm_wildcard_rcv_attr_t * mWildcardReceiverAttributes;
        /* If we're using a wildcard receiver... */
        lbm_wildcard_rcv_t * mWildcardReceiver;
        /* Head of the message queue */
        lbmmon_transport_lbm_rcv_node_t * mHead;
        /* Tail of the message queue */
        lbmmon_transport_lbm_rcv_node_t * mTail;
} lbmmon_transport_lbm_rcv_t;

static void     src_cleanup(lbmmon_transport_lbm_src_t * Data);
static void     rcv_cleanup(lbmmon_transport_lbm_rcv_t * Data);
static int receive_callback(lbm_rcv_t * Receiver, lbm_msg_t * Message, void * ClientData);
static void lock_receiver(lbmmon_transport_lbm_rcv_t * Receiver);
static void unlock_receiver(lbmmon_transport_lbm_rcv_t * Receiver);
static int scope_is_valid(const char * Scope);

#define DEFAULT_CONTEXT_NAME "29west_statistics_context"
#define DEFAULT_TOPIC "/29west/statistics"

static char ErrorString[1024];

typedef struct
{
        const char * option;
        const char * value;
} option_entry_t;

static option_entry_t SourceContextOption[] =
{
        /* Force embedded mode for simplicity. */
        { "operational_mode", "embedded" },
        /* Disable monitoring for this context. */
        { "monitor_interval", "0" },
        /* We don't need request/response, so don't use up ports. */
        { "request_tcp_bind_request_port", "0" },
        /* We don't need MIM, so disable MIM receiver. */
        { "mim_incoming_address", "0.0.0.0" },
        /* No need to cache topics. */
        { "resolver_cache", "0" },
        /* End of list. */
        { NULL, NULL }
};

static option_entry_t ReceiverContextOption[] =
{
        /* Force embedded mode for simplicity. */
        { "operational_mode", "embedded" },
        /* Disable monitoring for this context. */
        { "monitor_interval", "0" },
        /* We don't need request/response, so don't use up ports. */
        { "request_tcp_bind_request_port", "0" },
        /* We don't need MIM, so disable MIM receiver. */
        { "mim_incoming_address", "0.0.0.0" },
        /* No need to cache topics. */
        { "resolver_cache", "0" },
        /* End of list. */
        { NULL, NULL }
};

static option_entry_t SourceTopicOption[] =
{
        /* Minimize memory used for LBT-RU retransmissions. */
        { "transport_lbtru_transmission_window_size", "500000" },
        /* Minimize memory used for LBT-RM retransmissions. */
        { "transport_lbtrm_transmission_window_size", "500000" },
        /* End of list. */
        { NULL, NULL }
};

static option_entry_t ReceiverTopicOption[] =
{
        /* End of list. */
        { NULL, NULL }
};

static option_entry_t WildcardReceiverOption[] =
{
        /* End of list. */
        { NULL, NULL }
};

const lbmmon_transport_func_t *
lbmmon_transport_lbm_module(void)
{
        return (&LBMMON_TRANSPORT_LBM);
}

int
lbmmon_transport_lbm_initsrc(void * * TransportClientData, const void * TransportOptions)
{
        lbmmon_transport_lbm_src_t * data;
        int rc;
        const char * ptr = (const char *) TransportOptions;
        char key[512];
        char value[512];
        char config_file[512];
        char topic[512];
        char scope[512];
        char option[512];
        option_entry_t * entry;

        memset(ErrorString, 0, sizeof(ErrorString));
        data = malloc(sizeof(lbmmon_transport_lbm_src_t));
        data->mContextAttributes = NULL;
        data->mContext = NULL;
        data->mTopicAttributes = NULL;
        data->mSource = NULL;
        data->mTopic = NULL;

        /* Process any options */
        memset(config_file, 0, sizeof(config_file));
        strncpy(topic, DEFAULT_TOPIC, sizeof(topic));
        while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
        {
                if (strcasecmp(key, "config") == 0)
                {
                        strncpy(config_file, value, sizeof(config_file));
                }
                else if (strcasecmp(key, "topic") == 0)
                {
                        strncpy(topic, value, sizeof(topic));
                }
        }

        /* Initialize the context attributes */
        rc = lbm_context_attr_create_default(&(data->mContextAttributes));
        if (rc == LBM_FAILURE)
        {
                snprintf(ErrorString,
                                 sizeof(ErrorString),
                                 "lbm_context_attr_init() failed, %s",
                                 lbm_errmsg());
                return (rc);
        }
        /* Set the default context name */
        rc = lbm_context_attr_str_setopt(data->mContextAttributes, "context_name", DEFAULT_CONTEXT_NAME);
        if (rc == LBM_FAILURE)
        {
                snprintf(ErrorString,
                                 sizeof(ErrorString),
                                 "lbm_context_attr_str_setopt() failed, %s",
                                 lbm_errmsg());
                return (rc);
        }
        if (config_file[0] != '\0')
        {
                /* A config file was passed as an option. Use it to populate the context attributes. */
                rc = lbmaux_context_attr_setopt_from_file(data->mContextAttributes, config_file);
                if (rc != 0)
                {
                        snprintf(ErrorString,
                                         sizeof(ErrorString),
                                         "lbmaux_context_attr_setopt_from_file() failed, %s",
                                         lbm_errmsg());
                        src_cleanup(data);
                        return (-1);
                }
        }
        /* Go back through the options, looking for any specific context options. */
        ptr = (const char *) TransportOptions;
        while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
        {
                if (sscanf(key, "%[a-zA-Z_]|%[a-zA-Z_]", scope, option) != 2)
                {
                        continue;
                }
                if (scope_is_valid(scope) == -1)
                {
                        snprintf(ErrorString,
                                         sizeof(ErrorString),
                                         "invalid option scope [%s]",
                                         scope);
                        src_cleanup(data);
                        return (-1);
                }
                if (strcasecmp(scope, "context") == 0)
                {
                        rc = lbm_context_attr_str_setopt(data->mContextAttributes, option, value);
                        if (rc == LBM_FAILURE)
                        {
                                snprintf(ErrorString,
                                                 sizeof(ErrorString),
                                                 "invalid option [context %s %s], %s",
                                                 option,
                                                 value,
                                                 lbm_errmsg());
                                src_cleanup(data);
                                return (rc);
                        }
                }
        }

        entry = &SourceContextOption[0];
        while (entry->option != NULL)
        {
                rc = lbm_context_attr_str_setopt(data->mContextAttributes, entry->option, entry->value);
                if (rc == LBM_FAILURE)
                {
                        snprintf(ErrorString,
                                         sizeof(ErrorString),
                                         "error setting option [context %s %s], %s",
                                         entry->option,
                                         entry->value,
                                         lbm_errmsg());
                        src_cleanup(data);
                        return (rc);
                }
                entry++;
        }

        /* Create the context */
        rc = lbm_context_create(&(data->mContext), data->mContextAttributes, NULL, NULL);
        if (rc == LBM_FAILURE)
        {
                snprintf(ErrorString,
                                 sizeof(ErrorString),
                                 "lbm_context_create() failed, %s",
                                 lbm_errmsg());
                src_cleanup(data);
                return (rc);
        }

        /* Initialize the source topic attributes */
        rc = lbm_src_topic_attr_create_default(&(data->mTopicAttributes));
        if (rc == LBM_FAILURE)
        {
                snprintf(ErrorString,
                                 sizeof(ErrorString),
                                 "lbm_src_topic_attr_create_default() failed, %s",
                                 lbm_errmsg());
                src_cleanup(data);
                return (rc);
        }
        entry = &SourceTopicOption[0];
        while (entry->option != NULL)
        {
                rc = lbm_src_topic_attr_str_setopt(data->mTopicAttributes, entry->option, entry->value);
                if (rc == LBM_FAILURE)
                {
                        snprintf(ErrorString,
                                         sizeof(ErrorString),
                                         "error setting option [source %s %s], %s",
                                         entry->option,
                                         entry->value,
                                         lbm_errmsg());
                        src_cleanup(data);
                        return (rc);
                }
                entry++;
        }
        if (config_file[0] != '\0')
        {
                /* A config file was passed as an option. Use it to populate the source topic attributes. */
                rc = lbmaux_src_topic_attr_setopt_from_file(data->mTopicAttributes, config_file);
                if (rc != 0)
                {
                        snprintf(ErrorString,
                                         sizeof(ErrorString),
                                         "lbmaux_src_topic_attr_setopt_from_file() failed, %s",
                                         lbm_errmsg());
                        src_cleanup(data);
                        return (-1);
                }
        }
        /* Go back through the options, looking for any specific source options. */
        ptr = (const char *) TransportOptions;
        while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
        {
                if (sscanf(key, "%[a-zA-Z_]|%[a-zA-Z_]", scope, option) != 2)
                {
                        continue;
                }
                if (strcasecmp(scope, "source") == 0)
                {
                        rc = lbm_src_topic_attr_str_setopt(data->mTopicAttributes, option, value);
                        if (rc == LBM_FAILURE)
                        {
                                snprintf(ErrorString,
                                                 sizeof(ErrorString),
                                                 "invalid option [source %s %s], %s",
                                                 option,
                                                 value,
                                                 lbm_errmsg());
                                src_cleanup(data);
                                return (rc);
                        }
                }
        }

        /* Create the topic */
        rc = lbm_src_topic_alloc(&(data->mTopic), data->mContext, topic, data->mTopicAttributes);
        if (rc == LBM_FAILURE)
        {
                snprintf(ErrorString,
                                 sizeof(ErrorString),
                                 "lbm_src_topic_alloc() failed, %s",
                                 lbm_errmsg());
                src_cleanup(data);
                return (rc);
        }

        /* Create the source */
        rc = lbm_src_create(&(data->mSource), data->mContext, data->mTopic, NULL, NULL, NULL);
        if (rc == LBM_FAILURE)
        {
                snprintf(ErrorString,
                                 sizeof(ErrorString),
                                 "lbm_src_create() failed, %s",
                                 lbm_errmsg());
                src_cleanup(data);
                return (rc);
        }

        /* Pass back the lbmmon_transport_lbm_src_t created */
        *TransportClientData = data;
        return (0);
}

/*                                                                              
        This function is called upon receipt of an LBM message (when operating as
        a statistics receiver).                                                                                                                                                         
*/
int
receive_callback(lbm_rcv_t * Receiver, lbm_msg_t * Message, void * ClientData)
{
        lbmmon_transport_lbm_rcv_t * rcv = (lbmmon_transport_lbm_rcv_t *) ClientData;
        lbmmon_transport_lbm_rcv_node_t * node;

        if (Message->type == LBM_MSG_DATA)
        {
                /* A data message. We want to enqueue it for processing. */
                lock_receiver(rcv);
                node = malloc(sizeof(lbmmon_transport_lbm_rcv_node_t));
                /*
                        Since we hold onto the message until it is actually processed,
                        let LBM know about it.
                */
                lbm_msg_retain(Message);
                node->mMessage = Message;
                node->mUsedBytes = 0;   /* No data returned as yet */

                /* Link the message onto the queue */
                node->mNext = NULL;
                if (rcv->mTail != NULL)
                {
                        rcv->mTail->mNext = node;
                }
                else
                {
                        rcv->mHead = node;
                }
                rcv->mTail = node;
                unlock_receiver(rcv);
        }
        return (0);
}

int
lbmmon_transport_lbm_initrcv(void * * TransportClientData, const void * TransportOptions)
{
        lbmmon_transport_lbm_rcv_t * data;
        int rc;
        const char * ptr = (const char *) TransportOptions;
        char key[512];
        char value[512];
        char config_file[512];
        char topic[512];
        char wildcard_topic[512];
        char scope[512];
        char option[512];
        option_entry_t * entry;

        memset(ErrorString, 0, sizeof(ErrorString));
        data = malloc(sizeof(lbmmon_transport_lbm_rcv_t));

        data->mLockCreated = 0;
        data->mContextAttributes = NULL;
        data->mContext = NULL;
        data->mReceiver = NULL;
        data->mTopicAttributes = NULL;
        data->mTopic = NULL;
        data->mWildcardReceiverAttributes = NULL;
        data->mWildcardReceiver = NULL;
        data->mHead = NULL;
        data->mTail = NULL;

        /* Process any options */
        memset(config_file, 0, sizeof(config_file));
        strncpy(topic, DEFAULT_TOPIC, sizeof(topic));
        memset(wildcard_topic, 0, sizeof(wildcard_topic));
        while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
        {
                if (strcasecmp(key, "config") == 0)
                {
                        strncpy(config_file, value, sizeof(config_file));
                }
                else if (strcasecmp(key, "topic") == 0)
                {
                        strncpy(topic, value, sizeof(topic));
                }
                else if (strcasecmp(key, "wctopic") == 0)
                {
                        strncpy(wildcard_topic, value, sizeof(wildcard_topic));
                }
        }

        /* Initialize the context attributes */
        rc = lbm_context_attr_create_default(&(data->mContextAttributes));
        if (rc == LBM_FAILURE)
        {
                snprintf(ErrorString,
                                 sizeof(ErrorString),
                                 "lbm_context_attr_init() failed, %s",
                                 lbm_errmsg());
                rcv_cleanup(data);
                return (rc);
        }
        /* Set the default context name */
        rc = lbm_context_attr_str_setopt(data->mContextAttributes, "context_name", DEFAULT_CONTEXT_NAME);
        if (rc == LBM_FAILURE)
        {
                snprintf(ErrorString,
                                 sizeof(ErrorString),
                                 "lbm_context_attr_str_setopt() failed, %s",
                                 lbm_errmsg());
                return (rc);
        }
        if (config_file[0] != '\0')
        {
                /* A config file was passed as an option. Use it to populate the context attributes. */
                rc = lbmaux_context_attr_setopt_from_file(data->mContextAttributes, config_file);
                if (rc != 0)
                {
                        snprintf(ErrorString,
                                         sizeof(ErrorString),
                                         "lbmaux_context_attr_setopt_from_file() failed, %s",
                                         lbm_errmsg());
                        rcv_cleanup(data);
                        return (-1);
                }
        }
        /* Go back through the options, looking for any specific context options. */
        ptr = (const char *) TransportOptions;
        while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
        {
                if (sscanf(key, "%[a-zA-Z_]|%[a-zA-Z_]", scope, option) != 2)
                {
                        continue;
                }
                if (scope_is_valid(scope) == -1)
                {
                        snprintf(ErrorString,
                                         sizeof(ErrorString),
                                         "invalid option scope [%s]",
                                         scope);
                        rcv_cleanup(data);
                        return (-1);
                }
                if (strcasecmp(scope, "context") == 0)
                {
                        rc = lbm_context_attr_str_setopt(data->mContextAttributes, option, value);
                        if (rc == LBM_FAILURE)
                        {
                                snprintf(ErrorString,
                                                 sizeof(ErrorString),
                                                 "invalid option [context %s %s], %s",
                                                 option,
                                                 value,
                                                 lbm_errmsg());
                                rcv_cleanup(data);
                                return (rc);
                        }
                }
        }

        entry = &ReceiverContextOption[0];
        while (entry->option != NULL)
        {
                rc = lbm_context_attr_str_setopt(data->mContextAttributes, entry->option, entry->value);
                if (rc == LBM_FAILURE)
                {
                        snprintf(ErrorString,
                                         sizeof(ErrorString),
                                         "error setting option [context %s %s], %s",
                                         entry->option,
                                         entry->value,
                                         lbm_errmsg());
                        rcv_cleanup(data);
                        return (rc);
                }
                entry++;
        }

        /* Create the context */
        rc = lbm_context_create(&(data->mContext), data->mContextAttributes, NULL, NULL);
        if (rc == LBM_FAILURE)
        {
                snprintf(ErrorString,
                                 sizeof(ErrorString),
                                 "lbm_context_create() failed, %s",
                                 lbm_errmsg());
                rcv_cleanup(data);
                return (rc);
        }

        /* If a wildcard topic was specified, initialize the wildcard receiver attributes. */
        if (wildcard_topic[0] != '\0')
        {
                rc = lbm_wildcard_rcv_attr_create_default(&(data->mWildcardReceiverAttributes));
                if (rc == LBM_FAILURE)
                {
                        snprintf(ErrorString,
                                         sizeof(ErrorString),
                                         "lbm_wildcard_rcv_attr_init() failed, %s",
                                         lbm_errmsg());
                        rcv_cleanup(data);
                        return (rc);
                }
                if (config_file[0] != '\0')
                {
                        /* A config file was passed as an option. Use it to populate the wildcard receiver attributes. */
                        rc = lbmaux_wildcard_rcv_attr_setopt_from_file(data->mWildcardReceiverAttributes, config_file);
                        if (rc == LBM_FAILURE)
                        {
                                snprintf(ErrorString,
                                                 sizeof(ErrorString),
                                                 "lbmaux_wildcard_rcv_attr_setopt_from_file() failed, %s",
                                                 lbm_errmsg());
                                rcv_cleanup(data);
                                return (-1);
                        }
                }
                /* Go back through the options, looking for any specific wildcard receiver options. */
                ptr = (const char *) TransportOptions;
                while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
                {
                        if (sscanf(key, "%[a-zA-Z_]|%[a-zA-Z_]", scope, option) != 2)
                        {
                                continue;
                        }
                        if (strcasecmp(scope, "wildcard_receiver") == 0)
                        {
                                rc = lbm_wildcard_rcv_attr_str_setopt(data->mWildcardReceiverAttributes, option, value);
                                if (rc == LBM_FAILURE)
                                {
                                        snprintf(ErrorString,
                                                         sizeof(ErrorString),
                                                         "invalid option [wildcard_receiver %s %s], %s",
                                                         option,
                                                         value,
                                                         lbm_errmsg());
                                        rcv_cleanup(data);
                                        return (rc);
                                }
                        }
                }
                entry = &WildcardReceiverOption[0];
                while (entry->option != NULL)
                {
                        rc = lbm_wildcard_rcv_attr_str_setopt(data->mWildcardReceiverAttributes, entry->option, entry->value);
                        if (rc == LBM_FAILURE)
                        {
                                snprintf(ErrorString,
                                                 sizeof(ErrorString),
                                                 "error setting option [wildcard_receiver %s %s], %s",
                                                 entry->option,
                                                 entry->value,
                                                 lbm_errmsg());
                                rcv_cleanup(data);
                                return (rc);
                        }
                        entry++;
                }
        }

        /* Initialize and set the receiver topic attributes. */
        rc = lbm_rcv_topic_attr_create_default(&(data->mTopicAttributes));
        if (rc == LBM_FAILURE)
        {
                snprintf(ErrorString,
                                 sizeof(ErrorString),
                                 "lbm_rcv_topic_attr_init() failed, %s",
                                 lbm_errmsg());
                rcv_cleanup(data);
                return (rc);
        }
        if (config_file[0] != '\0')
        {
                /* A config file was passed as an option. Use it to populate the receiver topic attributes. */
                rc = lbmaux_rcv_topic_attr_setopt_from_file(data->mTopicAttributes, config_file);
                if (rc == LBM_FAILURE)
                {
                        snprintf(ErrorString,
                                         sizeof(ErrorString),
                                         "lbmaux_rcv_topic_attr_setopt_from_file() failed, %s",
                                         lbm_errmsg());
                        rcv_cleanup(data);
                        return (-1);
                }
        }
        /* Go back through the options, looking for any specific receiver options. */
        ptr = (const char *) TransportOptions;
        while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
        {
                if (sscanf(key, "%[a-zA-Z_]|%[a-zA-Z_]", scope, option) != 2)
                {
                        continue;
                }
                if (strcasecmp(scope, "receiver") == 0)
                {
                        rc = lbm_rcv_topic_attr_str_setopt(data->mTopicAttributes, option, value);
                        if (rc == LBM_FAILURE)
                        {
                                snprintf(ErrorString,
                                                 sizeof(ErrorString),
                                                 "invalid option [receiver %s %s], %s",
                                                 option,
                                                 value,
                                                 lbm_errmsg());
                                rcv_cleanup(data);
                                return (rc);
                        }
                }
        }
        entry = &ReceiverTopicOption[0];
        while (entry->option != NULL)
        {
                rc = lbm_rcv_topic_attr_str_setopt(data->mTopicAttributes, entry->option, entry->value);
                if (rc == LBM_FAILURE)
                {
                        snprintf(ErrorString,
                                         sizeof(ErrorString),
                                         "error setting option [receiver %s %s], %s",
                                         entry->option,
                                         entry->value,
                                         lbm_errmsg());
                        rcv_cleanup(data);
                        return (rc);
                }
                entry++;
        }

        /* For a non-wildcard topic, lookup the topic. */
        if (wildcard_topic[0] == '\0')
        {
                rc = lbm_rcv_topic_lookup(&(data->mTopic), data->mContext, topic, data->mTopicAttributes);
                if (rc == LBM_FAILURE)
                {
                        snprintf(ErrorString,
                                         sizeof(ErrorString),
                                         "lbm_rcv_topic_lookup() failed, %s",
                                         lbm_errmsg());
                        rcv_cleanup(data);
                        return (rc);
                }
        }

#ifdef _WIN32
        InitializeCriticalSection(&(data->mLock));
#else
        pthread_mutex_init(&(data->mLock), NULL);
#endif
        data->mLockCreated = 1;
        lock_receiver(data);
        if (wildcard_topic[0] != '\0')
        {
                /* Wildcard topic, create a wildcard receiver */
                rc = lbm_wildcard_rcv_create(&(data->mWildcardReceiver),
                                                                         data->mContext,
                                                                         wildcard_topic,
                                                                         data->mTopicAttributes,
                                                                         data->mWildcardReceiverAttributes,
                                                                         receive_callback,
                                                                         data,
                                                                         NULL);
        }
        else
        {
                /* Non-wildcard topic, create a normal receiver */
                rc = lbm_rcv_create(&(data->mReceiver),
                                                        data->mContext,
                                                        data->mTopic,
                                                        receive_callback,
                                                        data,
                                                        NULL);
        }
        if (rc == LBM_FAILURE)
        {
                snprintf(ErrorString,
                                 sizeof(ErrorString),
                                 "lbm_wildcard_rcv_create()/lbm_rcv_create() failed, %s",
                                 lbm_errmsg());
                unlock_receiver(data);
                rcv_cleanup(data);
                return (rc);
        }

        /* Pass back the lbmmon_transport_lbm_rcv_t created */
        *TransportClientData = data;
        unlock_receiver(data);
        return (0);
}

int
lbmmon_transport_lbm_send(const char * Data, size_t Length, void * TransportClientData)
{
        lbmmon_transport_lbm_src_t * src;
        int rc;

        if ((Data == NULL) || (TransportClientData == NULL))
        {
                strncpy(ErrorString, "Invalid argument", sizeof(ErrorString));
                return (-1);
        }
        src = (lbmmon_transport_lbm_src_t *) TransportClientData;
        rc = lbm_src_send(src->mSource, Data, Length, 0);
        if (rc == LBM_FAILURE)
        {
                snprintf(ErrorString,
                                 sizeof(ErrorString),
                                 "lbm_src_send() failed, %s",
                                 lbm_errmsg());
        }
        return (rc);
}

int
lbmmon_transport_lbm_receive(char * Data, size_t * Length, unsigned int TimeoutMS, void * TransportClientData)
{
        lbmmon_transport_lbm_rcv_t * rcv = (lbmmon_transport_lbm_rcv_t *) TransportClientData;
        lbmmon_transport_lbm_rcv_node_t * node;
        int rc = 0;
        size_t length_remaining;
#if defined(_WIN32)
#elif defined(__TANDEM)
        unsigned int sleep_sec;
        unsigned int sleep_usec;
#else
        struct timespec ivl;
#endif

        if ((Data == NULL) || (Length == NULL) || (TransportClientData == NULL))
        {
                strncpy(ErrorString, "Invalid argument", sizeof(ErrorString));
                return (-1);
        }
        if (*Length == 0)
        {
                return (0);
        }
        lock_receiver(rcv);
        if (rcv->mHead != NULL)
        {
                /* Queue is non-empty. Pull the first message from the queue. */
                node = rcv->mHead;
                length_remaining = node->mMessage->len - node->mUsedBytes;
                if (*Length >= length_remaining)
                {
                        /* We can transfer the rest of the message */
                        memcpy(Data, node->mMessage->data + node->mUsedBytes, length_remaining);
                        *Length = length_remaining;
                        rc = 0;
                        /* We're done with the LBM message, so let LBM know. */
                        lbm_msg_delete(node->mMessage);
                        /* Unlink the node from the queue */
                        rcv->mHead = node->mNext;
                        if (rcv->mHead == NULL)
                        {
                                rcv->mTail = NULL;
                        }
                        free(node);
                }
                else
                {
                        /* Can only transfer part of the message */
                        memcpy(Data, node->mMessage->data + node->mUsedBytes, *Length);
                        node->mUsedBytes -= *Length;
                        rc = 0;
                }
                unlock_receiver(rcv);
        }
        else
        {
                unlock_receiver(rcv);
                /* Sleep for wait time */
#define NANOSECONDS_PER_SECOND 1000000000
#define MICROSECONDS_PER_SECOND 1000000
#define MILLISECONDS_PER_SECOND 1000
#define NANOSECONDS_PER_MILLISECOND (NANOSECONDS_PER_SECOND / MILLISECONDS_PER_SECOND)
#define MICROSECONDS_PER_MILLISECOND (MICROSECONDS_PER_SECOND / MILLISECONDS_PER_SECOND)
#if defined(_WIN32)
                Sleep(TimeoutMS);
#elif defined(__TANDEM)
                sleep_sec = TimeoutMS / MILLISECONDS_PER_SECOND;
                sleep_usec = (TimeoutMS % MILLISECONDS_PER_SECOND) * MICROSECONDS_PER_MILLISECOND;
                if (sleep_usec > 0)
                {
                        usleep(sleep_usec);
                }
                if (sleep_sec > 0)
                {
                        sleep(sleep_sec);
                }
#else
                ivl.tv_sec = TimeoutMS / MILLISECONDS_PER_SECOND;
                ivl.tv_nsec = (TimeoutMS % MILLISECONDS_PER_SECOND) * NANOSECONDS_PER_MILLISECOND;
                nanosleep(&ivl, NULL);
#endif
                rc = 1;
        }
        return (rc);
}

void
src_cleanup(lbmmon_transport_lbm_src_t * Data)
{
        if (Data->mSource != NULL)
        {
                lbm_src_delete(Data->mSource);
                Data->mSource = NULL;
        }
        Data->mTopic = NULL;
        if (Data->mTopicAttributes != NULL)
        {
                lbm_src_topic_attr_delete(Data->mTopicAttributes);
                Data->mTopicAttributes = NULL;
        }
        if (Data->mContext != NULL)
        {
                lbm_context_delete(Data->mContext);
                Data->mContext = NULL;
        }
        if (Data->mContextAttributes != NULL)
        {
                lbm_context_attr_delete(Data->mContextAttributes);
                Data->mContextAttributes = NULL;
        }
        free(Data);
}

int
lbmmon_transport_lbm_src_finish(void * TransportClientData)
{
        lbmmon_transport_lbm_src_t * src;

        if (TransportClientData == NULL)
        {
                strncpy(ErrorString, "Invalid argument", sizeof(ErrorString));
                return (-1);
        }
        src = (lbmmon_transport_lbm_src_t *) TransportClientData;
        src_cleanup(src);
        return (0);
}

void
rcv_cleanup(lbmmon_transport_lbm_rcv_t * Data)
{
        lbmmon_transport_lbm_rcv_node_t * node;
        lbmmon_transport_lbm_rcv_node_t * next;

        /* Stop the receiver to prevent any more incoming messages */
        if (Data->mWildcardReceiver != NULL)
        {
                lbm_wildcard_rcv_delete(Data->mWildcardReceiver);
                Data->mWildcardReceiver = NULL;
        }
        if (Data->mWildcardReceiverAttributes != NULL)
        {
                lbm_wildcard_rcv_attr_delete(Data->mWildcardReceiverAttributes);
                Data->mWildcardReceiverAttributes = NULL;
        }
        if (Data->mReceiver != NULL)
        {
                lbm_rcv_delete(Data->mReceiver);
                Data->mReceiver = NULL;
        }
        if (Data->mTopicAttributes != NULL)
        {
                lbm_rcv_topic_attr_delete(Data->mTopicAttributes);
                Data->mTopicAttributes = NULL;
        }
        Data->mTopic = NULL;

        /* Lock the receiver */
        if (Data->mLockCreated != 0)
        {
                lock_receiver(Data);
        }

        /* Delete the context to really make sure no more messages come in */
        if (Data->mContext != NULL)
        {
                lbm_context_delete(Data->mContext);
                Data->mContext = NULL;
        }
        if (Data->mContextAttributes != NULL)
        {
                lbm_context_attr_delete(Data->mContextAttributes);
                Data->mContextAttributes = NULL;
        }

        /* Clean out the queue */
        node = Data->mHead;
        while (node != NULL)
        {
                /* Let LBM know we're done with the message */
                lbm_msg_delete(node->mMessage);
                next = node->mNext;
                free(node);
                node = next;
        }

        if (Data->mLockCreated)
        {
                unlock_receiver(Data);
#ifdef _WIN32
                DeleteCriticalSection(&(Data->mLock));
#else
                pthread_mutex_destroy(&(Data->mLock));
#endif
        }

        free(Data);
}

int
lbmmon_transport_lbm_rcv_finish(void * TransportClientData)
{
        lbmmon_transport_lbm_rcv_t * rcv;

        if (TransportClientData == NULL)
        {
                strncpy(ErrorString, "Invalid argument", sizeof(ErrorString));
                return (-1);
        }
        rcv = (lbmmon_transport_lbm_rcv_t *) TransportClientData;
        rcv_cleanup(rcv);
        return (0);
}

void
lock_receiver(lbmmon_transport_lbm_rcv_t * Receiver)
{
#ifdef _WIN32
        EnterCriticalSection(&(Receiver->mLock));
#else
        pthread_mutex_lock(&(Receiver->mLock));
#endif
}

void
unlock_receiver(lbmmon_transport_lbm_rcv_t * Receiver)
{
#ifdef _WIN32
        LeaveCriticalSection(&(Receiver->mLock));
#else
        pthread_mutex_unlock(&(Receiver->mLock));
#endif
}

const char *
lbmmon_transport_lbm_errmsg(void)
{
        return (ErrorString);
}

int
scope_is_valid(const char * Scope)
{
        if (strcasecmp(Scope, "context") == 0)
        {
                return (0);
        }
        if (strcasecmp(Scope, "source") == 0)
        {
                return (0);
        }
        if (strcasecmp(Scope, "receiver") == 0)
        {
                return (0);
        }
        if (strcasecmp(Scope, "event_queue") == 0)
        {
                return (0);
        }
        return (-1);
}


Generated on Thu Mar 6 13:11:14 2014 for LBM API by  doxygen 1.5.2