umeblocksrc.h

Go to the documentation of this file.
00001 /**     \file umeblocksrc.h
00002         \brief UME Blocking API
00003 
00004         The Ultra Messaging Enterprise (UME) API Description.
00005 
00006   Copyright (c) 2005-2014 Informatica Corporation  Permission is granted to licensees to use
00007   or alter this software for any purpose, including commercial applications,
00008   according to the terms laid out in the Software License Agreement.
00009 
00010   This source code example is provided by Informatica for educational
00011   and evaluation purposes only.
00012 
00013   THE SOFTWARE IS PROVIDED "AS IS" AND INFORMATICA DISCLAIMS ALL WARRANTIES
00014   EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION, ANY IMPLIED WARRANTIES OF
00015   NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A PARTICULAR
00016   PURPOSE.  INFORMATICA DOES NOT WARRANT THAT USE OF THE SOFTWARE WILL BE
00017   UNINTERRUPTED OR ERROR-FREE.  INFORMATICA SHALL NOT, UNDER ANY CIRCUMSTANCES, BE
00018   LIABLE TO LICENSEE FOR LOST PROFITS, CONSEQUENTIAL, INCIDENTAL, SPECIAL OR
00019   INDIRECT DAMAGES ARISING OUT OF OR RELATED TO THIS AGREEMENT OR THE
00020   TRANSACTIONS CONTEMPLATED HEREUNDER, EVEN IF INFORMATICA HAS BEEN APPRISED OF
00021   THE LIKELIHOOD OF SUCH DAMAGES.
00022 
00023 */
00024 #ifndef LBMAUX_H
00025 #define LBMAUX_H
00026 
00027 #if defined(_WIN32)
00028         #define SLEEP_SEC(x) Sleep((x)*1000)
00029         #define SLEEP_MSEC(x) Sleep(x)
00030 #else
00031 #   define SLEEP_SEC(x) sleep(x)
00032 #   define SLEEP_MSEC(x) \
00033                 do{ \
00034                         if ((x) >= 1000){ \
00035                                 sleep((x) / 1000); \
00036                                 usleep((x) % 1000 * 1000); \
00037                         } \
00038                         else{ \
00039                                 usleep((x)*1000); \
00040                         } \
00041                 }while (0)
00042 #endif /* _WIN32 */
00043 
00044 #define UME_BLOCK_DEBUG 0
00045 #define UME_BLOCK_DEBUG_PRINT(t, ...) do { \
00046         if(UME_BLOCK_DEBUG) { \
00047                 fprintf(stderr, t, ##__VA_ARGS__); \
00048                 fprintf(stderr, "\n"); \
00049         } \
00050 } while(0)
00051 
00052 #define UME_BLOCK_PRINT_ERROR(t, ...) do { \
00053         fprintf(stderr, t, ##__VA_ARGS__); \
00054         fprintf(stderr, "\n"); \
00055         } while(0)
00056         
00057 
00058 /* Semaphore Helper macros */
00059 #if defined(_WIN32) /* Win32*/
00060         #define UME_BLOCKING_TYPE "Win32 Semaphore"
00061         typedef struct ume_sem_t_stct
00062         {
00063                 HANDLE hSem;
00064                 int max;
00065                 int state;
00066         } ume_sem_t;
00067 
00068         #define UME_SEM_INIT(sem,len,ret) \
00069                 do { \
00070                         ret=0; \
00071                         sem.hSem  = CreateSemaphore(NULL, len, len, NULL); \
00072                         sem.max   = len; \
00073                         sem.state = len; \
00074                         if(!sem.hSem) { ret = -1; } \
00075                 } while(0)
00076 
00077         #define UME_SEM_DESTROY(sem)    CloseHandle(sem.hSem)
00078         #define UME_SEM_POST(sem) \
00079                 do { \
00080                         ReleaseSemaphore(sem.hSem, 1, NULL); \
00081                         InterlockedIncrement(&(sem.state)); \
00082                 } while(0)
00083                 
00084         #define UME_SEM_WAIT(sem) \
00085         do { \
00086                 WaitForSingleObject(sem.hSem, INFINITE); \
00087                 InterlockedDecrement(&(sem.state)); \
00088         } while(0)
00089 
00090         #define UME_SEM_GETVALUE(sem,val)            val=sem.state;
00091 
00092         #define UME_SEM_TIMEDWAIT(sem,time,ret) \
00093                 do { \
00094                         ret=WaitForSingleObject(sem.hSem, time); \
00095                         if(ret!=WAIT_TIMEOUT && ret==WAIT_OBJECT_0) { InterlockedDecrement(&(sem.state)); } \
00096                 } while(0)
00097                 
00098         #define UME_SEM_TIMEDOUT(v)           v == WAIT_TIMEOUT
00099         #define UME_SEM_TIMEDOK(v)            v == WAIT_OBJECT_0
00100 
00101 /* #elif defined(__linux) */ /* POSIX Linux Systems */
00102 #elif defined(__linux)   /* POSIX Linux Systems */
00103         #define UME_BLOCKING_TYPE "Posix Semaphore"
00104         typedef sem_t ume_sem_t;
00105 
00106         #define UME_SEM_INIT(sem, len, ret)   ret = sem_init(&sem, 0, 0)
00107         #define UME_SEM_POST(sem)             sem_post(&sem)
00108         #define UME_SEM_WAIT(sem) \
00109                 do { \
00110                         while(sem_wait(&sem) < 0) {} \
00111                 } while(0)
00112         
00113         #define UME_SEM_GETVALUE(sem, value)     sem_getvalue(&(sem), &(value))
00114         #define UME_SEM_DESTROY(sem)             sem_destroy(&sem)
00115 
00116         /* Trick: Both timespec and timeval are 16 bytes long, use timespec for
00117            gettimeofday and multiply timespec.tv_nsec by 1000 to convert to ns. */
00118         #define UME_SEM_TIMEDWAIT(sem,mstime,ret) \
00119                 do { \
00120                         struct timespec ts; \
00121                         gettimeofday((struct timeval*) &ts, NULL); \
00122                         ts.tv_sec  += (mstime/1000); \
00123                         ts.tv_nsec  = (ts.tv_nsec*1000) + ((mstime%1000)*1000000); \
00124                         ret = sem_timedwait(&sem, &ts); \
00125                 } while(0)
00126 
00127         #define UME_SEM_TIMEDOUT(v)              v == ETIMEDOUT
00128         #define UME_SEM_TIMEDOK(v)               v == 0
00129 
00130 #else /* Unix type systems without sem_timedwait */
00131         #define UME_BLOCKING_TYPE "Posix pthread_cond"
00132         typedef struct ume_sem_t_stct
00133         {
00134                 pthread_mutex_t mtx;
00135                 pthread_cond_t cond;
00136                 int max;
00137                 int state;
00138         } ume_sem_t;
00139 
00140         /* TODO: Error out if unable to gain mutex init or cond init*/
00141         #define UME_SEM_INIT(sem,len,ret) \
00142                 do { \
00143                         pthread_mutex_init(&(sem.mtx), NULL); \
00144                         pthread_cond_init(&(sem.cond), NULL); \
00145                         sem.max = len; \
00146                         sem.state = len; \
00147                 } while(0)
00148 
00149         #define UME_SEM_POST(sem) \
00150                 do { \
00151                         pthread_mutex_lock(&(sem.mtx)); \
00152                         if(sem.state < sem.max) { sem.state++; } \
00153                         pthread_cond_broadcast(&(sem.cond)); \
00154                         pthread_mutex_unlock(&(sem.mtx)); \
00155                 } while(0) 
00156                 
00157         #define UME_SEM_WAIT(sem) \
00158                 do { \
00159                         pthread_mutex_lock(&(sem.mtx)); \
00160                         while(sem.state == 0) { \
00161                                 pthread_cond_wait(&(sem.cond), &(sem.mtx)); } \
00162                         sem.state--; \
00163                         pthread_mutex_unlock(&(sem.mtx)); \
00164                 } while(0)
00165 
00166         #define UME_SEM_GETVALUE(sem, value)  value = sem.state
00167         
00168         #define UME_SEM_DESTROY(sem) \
00169                 do { \
00170                         pthread_cond_destroy(&(sem.cond)); \
00171                         pthread_mutex_destroy(&(sem.mtx)); \
00172                 } while (0)
00173 
00174         /* Trick: Both timespec and timeval are 16 bytes long, use timespec for
00175            gettimeofday and multiply timespec.tv_nsec by 1000 to convert to ns. */
00176         #define UME_SEM_TIMEDWAIT(sem, mstime, ret) \
00177         do { \
00178                 struct timespec ts; \
00179                 gettimeofday((struct timeval*) &ts, NULL); \
00180                 ts.tv_sec  += (mstime/1000); \
00181                 ts.tv_nsec  = (ts.tv_nsec*1000) + ((mstime%1000)*1000000); \
00182                 pthread_mutex_lock(&(sem.mtx)); \
00183                 ret = pthread_cond_timedwait(&(sem.cond), &(sem.mtx), &ts); \
00184                 if(ret == 0) { sem.state--; } \
00185                 pthread_mutex_unlock(&(sem.mtx)); \
00186         } while (0)
00187 
00188         #define UME_SEM_TIMEDOUT(v)           0
00189         #define UME_SEM_TIMEDOK(v)            0
00190         #define UME_TIMESPEC_MSSET(t,s,n)     0
00191 #endif
00192 
00193 #define UME_MALLOC_RETURN(e,s,r) do { \
00194         if((e = malloc(s)) == NULL) { \
00195                 return r; \
00196         } \
00197 } while(0)
00198 
00199 #define UME_TIME_OUT 5000
00200 #define UME_RETRY_COUNT 10
00201 
00202 /*! \brief Structure that holds a bitmap for sequence numbers (opaque). */
00203 struct ume_block_bitmap_t_stct;
00204 typedef struct ume_block_bitmap_t_stct ume_block_bitmap_t;
00205 
00206 /*! \brief Structure used to designate an UME Block source. */
00207 typedef struct ume_block_src_t_stct
00208 {
00209         lbm_src_t *src;                      /* Pointer to the actual LBM Source */
00210         lbm_src_cb_proc appproc;             /* Callback function for event callbacks */
00211         ume_sem_t stablelock;                /* Locking mechanism for blocking */
00212         ume_block_bitmap_t *bitmap;          /* Stability check mechanism */
00213         void *clientd;                       /* Client provided object */
00214         int maxretentionsz;                  /* Maximum retention buffer size */
00215         /* int seq; */
00216         int err;                             /* Error holder */
00217         unsigned int last;                   /* Last sequence number */
00218         unsigned int first;                  /* First sequence number */
00219 } ume_block_src_t;
00220 
00221 
00222 /*! \brief Delete an UMEBlock Source object
00223     \param asrc Pointero to an UMEBlock Source object to delete.
00224     \return 0 for Success and -1 for Failure.
00225 */
00226 int ume_block_src_delete(ume_block_src_t *asrc);
00227 
00228 /*! \brief Create an UMEBlock Source that will send messages to a given topic.
00229     \param srcp A pointer to a pointer to a UMEBlock source object. Will be filled
00230                             in by this function to point to a newly created ume_block_src_t object.
00231                 \param ctx Pointer to the LBM context object associated with the sender.
00232                 \param topic Pointer to the LBM topic object associated with the destination
00233                             of messages sent by the source.
00234     \param tattr Pointer to an LBM topic attribute object. The passed object CANNOT be NULL.
00235                 \param proc Pointer to a function to call when events occur related to the source.
00236                 If NULL, then events are not delivered to the source.
00237     \param clientd Pointer to tclient data that is passed when \a proc is called.
00238                 \param evq Optional Event Queue to place events on when they occur.
00239                             If NULL causes \a proc to be called from context thread.
00240     \return 0 for Success and -1 for Failure.
00241 */
00242 int ume_block_src_create(ume_block_src_t **srcp, lbm_context_t *ctx, lbm_topic_t *topic, lbm_src_topic_attr_t *tattr, lbm_src_cb_proc proc, void *clientd, lbm_event_queue_t *evq);
00243 
00244 /*! \brief Extended send of a message to the topic associated with an UMBlock source.
00245     \param asrc Pointer to the UMBlock source to send from.
00246                 \param msg Pointer to the data to send in this message.
00247                 \param len Length (in bytes) of the data to send in this message.
00248                 \param info Pointer to lbm_src_send_ex_info_t options.
00249     \return 0 for Success and -1 for Failure.
00250 */
00251 int ume_block_src_send_ex(ume_block_src_t *asrc, const char *msg, size_t len, int flags, lbm_src_send_ex_info_t *info);
00252 #endif
00253 

Generated on Wed Jul 16 15:57:56 2014 for LBM API by  doxygen 1.4.7