00001
00002
00003
00004
00005
00006 #ifndef LBMAUX_H
00007 #define LBMAUX_H
00008
00009 #if defined(_WIN32)
00010 #define SLEEP_SEC(x) Sleep((x)*1000)
00011 #define SLEEP_MSEC(x) Sleep(x)
00012 #else
00013 # define SLEEP_SEC(x) sleep(x)
00014 # define SLEEP_MSEC(x) \
00015 do{ \
00016 if ((x) >= 1000){ \
00017 sleep((x) / 1000); \
00018 usleep((x) % 1000 * 1000); \
00019 } \
00020 else{ \
00021 usleep((x)*1000); \
00022 } \
00023 }while (0)
00024 #endif
00025
00026 #define UME_BLOCK_DEBUG 0
00027 #define UME_BLOCK_DEBUG_PRINT(t, ...) do { \
00028 if(UME_BLOCK_DEBUG) { \
00029 fprintf(stderr, t, ##__VA_ARGS__); \
00030 fprintf(stderr, "\n"); \
00031 } \
00032 } while(0)
00033
00034 #define UME_BLOCK_PRINT_ERROR(t, ...) do { \
00035 fprintf(stderr, t, ##__VA_ARGS__); \
00036 fprintf(stderr, "\n"); \
00037 } while(0)
00038
00039
00040
00041 #if defined(_WIN32)
00042 #define UME_BLOCKING_TYPE "Win32 Semaphore"
00043 typedef struct ume_sem_t_stct
00044 {
00045 HANDLE hSem;
00046 int max;
00047 int state;
00048 } ume_sem_t;
00049
00050 #define UME_SEM_INIT(sem,len,ret) \
00051 do { \
00052 ret=0; \
00053 sem.hSem = CreateSemaphore(NULL, len, len, NULL); \
00054 sem.max = len; \
00055 sem.state = len; \
00056 if(!sem.hSem) { ret = -1; } \
00057 } while(0)
00058
00059 #define UME_SEM_DESTROY(sem) CloseHandle(sem.hSem)
00060 #define UME_SEM_POST(sem) \
00061 do { \
00062 ReleaseSemaphore(sem.hSem, 1, NULL); \
00063 InterlockedIncrement(&(sem.state)); \
00064 } while(0)
00065
00066 #define UME_SEM_WAIT(sem) \
00067 do { \
00068 WaitForSingleObject(sem.hSem, INFINITE); \
00069 InterlockedDecrement(&(sem.state)); \
00070 } while(0)
00071
00072 #define UME_SEM_GETVALUE(sem,val) val=sem.state;
00073
00074 #define UME_SEM_TIMEDWAIT(sem,time,ret) \
00075 do { \
00076 ret=WaitForSingleObject(sem.hSem, time); \
00077 if(ret!=WAIT_TIMEOUT && ret==WAIT_OBJECT_0) { InterlockedDecrement(&(sem.state)); } \
00078 } while(0)
00079
00080 #define UME_SEM_TIMEDOUT(v) v == WAIT_TIMEOUT
00081 #define UME_SEM_TIMEDOK(v) v == WAIT_OBJECT_0
00082
00083
00084 #elif defined(__linux)
00085 #define UME_BLOCKING_TYPE "Posix Semaphore"
00086 typedef sem_t ume_sem_t;
00087
00088 #define UME_SEM_INIT(sem, len, ret) ret = sem_init(&sem, 0, 0)
00089 #define UME_SEM_POST(sem) sem_post(&sem)
00090 #define UME_SEM_WAIT(sem) \
00091 do { \
00092 while(sem_wait(&sem) < 0) {} \
00093 } while(0)
00094
00095 #define UME_SEM_GETVALUE(sem, value) sem_getvalue(&(sem), &(value))
00096 #define UME_SEM_DESTROY(sem) sem_destroy(&sem)
00097
00098
00099
00100 #define UME_SEM_TIMEDWAIT(sem,mstime,ret) \
00101 do { \
00102 struct timespec ts; \
00103 gettimeofday((struct timeval*) &ts, NULL); \
00104 ts.tv_sec += (mstime/1000); \
00105 ts.tv_nsec = (ts.tv_nsec*1000) + ((mstime%1000)*1000000); \
00106 ret = sem_timedwait(&sem, &ts); \
00107 } while(0)
00108
00109 #define UME_SEM_TIMEDOUT(v) v == ETIMEDOUT
00110 #define UME_SEM_TIMEDOK(v) v == 0
00111
00112 #else
00113 #define UME_BLOCKING_TYPE "Posix pthread_cond"
00114 typedef struct ume_sem_t_stct
00115 {
00116 pthread_mutex_t mtx;
00117 pthread_cond_t cond;
00118 int max;
00119 int state;
00120 } ume_sem_t;
00121
00122
00123 #define UME_SEM_INIT(sem,len,ret) \
00124 do { \
00125 pthread_mutex_init(&(sem.mtx), NULL); \
00126 pthread_cond_init(&(sem.cond), NULL); \
00127 sem.max = len; \
00128 sem.state = len; \
00129 } while(0)
00130
00131 #define UME_SEM_POST(sem) \
00132 do { \
00133 pthread_mutex_lock(&(sem.mtx)); \
00134 if(sem.state < sem.max) { sem.state++; } \
00135 pthread_cond_broadcast(&(sem.cond)); \
00136 pthread_mutex_unlock(&(sem.mtx)); \
00137 } while(0)
00138
00139 #define UME_SEM_WAIT(sem) \
00140 do { \
00141 pthread_mutex_lock(&(sem.mtx)); \
00142 while(sem.state == 0) { \
00143 pthread_cond_wait(&(sem.cond), &(sem.mtx)); } \
00144 sem.state--; \
00145 pthread_mutex_unlock(&(sem.mtx)); \
00146 } while(0)
00147
00148 #define UME_SEM_GETVALUE(sem, value) value = sem.state
00149
00150 #define UME_SEM_DESTROY(sem) \
00151 do { \
00152 pthread_cond_destroy(&(sem.cond)); \
00153 pthread_mutex_destroy(&(sem.mtx)); \
00154 } while (0)
00155
00156
00157
00158 #define UME_SEM_TIMEDWAIT(sem, mstime, ret) \
00159 do { \
00160 struct timespec ts; \
00161 gettimeofday((struct timeval*) &ts, NULL); \
00162 ts.tv_sec += (mstime/1000); \
00163 ts.tv_nsec = (ts.tv_nsec*1000) + ((mstime%1000)*1000000); \
00164 pthread_mutex_lock(&(sem.mtx)); \
00165 ret = pthread_cond_timedwait(&(sem.cond), &(sem.mtx), &ts); \
00166 if(ret == 0) { sem.state--; } \
00167 pthread_mutex_unlock(&(sem.mtx)); \
00168 } while (0)
00169
00170 #define UME_SEM_TIMEDOUT(v) 0
00171 #define UME_SEM_TIMEDOK(v) 0
00172 #define UME_TIMESPEC_MSSET(t,s,n) 0
00173 #endif
00174
00175 #define UME_MALLOC_RETURN(e,s,r) do { \
00176 if((e = malloc(s)) == NULL) { \
00177 return r; \
00178 } \
00179 } while(0)
00180
00181 #define UME_TIME_OUT 5000
00182 #define UME_RETRY_COUNT 10
00183
00184
00185 struct ume_block_bitmap_t_stct;
00186 typedef struct ume_block_bitmap_t_stct ume_block_bitmap_t;
00187
00188
00189 typedef struct ume_block_src_t_stct
00190 {
00191 lbm_src_t *src;
00192 lbm_src_cb_proc appproc;
00193 ume_sem_t stablelock;
00194 ume_block_bitmap_t *bitmap;
00195 void *clientd;
00196 int maxretentionsz;
00197
00198 int err;
00199 unsigned int last;
00200 unsigned int first;
00201 } ume_block_src_t;
00202
00203
00204
00205
00206
00207
00208 int ume_block_src_delete(ume_block_src_t *asrc);
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224 int ume_block_src_create(ume_block_src_t **srcp, lbm_context_t *ctx, lbm_topic_t *topic, lbm_src_topic_attr_t *tattr, lbm_src_cb_proc proc, void *clientd, lbm_event_queue_t *evq);
00225
00226
00227
00228
00229
00230
00231
00232
00233 int ume_block_src_send_ex(ume_block_src_t *asrc, const char *msg, size_t len, int flags, lbm_src_send_ex_info_t *info);
00234 #endif
00235