Source code for lbmmontrudp.c

/*
  All of the documentation and software included in this and any
  other Informatica Corporation Ultra Messaging Releases
  Copyright (C) Informatica Corporation. All rights reserved.
  
  Redistribution and use in source and binary forms, with or without
  modification, are permitted only as covered by the terms of a
  valid software license agreement with Informatica Corporation.

  Copyright (C) 2004-2014, Informatica Corporation. All Rights Reserved.

  THE SOFTWARE IS PROVIDED "AS IS" AND INFORMATICA DISCLAIMS ALL WARRANTIES 
  EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION, ANY IMPLIED WARRANTIES OF 
  NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A PARTICULAR 
  PURPOSE.  INFORMATICA DOES NOT WARRANT THAT USE OF THE SOFTWARE WILL BE 
  UNINTERRUPTED OR ERROR-FREE.  INFORMATICA SHALL NOT, UNDER ANY CIRCUMSTANCES, BE 
  LIABLE TO LICENSEE FOR LOST PROFITS, CONSEQUENTIAL, INCIDENTAL, SPECIAL OR 
  INDIRECT DAMAGES ARISING OUT OF OR RELATED TO THIS AGREEMENT OR THE 
  TRANSACTIONS CONTEMPLATED HEREUNDER, EVEN IF INFORMATICA HAS BEEN APPRISED OF 
  THE LIKELIHOOD OF SUCH DAMAGES.
  
*/

#ifdef __VOS__
#define _POSIX_C_SOURCE 200112L
#include <sys/time.h>
#endif

#include <stdio.h>
#include <time.h>
#include <string.h>
#include <stdlib.h>
#include <limits.h>
#include <errno.h>
#ifdef _WIN32
        #include <winsock2.h>
        #include <ws2tcpip.h>
        #define strcasecmp stricmp
        #define snprintf _snprintf
        typedef int ssize_t;
#else
        #include "config.h"
        #include <unistd.h>
        #if defined(__TANDEM)
                #if defined(HAVE_TANDEM_SPT)
                        #include <ktdmtyp.h>
                        #include <spthread.h>
                #else
                        #include <pthread.h>
                #endif
        #else
                #include <pthread.h>
        #endif
        #include <strings.h>
        #include <sys/socket.h>
        #include <netinet/in.h>
        #include <arpa/inet.h>
        #include <unistd.h>
#endif
#if defined(__VMS)
        typedef int socklen_t;
#endif
#include <lbm/lbmmon.h>
#include <lbm/lbmmontrudp.h>

#ifdef _WIN32
        #define LBMMON_INVALID_HANDLE INVALID_SOCKET
        #define LBMMON_SOCKET_ERROR SOCKET_ERROR
#else
        #define LBMMON_INVALID_HANDLE -1
        #define LBMMON_SOCKET_ERROR -1
#endif
#ifndef INADDR_NONE
        #define INADDR_NONE ((in_addr_t) 0xffffffff)
#endif

/*                                                                              
        Package all of the needed function pointers for this module into a
        lbmmon_transport_func_t structure.                                                                                                                                                              
*/
static const lbmmon_transport_func_t LBMMON_TRANSPORT_UDP =
{
        lbmmon_transport_udp_initsrc,
        lbmmon_transport_udp_initrcv,
        lbmmon_transport_udp_send,
        lbmmon_transport_udp_receive,
        lbmmon_transport_udp_src_finish,
        lbmmon_transport_udp_rcv_finish,
        lbmmon_transport_udp_errmsg
};

/*                                                                              
        For a statistics source, one of these gets returned as the TransportClientData.                                                                                                                                                         
*/
typedef struct
{
        /* Socket used to send a statistics packet */
#ifdef _WIN32
        SOCKET mSocket;
#else
        int mSocket;
#endif
        /* Peer socket address */
        struct sockaddr_in mPeer;
        /* Mode */
        unsigned char mMode;
} lbmmon_transport_udp_src_t;

