UM C API  6.16.1
Source code for lbmmontrudp.c
/*
All of the documentation and software included in this and any
other Informatica Inc. Ultra Messaging Releases
Copyright (C) Informatica Inc. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted only as covered by the terms of a
valid software license agreement with Informatica Inc.
(C) Copyright 2004,2023 Informatica Inc. All Rights Reserved.
THE SOFTWARE IS PROVIDED "AS IS" AND INFORMATICA DISCLAIMS ALL WARRANTIES
EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION, ANY IMPLIED WARRANTIES OF
NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A PARTICULAR
PURPOSE. INFORMATICA DOES NOT WARRANT THAT USE OF THE SOFTWARE WILL BE
UNINTERRUPTED OR ERROR-FREE. INFORMATICA SHALL NOT, UNDER ANY CIRCUMSTANCES, BE
LIABLE TO LICENSEE FOR LOST PROFITS, CONSEQUENTIAL, INCIDENTAL, SPECIAL OR
INDIRECT DAMAGES ARISING OUT OF OR RELATED TO THIS AGREEMENT OR THE
TRANSACTIONS CONTEMPLATED HEREUNDER, EVEN IF INFORMATICA HAS BEEN APPRISED OF
THE LIKELIHOOD OF SUCH DAMAGES.
*/
#ifdef __VOS__
#define _POSIX_C_SOURCE 200112L
#include <sys/time.h>
#endif
#include <stdio.h>
#include <time.h>
#include <string.h>
#include <stdlib.h>
#include <limits.h>
#include <errno.h>
#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
#define strcasecmp stricmp
#define snprintf _snprintf
typedef int ssize_t;
#else
#include "config.h"
#include <unistd.h>
#if defined(__TANDEM)
#if defined(HAVE_TANDEM_SPT)
#include <ktdmtyp.h>
#include <spthread.h>
#else
#include <pthread.h>
#endif
#else
#include <pthread.h>
#endif
#include <strings.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#endif
#if defined(__VMS)
typedef int socklen_t;
#endif
#include <lbm/lbmmon.h>
#include <lbm/lbmmontrudp.h>
#ifdef _WIN32
#define LBMMON_INVALID_HANDLE INVALID_SOCKET
#define LBMMON_SOCKET_ERROR SOCKET_ERROR
#else
#define LBMMON_INVALID_HANDLE -1
#define LBMMON_SOCKET_ERROR -1
#endif
#ifndef INADDR_NONE
#define INADDR_NONE ((in_addr_t) 0xffffffff)
#endif
/*
Package all of the needed function pointers for this module into a
lbmmon_transport_func_t structure.
*/
static const lbmmon_transport_func_t LBMMON_TRANSPORT_UDP =
{
lbmmon_transport_udp_initsrc,
lbmmon_transport_udp_initrcv,
lbmmon_transport_udp_send,
lbmmon_transport_udp_receive,
lbmmon_transport_udp_src_finish,
lbmmon_transport_udp_rcv_finish,
lbmmon_transport_udp_errmsg,
NULL,
NULL,
NULL,
NULL
};
/*
For a statistics source, one of these gets returned as the TransportClientData.
*/
typedef struct
{
/* Socket used to send a statistics packet */
#ifdef _WIN32
SOCKET mSocket;
#else
int mSocket;
#endif
/* Peer socket address */
struct sockaddr_in mPeer;
/* Mode */
unsigned char mMode;
} lbmmon_transport_udp_src_t;
#define MODE_UNICAST 0
#define MODE_BROADCAST 1
#define MODE_MULTICAST 2
/*
A queue of incoming statistics packets is maintained. This describes each
entry in the queue.
*/
struct lbmmon_transport_udp_rcv_node_t_stct
{
/* Pointer to the message */
unsigned char * mMessage;
/* Length of the message */
size_t mMessageLength;
/* Number of bytes of the message returned to caller */
size_t mUsedBytes;
/* Next entry in the queue */
struct lbmmon_transport_udp_rcv_node_t_stct * mNext;
};
typedef struct lbmmon_transport_udp_rcv_node_t_stct lbmmon_transport_udp_rcv_node_t;
/*
For a statistics receiver, one of these gets returned as the TransportClientData.
*/
typedef struct
{
/* Lock to prevent access by multiple threads */
#ifdef _WIN32
CRITICAL_SECTION mLock;
#else
pthread_mutex_t mLock;
#endif
/* Socket used to receive packets */
#ifdef _WIN32
SOCKET mSocket;
#else
int mSocket;
#endif
/* Peer socket address */
struct sockaddr_in mPeer;
/* Interface */
struct sockaddr_in mInterface;
/* Multicast membership */
struct ip_mreq mMulticastMembership;
/* Mode */
unsigned char mMode;
/* Head of the message queue */
lbmmon_transport_udp_rcv_node_t * mHead;
/* Tail of the message queue */
lbmmon_transport_udp_rcv_node_t * mTail;
/* Receiving thread */
#ifdef _WIN32
HANDLE mThread;
#else
pthread_t mThread;
#endif
/* Flag to terminate thread */
unsigned char mTerminateThread;
} lbmmon_transport_udp_rcv_t;
#define DEFAULT_INTERFACE "0.0.0.0"
#define DEFAULT_PORT "2933"
#define DEFAULT_TTL "16"
/* Error codes */
#define LBMMONTRUDP_ERR_INVALID_OPTION 1
#define LBMMONTRUDP_ERR_SOCKET 2
#define LBMMONTRUDP_ERR_SEND 3
#define LBMMONTRUDP_ERR_THREAD 4
static void lock_receiver(lbmmon_transport_udp_rcv_t * Receiver);
static void unlock_receiver(lbmmon_transport_udp_rcv_t * Receiver);
#ifdef _WIN32
static DWORD WINAPI receive_thread_proc(void * Arg);
#else
static void * receive_thread_proc(void * Arg);
#endif
static char ErrorString[1024];
static const char *
last_socket_error(void)
{
static char message[512];
#ifdef _WIN32
snprintf(message, sizeof(message), "error %d", WSAGetLastError());
#else
snprintf(message,
sizeof(message),
"error %d, %s",
errno,
strerror(errno));
#endif
return (message);
}
static void lbmmon_transport_udp_report_allocation_error(size_t Size)
{
snprintf(ErrorString, sizeof(ErrorString), "Unable to allocate %u bytes", (unsigned) Size);
}
lbmmon_transport_udp_module(void)
{
return (&LBMMON_TRANSPORT_UDP);
}
int
lbmmon_transport_udp_initsrc(void * * TransportClientData, const void * TransportOptions)
{
lbmmon_transport_udp_src_t * data;
int rc;
const char * ptr = (const char *) TransportOptions;
char key[512];
char value[512];
char address[512];
char port[512];
char interface[512];
char mcgroup[512];
char bcaddress[512];
char ttl[512];
unsigned long port_value;
struct in_addr multicast_group;
struct in_addr multicast_interface;
struct in_addr broadcast_address;
struct in_addr host_address;
unsigned long ttl_value = 0;
memset(ErrorString, 0, sizeof(ErrorString));
data = malloc(sizeof(lbmmon_transport_udp_src_t));
if (data == NULL)
{
lbmmon_transport_udp_report_allocation_error(sizeof(lbmmon_transport_udp_src_t));
return (-1);
}
multicast_group.s_addr = 0;
multicast_interface.s_addr = 0;
broadcast_address.s_addr = 0;
host_address.s_addr = 0;
/* Process any options */
memset(address, 0, sizeof(address));
memset(port, 0, sizeof(port));
strcpy(port, DEFAULT_PORT);
memset(interface, 0, sizeof(interface));
strcpy(interface, DEFAULT_INTERFACE);
memset(mcgroup, 0, sizeof(mcgroup));
memset(bcaddress, 0, sizeof(bcaddress));
memset(ttl, 0, sizeof(ttl));
strcpy(ttl, DEFAULT_TTL);
data->mMode = MODE_UNICAST;
while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
{
if (strcasecmp(key, "address") == 0)
{
strncpy(address, value, sizeof(address));
}
else if (strcasecmp(key, "port") == 0)
{
strncpy(port, value, sizeof(port));
}
else if (strcasecmp(key, "interface") == 0)
{
strncpy(interface, value, sizeof(interface));
}
else if (strcasecmp(key, "mcgroup") == 0)
{
strncpy(mcgroup, value, sizeof(mcgroup));
}
else if (strcasecmp(key, "bcaddress") == 0)
{
strncpy(bcaddress, value, sizeof(bcaddress));
}
else if (strcasecmp(key, "ttl") == 0)
{
strncpy(ttl, value, sizeof(ttl));
}
}
/* Validate the options
Note the following:
- interface and ttl only apply to mcgroup
- mcgroup (and thus multicast) takes precedence over bcaddress (and thus broadcast)
- bcaddress takes precedence over address.
*/
port_value = strtoul(port, NULL, 0);
if ((port_value == ULONG_MAX) && (errno == ERANGE))
{
strncpy(ErrorString, "Invalid port value", sizeof(ErrorString));
free(data);
return (LBMMONTRUDP_ERR_INVALID_OPTION);
}
else if (port_value > USHRT_MAX)
{
strncpy(ErrorString, "Invalid port value", sizeof(ErrorString));
free(data);
return (LBMMONTRUDP_ERR_INVALID_OPTION);
}
if (mcgroup[0] != '\0')
{
data->mMode = MODE_MULTICAST;
multicast_group.s_addr = inet_addr(mcgroup);
if (multicast_group.s_addr == INADDR_NONE)
{
strncpy(ErrorString, "Invalid mcgroup value", sizeof(ErrorString));
free(data);
return (LBMMONTRUDP_ERR_INVALID_OPTION);
}
if (!IN_MULTICAST(ntohl(multicast_group.s_addr)))
{
strncpy(ErrorString, "Invalid mcgroup value", sizeof(ErrorString));
free(data);
return (LBMMONTRUDP_ERR_INVALID_OPTION);
}
multicast_interface.s_addr = inet_addr(interface);
if (multicast_interface.s_addr == INADDR_NONE)
{
strncpy(ErrorString, "Invalid interface value", sizeof(ErrorString));
free(data);
return (LBMMONTRUDP_ERR_INVALID_OPTION);
}
ttl_value = strtoul(ttl, NULL, 0);
if ((ttl_value == ULONG_MAX) && (errno == ERANGE))
{
strncpy(ErrorString, "Invalid ttl value", sizeof(ErrorString));
free(data);
return (LBMMONTRUDP_ERR_INVALID_OPTION);
}
else if (ttl_value > UCHAR_MAX)
{
strncpy(ErrorString, "Invalid ttl value", sizeof(ErrorString));
free(data);
return (LBMMONTRUDP_ERR_INVALID_OPTION);
}
}
else if (bcaddress[0] != '\0')
{
data->mMode = MODE_BROADCAST;
broadcast_address.s_addr = inet_addr(bcaddress);
if (broadcast_address.s_addr == INADDR_NONE)
{
strncpy(ErrorString, "Invalid bcaddress value", sizeof(ErrorString));
free(data);
return (LBMMONTRUDP_ERR_INVALID_OPTION);
}
}
else
{
host_address.s_addr = inet_addr(address);
if (host_address.s_addr == INADDR_NONE)
{
strncpy(ErrorString, "Invalid address value", sizeof(ErrorString));
free(data);
return (LBMMONTRUDP_ERR_INVALID_OPTION);
}
}
/* Create the socket */
data->mSocket = socket(PF_INET, SOCK_DGRAM, 0);
if (data->mSocket == LBMMON_INVALID_HANDLE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"socket() failed, %s",
last_socket_error());
free(data);
return (LBMMONTRUDP_ERR_SOCKET);
}
/* If broadcast mode, enable broadcast on the socket */
if (data->mMode == MODE_BROADCAST)
{
int option = 1;
socklen_t len = sizeof(option);
rc = setsockopt(data->mSocket, SOL_SOCKET, SO_BROADCAST, (void *) &option, len);
if (rc == LBMMON_SOCKET_ERROR)
{
snprintf(ErrorString,
sizeof(ErrorString),
"setsockopt(...,SO_BROADCAST,...) failed, %s",
last_socket_error());
#ifdef _WIN32
closesocket(data->mSocket);
#else
close(data->mSocket);
#endif
free(data);
return (LBMMONTRUDP_ERR_SOCKET);
}
}
/* For multicast, set the outgoing interface and TTL. */
if (data->mMode == MODE_MULTICAST)
{
unsigned char optval = (unsigned char) ttl_value;
struct in_addr ifc_addr;
rc = setsockopt(data->mSocket, IPPROTO_IP, IP_MULTICAST_TTL, (void *) &optval, sizeof(optval));
if (rc != LBMMON_SOCKET_ERROR)
{
ifc_addr.s_addr = multicast_interface.s_addr;
rc = setsockopt(data->mSocket, IPPROTO_IP, IP_MULTICAST_IF, (void *) &ifc_addr, sizeof(ifc_addr));
if (rc == LBMMON_SOCKET_ERROR)
{
snprintf(ErrorString,
sizeof(ErrorString),
"setsockopt(...,IP_MULTICAST_IF,...) failed, %s",
last_socket_error());
}
}
else
{
snprintf(ErrorString,
sizeof(ErrorString),
"setsockopt(...,IP_MULTICAST_TTL,...) failed, %s",
last_socket_error());
}
if (rc == LBMMON_SOCKET_ERROR)
{
#ifdef _WIN32
closesocket(data->mSocket);
#else
close(data->mSocket);
#endif
free(data);
return (LBMMONTRUDP_ERR_SOCKET);
}
}
/* Build the peer sockaddr_in. */
data->mPeer.sin_family = AF_INET;
data->mPeer.sin_port = htons((unsigned short) port_value);
switch (data->mMode)
{
case MODE_UNICAST:
default:
data->mPeer.sin_addr.s_addr = host_address.s_addr;
break;
case MODE_BROADCAST:
data->mPeer.sin_addr.s_addr = broadcast_address.s_addr;
break;
case MODE_MULTICAST:
data->mPeer.sin_addr.s_addr = multicast_group.s_addr;
break;
}
/* Pass back the lbmmon_transport_udp_src_t created */
*TransportClientData = data;
return (0);
}
int
lbmmon_transport_udp_initrcv(void * * TransportClientData, const void * TransportOptions)
{
lbmmon_transport_udp_rcv_t * data;
int rc;
const char * ptr = (const char *) TransportOptions;
char key[512];
char value[512];
char port[512];
char interface[512];
char mcgroup[512];
unsigned long port_value;
struct in_addr multicast_group;
struct in_addr multicast_interface;
memset(ErrorString, 0, sizeof(ErrorString));
data = malloc(sizeof(lbmmon_transport_udp_rcv_t));
if (data == NULL)
{
lbmmon_transport_udp_report_allocation_error(sizeof(lbmmon_transport_udp_rcv_t));
return (-1);
}
multicast_group.s_addr = 0;
multicast_interface.s_addr = 0;
data->mHead = NULL;
data->mTail = NULL;
data->mTerminateThread = 0;
/* Process any options */
memset(port, 0, sizeof(port));
strcpy(port, DEFAULT_PORT);
memset(interface, 0, sizeof(interface));
strcpy(interface, DEFAULT_INTERFACE);
memset(mcgroup, 0, sizeof(mcgroup));
data->mMode = MODE_UNICAST;
while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
{
if (strcasecmp(key, "port") == 0)
{
strncpy(port, value, sizeof(port));
}
else if (strcasecmp(key, "interface") == 0)
{
strncpy(interface, value, sizeof(interface));
}
else if (strcasecmp(key, "mcgroup") == 0)
{
strncpy(mcgroup, value, sizeof(mcgroup));
}
}
/* Validate the options
Note the following:
- interface only applies to mcgroup
- mcgroup (and thus multicast) takes precedence over broadcast/unicast
*/
port_value = strtoul(port, NULL, 0);
if ((port_value == ULONG_MAX) && (errno == ERANGE))
{
strncpy(ErrorString, "Invalid port value", sizeof(ErrorString));
free(data);
return (LBMMONTRUDP_ERR_INVALID_OPTION);
}
else if (port_value > USHRT_MAX)
{
strncpy(ErrorString, "Invalid port value", sizeof(ErrorString));
free(data);
return (LBMMONTRUDP_ERR_INVALID_OPTION);
}
if (mcgroup[0] != '\0')
{
data->mMode = MODE_MULTICAST;
multicast_group.s_addr = inet_addr(mcgroup);
if (multicast_group.s_addr == INADDR_NONE)
{
strncpy(ErrorString, "Invalid mcgroup value", sizeof(ErrorString));
free(data);
return (LBMMONTRUDP_ERR_INVALID_OPTION);
}
if (!IN_MULTICAST(ntohl(multicast_group.s_addr)))
{
strncpy(ErrorString, "Invalid mcgroup value", sizeof(ErrorString));
free(data);
return (LBMMONTRUDP_ERR_INVALID_OPTION);
}
multicast_interface.s_addr = inet_addr(interface);
if (multicast_interface.s_addr == INADDR_NONE)
{
strncpy(ErrorString, "Invalid interface value", sizeof(ErrorString));
free(data);
return (LBMMONTRUDP_ERR_INVALID_OPTION);
}
}
/* Create the socket */
data->mSocket = socket(PF_INET, SOCK_DGRAM, 0);
if (data->mSocket == LBMMON_INVALID_HANDLE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"socket() failed, %s",
last_socket_error());
free(data);
return (LBMMONTRUDP_ERR_SOCKET);
}
/* Build the interface sockaddr_in. */
memset(&(data->mInterface), 0, sizeof(data->mInterface));
data->mInterface.sin_family = AF_INET;
data->mInterface.sin_port = htons((unsigned short) port_value);
data->mInterface.sin_addr.s_addr = INADDR_ANY;
/* Bind the socket. */
rc = bind(data->mSocket, (struct sockaddr *) &(data->mInterface), sizeof(data->mInterface));
if (rc == LBMMON_SOCKET_ERROR)
{
snprintf(ErrorString,
sizeof(ErrorString),
"bind() failed, %s",
last_socket_error());
#ifdef _WIN32
closesocket(data->mSocket);
#else
close(data->mSocket);
#endif
return (LBMMONTRUDP_ERR_SOCKET);
}
/* For multicast, join the group. */
if (data->mMode == MODE_MULTICAST)
{
data->mMulticastMembership.imr_interface.s_addr = multicast_interface.s_addr;
data->mMulticastMembership.imr_multiaddr.s_addr = multicast_group.s_addr;
rc = setsockopt(data->mSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (void *) &(data->mMulticastMembership), sizeof(data->mMulticastMembership));
if (rc == LBMMON_SOCKET_ERROR)
{
snprintf(ErrorString,
sizeof(ErrorString),
"setsockopt(...,IP_ADD_MEMBERSHIP,...) failed, %s",
last_socket_error());
#ifdef _WIN32
closesocket(data->mSocket);
#else
close(data->mSocket);
#endif
free(data);
return (LBMMONTRUDP_ERR_SOCKET);
}
}
/* Build the peer sockaddr_in. */
data->mPeer.sin_family = AF_INET;
data->mPeer.sin_port = htons((unsigned short) port_value);
data->mPeer.sin_addr.s_addr = INADDR_ANY;
#ifdef _WIN32
InitializeCriticalSection(&(data->mLock));
#else
pthread_mutex_init(&(data->mLock), NULL);
#endif
/* Start the receive thread */
#ifdef _WIN32
data->mThread = CreateThread(NULL, 0, receive_thread_proc, data, 0, NULL);
if (data->mThread == NULL)
{
snprintf(ErrorString,
sizeof(ErrorString),
"CreateThread() failed, error %d",
GetLastError());
closesocket(data->mSocket);
free(data);
return (LBMMONTRUDP_ERR_THREAD);
}
#else
#ifdef __VOS__
{
pthread_attr_t pth_attr;
pthread_attr_init (&pth_attr);
pthread_attr_setschedpolicy (&pth_attr, SCHED_RR);
rc = pthread_create(&(data->mThread), &pth_attr, receive_thread_proc, data);
}
#else
rc = pthread_create(&(data->mThread), NULL, receive_thread_proc, data);
#endif
if (rc != 0)
{
snprintf(ErrorString,
sizeof(ErrorString),
"pthread_create() failed, error %d, %s",
rc,
strerror(rc));
close(data->mSocket);
free(data);
return (LBMMONTRUDP_ERR_THREAD);
}
#endif
/* Pass back the lbmmon_transport_udp_rcv_t created */
*TransportClientData = data;
return (0);
}
#ifdef _WIN32
DWORD WINAPI
receive_thread_proc(void * Arg)
#else
void *
receive_thread_proc(void * Arg)
#endif
{
lbmmon_transport_udp_rcv_t * rcv = (lbmmon_transport_udp_rcv_t *) Arg;
unsigned char buffer[8192];
struct timeval timeout;
fd_set readfds;
int rc;
ssize_t bytes_read;
lbmmon_transport_udp_rcv_node_t * node;
while (rcv->mTerminateThread == 0)
{
FD_ZERO(&readfds);
FD_SET(rcv->mSocket, &readfds);
timeout.tv_sec = 0;
timeout.tv_usec = 500000;
rc = select(rcv->mSocket + 1, &readfds, NULL, NULL, &timeout);
if (rc <= 0)
{
continue;
}
bytes_read = recvfrom(rcv->mSocket, buffer, sizeof(buffer), 0, NULL, NULL);
if (bytes_read == LBMMON_SOCKET_ERROR)
{
continue;
}
/* A data message. We want to enqueue it for processing. */
lock_receiver(rcv);
node = malloc(sizeof(lbmmon_transport_udp_rcv_node_t));
if (node == NULL)
{
lbmmon_transport_udp_report_allocation_error(sizeof(lbmmon_transport_udp_rcv_node_t));
break;
}
node->mMessage = malloc((size_t) bytes_read);
if (node->mMessage == NULL)
{
lbmmon_transport_udp_report_allocation_error(bytes_read);
break;
}
memcpy(node->mMessage, buffer, (size_t) bytes_read);
node->mMessageLength = (size_t) bytes_read;
node->mUsedBytes = 0; /* No data returned as yet */
/* Link the message onto the queue */
node->mNext = NULL;
if (rcv->mTail != NULL)
{
rcv->mTail->mNext = node;
}
else
{
rcv->mHead = node;
}
rcv->mTail = node;
unlock_receiver(rcv);
}
#ifdef _WIN32
return (0);
#else
return (NULL);
#endif
}
int
lbmmon_transport_udp_send(const char * Data, size_t Length, void * TransportClientData)
{
lbmmon_transport_udp_src_t * src;
int rc;
if ((Data == NULL) || (TransportClientData == NULL))
{
return (-1);
}
src = (lbmmon_transport_udp_src_t *) TransportClientData;
rc = sendto(src->mSocket, Data, Length, 0, (struct sockaddr *) &(src->mPeer), sizeof(src->mPeer));
if (rc == LBMMON_SOCKET_ERROR)
{
snprintf(ErrorString,
sizeof(ErrorString),
"sendto() failed, %s",
last_socket_error());
return (LBMMONTRUDP_ERR_SEND);
}
return (0);
}
int
lbmmon_transport_udp_receive(char * Data, size_t * Length, unsigned int TimeoutMS, void * TransportClientData)
{
lbmmon_transport_udp_rcv_t * rcv = (lbmmon_transport_udp_rcv_t *) TransportClientData;
lbmmon_transport_udp_rcv_node_t * node;
int rc = 0;
size_t length_remaining;
#if defined(_WIN32)
#elif defined(__TANDEM)
unsigned int sleep_sec;
unsigned int sleep_usec;
#else
struct timespec ivl;
#endif
if ((Data == NULL) || (Length == NULL) || (TransportClientData == NULL))
{
return (-1);
}
if (*Length == 0)
{
return (0);
}
lock_receiver(rcv);
if (rcv->mHead != NULL)
{
/* Queue is non-empty. Pull the first message from the queue. */
node = rcv->mHead;
length_remaining = node->mMessageLength - node->mUsedBytes;
if (*Length >= length_remaining)
{
/* We can transfer the rest of the message */
memcpy(Data, node->mMessage + node->mUsedBytes, length_remaining);
*Length = length_remaining;
rc = 0;
/* We're done with the message, so free it. */
free(node->mMessage);
node->mMessage = NULL;
/* Unlink the node from the queue */
rcv->mHead = node->mNext;
if (rcv->mHead == NULL)
{
rcv->mTail = NULL;
}
free(node);
}
else
{
/* MSGDESC: The monitoring message received is larger than the maximum allowed size given.
* MSGRES: This is a hard coded maximum. */
lbm_logf(LBM_LOG_ERR, "Core-8034-2: [LBMMON] Dropping monitoring message that is larger than the maximum allowed size of %d (size=%d)",
*Length, node->mMessageLength);
/* We're done with the message, so free it. */
free(node->mMessage);
node->mMessage = NULL;
/* Unlink the node from the queue */
rcv->mHead = node->mNext;
if (rcv->mHead == NULL)
{
rcv->mTail = NULL;
}
free(node);
rc = 1; /* Positive number prevents caller from logging message too */
}
unlock_receiver(rcv);
}
else
{
unlock_receiver(rcv);
/* Sleep for wait time */
#define NANOSECONDS_PER_SECOND 1000000000
#define MICROSECONDS_PER_SECOND 1000000
#define MILLISECONDS_PER_SECOND 1000
#define NANOSECONDS_PER_MILLISECOND (NANOSECONDS_PER_SECOND / MILLISECONDS_PER_SECOND)
#define MICROSECONDS_PER_MILLISECOND (MICROSECONDS_PER_SECOND / MILLISECONDS_PER_SECOND)
#if defined(_WIN32)
Sleep(TimeoutMS);
#elif defined(__TANDEM)
sleep_sec = TimeoutMS / MILLISECONDS_PER_SECOND;
sleep_usec = (TimeoutMS % MILLISECONDS_PER_SECOND) * MICROSECONDS_PER_MILLISECOND;
if (sleep_usec > 0)
{
usleep(sleep_usec);
}
if (sleep_sec > 0)
{
sleep(sleep_sec);
}
#else
ivl.tv_sec = TimeoutMS / MILLISECONDS_PER_SECOND;
ivl.tv_nsec = (TimeoutMS % MILLISECONDS_PER_SECOND) * NANOSECONDS_PER_MILLISECOND;
nanosleep(&ivl, NULL);
#endif
rc = 1;
}
return (rc);
}
int
lbmmon_transport_udp_src_finish(void * TransportClientData)
{
lbmmon_transport_udp_src_t * src;
if (TransportClientData == NULL)
{
strncpy(ErrorString, "Invalid parameter", sizeof(ErrorString));
return (-1);
}
src = (lbmmon_transport_udp_src_t *) TransportClientData;
#ifdef _WIN32
closesocket(src->mSocket);
#else
close(src->mSocket);
#endif
/* Clean up our data */
free(TransportClientData);
return (0);
}
int
lbmmon_transport_udp_rcv_finish(void * TransportClientData)
{
lbmmon_transport_udp_rcv_t * rcv = (lbmmon_transport_udp_rcv_t *) TransportClientData;
lbmmon_transport_udp_rcv_node_t * node;
lbmmon_transport_udp_rcv_node_t * next;
/* Stop the thread to prevent any more incoming messages */
rcv->mTerminateThread = 1;
/* Lock the receiver */
lock_receiver(rcv);
/* Clean out the queue */
node = rcv->mHead;
while (node != NULL)
{
/* Let LBM know we're done with the message */
free(node->mMessage);
next = node->mNext;
free(node);
node = next;
}
unlock_receiver(rcv);
/* If multicast, drop membership. */
if (rcv->mMode == MODE_MULTICAST)
{
(void) setsockopt(rcv->mSocket, IPPROTO_IP, IP_DROP_MEMBERSHIP, (void *) &(rcv->mMulticastMembership), sizeof(rcv->mMulticastMembership));
}
#ifdef _WIN32
closesocket(rcv->mSocket);
#else
close(rcv->mSocket);
#endif
#ifdef _WIN32
DeleteCriticalSection(&(rcv->mLock));
#else
pthread_mutex_destroy(&(rcv->mLock));
#endif
free(TransportClientData);
return (0);
}
void
lock_receiver(lbmmon_transport_udp_rcv_t * Receiver)
{
#ifdef _WIN32
EnterCriticalSection(&(Receiver->mLock));
#else
pthread_mutex_lock(&(Receiver->mLock));
#endif
}
void
unlock_receiver(lbmmon_transport_udp_rcv_t * Receiver)
{
#ifdef _WIN32
LeaveCriticalSection(&(Receiver->mLock));
#else
pthread_mutex_unlock(&(Receiver->mLock));
#endif
}
const char *
lbmmon_transport_udp_errmsg(void)
{
return (ErrorString);
}