#include "GLTB/workqueue.h" namespace gltb { WorkQueue::WorkQueue() { queueMutex = Mutex::createMutex(); queueEmpty = new ConditionalVariable(); idleWorkerMutex = Mutex::createMutex(); activeWorkMutex = Mutex::createMutex(); activeWorkEmpty = new ConditionalVariable(); } void WorkQueue::pushWorkItem(gltb::RefPtr item) { MutexGuard guard(queueMutex); MutexGuard idleWorkerGuard(idleWorkerMutex); workItems.push(item); if(!idleWorkerQueue.empty()) { RefPtr worker=idleWorkerQueue.front(); idleWorkerQueue.pop_front(); worker->wakeUp(); return; } } bool WorkQueue::hasWorkAvailable() { MutexGuard guard(queueMutex); return !workItems.empty(); } bool WorkQueue::isEmpty() { MutexGuard queueGuard(queueMutex); MutexGuard activeWorkGuard(activeWorkMutex); if(workItems.empty() && activeWorkItems.empty()) { return true; } return false; } gltb::RefPtr WorkQueue::popWorkItem(gltb::RefPtr worker, bool withTimeout, int timeout) { queueMutex->lock(); idleWorkerMutex->lock(); if(workItems.empty()) { idleWorkerQueue.push_back(worker); // need to temporarily release mutexes to avoid blocking insertion of new work item queueMutex->unlock(); idleWorkerMutex->unlock(); if(withTimeout) { if(!worker->_waitForWork(timeout)) { // remove from idle worker queue as we're no longer idle /* * NOTE: is this a race condition leading to a missed wakeup * for another worker if/when item signal on this worker is * raised after timeout? */ idleWorkerMutex->lock(); idleWorkerQueue.remove(worker); idleWorkerMutex->unlock(); return nullptr; } } else { worker->_waitForWork(); } queueMutex->lock(); } else { idleWorkerMutex->unlock(); } if(workItems.empty()) { queueEmpty->signalAll(); queueMutex->unlock(); return nullptr; } gltb::RefPtr item=workItems.front(); workItems.pop(); { MutexGuard activeWorkGuard(activeWorkMutex); activeWorkItems.push_back(item); } if(workItems.empty()) { queueEmpty->signalAll(); } queueMutex->unlock(); return item; } void WorkQueue::finishWorkItem(RefPtr item) { // need to lock queue first in order to avoid deadlocks with popWorkItem MutexGuard queueGuard(queueMutex); MutexGuard activeWorkGuard(activeWorkMutex); for(auto iter = activeWorkItems.begin(); iter != activeWorkItems.end(); ++iter) { if(*iter == item) { activeWorkItems.erase(iter); break; } } if(activeWorkItems.empty() && workItems.empty()) { activeWorkEmpty->signal(); } } void WorkQueue::wakeAllWaitingThreads() { MutexGuard idleWorkerGuard(idleWorkerMutex); for(auto iter=idleWorkerQueue.begin();iter!=idleWorkerQueue.end();++iter) { (*iter)->wakeUp(); } } void WorkQueue::waitForEmptyQueue() { { MutexGuard guard(queueMutex); if(workItems.empty()) { return; } } queueEmpty->wait(); } void WorkQueue::waitForEmptyQueue(int timeout) { { MutexGuard guard(queueMutex); if(workItems.empty()) { return; } } queueEmpty->wait(timeout); } void WorkQueue::waitForCompletedWork() { { MutexGuard queueGuard(queueMutex); MutexGuard activeWorkGuard(activeWorkMutex); if(workItems.empty() && activeWorkItems.empty()) { return; } } activeWorkEmpty->wait(); } void WorkQueue::waitForCompletedWork(int timeout) { { MutexGuard queueGuard(queueMutex); MutexGuard activeWorkGuard(activeWorkMutex); if(workItems.empty() && activeWorkItems.empty()) { return; } } activeWorkEmpty->wait(timeout); } Worker::Worker(gltb::RefPtr queue) : queue(queue) { workAvailable=new ConditionalVariable(); } void Worker::processOneJob(bool block) { gltb::RefPtr workItem; if(block) { workItem=queue->popWorkItem(this); } else { workItem=queue->popWorkItem(this,0); } if(workItem!=nullptr) { workItem->run(); queue->finishWorkItem(workItem); } } WorkerPool::WorkerPool(RefPtr queue, const unsigned int size, RefPtr perThreadInit) : workQueue(queue), alreadyShutDown(false) { initWorkerPool(size,perThreadInit); } WorkerPool::WorkerPool(RefPtr queue, RefPtr perThreadInit) : workQueue(queue), alreadyShutDown(false) { initWorkerPool(numberOfProcessors(),perThreadInit); } WorkerPool::~WorkerPool() { quit(); } void WorkerPool::initWorkerPool(const unsigned int size, RefPtr perThreadInit) { workerRunnables.resize(size); workerThreads.resize(size); for(unsigned int i=0;istart(); } } void WorkerPool::assignThreadsToProcessors() { size_t numThreads = workerThreads.size(); if(numThreads > numberOfProcessors()) { numThreads = numberOfProcessors(); } for(size_t i = 0; i < numThreads; i++) { workerThreads[i]->assignToProcessor(i); } } void WorkerPool::quit(bool immediately) { if(alreadyShutDown) { return; } if(!immediately) { workQueue->waitForEmptyQueue(); } for(unsigned int i=0;iquit(); } workQueue->wakeAllWaitingThreads(); for(unsigned int i=0;ijoin(); } workerThreads.clear(); workerRunnables.clear(); alreadyShutDown=true; } }