#define MODE_UNICAST 0
#define MODE_BROADCAST 1
#define MODE_MULTICAST 2

/*
        A queue of incoming statistics packets is maintained. This describes each 
        entry in the queue.
*/
struct lbmmon_transport_udp_rcv_node_t_stct
{
        /* Pointer to the message */
        unsigned char * mMessage;
        /* Length of the message */
        size_t mMessageLength;
        /* Number of bytes of the message returned to caller */
        size_t mUsedBytes;
        /* Next entry in the queue */
        struct lbmmon_transport_udp_rcv_node_t_stct * mNext;
};
typedef struct lbmmon_transport_udp_rcv_node_t_stct lbmmon_transport_udp_rcv_node_t;

/*                                                                              
        For a statistics receiver, one of these gets returned as the TransportClientData.                                                                                                                                                               
*/
typedef struct
{
        /* Lock to prevent access by multiple threads */
#ifdef _WIN32
        CRITICAL_SECTION mLock;
#else
        pthread_mutex_t mLock;
#endif
        /* Socket used to receive packets */
#ifdef _WIN32
        SOCKET mSocket;
#else
        int mSocket;
#endif
        /* Peer socket address */
        struct sockaddr_in mPeer;
        /* Interface */
        struct sockaddr_in mInterface;
        /* Multicast membership */
        struct ip_mreq mMulticastMembership;
        /* Mode */
        unsigned char mMode;
        /* Head of the message queue */
        lbmmon_transport_udp_rcv_node_t * mHead;
        /* Tail of the message queue */
        lbmmon_transport_udp_rcv_node_t * mTail;
        /* Receiving thread */
#ifdef _WIN32
        HANDLE mThread;
#else
        pthread_t mThread;
#endif
        /* Flag to terminate thread */
        unsigned char mTerminateThread;
} lbmmon_transport_udp_rcv_t;

#define DEFAULT_INTERFACE "0.0.0.0"
#define DEFAULT_PORT "2933"
#define DEFAULT_TTL "16"

/* Error codes */
#define LBMMONTRUDP_ERR_INVALID_OPTION 1
#define LBMMONTRUDP_ERR_SOCKET 2
#define LBMMONTRUDP_ERR_SEND 3
#define LBMMONTRUDP_ERR_THREAD 4

static void lock_receiver(lbmmon_transport_udp_rcv_t * Receiver);
static void unlock_receiver(lbmmon_transport_udp_rcv_t * Receiver);
#ifdef _WIN32
static DWORD WINAPI receive_thread_proc(void * Arg);
#else
static void * receive_thread_proc(void * Arg);
#endif

static char ErrorString[1024];

static const char *
last_socket_error(void)
{
        static char message[512];
#ifdef _WIN32
        snprintf(message, sizeof(message), "error %d", WSAGetLastError());
#else
        snprintf(message,
                         sizeof(message),
                         "error %d, %s",
                         errno,
                         strerror(errno));
#endif
        return (message);
}

const lbmmon_transport_func_t *
lbmmon_transport_udp_module(void)
{
        return (&LBMMON_TRANSPORT_UDP);
}

