#ifdef __VOS__
#define _POSIX_C_SOURCE 200112L
#include <sys/time.h>
#endif
#include <stdio.h>
#include <time.h>
#include <string.h>
#include <stdlib.h>
#include <limits.h>
#include <errno.h>
#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
#define strcasecmp stricmp
#define snprintf _snprintf
typedef int ssize_t;
#else
#include "config.h"
#include <unistd.h>
#if defined(__TANDEM)
#if defined(HAVE_TANDEM_SPT)
#include <ktdmtyp.h>
#include <spthread.h>
#else
#include <pthread.h>
#endif
#else
#include <pthread.h>
#endif
#include <strings.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#endif
#if defined(__VMS)
typedef int socklen_t;
#endif
#include <lbm/lbmmon.h>
#include <lbm/lbmmontrudp.h>
#ifdef _WIN32
#define LBMMON_INVALID_HANDLE INVALID_SOCKET
#define LBMMON_SOCKET_ERROR SOCKET_ERROR
#else
#define LBMMON_INVALID_HANDLE -1
#define LBMMON_SOCKET_ERROR -1
#endif
#ifndef INADDR_NONE
#define INADDR_NONE ((in_addr_t) 0xffffffff)
#endif
static const lbmmon_transport_func_t LBMMON_TRANSPORT_UDP =
{
lbmmon_transport_udp_initsrc,
lbmmon_transport_udp_initrcv,
lbmmon_transport_udp_send,
lbmmon_transport_udp_receive,
lbmmon_transport_udp_src_finish,
lbmmon_transport_udp_rcv_finish,
lbmmon_transport_udp_errmsg
};
typedef struct
{
#ifdef _WIN32
SOCKET mSocket;
#else
int mSocket;
#endif
struct sockaddr_in mPeer;
unsigned char mMode;
} lbmmon_transport_udp_src_t;
#define MODE_UNICAST 0
#define MODE_BROADCAST 1
#define MODE_MULTICAST 2
struct lbmmon_transport_udp_rcv_node_t_stct
{
unsigned char * mMessage;
size_t mMessageLength;
size_t mUsedBytes;
struct lbmmon_transport_udp_rcv_node_t_stct * mNext;
};
typedef struct lbmmon_transport_udp_rcv_node_t_stct lbmmon_transport_udp_rcv_node_t;
typedef struct
{
#ifdef _WIN32
CRITICAL_SECTION mLock;
#else
pthread_mutex_t mLock;
#endif
#ifdef _WIN32
SOCKET mSocket;
#else
int mSocket;
#endif
struct sockaddr_in mPeer;
struct sockaddr_in mInterface;
struct ip_mreq mMulticastMembership;
unsigned char mMode;
lbmmon_transport_udp_rcv_node_t * mHead;
lbmmon_transport_udp_rcv_node_t * mTail;
#ifdef _WIN32
HANDLE mThread;
#else
pthread_t mThread;
#endif
unsigned char mTerminateThread;
} lbmmon_transport_udp_rcv_t;
#define DEFAULT_INTERFACE "0.0.0.0"
#define DEFAULT_PORT "2933"
#define DEFAULT_TTL "16"
#define LBMMONTRUDP_ERR_INVALID_OPTION 1
#define LBMMONTRUDP_ERR_SOCKET 2
#define LBMMONTRUDP_ERR_SEND 3
#define LBMMONTRUDP_ERR_THREAD 4
static void lock_receiver(lbmmon_transport_udp_rcv_t * Receiver);
static void unlock_receiver(lbmmon_transport_udp_rcv_t * Receiver);
#ifdef _WIN32
static DWORD WINAPI receive_thread_proc(void * Arg);
#else
static void * receive_thread_proc(void * Arg);
#endif
static char ErrorString[1024];
static const char *
last_socket_error(void)
{
static char message[512];
#ifdef _WIN32
snprintf(message, sizeof(message), "error %d", WSAGetLastError());
#else
snprintf(message,
sizeof(message),
"error %d, %s",
errno,
strerror(errno));
#endif
return (message);
}
const lbmmon_transport_func_t *
lbmmon_transport_udp_module(void)
{
return (&LBMMON_TRANSPORT_UDP);
}
int
lbmmon_transport_udp_initsrc(void * * TransportClientData, const void * TransportOptions)
{
lbmmon_transport_udp_src_t * data;
int rc;
const char * ptr = (const char *) TransportOptions;
char key[512];
char value[512];
char address[512];
char port[512];
char interface[512];
char mcgroup[512];
char bcaddress[512];
char ttl[512];
unsigned long port_value;
struct in_addr multicast_group;
struct in_addr multicast_interface;
struct in_addr broadcast_address;
struct in_addr host_address;
unsigned long ttl_value = 0;
memset(ErrorString, 0, sizeof(ErrorString));
data = malloc(sizeof(lbmmon_transport_udp_src_t));
multicast_group.s_addr = 0;
multicast_interface.s_addr = 0;
broadcast_address.s_addr = 0;
host_address.s_addr = 0;
memset(address, 0, sizeof(address));
memset(port, 0, sizeof(port));
strcpy(port, DEFAULT_PORT);
memset(interface, 0, sizeof(interface));
strcpy(interface, DEFAULT_INTERFACE);
memset(mcgroup, 0, sizeof(mcgroup));
memset(bcaddress, 0, sizeof(bcaddress));
memset(ttl, 0, sizeof(ttl));
strcpy(ttl, DEFAULT_TTL);
data->mMode = MODE_UNICAST;
while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
{
if (strcasecmp(key, "address") == 0)
{
strncpy(address, value, sizeof(address));
}
else if (strcasecmp(key, "port") == 0)
{
strncpy(port, value, sizeof(port));
}
else if (strcasecmp(key, "interface") == 0)
{
strncpy(interface, value, sizeof(interface));
}
else if (strcasecmp(key, "mcgroup") == 0)
{
strncpy(mcgroup, value, sizeof(mcgroup));
}
else if (strcasecmp(key, "bcaddress") == 0)
{
strncpy(bcaddress, value, sizeof(bcaddress));
}
else if (strcasecmp(key, "ttl") == 0)
{
strncpy(ttl, value, sizeof(ttl));
}
}
port_value = strtoul(port, NULL, 0);
if ((port_value == ULONG_MAX) && (errno == ERANGE))
{
strncpy(ErrorString, "Invalid port value", sizeof(ErrorString));
free(data);
return (LBMMONTRUDP_ERR_INVALID_OPTION);
}
else if (port_value > USHRT_MAX)
{
strncpy(ErrorString, "Invalid port value", sizeof(ErrorString));
free(data);
return (LBMMONTRUDP_ERR_INVALID_OPTION);
}
if (mcgroup[0] != '\0')
{
data->mMode = MODE_MULTICAST;
multicast_group.s_addr = inet_addr(mcgroup);
if (multicast_group.s_addr == INADDR_NONE)
{
strncpy(ErrorString, "Invalid mcgroup value", sizeof(ErrorString));
free(data);
return (LBMMONTRUDP_ERR_INVALID_OPTION);
}
if (!IN_MULTICAST(ntohl(multicast_group.s_addr)))
{
strncpy(ErrorString, "Invalid mcgroup value", sizeof(ErrorString));
free(data);
return (LBMMONTRUDP_ERR_INVALID_OPTION);
}
multicast_interface.s_addr = inet_addr(interface);
if (multicast_interface.s_addr == INADDR_NONE)
{
strncpy(ErrorString, "Invalid interface value", sizeof(ErrorString));
free(data);
return (LBMMONTRUDP_ERR_INVALID_OPTION);
}
ttl_value = strtoul(ttl, NULL, 0);
if ((ttl_value == ULONG_MAX) && (errno == ERANGE))
{
strncpy(ErrorString, "Invalid ttl value", sizeof(ErrorString));
free(data);
return (LBMMONTRUDP_ERR_INVALID_OPTION);
}
else if (ttl_value > UCHAR_MAX)
{
strncpy(ErrorString, "Invalid ttl value", sizeof(ErrorString));
free(data);
return (LBMMONTRUDP_ERR_INVALID_OPTION);
}
}
else if (bcaddress[0] != '\0')
{
data->mMode = MODE_BROADCAST;
broadcast_address.s_addr = inet_addr(bcaddress);
if (broadcast_address.s_addr == INADDR_NONE)
{
strncpy(ErrorString, "Invalid bcaddress value", sizeof(ErrorString));
free(data);
return (LBMMONTRUDP_ERR_INVALID_OPTION);
}
}
else
{
host_address.s_addr = inet_addr(address);
if (host_address.s_addr == INADDR_NONE)
{
strncpy(ErrorString, "Invalid address value", sizeof(ErrorString));
free(data);
return (LBMMONTRUDP_ERR_INVALID_OPTION);
}
}
data->mSocket = socket(PF_INET, SOCK_DGRAM, 0);
if (data->mSocket == LBMMON_INVALID_HANDLE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"socket() failed, %s",
last_socket_error());
free(data);
return (LBMMONTRUDP_ERR_SOCKET);
}
if (data->mMode == MODE_BROADCAST)
{
int option = 1;
socklen_t len = sizeof(option);
rc = setsockopt(data->mSocket, SOL_SOCKET, SO_BROADCAST, (void *) &option, len);
if (rc == LBMMON_SOCKET_ERROR)
{
snprintf(ErrorString,
sizeof(ErrorString),
"setsockopt(...,SO_BROADCAST,...) failed, %s",
last_socket_error());
#ifdef _WIN32
closesocket(data->mSocket);
#else
close(data->mSocket);
#endif
free(data);
return (LBMMONTRUDP_ERR_SOCKET);
}
}
if (data->mMode == MODE_MULTICAST)
{
unsigned char optval = (unsigned char) ttl_value;
struct in_addr ifc_addr;
rc = setsockopt(data->mSocket, IPPROTO_IP, IP_MULTICAST_TTL, (void *) &optval, sizeof(optval));
if (rc != LBMMON_SOCKET_ERROR)
{
ifc_addr.s_addr = multicast_interface.s_addr;
rc = setsockopt(data->mSocket, IPPROTO_IP, IP_MULTICAST_IF, (void *) &ifc_addr, sizeof(ifc_addr));
if (rc == LBMMON_SOCKET_ERROR)
{
snprintf(ErrorString,
sizeof(ErrorString),
"setsockopt(...,IP_MULTICAST_IF,...) failed, %s",
last_socket_error());
}
}
else
{
snprintf(ErrorString,
sizeof(ErrorString),
"setsockopt(...,IP_MULTICAST_TTL,...) failed, %s",
last_socket_error());
}
if (rc == LBMMON_SOCKET_ERROR)
{
#ifdef _WIN32
closesocket(data->mSocket);
#else
close(data->mSocket);
#endif
free(data);
return (LBMMONTRUDP_ERR_SOCKET);
}
}
data->mPeer.sin_family = AF_INET;
data->mPeer.sin_port = htons((unsigned short) port_value);
switch (data->mMode)
{
case MODE_UNICAST:
default:
data->mPeer.sin_addr.s_addr = host_address.s_addr;
break;
case MODE_BROADCAST:
data->mPeer.sin_addr.s_addr = broadcast_address.s_addr;
break;
case MODE_MULTICAST:
data->mPeer.sin_addr.s_addr = multicast_group.s_addr;
break;
}
*TransportClientData = data;
return (0);
}
int
lbmmon_transport_udp_initrcv(void * * TransportClientData, const void * TransportOptions)
{
lbmmon_transport_udp_rcv_t * data;
int rc;
const char * ptr = (const char *) TransportOptions;
char key[512];
char value[512];
char port[512];
char interface[512];
char mcgroup[512];
unsigned long port_value;
struct in_addr multicast_group;
struct in_addr multicast_interface;
memset(ErrorString, 0, sizeof(ErrorString));
data = malloc(sizeof(lbmmon_transport_udp_rcv_t));
multicast_group.s_addr = 0;
multicast_interface.s_addr = 0;
data->mHead = NULL;
data->mTail = NULL;
data->mTerminateThread = 0;
memset(port, 0, sizeof(port));
strcpy(port, DEFAULT_PORT);
memset(interface, 0, sizeof(interface));
strcpy(interface, DEFAULT_INTERFACE);
memset(mcgroup, 0, sizeof(mcgroup));
data->mMode = MODE_UNICAST;
while ((ptr = lbmmon_next_key_value_pair(ptr, key, sizeof(key), value, sizeof(value))) != NULL)
{
if (strcasecmp(key, "port") == 0)
{
strncpy(port, value, sizeof(port));
}
else if (strcasecmp(key, "interface") == 0)
{
strncpy(interface, value, sizeof(interface));
}
else if (strcasecmp(key, "mcgroup") == 0)
{
strncpy(mcgroup, value, sizeof(mcgroup));
}
}
port_value = strtoul(port, NULL, 0);
if ((port_value == ULONG_MAX) && (errno == ERANGE))
{
strncpy(ErrorString, "Invalid port value", sizeof(ErrorString));
free(data);
return (LBMMONTRUDP_ERR_INVALID_OPTION);
}
else if (port_value > USHRT_MAX)
{
strncpy(ErrorString, "Invalid port value", sizeof(ErrorString));
free(data);
return (LBMMONTRUDP_ERR_INVALID_OPTION);
}
if (mcgroup[0] != '\0')
{
data->mMode = MODE_MULTICAST;
multicast_group.s_addr = inet_addr(mcgroup);
if (multicast_group.s_addr == INADDR_NONE)
{
strncpy(ErrorString, "Invalid mcgroup value", sizeof(ErrorString));
free(data);
return (LBMMONTRUDP_ERR_INVALID_OPTION);
}
if (!IN_MULTICAST(ntohl(multicast_group.s_addr)))
{
strncpy(ErrorString, "Invalid mcgroup value", sizeof(ErrorString));
free(data);
return (LBMMONTRUDP_ERR_INVALID_OPTION);
}
multicast_interface.s_addr = inet_addr(interface);
if (multicast_interface.s_addr == INADDR_NONE)
{
strncpy(ErrorString, "Invalid interface value", sizeof(ErrorString));
free(data);
return (LBMMONTRUDP_ERR_INVALID_OPTION);
}
}
data->mSocket = socket(PF_INET, SOCK_DGRAM, 0);
if (data->mSocket == LBMMON_INVALID_HANDLE)
{
snprintf(ErrorString,
sizeof(ErrorString),
"socket() failed, %s",
last_socket_error());
free(data);
return (LBMMONTRUDP_ERR_SOCKET);
}
memset(&(data->mInterface), 0, sizeof(data->mInterface));
data->mInterface.sin_family = AF_INET;
data->mInterface.sin_port = htons((unsigned short) port_value);
data->mInterface.sin_addr.s_addr = INADDR_ANY;
rc = bind(data->mSocket, (struct sockaddr *) &(data->mInterface), sizeof(data->mInterface));
if (rc == LBMMON_SOCKET_ERROR)
{
snprintf(ErrorString,
sizeof(ErrorString),
"bind() failed, %s",
last_socket_error());
#ifdef _WIN32
closesocket(data->mSocket);
#else
close(data->mSocket);
#endif
return (LBMMONTRUDP_ERR_SOCKET);
}
if (data->mMode == MODE_MULTICAST)
{
data->mMulticastMembership.imr_interface.s_addr = multicast_interface.s_addr;
data->mMulticastMembership.imr_multiaddr.s_addr = multicast_group.s_addr;
rc = setsockopt(data->mSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (void *) &(data->mMulticastMembership), sizeof(data->mMulticastMembership));
if (rc == LBMMON_SOCKET_ERROR)
{
snprintf(ErrorString,
sizeof(ErrorString),
"setsockopt(...,IP_ADD_MEMBERSHIP,...) failed, %s",
last_socket_error());
#ifdef _WIN32
closesocket(data->mSocket);
#else
close(data->mSocket);
#endif
free(data);
return (LBMMONTRUDP_ERR_SOCKET);
}
}
data->mPeer.sin_family = AF_INET;
data->mPeer.sin_port = htons((unsigned short) port_value);
data->mPeer.sin_addr.s_addr = INADDR_ANY;
#ifdef _WIN32
InitializeCriticalSection(&(data->mLock));
#else
pthread_mutex_init(&(data->mLock), NULL);
#endif
#ifdef _WIN32
data->mThread = CreateThread(NULL, 0, receive_thread_proc, data, 0, NULL);
if (data->mThread == NULL)
{
snprintf(ErrorString,
sizeof(ErrorString),
"CreateThread() failed, error %d",
GetLastError());
closesocket(data->mSocket);
free(data);
return (LBMMONTRUDP_ERR_THREAD);
}
#else
#ifdef __VOS__
{
pthread_attr_t pth_attr;
pthread_attr_init (&pth_attr);
pthread_attr_setschedpolicy (&pth_attr, SCHED_RR);
rc = pthread_create(&(data->mThread), &pth_attr, receive_thread_proc, data);
}
#else
rc = pthread_create(&(data->mThread), NULL, receive_thread_proc, data);
#endif
if (rc != 0)
{
snprintf(ErrorString,
sizeof(ErrorString),
"pthread_create() failed, error %d, %s",
rc,
strerror(rc));
close(data->mSocket);
free(data);
return (LBMMONTRUDP_ERR_THREAD);
}
#endif
*TransportClientData = data;
return (0);
}
#ifdef _WIN32
DWORD WINAPI
receive_thread_proc(void * Arg)
#else
void *
receive_thread_proc(void * Arg)
#endif
{
lbmmon_transport_udp_rcv_t * rcv = (lbmmon_transport_udp_rcv_t *) Arg;
unsigned char buffer[8192];
struct timeval timeout;
fd_set readfds;
int rc;
ssize_t bytes_read;
lbmmon_transport_udp_rcv_node_t * node;
while (rcv->mTerminateThread == 0)
{
FD_ZERO(&readfds);
FD_SET(rcv->mSocket, &readfds);
timeout.tv_sec = 0;
timeout.tv_usec = 500000;
rc = select(rcv->mSocket + 1, &readfds, NULL, NULL, &timeout);
if (rc <= 0)
{
continue;
}
bytes_read = recvfrom(rcv->mSocket, buffer, sizeof(buffer), 0, NULL, NULL);
if (bytes_read == LBMMON_SOCKET_ERROR)
{
continue;
}
lock_receiver(rcv);
node = malloc(sizeof(lbmmon_transport_udp_rcv_node_t));
node->mMessage = malloc((size_t) bytes_read);
memcpy(node->mMessage, buffer, (size_t) bytes_read);
node->mMessageLength = (size_t) bytes_read;
node->mUsedBytes = 0;
node->mNext = NULL;
if (rcv->mTail != NULL)
{
rcv->mTail->mNext = node;
}
else
{
rcv->mHead = node;
}
rcv->mTail = node;
unlock_receiver(rcv);
}
#ifdef _WIN32
return (0);
#else
return (NULL);
#endif
}
int
lbmmon_transport_udp_send(const char * Data, size_t Length, void * TransportClientData)
{
lbmmon_transport_udp_src_t * src;
int rc;
if ((Data == NULL) || (TransportClientData == NULL))
{
return (-1);
}
src = (lbmmon_transport_udp_src_t *) TransportClientData;
rc = sendto(src->mSocket, Data, Length, 0, (struct sockaddr *) &(src->mPeer), sizeof(src->mPeer));
if (rc == LBMMON_SOCKET_ERROR)
{
snprintf(ErrorString,
sizeof(ErrorString),
"sendto() failed, %s",
last_socket_error());
return (LBMMONTRUDP_ERR_SEND);
}
return (0);
}
int
lbmmon_transport_udp_receive(char * Data, size_t * Length, unsigned int TimeoutMS, void * TransportClientData)
{
lbmmon_transport_udp_rcv_t * rcv = (lbmmon_transport_udp_rcv_t *) TransportClientData;
lbmmon_transport_udp_rcv_node_t * node;
int rc = 0;
size_t length_remaining;
#if defined(_WIN32)
#elif defined(__TANDEM)
unsigned int sleep_sec;
unsigned int sleep_usec;
#else
struct timespec ivl;
#endif
if ((Data == NULL) || (Length == NULL) || (TransportClientData == NULL))
{
return (-1);
}
if (*Length == 0)
{
return (0);
}
lock_receiver(rcv);
if (rcv->mHead != NULL)
{
node = rcv->mHead;
length_remaining = node->mMessageLength - node->mUsedBytes;
if (*Length >= length_remaining)
{
memcpy(Data, node->mMessage + node->mUsedBytes, length_remaining);
*Length = length_remaining;
rc = 0;
free(node->mMessage);
node->mMessage = NULL;
rcv->mHead = node->mNext;
if (rcv->mHead == NULL)
{
rcv->mTail = NULL;
}
free(node);
}
else
{
memcpy(Data, node->mMessage + node->mUsedBytes, *Length);
node->mUsedBytes -= *Length;
rc = 0;
}
unlock_receiver(rcv);
}
else
{
unlock_receiver(rcv);
#define NANOSECONDS_PER_SECOND 1000000000
#define MICROSECONDS_PER_SECOND 1000000
#define MILLISECONDS_PER_SECOND 1000
#define NANOSECONDS_PER_MILLISECOND (NANOSECONDS_PER_SECOND / MILLISECONDS_PER_SECOND)
#define MICROSECONDS_PER_MILLISECOND (MICROSECONDS_PER_SECOND / MILLISECONDS_PER_SECOND)
#if defined(_WIN32)
Sleep(TimeoutMS);
#elif defined(__TANDEM)
sleep_sec = TimeoutMS / MILLISECONDS_PER_SECOND;
sleep_usec = (TimeoutMS % MILLISECONDS_PER_SECOND) * MICROSECONDS_PER_MILLISECOND;
if (sleep_usec > 0)
{
usleep(sleep_usec);
}
if (sleep_sec > 0)
{
sleep(sleep_sec);
}
#else
ivl.tv_sec = TimeoutMS / MILLISECONDS_PER_SECOND;
ivl.tv_nsec = (TimeoutMS % MILLISECONDS_PER_SECOND) * NANOSECONDS_PER_MILLISECOND;
nanosleep(&ivl, NULL);
#endif
rc = 1;
}
return (rc);
}
int
lbmmon_transport_udp_src_finish(void * TransportClientData)
{
lbmmon_transport_udp_src_t * src;
if (TransportClientData == NULL)
{
strncpy(ErrorString, "Invalid parameter", sizeof(ErrorString));
return (-1);
}
src = (lbmmon_transport_udp_src_t *) TransportClientData;
#ifdef _WIN32
closesocket(src->mSocket);
#else
close(src->mSocket);
#endif
free(TransportClientData);
return (0);
}
int
lbmmon_transport_udp_rcv_finish(void * TransportClientData)
{
lbmmon_transport_udp_rcv_t * rcv = (lbmmon_transport_udp_rcv_t *) TransportClientData;
lbmmon_transport_udp_rcv_node_t * node;
lbmmon_transport_udp_rcv_node_t * next;
int rc;
rcv->mTerminateThread = 1;
lock_receiver(rcv);
node = rcv->mHead;
while (node != NULL)
{
free(node->mMessage);
next = node->mNext;
free(node);
node = next;
}
unlock_receiver(rcv);
if (rcv->mMode == MODE_MULTICAST)
{
rc = setsockopt(rcv->mSocket, IPPROTO_IP, IP_DROP_MEMBERSHIP, (void *) &(rcv->mMulticastMembership), sizeof(rcv->mMulticastMembership));
}
#ifdef _WIN32
closesocket(rcv->mSocket);
#else
close(rcv->mSocket);
#endif
#ifdef _WIN32
DeleteCriticalSection(&(rcv->mLock));
#else
pthread_mutex_destroy(&(rcv->mLock));
#endif
free(TransportClientData);
return (0);
}
void
lock_receiver(lbmmon_transport_udp_rcv_t * Receiver)
{
#ifdef _WIN32
EnterCriticalSection(&(Receiver->mLock));
#else
pthread_mutex_lock(&(Receiver->mLock));
#endif
}
void
unlock_receiver(lbmmon_transport_udp_rcv_t * Receiver)
{
#ifdef _WIN32
LeaveCriticalSection(&(Receiver->mLock));
#else
pthread_mutex_unlock(&(Receiver->mLock));
#endif
}
const char *
lbmmon_transport_udp_errmsg(void)
{
return (ErrorString);
}