--- ./action.h.ORIG 2008-07-15 23:32:02.000000000 -0700 +++ ./action.h 2008-07-20 14:03:36.336332045 -0700 @@ -61,7 +61,7 @@ * content later). This is preserved after the message has been * processed - it is also used to detect duplicates. */ - queue_t *pQueue; /* action queue */ + qqueue_t *pQueue; /* action queue */ SYNC_OBJ_TOOL; /* required for mutex support */ pthread_mutex_t mutActExec; /* mutex to guard actual execution of doAction for single-threaded modules */ }; --- ./config.h.in.ORIG 2008-07-20 12:01:35.634014851 -0700 +++ ./config.h.in 2008-07-20 16:48:17.361347497 -0700 @@ -260,6 +260,9 @@ /* Indicator for a BSD OS */ #undef OS_BSD +/* Indicator for a Solaris OS */ +#undef OS_SOLARIS + /* Name of package */ #undef PACKAGE @@ -332,6 +335,9 @@ # undef _GNU_SOURCE #endif +/* Use POSIX pthread semantics */ +#undef _POSIX_PTHREAD_SEMANTICS + /* Define for Solaris 2.5.1 so the uint8_t typedef from , , or is not used. If the typedef was allowed, the #define below would cause a syntax error. */ --- ./runtime/rsyslog.c.ORIG 2008-07-18 05:45:32.000000000 -0700 +++ ./runtime/rsyslog.c 2008-07-20 16:23:45.132238685 -0700 @@ -157,7 +157,7 @@ if(ppErrObj != NULL) *ppErrObj = "wtp"; CHKiRet(wtpClassInit(NULL)); if(ppErrObj != NULL) *ppErrObj = "queue"; - CHKiRet(queueClassInit(NULL)); + CHKiRet(qqueueClassInit(NULL)); if(ppErrObj != NULL) *ppErrObj = "vmstk"; CHKiRet(vmstkClassInit(NULL)); if(ppErrObj != NULL) *ppErrObj = "sysvar"; --- ./runtime/wtp.c.ORIG 2008-07-18 05:45:32.000000000 -0700 +++ ./runtime/wtp.c 2008-07-20 14:38:15.294797193 -0700 @@ -40,6 +40,11 @@ #include #include +#ifdef OS_SOLARIS +# include +# define pthread_yield() sched_yield() +#endif + #include "rsyslog.h" #include "stringbuf.h" #include "srUtils.h" --- ./runtime/net.c.ORIG 2008-07-18 05:45:32.000000000 -0700 +++ ./runtime/net.c 2008-07-20 15:56:33.217345120 -0700 @@ -63,6 +63,11 @@ #include "errmsg.h" #include "net.h" +#ifdef OS_SOLARIS +# define s6_addr32 _S6_un._S6_u32 + typedef unsigned int u_int32_t; +#endif + MODULE_TYPE_LIB /* static data */ --- ./runtime/queue.h.ORIG 2008-07-18 05:45:32.000000000 -0700 +++ ./runtime/queue.h 2008-07-20 15:16:44.133289131 -0700 @@ -159,7 +159,7 @@ strm_t *pRead; /* current file to be read */ } disk; } tVars; -} queue_t; +} qqueue_t; /* some symbolic constants for easier reference */ #define QUEUE_MODE_ENQDEQ 0 @@ -176,30 +176,30 @@ #define QUEUE_TIMEOUT_ETERNAL 24 * 60 * 60 * 1000 /* prototypes */ -rsRetVal queueDestruct(queue_t **ppThis); -rsRetVal queueEnqObj(queue_t *pThis, flowControl_t flwCtlType, void *pUsr); -rsRetVal queueStart(queue_t *pThis); -rsRetVal queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize); -rsRetVal queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); -rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, +rsRetVal qqueueDestruct(qqueue_t **ppThis); +rsRetVal qqueueEnqObj(qqueue_t *pThis, flowControl_t flwCtlType, void *pUsr); +rsRetVal qqueueStart(qqueue_t *pThis); +rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); +rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); +rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*)); PROTOTYPEObjClassInit(queue); -PROTOTYPEpropSetMeth(queue, iPersistUpdCnt, int); -PROTOTYPEpropSetMeth(queue, iDeqtWinFromHr, int); -PROTOTYPEpropSetMeth(queue, iDeqtWinToHr, int); -PROTOTYPEpropSetMeth(queue, toQShutdown, long); -PROTOTYPEpropSetMeth(queue, toActShutdown, long); -PROTOTYPEpropSetMeth(queue, toWrkShutdown, long); -PROTOTYPEpropSetMeth(queue, toEnq, long); -PROTOTYPEpropSetMeth(queue, iHighWtrMrk, int); -PROTOTYPEpropSetMeth(queue, iLowWtrMrk, int); -PROTOTYPEpropSetMeth(queue, iDiscardMrk, int); -PROTOTYPEpropSetMeth(queue, iDiscardSeverity, int); -PROTOTYPEpropSetMeth(queue, iMinMsgsPerWrkr, int); -PROTOTYPEpropSetMeth(queue, bSaveOnShutdown, int); -PROTOTYPEpropSetMeth(queue, pUsr, void*); -PROTOTYPEpropSetMeth(queue, iDeqSlowdown, int); -PROTOTYPEpropSetMeth(queue, sizeOnDiskMax, int64); -#define queueGetID(pThis) ((unsigned long) pThis) +PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int); +PROTOTYPEpropSetMeth(qqueue, iDeqtWinFromHr, int); +PROTOTYPEpropSetMeth(qqueue, iDeqtWinToHr, int); +PROTOTYPEpropSetMeth(qqueue, toQShutdown, long); +PROTOTYPEpropSetMeth(qqueue, toActShutdown, long); +PROTOTYPEpropSetMeth(qqueue, toWrkShutdown, long); +PROTOTYPEpropSetMeth(qqueue, toEnq, long); +PROTOTYPEpropSetMeth(qqueue, iHighWtrMrk, int); +PROTOTYPEpropSetMeth(qqueue, iLowWtrMrk, int); +PROTOTYPEpropSetMeth(qqueue, iDiscardMrk, int); +PROTOTYPEpropSetMeth(qqueue, iDiscardSeverity, int); +PROTOTYPEpropSetMeth(qqueue, iMinMsgsPerWrkr, int); +PROTOTYPEpropSetMeth(qqueue, bSaveOnShutdown, int); +PROTOTYPEpropSetMeth(qqueue, pUsr, void*); +PROTOTYPEpropSetMeth(qqueue, iDeqSlowdown, int); +PROTOTYPEpropSetMeth(qqueue, sizeOnDiskMax, int64); +#define qqueueGetID(pThis) ((unsigned long) pThis) #endif /* #ifndef QUEUE_H_INCLUDED */ --- ./runtime/modules.c.ORIG 2008-07-18 05:45:32.000000000 -0700 +++ ./runtime/modules.c 2008-07-20 14:32:09.484674784 -0700 @@ -49,6 +49,10 @@ #include #include +#ifdef OS_SOLARIS +# define PATH_MAX MAXPATHLEN +#endif + #include "cfsysline.h" #include "modules.h" #include "errmsg.h" --- ./runtime/conf.c.ORIG 2008-07-18 05:45:32.000000000 -0700 +++ ./runtime/conf.c 2008-07-20 14:32:06.892510173 -0700 @@ -46,7 +46,9 @@ #include #include #ifdef HAVE_LIBGEN_H -# include +# ifndef OS_SOLARIS +# include +# endif #endif #include "rsyslog.h" @@ -68,6 +70,9 @@ #include "ctok.h" #include "ctok_token.h" +#ifdef OS_SOLARIS +# define NAME_MAX MAXNAMELEN +#endif /* forward definitions */ static rsRetVal cfline(uchar *line, selector_t **pfCurr); --- ./runtime/queue.c.ORIG 2008-07-18 05:45:32.000000000 -0700 +++ ./runtime/queue.c 2008-07-20 15:41:04.324598040 -0700 @@ -55,14 +55,14 @@ DEFobjCurrIf(glbl) /* forward-definitions */ -rsRetVal queueChkPersist(queue_t *pThis); -static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex); -static rsRetVal queueRateLimiter(queue_t *pThis); -static int queueChkStopWrkrDA(queue_t *pThis); -static int queueIsIdleDA(queue_t *pThis); -static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave); -static rsRetVal queueConsumerCancelCleanup(void *arg1, void *arg2); -static rsRetVal queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex); +rsRetVal qqueueChkPersist(qqueue_t *pThis); +static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex); +static rsRetVal qqueueRateLimiter(qqueue_t *pThis); +static int qqueueChkStopWrkrDA(qqueue_t *pThis); +static int qqueueIsIdleDA(qqueue_t *pThis); +static rsRetVal qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave); +static rsRetVal qqueueConsumerCancelCleanup(void *arg1, void *arg2); +static rsRetVal qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex); /* some constants for queuePersist () */ #define QUEUE_CHECKPOINT 1 @@ -76,7 +76,7 @@ * rgerhards, 2008-01-29 */ static inline int -queueGetOverallQueueSize(queue_t *pThis) +qqueueGetOverallQueueSize(qqueue_t *pThis) { #if 0 /* leave a bit in for debugging -- rgerhards, 2008-01-30 */ BEGINfunc @@ -94,26 +94,26 @@ * this point in time. The mutex must be locked when * ths function is called. -- rgerhards, 2008-01-25 */ -static inline rsRetVal queueAdviseMaxWorkers(queue_t *pThis) +static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis) { DEFiRet; int iMaxWorkers; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); if(!pThis->bEnqOnly) { if(pThis->bRunsDA) { /* if we have not yet reached the high water mark, there is no need to start a * worker. -- rgerhards, 2008-01-26 */ - if(queueGetOverallQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) { + if(qqueueGetOverallQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) { wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ } } else { if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { iMaxWorkers = 1; } else { - iMaxWorkers = queueGetOverallQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; + iMaxWorkers = qqueueGetOverallQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; } wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */ } @@ -128,11 +128,11 @@ * rgerhards, 2008-02-27 */ static rsRetVal -queueWaitDAModeInitialized(queue_t *pThis) +qqueueWaitDAModeInitialized(qqueue_t *pThis) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->bRunsDA); while(pThis->bRunsDA != 2) { @@ -154,17 +154,17 @@ * rgerhards, 2008-01-15 */ static rsRetVal -queueTurnOffDAMode(queue_t *pThis) +qqueueTurnOffDAMode(qqueue_t *pThis) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->bRunsDA); /* at this point, we need a fully initialized DA queue. So if it isn't, we finally need * to wait for its startup... -- rgerhards, 2008-01-25 */ - queueWaitDAModeInitialized(pThis); + qqueueWaitDAModeInitialized(pThis); /* if we need to pull any data that we still need from the (child) disk queue, * now would be the time to do so. At present, we do not need this, but I'd like to @@ -183,15 +183,15 @@ /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty, * this will be quick. */ - queueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */ + qqueueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */ dbgoprint((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n", iRet); /* now we need to check if the regular queue has some messages. This may be the case * when it is waiting that the high water mark is reached again. If so, we need to start up * a regular worker. -- rgerhards, 2008-01-26 */ - if(queueGetOverallQueueSize(pThis) > 0) { - queueAdviseMaxWorkers(pThis); + if(qqueueGetOverallQueueSize(pThis) > 0) { + qqueueAdviseMaxWorkers(pThis); } } @@ -215,11 +215,11 @@ * rgerhards, 2008-01-14 */ static rsRetVal -queueChkIsDA(queue_t *pThis) +qqueueChkIsDA(qqueue_t *pThis) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); if(pThis->pszFilePrefix != NULL) { pThis->bIsDA = 1; dbgoprint((obj_t*) pThis, "is disk-assisted, disk will be used on demand\n"); @@ -243,18 +243,18 @@ * rgerhards, 2008-01-15 */ static rsRetVal -queueStartDA(queue_t *pThis) +qqueueStartDA(qqueue_t *pThis) { DEFiRet; uchar pszDAQName[128]; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); if(pThis->bRunsDA == 2) /* check if already in (fully initialized) DA mode... */ FINALIZE; /* ... then we are already done! */ /* create message queue */ - CHKiRet(queueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer)); + CHKiRet(qqueueConstruct(&pThis->pqDA, QUEUETYPE_DISK , 1, 0, pThis->pConsumer)); /* give it a name */ snprintf((char*) pszDAQName, sizeof(pszDAQName)/sizeof(uchar), "%s[DA]", obj.GetName((obj_t*) pThis)); @@ -265,30 +265,30 @@ */ pThis->pqDA->pqParent = pThis; - CHKiRet(queueSetpUsr(pThis->pqDA, pThis->pUsr)); - CHKiRet(queueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax)); - CHKiRet(queueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown)); - CHKiRet(queueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize)); - CHKiRet(queueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix)); - CHKiRet(queueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt)); - CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); - CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq)); - CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED)); - CHKiRet(queueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr)); - CHKiRet(queueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr)); - CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0)); - CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0)); + CHKiRet(qqueueSetpUsr(pThis->pqDA, pThis->pUsr)); + CHKiRet(qqueueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax)); + CHKiRet(qqueueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown)); + CHKiRet(qqueueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize)); + CHKiRet(qqueueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix)); + CHKiRet(qqueueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt)); + CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); + CHKiRet(qqueueSettoEnq(pThis->pqDA, pThis->toEnq)); + CHKiRet(qqueueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED)); + CHKiRet(qqueueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr)); + CHKiRet(qqueueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr)); + CHKiRet(qqueueSetiHighWtrMrk(pThis->pqDA, 0)); + CHKiRet(qqueueSetiDiscardMrk(pThis->pqDA, 0)); if(pThis->toQShutdown == 0) { - CHKiRet(queueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */ + CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */ } else { /* we use the shortest possible shutdown (0 is endless!) because when we run on disk AND * have an obviously large backlog, we can't finish it in any case. So there is no point * in holding shutdown longer than necessary. -- rgerhards, 2008-01-15 */ - CHKiRet(queueSettoQShutdown(pThis->pqDA, 1)); + CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 1)); } - iRet = queueStart(pThis->pqDA); + iRet = qqueueStart(pThis->pqDA); /* file not found is expected, that means it is no previous QIF available */ if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND) FINALIZE; /* something is wrong */ @@ -306,12 +306,12 @@ pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */ dbgoprint((obj_t*) pThis, "is now running in disk assisted mode, disk queue 0x%lx\n", - queueGetID(pThis->pqDA)); + qqueueGetID(pThis->pqDA)); finalize_it: if(iRet != RS_RET_OK) { if(pThis->pqDA != NULL) { - queueDestruct(&pThis->pqDA); + qqueueDestruct(&pThis->pqDA); } dbgoprint((obj_t*) pThis, "error %d creating disk queue - giving up.\n", iRet); pThis->bIsDA = 0; @@ -328,7 +328,7 @@ * rgerhards, 2008-01-16 */ static inline rsRetVal -queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex) +qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) { DEFiRet; DEFVARS_mutexProtection; @@ -346,12 +346,12 @@ lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DA", obj.GetName((obj_t*) pThis)); CHKiRet(wtpConstruct (&pThis->pWtpDA)); CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); - CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrDA)); - CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueIsIdleDA)); - CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerDA)); - CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) queueConsumerCancelCleanup)); - CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueStartDA)); - CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueTurnOffDAMode)); + CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA)); + CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA)); + CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerDA)); + CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) qqueueConsumerCancelCleanup)); + CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA)); + CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode)); CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut)); CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty)); CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1)); @@ -384,14 +384,14 @@ * rgerhards, 2008-01-14 */ static inline rsRetVal -queueChkStrtDA(queue_t *pThis) +qqueueChkStrtDA(qqueue_t *pThis) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); /* if we do not hit the high water mark, we have nothing to do */ - if(queueGetOverallQueueSize(pThis) != pThis->iHighWtrMrk) + if(qqueueGetOverallQueueSize(pThis) != pThis->iHighWtrMrk) ABORT_FINALIZE(RS_RET_OK); if(pThis->bRunsDA) { @@ -405,15 +405,15 @@ * we need at least one). */ dbgoprint((obj_t*) pThis, "%d entries - passed high water mark in DA mode, send notify\n", - queueGetOverallQueueSize(pThis)); - queueAdviseMaxWorkers(pThis); + qqueueGetOverallQueueSize(pThis)); + qqueueAdviseMaxWorkers(pThis); } else { /* this is the case when we are currently not running in DA mode. So it is time * to turn it back on. */ dbgoprint((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n", - queueGetOverallQueueSize(pThis)); - queueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */ + qqueueGetOverallQueueSize(pThis)); + qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */ } finalize_it: @@ -431,7 +431,7 @@ */ /* -------------------- fixed array -------------------- */ -static rsRetVal qConstructFixedArray(queue_t *pThis) +static rsRetVal qConstructFixedArray(qqueue_t *pThis) { DEFiRet; @@ -447,14 +447,14 @@ pThis->tVars.farray.head = 0; pThis->tVars.farray.tail = 0; - queueChkIsDA(pThis); + qqueueChkIsDA(pThis); finalize_it: RETiRet; } -static rsRetVal qDestructFixedArray(queue_t *pThis) +static rsRetVal qDestructFixedArray(qqueue_t *pThis) { DEFiRet; @@ -466,7 +466,7 @@ RETiRet; } -static rsRetVal qAddFixedArray(queue_t *pThis, void* in) +static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in) { DEFiRet; @@ -479,7 +479,7 @@ RETiRet; } -static rsRetVal qDelFixedArray(queue_t *pThis, void **out) +static rsRetVal qDelFixedArray(qqueue_t *pThis, void **out) { DEFiRet; @@ -498,7 +498,7 @@ /* first some generic functions which are also used for the unget linked list */ -static inline rsRetVal queueAddLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, void* pUsr) +static inline rsRetVal qqueueAddLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, void* pUsr) { DEFiRet; qLinkedList_t *pEntry; @@ -524,7 +524,7 @@ RETiRet; } -static inline rsRetVal queueDelLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, obj_t **ppUsr) +static inline rsRetVal qqueueDelLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, obj_t **ppUsr) { DEFiRet; qLinkedList_t *pEntry; @@ -551,7 +551,7 @@ /* end generic functions which are also used for the unget linked list */ -static rsRetVal qConstructLinkedList(queue_t *pThis) +static rsRetVal qConstructLinkedList(qqueue_t *pThis) { DEFiRet; @@ -560,13 +560,13 @@ pThis->tVars.linklist.pRoot = 0; pThis->tVars.linklist.pLast = 0; - queueChkIsDA(pThis); + qqueueChkIsDA(pThis); RETiRet; } -static rsRetVal qDestructLinkedList(queue_t __attribute__((unused)) *pThis) +static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis) { DEFiRet; @@ -579,11 +579,11 @@ RETiRet; } -static rsRetVal qAddLinkedList(queue_t *pThis, void* pUsr) +static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr) { DEFiRet; - iRet = queueAddLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, pUsr); + iRet = qqueueAddLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, pUsr); #if 0 qLinkedList_t *pEntry; @@ -607,10 +607,10 @@ RETiRet; } -static rsRetVal qDelLinkedList(queue_t *pThis, obj_t **ppUsr) +static rsRetVal qDelLinkedList(qqueue_t *pThis, obj_t **ppUsr) { DEFiRet; - iRet = queueDelLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, ppUsr); + iRet = qqueueDelLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, ppUsr); #if 0 qLinkedList_t *pEntry; @@ -637,11 +637,11 @@ static rsRetVal -queueLoadPersStrmInfoFixup(strm_t *pStrm, queue_t __attribute__((unused)) *pThis) +qqueueLoadPersStrmInfoFixup(strm_t *pStrm, qqueue_t __attribute__((unused)) *pThis) { DEFiRet; ISOBJ_TYPE_assert(pStrm, strm); - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); CHKiRet(strmSetDir(pStrm, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); finalize_it: RETiRet; @@ -653,14 +653,14 @@ * rgerhards, 2008-01-15 */ static rsRetVal -queueHaveQIF(queue_t *pThis) +qqueueHaveQIF(qqueue_t *pThis) { DEFiRet; uchar pszQIFNam[MAXFNAME]; size_t lenQIFNam; struct stat stat_buf; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); if(pThis->pszFilePrefix == NULL) ABORT_FINALIZE(RS_RET_NO_FILEPREFIX); @@ -690,7 +690,7 @@ * rgerhards, 2008-01-11 */ static rsRetVal -queueTryLoadPersistedInfo(queue_t *pThis) +qqueueTryLoadPersistedInfo(qqueue_t *pThis) { DEFiRet; strm_t *psQIF = NULL; @@ -700,7 +700,7 @@ int iUngottenObjs; obj_t *pUsr; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); /* Construct file name */ lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", @@ -735,15 +735,15 @@ while(iUngottenObjs > 0) { /* fill the queue from disk */ CHKiRet(obj.Deserialize((void*) &pUsr, (uchar*)"msg", psQIF, NULL, NULL)); - queueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED); + qqueueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED); --iUngottenObjs; /* one less */ } /* and now the stream objects (some order as when persisted!) */ CHKiRet(obj.Deserialize(&pThis->tVars.disk.pWrite, (uchar*) "strm", psQIF, - (rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis)); + (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis)); CHKiRet(obj.Deserialize(&pThis->tVars.disk.pRead, (uchar*) "strm", psQIF, - (rsRetVal(*)(obj_t*,void*))queueLoadPersStrmInfoFixup, pThis)); + (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis)); CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite)); CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pRead)); @@ -773,7 +773,7 @@ * allowed file size at this point - that should be a config setting... * rgerhards, 2008-01-10 */ -static rsRetVal qConstructDisk(queue_t *pThis) +static rsRetVal qConstructDisk(qqueue_t *pThis) { DEFiRet; int bRestarted = 0; @@ -781,7 +781,7 @@ ASSERT(pThis != NULL); /* and now check if there is some persistent information that needs to be read in */ - iRet = queueTryLoadPersistedInfo(pThis); + iRet = qqueueTryLoadPersistedInfo(pThis); if(iRet == RS_RET_OK) bRestarted = 1; else if(iRet != RS_RET_FILE_NOT_FOUND) @@ -823,7 +823,7 @@ } -static rsRetVal qDestructDisk(queue_t *pThis) +static rsRetVal qDestructDisk(qqueue_t *pThis) { DEFiRet; @@ -835,7 +835,7 @@ RETiRet; } -static rsRetVal qAddDisk(queue_t *pThis, void* pUsr) +static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr) { DEFiRet; number_t nWriteCount; @@ -862,7 +862,7 @@ RETiRet; } -static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr) +static rsRetVal qDelDisk(qqueue_t *pThis, void **ppUsr) { DEFiRet; @@ -893,18 +893,18 @@ } /* -------------------- direct (no queueing) -------------------- */ -static rsRetVal qConstructDirect(queue_t __attribute__((unused)) *pThis) +static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis) { return RS_RET_OK; } -static rsRetVal qDestructDirect(queue_t __attribute__((unused)) *pThis) +static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) { return RS_RET_OK; } -static rsRetVal qAddDirect(queue_t *pThis, void* pUsr) +static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) { DEFiRet; @@ -921,7 +921,7 @@ RETiRet; } -static rsRetVal qDelDirect(queue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out) +static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out) { return RS_RET_OK; } @@ -936,12 +936,12 @@ * rgerhards, 2008-01-20 */ static rsRetVal -queueUngetObj(queue_t *pThis, obj_t *pUsr, int bLockMutex) +qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex) { DEFiRet; DEFVARS_mutexProtection; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_assert(pUsr); /* TODO: we aborted right at this place at least 3 times -- race? 2008-02-28, -03-10, -03-15 The second time I noticed it the queue was in destruction with NO worker threads running. The pUsr ptr was totally off and provided no clue what it may be pointing @@ -950,7 +950,7 @@ dbgoprint((obj_t*) pThis, "ungetting user object %s\n", obj.GetName(pUsr)); BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex); - iRet = queueAddLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, pUsr); + iRet = qqueueAddLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, pUsr); ++pThis->iUngottenObjs; /* indicate one more */ END_MTX_PROTECTED_OPERATIONS(pThis->mut); @@ -966,14 +966,14 @@ * rgerhards, 2008-01-29 */ static rsRetVal -queueGetUngottenObj(queue_t *pThis, obj_t **ppUsr) +qqueueGetUngottenObj(qqueue_t *pThis, obj_t **ppUsr) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(ppUsr != NULL); - iRet = queueDelLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, ppUsr); + iRet = qqueueDelLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, ppUsr); --pThis->iUngottenObjs; /* indicate one less */ dbgoprint((obj_t*) pThis, "dequeued ungotten user object %s\n", obj.GetName(*ppUsr)); @@ -987,7 +987,7 @@ * things truely different. -- rgerhards, 2008-02-12 */ static rsRetVal -queueAdd(queue_t *pThis, void *pUsr) +qqueueAdd(qqueue_t *pThis, void *pUsr) { DEFiRet; @@ -1010,7 +1010,7 @@ * ungotten list and, if so, dequeue it first. */ static rsRetVal -queueDel(queue_t *pThis, void *pUsr) +qqueueDel(qqueue_t *pThis, void *pUsr) { DEFiRet; @@ -1022,7 +1022,7 @@ * losing the whole process because it loops... -- rgerhards, 2008-01-03 */ if(pThis->iUngottenObjs > 0) { - iRet = queueGetUngottenObj(pThis, (obj_t**) pUsr); + iRet = qqueueGetUngottenObj(pThis, (obj_t**) pUsr); } else { iRet = pThis->qDel(pThis, pUsr); --pThis->iQueueSize; @@ -1046,14 +1046,14 @@ * complex) if each would have its own shutdown. The function does not self check * this condition - the caller must make sure it is not called with a parent. */ -static rsRetVal queueShutdownWorkers(queue_t *pThis) +static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) { DEFiRet; DEFVARS_mutexProtection; struct timespec tTimeout; rsRetVal iRetLocal; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ dbgoprint((obj_t*) pThis, "initiating worker thread shutdown sequence\n"); @@ -1067,7 +1067,7 @@ /* first try to shutdown the queue within the regular shutdown period */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ - if(queueGetOverallQueueSize(pThis) > 0) { + if(qqueueGetOverallQueueSize(pThis) > 0) { if(pThis->bRunsDA) { /* We may have waited on the low water mark. As it may have changed, we * see if we reactivate the worker. @@ -1105,7 +1105,7 @@ if(pThis->bRunsDA) { END_MTX_PROTECTED_OPERATIONS(pThis->mut); dbgoprint((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n", - queueGetID(pThis->pqDA)); + qqueueGetID(pThis->pqDA)); /* we use the same absolute timeout as above, so we do not use more than the configured * timeout interval! */ @@ -1134,19 +1134,19 @@ /* at this stage, we need to have the DA worker properly initialized and running (if there is one) */ if(pThis->bRunsDA) - queueWaitDAModeInitialized(pThis); + qqueueWaitDAModeInitialized(pThis); BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ /* optimize parameters for shutdown of DA-enabled queues */ - if(pThis->bIsDA && queueGetOverallQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { + if(pThis->bIsDA && qqueueGetOverallQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { /* switch to enqueue-only mode so that no more actions happen */ if(pThis->bRunsDA == 0) { - queueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */ + qqueueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */ } else { /* TODO: RACE: we may reach this point when the DA worker has been initialized (state 1) * but is not yet running (state 2). In this case, pThis->pqDA is NULL! rgerhards, 2008-02-27 */ - queueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */ + qqueueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */ } END_MTX_PROTECTED_OPERATIONS(pThis->mut); /* make sure we do not timeout before we are done */ @@ -1168,7 +1168,7 @@ * they will automatically terminate as there no longer is any message left to process. */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ - if(queueGetOverallQueueSize(pThis) > 0) { + if(qqueueGetOverallQueueSize(pThis) > 0) { timeoutComp(&tTimeout, pThis->toActShutdown); if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) { END_MTX_PROTECTED_OPERATIONS(pThis->mut); @@ -1237,7 +1237,7 @@ * Well, more precisely, they *are in termination*. Some cancel cleanup handlers * may still be running. */ - dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", queueGetOverallQueueSize(pThis)); + dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", qqueueGetOverallQueueSize(pThis)); RETiRet; } @@ -1249,17 +1249,17 @@ * is done by queueStart(). The reason is that we want to give the caller a chance * to modify some parameters before the queue is actually started. */ -rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, +rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*)) { DEFiRet; - queue_t *pThis; + qqueue_t *pThis; ASSERT(ppThis != NULL); ASSERT(pConsumer != NULL); ASSERT(iWorkerThreads >= 0); - if((pThis = (queue_t *)calloc(1, sizeof(queue_t))) == NULL) { + if((pThis = (qqueue_t *)calloc(1, sizeof(qqueue_t))) == NULL) { ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); } @@ -1295,7 +1295,7 @@ pThis->qConstruct = qConstructLinkedList; pThis->qDestruct = qDestructLinkedList; pThis->qAdd = qAddLinkedList; - pThis->qDel = (rsRetVal (*)(queue_t*,void**)) qDelLinkedList; + pThis->qDel = (rsRetVal (*)(qqueue_t*,void**)) qDelLinkedList; break; case QUEUETYPE_DISK: pThis->qConstruct = qConstructDisk; @@ -1322,25 +1322,25 @@ /* cancellation cleanup handler for queueWorker () * Updates admin structure and frees ressources. * Params: - * arg1 - user pointer (in this case a queue_t) + * arg1 - user pointer (in this case a qqueue_t) * arg2 - user data pointer (in this case a queue data element, any object [queue's pUsr ptr!]) * Note that arg2 may be NULL, in which case no dequeued but unprocessed pUsr exists! * rgerhards, 2008-01-16 */ static rsRetVal -queueConsumerCancelCleanup(void *arg1, void *arg2) +qqueueConsumerCancelCleanup(void *arg1, void *arg2) { DEFiRet; - queue_t *pThis = (queue_t*) arg1; + qqueue_t *pThis = (qqueue_t*) arg1; obj_t *pUsr = (obj_t*) arg2; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); if(pUsr != NULL) { /* make sure the data element is not lost */ dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n"); - CHKiRet(queueUngetObj(pThis, pUsr, LOCK_MUTEX)); + CHKiRet(qqueueUngetObj(pThis, pUsr, LOCK_MUTEX)); } finalize_it: @@ -1362,13 +1362,13 @@ * the return state! * rgerhards, 2008-01-24 */ -static int queueChkDiscardMsg(queue_t *pThis, int iQueueSize, int bRunsDA, void *pUsr) +static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, int bRunsDA, void *pUsr) { DEFiRet; rsRetVal iRetLocal; int iSeverity; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_assert(pUsr); if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk && bRunsDA == 0) { @@ -1393,7 +1393,7 @@ * rgerhards, 2008-10-21 */ static rsRetVal -queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave) +qqueueDequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) { DEFiRet; void *pUsr; @@ -1401,9 +1401,9 @@ int bRunsDA; /* cache for early mutex release */ /* dequeue element (still protected from mutex) */ - iRet = queueDel(pThis, &pUsr); - queueChkPersist(pThis); - iQueueSize = queueGetOverallQueueSize(pThis); /* cache this for after mutex release */ + iRet = qqueueDel(pThis, &pUsr); + qqueueChkPersist(pThis); + iQueueSize = qqueueGetOverallQueueSize(pThis); /* cache this for after mutex release */ bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */ /* We now need to save the user pointer for the cancel cleanup handler, BUT ONLY @@ -1449,7 +1449,7 @@ * provide real-time creation of spool files. * Note: It is OK to use the cached iQueueSize here, because it does not hurt if it is slightly wrong. */ - CHKiRet(queueChkDiscardMsg(pThis, iQueueSize, bRunsDA, pUsr)); + CHKiRet(qqueueChkDiscardMsg(pThis, iQueueSize, bRunsDA, pUsr)); finalize_it: if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) { @@ -1498,7 +1498,7 @@ * but you get the idea from the code above. */ static rsRetVal -queueRateLimiter(queue_t *pThis) +qqueueRateLimiter(qqueue_t *pThis) { DEFiRet; int iDelay; @@ -1506,7 +1506,7 @@ time_t tCurr; struct tm m; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); dbgoprint((obj_t*) pThis, "entering rate limiter\n"); @@ -1563,14 +1563,14 @@ * rgerhards, 2008-01-21 */ static rsRetVal -queueConsumerReg(queue_t *pThis, wti_t *pWti, int iCancelStateSave) +qqueueConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); - CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave)); + CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave)); CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->pUsrp)); /* we now need to check if we should deliberately delay processing a bit @@ -1597,15 +1597,15 @@ * rgerhards, 2008-01-14 */ static rsRetVal -queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave) +qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); - CHKiRet(queueDequeueConsumable(pThis, pWti, iCancelStateSave)); - CHKiRet(queueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp)); + CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave)); + CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp)); finalize_it: dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet); @@ -1621,7 +1621,7 @@ * the DA queue */ static int -queueChkStopWrkrDA(queue_t *pThis) +qqueueChkStopWrkrDA(qqueue_t *pThis) { /* if our queue is in destruction, we drain to the DA queue and so we shall not terminate * until we have done so. @@ -1640,7 +1640,7 @@ && pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) { /* this queue can never grow, so we can give up... */ bStopWrkr = 1; - } else if(queueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { + } else if(qqueueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { bStopWrkr = 1; } else { bStopWrkr = 0; @@ -1663,9 +1663,9 @@ * the DA queue */ static int -queueChkStopWrkrReg(queue_t *pThis) +qqueueChkStopWrkrReg(qqueue_t *pThis) { - return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && queueGetOverallQueueSize(pThis) == 0); + return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && qqueueGetOverallQueueSize(pThis) == 0); } @@ -1673,26 +1673,26 @@ * are not stable! DA queue version */ static int -queueIsIdleDA(queue_t *pThis) +qqueueIsIdleDA(qqueue_t *pThis) { /* remember: iQueueSize is the DA queue size, not the main queue! */ /* TODO: I think we need just a single function for DA and non-DA mode - but I leave it for now as is */ - return(queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk)); + return(qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk)); } /* must only be called when the queue mutex is locked, else results * are not stable! Regular queue version */ static int -queueIsIdleReg(queue_t *pThis) +qqueueIsIdleReg(qqueue_t *pThis) { #if 0 /* enable for performance testing */ int ret; - ret = queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk); + ret = qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk); if(ret) fprintf(stderr, "queue is idle\n"); return ret; #else /* regular code! */ - return(queueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && queueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk)); + return(qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk)); #endif } @@ -1711,11 +1711,11 @@ * I am telling this, because I, too, always get confused by those... */ static rsRetVal -queueRegOnWrkrShutdown(queue_t *pThis) +qqueueRegOnWrkrShutdown(qqueue_t *pThis) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); if(pThis->pqParent != NULL) { pThis->pqParent->bChildIsDone = 1; /* indicate we are done */ @@ -1732,11 +1732,11 @@ * hook to indicate in the parent queue (if we are a child) that we are not done yet. */ static rsRetVal -queueRegOnWrkrStartup(queue_t *pThis) +qqueueRegOnWrkrStartup(qqueue_t *pThis) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); if(pThis->pqParent != NULL) { pThis->pqParent->bChildIsDone = 0; @@ -1749,7 +1749,7 @@ /* start up the queue - it must have been constructed and parameters defined * before. */ -rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ +rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ { DEFiRet; rsRetVal iRetLocal; @@ -1787,7 +1787,7 @@ dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, qsize %d, child %d starting\n", pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, - queueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1); + qqueueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1); if(pThis->qType == QUEUETYPE_DIRECT) FINALIZE; /* with direct queues, we are already finished... */ @@ -1798,13 +1798,13 @@ lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis)); CHKiRet(wtpConstruct (&pThis->pWtpReg)); CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf)); - CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRateLimiter)); - CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrReg)); - CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueIsIdleReg)); - CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerReg)); - CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))queueConsumerCancelCleanup)); - CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRegOnWrkrStartup)); - CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRegOnWrkrShutdown)); + CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRateLimiter)); + CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrReg)); + CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleReg)); + CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerReg)); + CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))qqueueConsumerCancelCleanup)); + CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrStartup)); + CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrShutdown)); CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut)); CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty)); CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads)); @@ -1817,10 +1817,10 @@ /* If we are disk-assisted, we need to check if there is a QIF file * which we need to load. -- rgerhards, 2008-01-15 */ - iRetLocal = queueHaveQIF(pThis); + iRetLocal = qqueueHaveQIF(pThis); if(iRetLocal == RS_RET_OK) { dbgoprint((obj_t*) pThis, "on-disk queue present, needs to be reloaded\n"); - queueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */ + qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */ bInitialized = 1; /* we are done */ } else { /* TODO: use logerror? -- rgerhards, 2008-01-16 */ @@ -1837,7 +1837,7 @@ /* if the queue already contains data, we need to start the correct number of worker threads. This can be * the case when a disk queue has been loaded. If we did not start it here, it would never start. */ - queueAdviseMaxWorkers(pThis); + qqueueAdviseMaxWorkers(pThis); pThis->bQueueStarted = 1; finalize_it: @@ -1852,7 +1852,7 @@ * and 0 otherwise. * rgerhards, 2008-01-10 */ -static rsRetVal queuePersist(queue_t *pThis, int bIsCheckpoint) +static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) { DEFiRet; strm_t *psQIF = NULL; /* Queue Info File */ @@ -1863,7 +1863,7 @@ ASSERT(pThis != NULL); if(pThis->qType != QUEUETYPE_DISK) { - if(queueGetOverallQueueSize(pThis) > 0) { + if(qqueueGetOverallQueueSize(pThis) > 0) { /* This error code is OK, but we will probably not implement this any time * The reason is that persistence happens via DA queues. But I would like to * leave the code as is, as we so have a hook in case we need one. @@ -1874,13 +1874,13 @@ FINALIZE; /* if the queue is empty, we are happy and done... */ } - dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", queueGetOverallQueueSize(pThis)); + dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", qqueueGetOverallQueueSize(pThis)); /* Construct file name */ lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix); - if((bIsCheckpoint != QUEUE_CHECKPOINT) && (queueGetOverallQueueSize(pThis) == 0)) { + if((bIsCheckpoint != QUEUE_CHECKPOINT) && (qqueueGetOverallQueueSize(pThis) == 0)) { if(pThis->bNeedDelQIF) { unlink((char*)pszQIFNam); pThis->bNeedDelQIF = 0; @@ -1914,7 +1914,7 @@ * to the regular files. -- rgerhards, 2008-01-29 */ while(pThis->iUngottenObjs > 0) { - CHKiRet(queueGetUngottenObj(pThis, &pUsr)); + CHKiRet(qqueueGetUngottenObj(pThis, &pUsr)); CHKiRet((objSerialize(pUsr))(pUsr, psQIF)); objDestruct(pUsr); } @@ -1948,14 +1948,14 @@ * abide to our regular call interface)... * rgerhards, 2008-01-13 */ -rsRetVal queueChkPersist(queue_t *pThis) +rsRetVal qqueueChkPersist(qqueue_t *pThis) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); if(pThis->iPersistUpdCnt && ++pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) { - queuePersist(pThis, QUEUE_CHECKPOINT); + qqueuePersist(pThis, QUEUE_CHECKPOINT); pThis->iUpdsSincePersist = 0; } @@ -1964,8 +1964,8 @@ /* destructor for the queue object */ -BEGINobjDestruct(queue) /* be sure to specify the object type also in END and CODESTART macros! */ -CODESTARTobjDestruct(queue) +BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and CODESTART macros! */ +CODESTARTobjDestruct(qqueue) pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */ /* shut down all workers (handles *all* of the persistence logic) @@ -1975,7 +1975,7 @@ * with a child! -- rgerhards, 2008-01-28 */ if(pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL) - queueShutdownWorkers(pThis); + qqueueShutdownWorkers(pThis); /* finally destruct our (regular) worker thread pool * Note: currently pWtpReg is never NULL, but if we optimize our logic, this may happen, @@ -2000,7 +2000,7 @@ wtpDestruct(&pThis->pWtpDA); } if(pThis->pqDA != NULL) { - queueDestruct(&pThis->pqDA); + qqueueDestruct(&pThis->pqDA); } /* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty) @@ -2010,7 +2010,7 @@ * disk queues and DA mode. Anyhow, it doesn't hurt to know that we could extend it here * if need arises (what I doubt...) -- rgerhards, 2008-01-25 */ - CHKiRet_Hdlr(queuePersist(pThis, QUEUE_NO_CHECKPOINT)) { + CHKiRet_Hdlr(qqueuePersist(pThis, QUEUE_NO_CHECKPOINT)) { dbgoprint((obj_t*) pThis, "error %d persisting queue - data lost!\n", iRet); } @@ -2035,7 +2035,7 @@ if(pThis->pszSpoolDir != NULL) free(pThis->pszSpoolDir); -ENDobjDestruct(queue) +ENDobjDestruct(qqueue) /* set the queue's file prefix @@ -2044,7 +2044,7 @@ * rgerhards, 2008-01-09 */ rsRetVal -queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix) +qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix) { DEFiRet; @@ -2067,11 +2067,11 @@ * rgerhards, 2008-01-09 */ rsRetVal -queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize) +qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); if(iMaxFileSize < 1024) { ABORT_FINALIZE(RS_RET_VALUE_TOO_LOW); @@ -2088,14 +2088,14 @@ * Enqueues the new element and awakes worker thread. */ rsRetVal -queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr) +qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) { DEFiRet; int iCancelStateSave; int i; struct timespec t; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); /* Please note that this function is not cancel-safe and consequently * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE @@ -2109,11 +2109,11 @@ } /* first check if we need to discard this message (which will cause CHKiRet() to exit) */ - CHKiRet(queueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr)); + CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr)); /* then check if we need to add an assistance disk queue */ if(pThis->bIsDA) - CHKiRet(queueChkStrtDA(pThis)); + CHKiRet(qqueueChkStrtDA(pThis)); /* handle flow control @@ -2167,8 +2167,8 @@ } /* and finally enqueue the message */ - CHKiRet(queueAdd(pThis, pUsr)); - queueChkPersist(pThis); + CHKiRet(qqueueAdd(pThis, pUsr)); + qqueueChkPersist(pThis); finalize_it: if(pThis->qType != QUEUETYPE_DIRECT) { @@ -2180,7 +2180,7 @@ /* make sure at least one worker is running. */ if(pThis->qType != QUEUETYPE_DIRECT) { - queueAdviseMaxWorkers(pThis); + qqueueAdviseMaxWorkers(pThis); } RETiRet; @@ -2196,12 +2196,12 @@ * rgerhards, 2008-01-16 */ static rsRetVal -queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex) +qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex) { DEFiRet; DEFVARS_mutexProtection; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); /* for simplicity, we do one big mutex lock. This method is extremely seldom * called, so that doesn't matter... -- rgerhards, 2008-01-16 @@ -2240,24 +2240,24 @@ /* some simple object access methods */ -DEFpropSetMeth(queue, iPersistUpdCnt, int) -DEFpropSetMeth(queue, iDeqtWinFromHr, int) -DEFpropSetMeth(queue, iDeqtWinToHr, int) -DEFpropSetMeth(queue, toQShutdown, long) -DEFpropSetMeth(queue, toActShutdown, long) -DEFpropSetMeth(queue, toWrkShutdown, long) -DEFpropSetMeth(queue, toEnq, long) -DEFpropSetMeth(queue, iHighWtrMrk, int) -DEFpropSetMeth(queue, iLowWtrMrk, int) -DEFpropSetMeth(queue, iDiscardMrk, int) -DEFpropSetMeth(queue, iFullDlyMrk, int) -DEFpropSetMeth(queue, iDiscardSeverity, int) -DEFpropSetMeth(queue, bIsDA, int) -DEFpropSetMeth(queue, iMinMsgsPerWrkr, int) -DEFpropSetMeth(queue, bSaveOnShutdown, int) -DEFpropSetMeth(queue, pUsr, void*) -DEFpropSetMeth(queue, iDeqSlowdown, int) -DEFpropSetMeth(queue, sizeOnDiskMax, int64) +DEFpropSetMeth(qqueue, iPersistUpdCnt, int) +DEFpropSetMeth(qqueue, iDeqtWinFromHr, int) +DEFpropSetMeth(qqueue, iDeqtWinToHr, int) +DEFpropSetMeth(qqueue, toQShutdown, long) +DEFpropSetMeth(qqueue, toActShutdown, long) +DEFpropSetMeth(qqueue, toWrkShutdown, long) +DEFpropSetMeth(qqueue, toEnq, long) +DEFpropSetMeth(qqueue, iHighWtrMrk, int) +DEFpropSetMeth(qqueue, iLowWtrMrk, int) +DEFpropSetMeth(qqueue, iDiscardMrk, int) +DEFpropSetMeth(qqueue, iFullDlyMrk, int) +DEFpropSetMeth(qqueue, iDiscardSeverity, int) +DEFpropSetMeth(qqueue, bIsDA, int) +DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int) +DEFpropSetMeth(qqueue, bSaveOnShutdown, int) +DEFpropSetMeth(qqueue, pUsr, void*) +DEFpropSetMeth(qqueue, iDeqSlowdown, int) +DEFpropSetMeth(qqueue, sizeOnDiskMax, int64) /* This function can be used as a generic way to set properties. Only the subset @@ -2266,11 +2266,11 @@ * rgerhards, 2008-01-11 */ #define isProp(name) !rsCStrSzStrCmp(pProp->pcsName, (uchar*) name, sizeof(name) - 1) -static rsRetVal queueSetProperty(queue_t *pThis, var_t *pProp) +static rsRetVal qqueueSetProperty(qqueue_t *pThis, var_t *pProp) { DEFiRet; - ISOBJ_TYPE_assert(pThis, queue); + ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pProp != NULL); if(isProp("iQueueSize")) { @@ -2292,19 +2292,19 @@ #undef isProp /* dummy */ -rsRetVal queueQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; } +rsRetVal qqueueQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; } /* Initialize the stream class. Must be called as the very first method * before anything else is called inside this class. * rgerhards, 2008-01-09 */ -BEGINObjClassInit(queue, 1, OBJ_IS_CORE_MODULE) +BEGINObjClassInit(qqueue, 1, OBJ_IS_CORE_MODULE) /* request objects we use */ CHKiRet(objUse(glbl, CORE_COMPONENT)); /* now set our own handlers */ - OBJSetMethodHandler(objMethod_SETPROPERTY, queueSetProperty); -ENDObjClassInit(queue) + OBJSetMethodHandler(objMethod_SETPROPERTY, qqueueSetProperty); +ENDObjClassInit(qqueue) /* vi:set ai: */ --- ./runtime/wti.c.ORIG 2008-07-18 05:45:32.000000000 -0700 +++ ./runtime/wti.c 2008-07-20 16:52:56.701362246 -0700 @@ -39,6 +39,11 @@ #include #include +#ifdef OS_SOLARIS +# include +# define pthread_yield() sched_yield() +#endif + #include "rsyslog.h" #include "stringbuf.h" #include "srUtils.h" --- ./action.c.ORIG 2008-07-18 05:45:31.000000000 -0700 +++ ./action.c 2008-07-20 16:22:22.636730766 -0700 @@ -139,7 +139,7 @@ ASSERT(pThis != NULL); if(pThis->pQueue != NULL) { - queueDestruct(&pThis->pQueue); + qqueueDestruct(&pThis->pQueue); } if(pThis->pMod != NULL) @@ -213,7 +213,7 @@ * to be run on multiple threads. So far, this is forbidden by the interface * spec. -- rgerhards, 2008-01-30 */ - CHKiRet(queueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction)); + CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction)); obj.SetName((obj_t*) pThis->pQueue, pszQName); /* ... set some properties ... */ @@ -226,24 +226,24 @@ errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \ } - queueSetpUsr(pThis->pQueue, pThis); - setQPROP(queueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace); - setQPROP(queueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize); - setQPROPstr(queueSetFilePrefix, "$ActionQueueFileName", pszActionQFName); - setQPROP(queueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt); - setQPROP(queueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown ); - setQPROP(queueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown); - setQPROP(queueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", iActionQtoWrkShutdown); - setQPROP(queueSettoEnq, "$ActionQueueTimeoutEnqueue", iActionQtoEnq); - setQPROP(queueSetiHighWtrMrk, "$ActionQueueHighWaterMark", iActionQHighWtrMark); - setQPROP(queueSetiLowWtrMrk, "$ActionQueueLowWaterMark", iActionQLowWtrMark); - setQPROP(queueSetiDiscardMrk, "$ActionQueueDiscardMark", iActionQDiscardMark); - setQPROP(queueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity); - setQPROP(queueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs); - setQPROP(queueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown); - setQPROP(queueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown); - setQPROP(queueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", iActionQueueDeqtWinFromHr); - setQPROP(queueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", iActionQueueDeqtWinToHr); + qqueueSetpUsr(pThis->pQueue, pThis); + setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace); + setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize); + setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", pszActionQFName); + setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt); + setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown ); + setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown); + setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", iActionQtoWrkShutdown); + setQPROP(qqueueSettoEnq, "$ActionQueueTimeoutEnqueue", iActionQtoEnq); + setQPROP(qqueueSetiHighWtrMrk, "$ActionQueueHighWaterMark", iActionQHighWtrMark); + setQPROP(qqueueSetiLowWtrMrk, "$ActionQueueLowWaterMark", iActionQLowWtrMark); + setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", iActionQDiscardMark); + setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity); + setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs); + setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown); + setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown); + setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", iActionQueueDeqtWinFromHr); + setQPROP(qqueueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", iActionQueueDeqtWinToHr); # undef setQPROP # undef setQPROPstr @@ -252,7 +252,7 @@ bActionQSaveOnShutdown, iActionQueMaxDiskSpace); - CHKiRet(queueStart(pThis->pQueue)); + CHKiRet(qqueueStart(pThis->pQueue)); dbgprintf("Action %p: queue %p created\n", pThis, pThis->pQueue); /* and now reset the queue params (see comment in its function header!) */ @@ -563,7 +563,7 @@ /* When we reach this point, we have a valid, non-disabled action. * So let's enqueue our message for execution. -- rgerhards, 2007-07-24 */ - iRet = queueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg)); + iRet = qqueueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg)); if(iRet == RS_RET_OK) pAction->f_prevcount = 0; /* message processed, so we start a new cycle */ --- ./configure.ac.ORIG 2008-07-18 06:46:00.000000000 -0700 +++ ./configure.ac 2008-07-20 16:46:45.785038441 -0700 @@ -32,6 +32,13 @@ # do not DEFINE OS_BSD os_type="bsd" ;; + *-*-solaris*) + os_type="solaris" + AC_DEFINE([OS_SOLARIS], [1], [Indicator for a Solaris OS]) + AC_DEFINE([_POSIX_PTHREAD_SEMANTICS], [1], [Use POSIX pthread semantics]) + sol_libs="-lsocket -lnsl" + AC_SUBST(sol_libs) + ;; esac AC_DEFINE_UNQUOTED([HOSTENV], "$host", [the host environment, can be queried via a system variable]) @@ -203,7 +210,10 @@ [ AC_DEFINE([USE_PTHREADS], [1], [Multithreading support enabled.]) pthreads_libs="-lpthread" - pthreads_cflags="-pthread" + case "${os_type}" in + solaris) pthreads_cflags="-pthreads" ;; + *) pthreads_cflags="-pthread" ;; + esac AC_SUBST(pthreads_libs) AC_SUBST(pthreads_cflags) ], --- ./tools/Makefile.am.ORIG 2008-07-18 06:11:20.000000000 -0700 +++ ./tools/Makefile.am 2008-07-20 17:21:25.664538887 -0700 @@ -22,7 +22,7 @@ \ ../dirty.h rsyslogd_CPPFLAGS = $(pthreads_cflags) $(rsrt_cflags) -rsyslogd_LDADD = $(zlib_libs) $(pthreads_libs) $(rsrt_libs) +rsyslogd_LDADD = $(zlib_libs) $(pthreads_libs) $(rsrt_libs) $(sol_libs) rsyslogd_LDFLAGS = -export-dynamic if ENABLE_DIAGTOOLS --- ./tools/syslogd.c.ORIG 2008-07-18 06:46:00.000000000 -0700 +++ ./tools/syslogd.c 2008-07-20 16:11:54.822375391 -0700 @@ -100,13 +100,18 @@ #include #include #include -#include -#ifdef __sun +#ifdef OS_SOLARIS # include +# include +# include +# include +# include #else +# include # include #endif + #include #include #include @@ -301,7 +306,7 @@ extern int errno; /* main message queue and its configuration parameters */ -static queue_t *pMsgQueue = NULL; /* the main message queue */ +static qqueue_t *pMsgQueue = NULL; /* the main message queue */ static int iMainMsgQueueSize = 10000; /* size of the main message queue above */ static int iMainMsgQHighWtrMark = 8000; /* high water mark for disk-assisted queues */ static int iMainMsgQLowWtrMark = 2000; /* low water mark for disk-assisted queues */ @@ -1568,7 +1573,7 @@ ISOBJ_TYPE_assert(pMsg, msg); MsgPrepareEnqueue(pMsg); - queueEnqObj(pMsgQueue, pMsg->flowCtlType, (void*) pMsg); + qqueueEnqObj(pMsgQueue, pMsg->flowCtlType, (void*) pMsg); RETiRet; } @@ -1629,7 +1634,7 @@ /* now submit the message to the main queue - then we are done */ pMsg->msgFlags = flags; MsgPrepareEnqueue(pMsg); - queueEnqObj(pMsgQueue, pMsg->flowCtlType, (void*) pMsg); + qqueueEnqObj(pMsgQueue, pMsg->flowCtlType, (void*) pMsg); ENDfunc } @@ -1915,7 +1920,7 @@ /* drain queue (if configured so) and stop main queue worker thread pool */ dbgprintf("Terminating main queue...\n"); - queueDestruct(&pMsgQueue); + qqueueDestruct(&pMsgQueue); pMsgQueue = NULL; /* Free ressources and close connections. This includes flushing any remaining @@ -2151,9 +2156,9 @@ static int iMainMsgQtoWrkMinMsgs = 100; static int iMainMsgQbSaveOnShutdown = 1; iMainMsgQueMaxDiskSpace = 0; - setQPROP(queueSettoWrkShutdown, "$MainMsgQueueTimeoutWorkerThreadShutdown", 5000); - setQPROP(queueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", 100); - setQPROP(queueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", 1); + setQPROP(qqueueSettoWrkShutdown, "$MainMsgQueueTimeoutWorkerThreadShutdown", 5000); + setQPROP(qqueueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", 100); + setQPROP(qqueueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", 1); */ dbgprintf("Work Directory: '%s'.\n", glbl.GetWorkDir()); } @@ -2211,7 +2216,7 @@ /* delete the message queue, which also flushes all messages left over */ if(pMsgQueue != NULL) { dbgprintf("deleting main message queue\n"); - queueDestruct(&pMsgQueue); /* delete pThis here! */ + qqueueDestruct(&pMsgQueue); /* delete pThis here! */ pMsgQueue = NULL; } @@ -2296,7 +2301,7 @@ } /* create message queue */ - CHKiRet_Hdlr(queueConstruct(&pMsgQueue, MainMsgQueType, iMainMsgQueueNumWorkers, iMainMsgQueueSize, msgConsumer)) { + CHKiRet_Hdlr(qqueueConstruct(&pMsgQueue, MainMsgQueType, iMainMsgQueueNumWorkers, iMainMsgQueueSize, msgConsumer)) { /* no queue is fatal, we need to give up in that case... */ fprintf(stderr, "fatal error %d: could not create message queue - rsyslogd can not run!\n", iRet); exit(1); @@ -2314,29 +2319,29 @@ errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \ } - setQPROP(queueSetMaxFileSize, "$MainMsgQueueFileSize", iMainMsgQueMaxFileSize); - setQPROP(queueSetsizeOnDiskMax, "$MainMsgQueueMaxDiskSpace", iMainMsgQueMaxDiskSpace); - setQPROPstr(queueSetFilePrefix, "$MainMsgQueueFileName", pszMainMsgQFName); - setQPROP(queueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", iMainMsgQPersistUpdCnt); - setQPROP(queueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", iMainMsgQtoQShutdown ); - setQPROP(queueSettoActShutdown, "$MainMsgQueueTimeoutActionCompletion", iMainMsgQtoActShutdown); - setQPROP(queueSettoWrkShutdown, "$MainMsgQueueWorkerTimeoutThreadShutdown", iMainMsgQtoWrkShutdown); - setQPROP(queueSettoEnq, "$MainMsgQueueTimeoutEnqueue", iMainMsgQtoEnq); - setQPROP(queueSetiHighWtrMrk, "$MainMsgQueueHighWaterMark", iMainMsgQHighWtrMark); - setQPROP(queueSetiLowWtrMrk, "$MainMsgQueueLowWaterMark", iMainMsgQLowWtrMark); - setQPROP(queueSetiDiscardMrk, "$MainMsgQueueDiscardMark", iMainMsgQDiscardMark); - setQPROP(queueSetiDiscardSeverity, "$MainMsgQueueDiscardSeverity", iMainMsgQDiscardSeverity); - setQPROP(queueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", iMainMsgQWrkMinMsgs); - setQPROP(queueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", bMainMsgQSaveOnShutdown); - setQPROP(queueSetiDeqSlowdown, "$MainMsgQueueDequeueSlowdown", iMainMsgQDeqSlowdown); - setQPROP(queueSetiDeqtWinFromHr, "$MainMsgQueueDequeueTimeBegin", iMainMsgQueueDeqtWinFromHr); - setQPROP(queueSetiDeqtWinToHr, "$MainMsgQueueDequeueTimeEnd", iMainMsgQueueDeqtWinToHr); + setQPROP(qqueueSetMaxFileSize, "$MainMsgQueueFileSize", iMainMsgQueMaxFileSize); + setQPROP(qqueueSetsizeOnDiskMax, "$MainMsgQueueMaxDiskSpace", iMainMsgQueMaxDiskSpace); + setQPROPstr(qqueueSetFilePrefix, "$MainMsgQueueFileName", pszMainMsgQFName); + setQPROP(qqueueSetiPersistUpdCnt, "$MainMsgQueueCheckpointInterval", iMainMsgQPersistUpdCnt); + setQPROP(qqueueSettoQShutdown, "$MainMsgQueueTimeoutShutdown", iMainMsgQtoQShutdown ); + setQPROP(qqueueSettoActShutdown, "$MainMsgQueueTimeoutActionCompletion", iMainMsgQtoActShutdown); + setQPROP(qqueueSettoWrkShutdown, "$MainMsgQueueWorkerTimeoutThreadShutdown", iMainMsgQtoWrkShutdown); + setQPROP(qqueueSettoEnq, "$MainMsgQueueTimeoutEnqueue", iMainMsgQtoEnq); + setQPROP(qqueueSetiHighWtrMrk, "$MainMsgQueueHighWaterMark", iMainMsgQHighWtrMark); + setQPROP(qqueueSetiLowWtrMrk, "$MainMsgQueueLowWaterMark", iMainMsgQLowWtrMark); + setQPROP(qqueueSetiDiscardMrk, "$MainMsgQueueDiscardMark", iMainMsgQDiscardMark); + setQPROP(qqueueSetiDiscardSeverity, "$MainMsgQueueDiscardSeverity", iMainMsgQDiscardSeverity); + setQPROP(qqueueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", iMainMsgQWrkMinMsgs); + setQPROP(qqueueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", bMainMsgQSaveOnShutdown); + setQPROP(qqueueSetiDeqSlowdown, "$MainMsgQueueDequeueSlowdown", iMainMsgQDeqSlowdown); + setQPROP(qqueueSetiDeqtWinFromHr, "$MainMsgQueueDequeueTimeBegin", iMainMsgQueueDeqtWinFromHr); + setQPROP(qqueueSetiDeqtWinToHr, "$MainMsgQueueDequeueTimeEnd", iMainMsgQueueDeqtWinToHr); # undef setQPROP # undef setQPROPstr /* ... and finally start the queue! */ - CHKiRet_Hdlr(queueStart(pMsgQueue)) { + CHKiRet_Hdlr(qqueueStart(pMsgQueue)) { /* no queue is fatal, we need to give up in that case... */ fprintf(stderr, "fatal error %d: could not start message queue - rsyslogd can not run!\n", iRet); exit(1); @@ -2871,7 +2876,7 @@ CHKiRet(strmClassInit(NULL)); CHKiRet(wtiClassInit(NULL)); CHKiRet(wtpClassInit(NULL)); - CHKiRet(queueClassInit(NULL)); + CHKiRet(qqueueClassInit(NULL)); CHKiRet(vmstkClassInit(NULL)); CHKiRet(sysvarClassInit(NULL)); CHKiRet(vmClassInit(NULL)); --- ./tools/omfile.c.ORIG 2008-07-18 05:45:33.000000000 -0700 +++ ./tools/omfile.c 2008-07-20 16:13:00.150603127 -0700 @@ -44,6 +44,10 @@ #include #include +#ifdef OS_SOLARIS +# include +#endif + #include "syslogd.h" #include "syslogd-types.h" #include "srUtils.h"