int
lbmmon_transport_udp_initsrc(void * * TransportClientData, const void * TransportOptions)
{
        lbmmon_transport_udp_src_t * data;
        int rc;
        const char * ptr = (const char *) TransportOptions;
        char key[512];
        char value[512];
        char address[512];
        char port[512];
        char interface[512];
        char mcgroup[512];
        char bcaddress[512];
        char ttl[512];
        unsigned long port_value;
        struct in_addr multicast_group;
        struct in_addr multicast_interface;
        struct in_addr broadcast_address;
        struct in_addr host_address;
        unsigned long ttl_value = 0;

        memset(ErrorString, 0, sizeof(ErrorString));
        data = malloc(sizeof(lbmmon_transport_udp_src_t));
        multicast_group.s_addr = 0;
        multicast_interface.s_addr = 0;
        broadcast_address.s_addr = 0;
        host_address.s_addr = 0;

        /* Process any options */
        memset(address, 0, sizeof(address));
        memset(port, 0, sizeof(port));
        strcpy(port, DEFAULT_PORT);
        memset(interface, 0, sizeof(interface));
        strcpy(interface, DEFAULT_INTERFACE);
        memset(mcgroup, 0, sizeof(mcgroup));
        memset(bcaddress, 0, sizeof(bcaddress));
        memset(ttl, 0, sizeof(ttl));
        strcpy(ttl, DEFAULT_TTL);
        data->mMode = MODE_UNICAST;
        while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
        {
                if (strcasecmp(key, "address") == 0)
                {
                        strncpy(address, value, sizeof(address));
                }
                else if (strcasecmp(key, "port") == 0)
                {
                        strncpy(port, value, sizeof(port));
                }
                else if (strcasecmp(key, "interface") == 0)
                {
                        strncpy(interface, value, sizeof(interface));
                }
                else if (strcasecmp(key, "mcgroup") == 0)
                {
                        strncpy(mcgroup, value, sizeof(mcgroup));
                }
                else if (strcasecmp(key, "bcaddress") == 0)
                {
                        strncpy(bcaddress, value, sizeof(bcaddress));
                }
                else if (strcasecmp(key, "ttl") == 0)
                {
                        strncpy(ttl, value, sizeof(ttl));
                }
        }

        /*      Validate the options
                Note the following:
                - interface and ttl only apply to mcgroup
                - mcgroup (and thus multicast) takes precedence over bcaddress (and thus broadcast)
                - bcaddress takes precedence over address.
        */
        port_value = strtoul(port, NULL, 0);
        if ((port_value == ULONG_MAX) && (errno == ERANGE))
        {
                strncpy(ErrorString, "Invalid port value", sizeof(ErrorString));
                free(data);
                return (LBMMONTRUDP_ERR_INVALID_OPTION);
        }
        else if (port_value > USHRT_MAX)
        {
                strncpy(ErrorString, "Invalid port value", sizeof(ErrorString));
                free(data);
                return (LBMMONTRUDP_ERR_INVALID_OPTION);
        }

        if (mcgroup[0] != '\0')
        {
                data->mMode = MODE_MULTICAST;
                multicast_group.s_addr = inet_addr(mcgroup);
                if (multicast_group.s_addr == INADDR_NONE)
                {
                        strncpy(ErrorString, "Invalid mcgroup value", sizeof(ErrorString));
                        free(data);
                        return (LBMMONTRUDP_ERR_INVALID_OPTION);
                }
                if (!IN_MULTICAST(ntohl(multicast_group.s_addr)))
                {
                        strncpy(ErrorString, "Invalid mcgroup value", sizeof(ErrorString));
                        free(data);
                        return (LBMMONTRUDP_ERR_INVALID_OPTION);
                }
                multicast_interface.s_addr = inet_addr(interface);
                if (multicast_interface.s_addr == INADDR_NONE)
                {
                        strncpy(ErrorString, "Invalid interface value", sizeof(ErrorString));
                        free(data);
                        return (LBMMONTRUDP_ERR_INVALID_OPTION);
                }
                ttl_value = strtoul(ttl, NULL, 0);
                if ((ttl_value == ULONG_MAX) && (errno == ERANGE))
                {
                        strncpy(ErrorString, "Invalid ttl value", sizeof(ErrorString));
                        free(data);
                        return (LBMMONTRUDP_ERR_INVALID_OPTION);
                }
                else if (ttl_value > UCHAR_MAX)
                {
                        strncpy(ErrorString, "Invalid ttl value", sizeof(ErrorString));
                        free(data);
                        return (LBMMONTRUDP_ERR_INVALID_OPTION);
                }
        }
        else if (bcaddress[0] != '\0')
        {
                data->mMode = MODE_BROADCAST;
                broadcast_address.s_addr = inet_addr(bcaddress);
                if (broadcast_address.s_addr == INADDR_NONE)
                {
                        strncpy(ErrorString, "Invalid bcaddress value", sizeof(ErrorString));
                        free(data);
                        return (LBMMONTRUDP_ERR_INVALID_OPTION);
                }
        }
        else
        {
                host_address.s_addr = inet_addr(address);
                if (host_address.s_addr == INADDR_NONE)
                {
                        strncpy(ErrorString, "Invalid address value", sizeof(ErrorString));
                        free(data);
                        return (LBMMONTRUDP_ERR_INVALID_OPTION);
                }
        }

        /* Create the socket */
        data->mSocket = socket(PF_INET, SOCK_DGRAM, 0);
        if (data->mSocket == LBMMON_INVALID_HANDLE)
        {
                snprintf(ErrorString,
                                 sizeof(ErrorString),
                                 "socket() failed, %s",
                                 last_socket_error());
                free(data);
                return (LBMMONTRUDP_ERR_SOCKET);
        }

        /* If broadcast mode, enable broadcast on the socket */
        if (data->mMode == MODE_BROADCAST)
        {
                int option = 1;
                socklen_t len = sizeof(option);
                rc = setsockopt(data->mSocket, SOL_SOCKET, SO_BROADCAST, (void *) &option, len);
                if (rc == LBMMON_SOCKET_ERROR)
                {
                        snprintf(ErrorString,
                                         sizeof(ErrorString),
                                         "setsockopt(...,SO_BROADCAST,...) failed, %s",
                                         last_socket_error());
#ifdef _WIN32
                        closesocket(data->mSocket);
#else
                        close(data->mSocket);
#endif
                        free(data);
                        return (LBMMONTRUDP_ERR_SOCKET);
                }
        }

        /* For multicast, set the outgoing interface and TTL. */
        if (data->mMode == MODE_MULTICAST)
        {
                unsigned char optval = (unsigned char) ttl_value;
                struct in_addr ifc_addr;
                rc = setsockopt(data->mSocket, IPPROTO_IP, IP_MULTICAST_TTL, (void *) &optval, sizeof(optval));
                if (rc != LBMMON_SOCKET_ERROR)
                {
                        ifc_addr.s_addr = multicast_interface.s_addr;
                        rc = setsockopt(data->mSocket, IPPROTO_IP, IP_MULTICAST_IF, (void *) &ifc_addr, sizeof(ifc_addr));
                        if (rc == LBMMON_SOCKET_ERROR)
                        {
                                snprintf(ErrorString,
                                                 sizeof(ErrorString),
                                                 "setsockopt(...,IP_MULTICAST_IF,...) failed, %s",
                                                 last_socket_error());
                        }
                }
                else
                {
                        snprintf(ErrorString,
                                         sizeof(ErrorString),
                                         "setsockopt(...,IP_MULTICAST_TTL,...) failed, %s",
                                         last_socket_error());
                }
                if (rc == LBMMON_SOCKET_ERROR)
                {
#ifdef _WIN32
                        closesocket(data->mSocket);
#else
                        close(data->mSocket);
#endif
                        free(data);
                        return (LBMMONTRUDP_ERR_SOCKET);
                }
        }

        /* Build the peer sockaddr_in. */
        data->mPeer.sin_family = AF_INET;
        data->mPeer.sin_port = htons((unsigned short) port_value);
        switch (data->mMode)
        {
                case MODE_UNICAST:
                default:
                        data->mPeer.sin_addr.s_addr = host_address.s_addr;
                        break;

                case MODE_BROADCAST:
                        data->mPeer.sin_addr.s_addr = broadcast_address.s_addr;
                        break;

                case MODE_MULTICAST:
                        data->mPeer.sin_addr.s_addr = multicast_group.s_addr;
                        break;
        }

        /* Pass back the lbmmon_transport_udp_src_t created */
        *TransportClientData = data;
        return (0);
}

