00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #ifndef LBMAUX_H
00025 #define LBMAUX_H
00026
00027 #if defined(_WIN32)
00028 #define SLEEP_SEC(x) Sleep((x)*1000)
00029 #define SLEEP_MSEC(x) Sleep(x)
00030 #else
00031 # define SLEEP_SEC(x) sleep(x)
00032 # define SLEEP_MSEC(x) \
00033 do{ \
00034 if ((x) >= 1000){ \
00035 sleep((x) / 1000); \
00036 usleep((x) % 1000 * 1000); \
00037 } \
00038 else{ \
00039 usleep((x)*1000); \
00040 } \
00041 }while (0)
00042 #endif
00043
00044 #define UME_BLOCK_DEBUG 0
00045 #define UME_BLOCK_DEBUG_PRINT(t, ...) do { \
00046 if(UME_BLOCK_DEBUG) { \
00047 fprintf(stderr, t, ##__VA_ARGS__); \
00048 fprintf(stderr, "\n"); \
00049 } \
00050 } while(0)
00051
00052 #define UME_BLOCK_PRINT_ERROR(t, ...) do { \
00053 fprintf(stderr, t, ##__VA_ARGS__); \
00054 fprintf(stderr, "\n"); \
00055 } while(0)
00056
00057
00058
00059 #if defined(_WIN32)
00060 #define UME_BLOCKING_TYPE "Win32 Semaphore"
00061 typedef struct ume_sem_t_stct
00062 {
00063 HANDLE hSem;
00064 int max;
00065 int state;
00066 } ume_sem_t;
00067
00068 #define UME_SEM_INIT(sem,len,ret) \
00069 do { \
00070 ret=0; \
00071 sem.hSem = CreateSemaphore(NULL, len, len, NULL); \
00072 sem.max = len; \
00073 sem.state = len; \
00074 if(!sem.hSem) { ret = -1; } \
00075 } while(0)
00076
00077 #define UME_SEM_DESTROY(sem) CloseHandle(sem.hSem)
00078 #define UME_SEM_POST(sem) \
00079 do { \
00080 ReleaseSemaphore(sem.hSem, 1, NULL); \
00081 InterlockedIncrement(&(sem.state)); \
00082 } while(0)
00083
00084 #define UME_SEM_WAIT(sem) \
00085 do { \
00086 WaitForSingleObject(sem.hSem, INFINITE); \
00087 InterlockedDecrement(&(sem.state)); \
00088 } while(0)
00089
00090 #define UME_SEM_GETVALUE(sem,val) val=sem.state;
00091
00092 #define UME_SEM_TIMEDWAIT(sem,time,ret) \
00093 do { \
00094 ret=WaitForSingleObject(sem.hSem, time); \
00095 if(ret!=WAIT_TIMEOUT && ret==WAIT_OBJECT_0) { InterlockedDecrement(&(sem.state)); } \
00096 } while(0)
00097
00098 #define UME_SEM_TIMEDOUT(v) v == WAIT_TIMEOUT
00099 #define UME_SEM_TIMEDOK(v) v == WAIT_OBJECT_0
00100
00101
00102 #elif defined(__linux)
00103 #define UME_BLOCKING_TYPE "Posix Semaphore"
00104 typedef sem_t ume_sem_t;
00105
00106 #define UME_SEM_INIT(sem, len, ret) ret = sem_init(&sem, 0, 0)
00107 #define UME_SEM_POST(sem) sem_post(&sem)
00108 #define UME_SEM_WAIT(sem) \
00109 do { \
00110 while(sem_wait(&sem) < 0) {} \
00111 } while(0)
00112
00113 #define UME_SEM_GETVALUE(sem, value) sem_getvalue(&(sem), &(value))
00114 #define UME_SEM_DESTROY(sem) sem_destroy(&sem)
00115
00116
00117
00118 #define UME_SEM_TIMEDWAIT(sem,mstime,ret) \
00119 do { \
00120 struct timespec ts; \
00121 gettimeofday((struct timeval*) &ts, NULL); \
00122 ts.tv_sec += (mstime/1000); \
00123 ts.tv_nsec = (ts.tv_nsec*1000) + ((mstime%1000)*1000000); \
00124 ret = sem_timedwait(&sem, &ts); \
00125 } while(0)
00126
00127 #define UME_SEM_TIMEDOUT(v) v == ETIMEDOUT
00128 #define UME_SEM_TIMEDOK(v) v == 0
00129
00130 #else
00131 #define UME_BLOCKING_TYPE "Posix pthread_cond"
00132 typedef struct ume_sem_t_stct
00133 {
00134 pthread_mutex_t mtx;
00135 pthread_cond_t cond;
00136 int max;
00137 int state;
00138 } ume_sem_t;
00139
00140
00141 #define UME_SEM_INIT(sem,len,ret) \
00142 do { \
00143 pthread_mutex_init(&(sem.mtx), NULL); \
00144 pthread_cond_init(&(sem.cond), NULL); \
00145 sem.max = len; \
00146 sem.state = len; \
00147 } while(0)
00148
00149 #define UME_SEM_POST(sem) \
00150 do { \
00151 pthread_mutex_lock(&(sem.mtx)); \
00152 if(sem.state < sem.max) { sem.state++; } \
00153 pthread_cond_broadcast(&(sem.cond)); \
00154 pthread_mutex_unlock(&(sem.mtx)); \
00155 } while(0)
00156
00157 #define UME_SEM_WAIT(sem) \
00158 do { \
00159 pthread_mutex_lock(&(sem.mtx)); \
00160 while(sem.state == 0) { \
00161 pthread_cond_wait(&(sem.cond), &(sem.mtx)); } \
00162 sem.state--; \
00163 pthread_mutex_unlock(&(sem.mtx)); \
00164 } while(0)
00165
00166 #define UME_SEM_GETVALUE(sem, value) value = sem.state
00167
00168 #define UME_SEM_DESTROY(sem) \
00169 do { \
00170 pthread_cond_destroy(&(sem.cond)); \
00171 pthread_mutex_destroy(&(sem.mtx)); \
00172 } while (0)
00173
00174
00175
00176 #define UME_SEM_TIMEDWAIT(sem, mstime, ret) \
00177 do { \
00178 struct timespec ts; \
00179 gettimeofday((struct timeval*) &ts, NULL); \
00180 ts.tv_sec += (mstime/1000); \
00181 ts.tv_nsec = (ts.tv_nsec*1000) + ((mstime%1000)*1000000); \
00182 pthread_mutex_lock(&(sem.mtx)); \
00183 ret = pthread_cond_timedwait(&(sem.cond), &(sem.mtx), &ts); \
00184 if(ret == 0) { sem.state--; } \
00185 pthread_mutex_unlock(&(sem.mtx)); \
00186 } while (0)
00187
00188 #define UME_SEM_TIMEDOUT(v) 0
00189 #define UME_SEM_TIMEDOK(v) 0
00190 #define UME_TIMESPEC_MSSET(t,s,n) 0
00191 #endif
00192
00193 #define UME_MALLOC_RETURN(e,s,r) do { \
00194 if((e = malloc(s)) == NULL) { \
00195 return r; \
00196 } \
00197 } while(0)
00198
00199 #define UME_TIME_OUT 5000
00200 #define UME_RETRY_COUNT 10
00201
00202
00203 struct ume_block_bitmap_t_stct;
00204 typedef struct ume_block_bitmap_t_stct ume_block_bitmap_t;
00205
00206
00207 typedef struct ume_block_src_t_stct
00208 {
00209 lbm_src_t *src;
00210 lbm_src_cb_proc appproc;
00211 ume_sem_t stablelock;
00212 ume_block_bitmap_t *bitmap;
00213 void *clientd;
00214 int maxretentionsz;
00215
00216 int err;
00217 unsigned int last;
00218 unsigned int first;
00219 } ume_block_src_t;
00220
00221
00222
00223
00224
00225
00226 int ume_block_src_delete(ume_block_src_t *asrc);
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236
00237
00238
00239
00240
00241
00242 int ume_block_src_create(ume_block_src_t **srcp, lbm_context_t *ctx, lbm_topic_t *topic, lbm_src_topic_attr_t *tattr, lbm_src_cb_proc proc, void *clientd, lbm_event_queue_t *evq);
00243
00244
00245
00246
00247
00248
00249
00250
00251 int ume_block_src_send_ex(ume_block_src_t *asrc, const char *msg, size_t len, int flags, lbm_src_send_ex_info_t *info);
00252 #endif
00253