umeblocksrc.h

Go to the documentation of this file.
00001 /**     \file umeblocksrc.h
00002         \brief UME Blocking API
00003 
00004         The Ultra Messaging Enterprise (UME) API Description.
00005 */
00006 #ifndef LBMAUX_H
00007 #define LBMAUX_H
00008 
00009 #if defined(_WIN32)
00010         #define SLEEP_SEC(x) Sleep((x)*1000)
00011         #define SLEEP_MSEC(x) Sleep(x)
00012 #else
00013 #   define SLEEP_SEC(x) sleep(x)
00014 #   define SLEEP_MSEC(x) \
00015                 do{ \
00016                         if ((x) >= 1000){ \
00017                                 sleep((x) / 1000); \
00018                                 usleep((x) % 1000 * 1000); \
00019                         } \
00020                         else{ \
00021                                 usleep((x)*1000); \
00022                         } \
00023                 }while (0)
00024 #endif /* _WIN32 */
00025 
00026 #define UME_BLOCK_DEBUG 0
00027 #define UME_BLOCK_DEBUG_PRINT(t, ...) do { \
00028         if(UME_BLOCK_DEBUG) { \
00029                 fprintf(stderr, t, ##__VA_ARGS__); \
00030                 fprintf(stderr, "\n"); \
00031         } \
00032 } while(0)
00033 
00034 #define UME_BLOCK_PRINT_ERROR(t, ...) do { \
00035         fprintf(stderr, t, ##__VA_ARGS__); \
00036         fprintf(stderr, "\n"); \
00037         } while(0)
00038         
00039 
00040 /* Semaphore Helper macros */
00041 #if defined(_WIN32) /* Win32*/
00042         #define UME_BLOCKING_TYPE "Win32 Semaphore"
00043         typedef struct ume_sem_t_stct
00044         {
00045                 HANDLE hSem;
00046                 int max;
00047                 int state;
00048         } ume_sem_t;
00049 
00050         #define UME_SEM_INIT(sem,len,ret) \
00051                 do { \
00052                         ret=0; \
00053                         sem.hSem  = CreateSemaphore(NULL, len, len, NULL); \
00054                         sem.max   = len; \
00055                         sem.state = len; \
00056                         if(!sem.hSem) { ret = -1; } \
00057                 } while(0)
00058 
00059         #define UME_SEM_DESTROY(sem)    CloseHandle(sem.hSem)
00060         #define UME_SEM_POST(sem) \
00061                 do { \
00062                         ReleaseSemaphore(sem.hSem, 1, NULL); \
00063                         InterlockedIncrement(&(sem.state)); \
00064                 } while(0)
00065                 
00066         #define UME_SEM_WAIT(sem) \
00067         do { \
00068                 WaitForSingleObject(sem.hSem, INFINITE); \
00069                 InterlockedDecrement(&(sem.state)); \
00070         } while(0)
00071 
00072         #define UME_SEM_GETVALUE(sem,val)            val=sem.state;
00073 
00074         #define UME_SEM_TIMEDWAIT(sem,time,ret) \
00075                 do { \
00076                         ret=WaitForSingleObject(sem.hSem, time); \
00077                         if(ret!=WAIT_TIMEOUT && ret==WAIT_OBJECT_0) { InterlockedDecrement(&(sem.state)); } \
00078                 } while(0)
00079                 
00080         #define UME_SEM_TIMEDOUT(v)           v == WAIT_TIMEOUT
00081         #define UME_SEM_TIMEDOK(v)            v == WAIT_OBJECT_0
00082 
00083 /* #elif defined(__linux) */ /* POSIX Linux Systems */
00084 #elif defined(__linux)   /* POSIX Linux Systems */
00085         #define UME_BLOCKING_TYPE "Posix Semaphore"
00086         typedef sem_t ume_sem_t;
00087 
00088         #define UME_SEM_INIT(sem, len, ret)   ret = sem_init(&sem, 0, 0)
00089         #define UME_SEM_POST(sem)             sem_post(&sem)
00090         #define UME_SEM_WAIT(sem) \
00091                 do { \
00092                         while(sem_wait(&sem) < 0) {} \
00093                 } while(0)
00094         
00095         #define UME_SEM_GETVALUE(sem, value)     sem_getvalue(&(sem), &(value))
00096         #define UME_SEM_DESTROY(sem)             sem_destroy(&sem)
00097 
00098         /* Trick: Both timespec and timeval are 16 bytes long, use timespec for
00099            gettimeofday and multiply timespec.tv_nsec by 1000 to convert to ns. */
00100         #define UME_SEM_TIMEDWAIT(sem,mstime,ret) \
00101                 do { \
00102                         struct timespec ts; \
00103                         gettimeofday((struct timeval*) &ts, NULL); \
00104                         ts.tv_sec  += (mstime/1000); \
00105                         ts.tv_nsec  = (ts.tv_nsec*1000) + ((mstime%1000)*1000000); \
00106                         ret = sem_timedwait(&sem, &ts); \
00107                 } while(0)
00108 
00109         #define UME_SEM_TIMEDOUT(v)              v == ETIMEDOUT
00110         #define UME_SEM_TIMEDOK(v)               v == 0
00111 
00112 #else /* Unix type systems without sem_timedwait */
00113         #define UME_BLOCKING_TYPE "Posix pthread_cond"
00114         typedef struct ume_sem_t_stct
00115         {
00116                 pthread_mutex_t mtx;
00117                 pthread_cond_t cond;
00118                 int max;
00119                 int state;
00120         } ume_sem_t;
00121 
00122         /* TODO: Error out if unable to gain mutex init or cond init*/
00123         #define UME_SEM_INIT(sem,len,ret) \
00124                 do { \
00125                         pthread_mutex_init(&(sem.mtx), NULL); \
00126                         pthread_cond_init(&(sem.cond), NULL); \
00127                         sem.max = len; \
00128                         sem.state = len; \
00129                 } while(0)
00130 
00131         #define UME_SEM_POST(sem) \
00132                 do { \
00133                         pthread_mutex_lock(&(sem.mtx)); \
00134                         if(sem.state < sem.max) { sem.state++; } \
00135                         pthread_cond_broadcast(&(sem.cond)); \
00136                         pthread_mutex_unlock(&(sem.mtx)); \
00137                 } while(0) 
00138                 
00139         #define UME_SEM_WAIT(sem) \
00140                 do { \
00141                         pthread_mutex_lock(&(sem.mtx)); \
00142                         while(sem.state == 0) { \
00143                                 pthread_cond_wait(&(sem.cond), &(sem.mtx)); } \
00144                         sem.state--; \
00145                         pthread_mutex_unlock(&(sem.mtx)); \
00146                 } while(0)
00147 
00148         #define UME_SEM_GETVALUE(sem, value)  value = sem.state
00149         
00150         #define UME_SEM_DESTROY(sem) \
00151                 do { \
00152                         pthread_cond_destroy(&(sem.cond)); \
00153                         pthread_mutex_destroy(&(sem.mtx)); \
00154                 } while (0)
00155 
00156         /* Trick: Both timespec and timeval are 16 bytes long, use timespec for
00157            gettimeofday and multiply timespec.tv_nsec by 1000 to convert to ns. */
00158         #define UME_SEM_TIMEDWAIT(sem, mstime, ret) \
00159         do { \
00160                 struct timespec ts; \
00161                 gettimeofday((struct timeval*) &ts, NULL); \
00162                 ts.tv_sec  += (mstime/1000); \
00163                 ts.tv_nsec  = (ts.tv_nsec*1000) + ((mstime%1000)*1000000); \
00164                 pthread_mutex_lock(&(sem.mtx)); \
00165                 ret = pthread_cond_timedwait(&(sem.cond), &(sem.mtx), &ts); \
00166                 if(ret == 0) { sem.state--; } \
00167                 pthread_mutex_unlock(&(sem.mtx)); \
00168         } while (0)
00169 
00170         #define UME_SEM_TIMEDOUT(v)           0
00171         #define UME_SEM_TIMEDOK(v)            0
00172         #define UME_TIMESPEC_MSSET(t,s,n)     0
00173 #endif
00174 
00175 #define UME_MALLOC_RETURN(e,s,r) do { \
00176         if((e = malloc(s)) == NULL) { \
00177                 return r; \
00178         } \
00179 } while(0)
00180 
00181 #define UME_TIME_OUT 5000
00182 #define UME_RETRY_COUNT 10
00183 
00184 /*! \brief Structure that holds a bitmap for sequence numbers (opaque). */
00185 struct ume_block_bitmap_t_stct;
00186 typedef struct ume_block_bitmap_t_stct ume_block_bitmap_t;
00187 
00188 /*! \brief Structure used to designate an UME Block source. */
00189 typedef struct ume_block_src_t_stct
00190 {
00191         lbm_src_t *src;                      /* Pointer to the actual LBM Source */
00192         lbm_src_cb_proc appproc;             /* Callback function for event callbacks */
00193         ume_sem_t stablelock;                /* Locking mechanism for blocking */
00194         ume_block_bitmap_t *bitmap;          /* Stability check mechanism */
00195         void *clientd;                       /* Client provided object */
00196         int maxretentionsz;                  /* Maximum retention buffer size */
00197         /* int seq; */
00198         int err;                             /* Error holder */
00199         unsigned int last;                   /* Last sequence number */
00200         unsigned int first;                  /* First sequence number */
00201 } ume_block_src_t;
00202 
00203 
00204 /*! \brief Delete an UMEBlock Source object
00205     \param asrc Pointero to an UMEBlock Source object to delete.
00206     \return 0 for Success and -1 for Failure.
00207 */
00208 int ume_block_src_delete(ume_block_src_t *asrc);
00209 
00210 /*! \brief Create an UMEBlock Source that will send messages to a given topic.
00211     \param srcp A pointer to a pointer to a UMEBlock source object. Will be filled
00212                             in by this function to point to a newly created ume_block_src_t object.
00213                 \param ctx Pointer to the LBM context object associated with the sender.
00214                 \param topic Pointer to the LBM topic object associated with the destination
00215                             of messages sent by the source.
00216     \param tattr Pointer to an LBM topic attribute object. The passed object CANNOT be NULL.
00217                 \param proc Pointer to a function to call when events occur related to the source.
00218                 If NULL, then events are not delivered to the source.
00219     \param clientd Pointer to tclient data that is passed when \a proc is called.
00220                 \param evq Optional Event Queue to place events on when they occur.
00221                             If NULL causes \a proc to be called from context thread.
00222     \return 0 for Success and -1 for Failure.
00223 */
00224 int ume_block_src_create(ume_block_src_t **srcp, lbm_context_t *ctx, lbm_topic_t *topic, lbm_src_topic_attr_t *tattr, lbm_src_cb_proc proc, void *clientd, lbm_event_queue_t *evq);
00225 
00226 /*! \brief Extended send of a message to the topic associated with an UMBlock source.
00227     \param asrc Pointer to the UMBlock source to send from.
00228                 \param msg Pointer to the data to send in this message.
00229                 \param len Length (in bytes) of the data to send in this message.
00230                 \param info Pointer to lbm_src_send_ex_info_t options.
00231     \return 0 for Success and -1 for Failure.
00232 */
00233 int ume_block_src_send_ex(ume_block_src_t *asrc, const char *msg, size_t len, int flags, lbm_src_send_ex_info_t *info);
00234 #endif
00235 

Generated on Thu Mar 6 13:11:09 2014 for LBM API by  doxygen 1.5.2