int
lbmmon_transport_udp_initrcv(void * * TransportClientData, const void * TransportOptions)
{
        lbmmon_transport_udp_rcv_t * data;
        int rc;
        const char * ptr = (const char *) TransportOptions;
        char key[512];
        char value[512];
        char port[512];
        char interface[512];
        char mcgroup[512];
        unsigned long port_value;
        struct in_addr multicast_group;
        struct in_addr multicast_interface;

        memset(ErrorString, 0, sizeof(ErrorString));
        data = malloc(sizeof(lbmmon_transport_udp_rcv_t));
        multicast_group.s_addr = 0;
        multicast_interface.s_addr = 0;
        data->mHead = NULL;
        data->mTail = NULL;
        data->mTerminateThread = 0;

        /* Process any options */
        memset(port, 0, sizeof(port));
        strcpy(port, DEFAULT_PORT);
        memset(interface, 0, sizeof(interface));
        strcpy(interface, DEFAULT_INTERFACE);
        memset(mcgroup, 0, sizeof(mcgroup));
        data->mMode = MODE_UNICAST;
        while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
        {
                if (strcasecmp(key, "port") == 0)
                {
                        strncpy(port, value, sizeof(port));
                }
                else if (strcasecmp(key, "interface") == 0)
                {
                        strncpy(interface, value, sizeof(interface));
                }
                else if (strcasecmp(key, "mcgroup") == 0)
                {
                        strncpy(mcgroup, value, sizeof(mcgroup));
                }
        }

        /*      Validate the options
                Note the following:
                - interface only applies to mcgroup
                - mcgroup (and thus multicast) takes precedence over broadcast/unicast
        */
        port_value = strtoul(port, NULL, 0);
        if ((port_value == ULONG_MAX) && (errno == ERANGE))
        {
                strncpy(ErrorString, "Invalid port value", sizeof(ErrorString));
                free(data);
                return (LBMMONTRUDP_ERR_INVALID_OPTION);
        }
        else if (port_value > USHRT_MAX)
        {
                strncpy(ErrorString, "Invalid port value", sizeof(ErrorString));
                free(data);
                return (LBMMONTRUDP_ERR_INVALID_OPTION);
        }

        if (mcgroup[0] != '\0')
        {
                data->mMode = MODE_MULTICAST;
                multicast_group.s_addr = inet_addr(mcgroup);
                if (multicast_group.s_addr == INADDR_NONE)
                {
                        strncpy(ErrorString, "Invalid mcgroup value", sizeof(ErrorString));
                        free(data);
                        return (LBMMONTRUDP_ERR_INVALID_OPTION);
                }
                if (!IN_MULTICAST(ntohl(multicast_group.s_addr)))
                {
                        strncpy(ErrorString, "Invalid mcgroup value", sizeof(ErrorString));
                        free(data);
                        return (LBMMONTRUDP_ERR_INVALID_OPTION);
                }
                multicast_interface.s_addr = inet_addr(interface);
                if (multicast_interface.s_addr == INADDR_NONE)
                {
                        strncpy(ErrorString, "Invalid interface value", sizeof(ErrorString));
                        free(data);
                        return (LBMMONTRUDP_ERR_INVALID_OPTION);
                }
        }

        /* Create the socket */
        data->mSocket = socket(PF_INET, SOCK_DGRAM, 0);
        if (data->mSocket == LBMMON_INVALID_HANDLE)
        {
                snprintf(ErrorString,
                                 sizeof(ErrorString),
                                 "socket() failed, %s",
                                 last_socket_error());
                free(data);
                return (LBMMONTRUDP_ERR_SOCKET);
        }

        /* Build the interface sockaddr_in. */
        memset(&(data->mInterface), 0, sizeof(data->mInterface));
        data->mInterface.sin_family = AF_INET;
        data->mInterface.sin_port = htons((unsigned short) port_value);
        data->mInterface.sin_addr.s_addr = INADDR_ANY;

        /* Bind the socket. */
        rc = bind(data->mSocket, (struct sockaddr *) &(data->mInterface), sizeof(data->mInterface));
        if (rc == LBMMON_SOCKET_ERROR)
        {
                snprintf(ErrorString,
                                 sizeof(ErrorString),
                                 "bind() failed, %s",
                                 last_socket_error());
#ifdef _WIN32
                        closesocket(data->mSocket);
#else
                        close(data->mSocket);
#endif
                        return (LBMMONTRUDP_ERR_SOCKET);
        }

        /* For multicast, join the group. */
        if (data->mMode == MODE_MULTICAST)
        {
                data->mMulticastMembership.imr_interface.s_addr = multicast_interface.s_addr;
                data->mMulticastMembership.imr_multiaddr.s_addr = multicast_group.s_addr;
                rc = setsockopt(data->mSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (void *) &(data->mMulticastMembership), sizeof(data->mMulticastMembership));
                if (rc == LBMMON_SOCKET_ERROR)
                {
                        snprintf(ErrorString,
                                         sizeof(ErrorString),
                                         "setsockopt(...,IP_ADD_MEMBERSHIP,...) failed, %s",
                                         last_socket_error());
#ifdef _WIN32
                        closesocket(data->mSocket);
#else
                        close(data->mSocket);
#endif
                        free(data);
                        return (LBMMONTRUDP_ERR_SOCKET);
                }
        }

        /* Build the peer sockaddr_in. */
        data->mPeer.sin_family = AF_INET;
        data->mPeer.sin_port = htons((unsigned short) port_value);
        data->mPeer.sin_addr.s_addr = INADDR_ANY;

#ifdef _WIN32
        InitializeCriticalSection(&(data->mLock));
#else
        pthread_mutex_init(&(data->mLock), NULL);
#endif

        /* Start the receive thread */
#ifdef _WIN32
        data->mThread = CreateThread(NULL, 0, receive_thread_proc, data, 0, NULL);
        if (data->mThread == NULL)
        {
                snprintf(ErrorString,
                                 sizeof(ErrorString),
                                 "CreateThread() failed, error %d",
                                 GetLastError());
                closesocket(data->mSocket);
                free(data);
                return (LBMMONTRUDP_ERR_THREAD);
        }
#else
#ifdef __VOS__
        {
                pthread_attr_t pth_attr;
                pthread_attr_init (&pth_attr);
                pthread_attr_setschedpolicy (&pth_attr, SCHED_RR);
                rc = pthread_create(&(data->mThread), &pth_attr, receive_thread_proc, data);
        }
#else
        rc = pthread_create(&(data->mThread), NULL, receive_thread_proc, data);
#endif
        if (rc != 0)
        {
                snprintf(ErrorString,
                                 sizeof(ErrorString),
                                 "pthread_create() failed, error %d, %s",
                                 rc,
                                 strerror(rc));
                close(data->mSocket);
                free(data);
                return (LBMMONTRUDP_ERR_THREAD);
        }
#endif

        /* Pass back the lbmmon_transport_udp_rcv_t created */
        *TransportClientData = data;
        return (0);
}

