#include "GLTB/workqueue.h" namespace gltb { WorkQueue::WorkQueue() { queueMutex = Mutex::createMutex(); queueEmpty = new ConditionVariable(); activeWorkMutex = Mutex::createMutex(); activeWorkEmpty = new ConditionVariable(); } void WorkQueue::pushWorkItem(gltb::RefPtr item) { MutexGuard guard(queueMutex); workItems.push(item); if(!idleWorkerQueue.empty()) { RefPtr worker=idleWorkerQueue.front(); idleWorkerQueue.pop_front(); worker->wakeUp(); return; } } bool WorkQueue::hasWorkAvailable() { MutexGuard guard(queueMutex); return !workItems.empty(); } bool WorkQueue::isEmpty() { MutexGuard queueGuard(queueMutex); MutexGuard activeWorkGuard(activeWorkMutex); if(workItems.empty() && activeWorkItems.empty()) { return true; } return false; } gltb::RefPtr WorkQueue::popWorkItem(gltb::RefPtr worker, bool withTimeout, int timeout) { queueMutex->lock(); if(workItems.empty()) { idleWorkerQueue.push_back(worker); if(withTimeout) { if(!worker->_waitForWork(queueMutex, timeout)) { // remove from idle worker queue as we're no longer idle /* * NOTE: is this a race condition leading to a missed wakeup * for another worker if/when item signal on this worker is * raised after timeout? */ idleWorkerQueue.remove(worker); queueMutex->unlock(); return nullptr; } } else { worker->_waitForWork(queueMutex); } } // if this happens the thread was awakened for some other reason than available work, for example a termination request if(workItems.empty()) { queueEmpty->signalAll(); queueMutex->unlock(); return nullptr; } gltb::RefPtr item=workItems.front(); workItems.pop(); { MutexGuard activeWorkGuard(activeWorkMutex); activeWorkItems.push_back(item); } if(workItems.empty()) { queueEmpty->signalAll(); } queueMutex->unlock(); return item; } void WorkQueue::finishWorkItem(RefPtr item) { // need to lock queue first in order to avoid deadlocks with popWorkItem MutexGuard queueGuard(queueMutex); MutexGuard activeWorkGuard(activeWorkMutex); for(auto iter = activeWorkItems.begin(); iter != activeWorkItems.end(); ++iter) { if(*iter == item) { activeWorkItems.erase(iter); break; } } if(activeWorkItems.empty() && workItems.empty()) { activeWorkEmpty->signal(); } } void WorkQueue::wakeAllWaitingThreads() { MutexGuard queueGuard(queueMutex); for(auto iter=idleWorkerQueue.begin();iter!=idleWorkerQueue.end();++iter) { (*iter)->wakeUp(); } } void WorkQueue::waitForEmptyQueue() { MutexGuard guard(queueMutex); while(!workItems.empty()) { queueEmpty->wait(queueMutex); } } bool WorkQueue::waitForEmptyQueue(int timeout) { MutexGuard guard(queueMutex); if(workItems.empty()) { return true; } queueEmpty->wait(queueMutex, timeout); if(workItems.empty()) { return true; } return false; } void WorkQueue::waitForCompletedWork() { waitForEmptyQueue(); MutexGuard activeWorkGuard(activeWorkMutex); while(!activeWorkItems.empty()) { activeWorkEmpty->wait(activeWorkMutex); } } bool WorkQueue::waitForCompletedWork(int timeout) { if(!waitForEmptyQueue(timeout)) { return false; } MutexGuard activeWorkGuard(activeWorkMutex); if(activeWorkItems.empty()) { return true; } activeWorkEmpty->wait(activeWorkMutex, timeout); if(activeWorkItems.empty()) { return true; } return false; } Worker::Worker(gltb::RefPtr queue) : queue(queue) { workAvailable=new ConditionVariable(); } void Worker::processOneJob(bool block) { gltb::RefPtr workItem; if(block) { workItem=queue->popWorkItem(this); } else { workItem=queue->popWorkItem(this,0); } if(workItem!=nullptr) { workItem->run(); queue->finishWorkItem(workItem); } } WorkerPool::WorkerPool(RefPtr queue, const unsigned int size, RefPtr perThreadInit) : workQueue(queue), alreadyShutDown(false) { initWorkerPool(size,perThreadInit); } WorkerPool::WorkerPool(RefPtr queue, RefPtr perThreadInit) : workQueue(queue), alreadyShutDown(false) { initWorkerPool(numberOfProcessors(),perThreadInit); } WorkerPool::~WorkerPool() { quit(); } void WorkerPool::initWorkerPool(const unsigned int size, RefPtr perThreadInit) { workerRunnables.resize(size); workerThreads.resize(size); std::vector> startupSemaphores(size); for(unsigned int i = 0;i < size; i++) { startupSemaphores[i] = Semaphore::createSemaphore(); workerRunnables[i]=new WorkerRunnable(workQueue, startupSemaphores[i], perThreadInit); workerThreads[i] = Thread::createThread(workerRunnables[i].getNakedPointer()); workerThreads[i]->start(); } for(unsigned int i = 0; i wait(); } } void WorkerPool::assignThreadsToProcessors() { size_t numThreads = workerThreads.size(); if(numThreads > numberOfProcessors()) { numThreads = numberOfProcessors(); } for(size_t i = 0; i < numThreads; i++) { workerThreads[i]->assignToProcessor(i); } } void WorkerPool::assignThreadsToProcessors(const std::vector &processors) { size_t numThreads = workerThreads.size(); if(numThreads > processors.size()) { numThreads = processors.size(); } for(size_t i = 0; i < numThreads; i++) { workerThreads[i]->assignToProcessor(processors[i]); } } void WorkerPool::quit(bool immediately) { if(alreadyShutDown) { return; } if(!immediately) { workQueue->waitForEmptyQueue(); } for(unsigned int i=0;iquit(); } workQueue->wakeAllWaitingThreads(); for(unsigned int i=0;ijoin(); } workerThreads.clear(); workerRunnables.clear(); alreadyShutDown=true; } std::vector WorkerPool::getWorkerThreadIds() { std::vector threadIds(workerThreads.size()); for(size_t i = 0; i < workerThreads.size(); i++) { threadIds[i] = workerThreads[i]->getUniqueID(); } return threadIds; } }