#ifdef _WIN32
DWORD WINAPI
receive_thread_proc(void * Arg)
#else
void *
receive_thread_proc(void * Arg)
#endif
{
        lbmmon_transport_udp_rcv_t * rcv = (lbmmon_transport_udp_rcv_t *) Arg;
        unsigned char buffer[8192];
        struct timeval timeout;
        fd_set readfds;
        int rc;
        ssize_t bytes_read;
        lbmmon_transport_udp_rcv_node_t * node;

        while (rcv->mTerminateThread == 0)
        {
                FD_ZERO(&readfds);
                FD_SET(rcv->mSocket, &readfds);
                timeout.tv_sec = 0;
                timeout.tv_usec = 500000;
                rc = select(rcv->mSocket + 1, &readfds, NULL, NULL, &timeout);
                if (rc <= 0)
                {
                        continue;
                }
                bytes_read = recvfrom(rcv->mSocket, buffer, sizeof(buffer), 0, NULL, NULL);
                if (bytes_read == LBMMON_SOCKET_ERROR)
                {
                        continue;
                }

                /* A data message. We want to enqueue it for processing. */
                lock_receiver(rcv);
                node = malloc(sizeof(lbmmon_transport_udp_rcv_node_t));
                node->mMessage = malloc((size_t) bytes_read);
                memcpy(node->mMessage, buffer, (size_t) bytes_read);
                node->mMessageLength = (size_t) bytes_read;
                node->mUsedBytes = 0;   /* No data returned as yet */

                /* Link the message onto the queue */
                node->mNext = NULL;
                if (rcv->mTail != NULL)
                {
                        rcv->mTail->mNext = node;
                }
                else
                {
                        rcv->mHead = node;
                }
                rcv->mTail = node;
                unlock_receiver(rcv);
        }
#ifdef _WIN32
        return (0);
#else
        return (NULL);
#endif
}

int
lbmmon_transport_udp_send(const char * Data, size_t Length, void * TransportClientData)
{
        lbmmon_transport_udp_src_t * src;
        int rc;

        if ((Data == NULL) || (TransportClientData == NULL))
        {
                return (-1);
        }
        src = (lbmmon_transport_udp_src_t *) TransportClientData;
        rc = sendto(src->mSocket, Data, Length, 0, (struct sockaddr *) &(src->mPeer), sizeof(src->mPeer));
        if (rc == LBMMON_SOCKET_ERROR)
        {
                snprintf(ErrorString,
                                 sizeof(ErrorString),
                                 "sendto() failed, %s",
                                 last_socket_error());
                return (LBMMONTRUDP_ERR_SEND);
        }
        return (0);
}

int
lbmmon_transport_udp_receive(char * Data, size_t * Length, unsigned int TimeoutMS, void * TransportClientData)
{
        lbmmon_transport_udp_rcv_t * rcv = (lbmmon_transport_udp_rcv_t *) TransportClientData;
        lbmmon_transport_udp_rcv_node_t * node;
        int rc = 0;
        size_t length_remaining;
#if defined(_WIN32)
#elif defined(__TANDEM)
        unsigned int sleep_sec;
        unsigned int sleep_usec;
#else
        struct timespec ivl;
#endif

        if ((Data == NULL) || (Length == NULL) || (TransportClientData == NULL))
        {
                return (-1);
        }
        if (*Length == 0)
        {
                return (0);
        }
        lock_receiver(rcv);
        if (rcv->mHead != NULL)
        {
                /* Queue is non-empty. Pull the first message from the queue. */
                node = rcv->mHead;
                length_remaining = node->mMessageLength - node->mUsedBytes;
                if (*Length >= length_remaining)
                {
                        /* We can transfer the rest of the message */
                        memcpy(Data, node->mMessage + node->mUsedBytes, length_remaining);
                        *Length = length_remaining;
                        rc = 0;
                        /* We're done with the message, so free it. */
                        free(node->mMessage);
                        node->mMessage = NULL;
                        /* Unlink the node from the queue */
                        rcv->mHead = node->mNext;
                        if (rcv->mHead == NULL)
                        {
                                rcv->mTail = NULL;
                        }
                        free(node);
                }
                else
                {
                        /* Can only transfer part of the message */
                        memcpy(Data, node->mMessage + node->mUsedBytes, *Length);
                        node->mUsedBytes -= *Length;
                        rc = 0;
                }
                unlock_receiver(rcv);
        }
        else
        {
                unlock_receiver(rcv);
                /* Sleep for wait time */
#define NANOSECONDS_PER_SECOND 1000000000
#define MICROSECONDS_PER_SECOND 1000000
#define MILLISECONDS_PER_SECOND 1000
#define NANOSECONDS_PER_MILLISECOND (NANOSECONDS_PER_SECOND / MILLISECONDS_PER_SECOND)
#define MICROSECONDS_PER_MILLISECOND (MICROSECONDS_PER_SECOND / MILLISECONDS_PER_SECOND)
#if defined(_WIN32)
                Sleep(TimeoutMS);
#elif defined(__TANDEM)
                sleep_sec = TimeoutMS / MILLISECONDS_PER_SECOND;
                sleep_usec = (TimeoutMS % MILLISECONDS_PER_SECOND) * MICROSECONDS_PER_MILLISECOND;
                if (sleep_usec > 0)
                {
                        usleep(sleep_usec);
                }
                if (sleep_sec > 0)
                {
                        sleep(sleep_sec);
                }
#else
                ivl.tv_sec = TimeoutMS / MILLISECONDS_PER_SECOND;
                ivl.tv_nsec = (TimeoutMS % MILLISECONDS_PER_SECOND) * NANOSECONDS_PER_MILLISECOND;
                nanosleep(&ivl, NULL);
#endif
                rc = 1;
        }
        return (rc);
}

int
lbmmon_transport_udp_src_finish(void * TransportClientData)
{
        lbmmon_transport_udp_src_t * src;

        if (TransportClientData == NULL)
        {
                strncpy(ErrorString, "Invalid parameter", sizeof(ErrorString));
                return (-1);
        }
        src = (lbmmon_transport_udp_src_t *) TransportClientData;

#ifdef _WIN32
        closesocket(src->mSocket);
#else
        close(src->mSocket);
#endif
        /* Clean up our data */
        free(TransportClientData);
        return (0);
}

int
lbmmon_transport_udp_rcv_finish(void * TransportClientData)
{
        lbmmon_transport_udp_rcv_t * rcv = (lbmmon_transport_udp_rcv_t *) TransportClientData;
        lbmmon_transport_udp_rcv_node_t * node;
        lbmmon_transport_udp_rcv_node_t * next;
        int rc;

        /* Stop the thread to prevent any more incoming messages */
        rcv->mTerminateThread = 1;

        /* Lock the receiver */
        lock_receiver(rcv);

        /* Clean out the queue */
        node = rcv->mHead;
        while (node != NULL)
        {
                /* Let LBM know we're done with the message */
                free(node->mMessage);
                next = node->mNext;
                free(node);
                node = next;
        }

        unlock_receiver(rcv);

        /* If multicast, drop membership. */
        if (rcv->mMode == MODE_MULTICAST)
        {
                rc = setsockopt(rcv->mSocket, IPPROTO_IP, IP_DROP_MEMBERSHIP, (void *) &(rcv->mMulticastMembership), sizeof(rcv->mMulticastMembership));
        }
#ifdef _WIN32
        closesocket(rcv->mSocket);
#else
        close(rcv->mSocket);
#endif
#ifdef _WIN32
        DeleteCriticalSection(&(rcv->mLock));
#else
        pthread_mutex_destroy(&(rcv->mLock));
#endif

        free(TransportClientData);
        return (0);
}

void
lock_receiver(lbmmon_transport_udp_rcv_t * Receiver)
{
#ifdef _WIN32
        EnterCriticalSection(&(Receiver->mLock));
#else
        pthread_mutex_lock(&(Receiver->mLock));
#endif
}

void
unlock_receiver(lbmmon_transport_udp_rcv_t * Receiver)
{
#ifdef _WIN32
        LeaveCriticalSection(&(Receiver->mLock));
#else
        pthread_mutex_unlock(&(Receiver->mLock));
#endif
}

const char *
lbmmon_transport_udp_errmsg(void)
{
        return (ErrorString);
}


Generated on Thu Mar 6 13:11:14 2014 for LBM API by  doxygen 1.5.2