#ifndef GLTB_WORKQUEUE_H #define GLTB_WORKQUEUE_H #include #include #include #include #include "GLTB/conditionvariable.h" #include "GLTB/mutex.h" #include "GLTB/referencedobject.h" #include "GLTB/semaphore.h" #include "GLTB/systeminfo.h" #include "GLTB/thread.h" namespace gltb { class WorkItem : public ReferencedObject { public: virtual void run() = 0; }; template class CallableWorkItem : public WorkItem { public: CallableWorkItem(callable c) : c(c) {}; void run() override { c(); } private: callable c; }; class Worker; class WorkQueue : public ReferencedObject { std::queue > workItems; RefPtr queueMutex; RefPtr queueEmpty; std::list > idleWorkerQueue; std::vector > activeWorkItems; RefPtr activeWorkMutex; RefPtr activeWorkEmpty; public: WorkQueue(); void pushWorkItem(gltb::RefPtr item); // MSVC and GCC have different opinions on what to call if this was called pushWorkItem as well template void pushCallable(callable c) { gltb::RefPtr item=new CallableWorkItem(c); pushWorkItem(item); } bool hasWorkAvailable(); bool isEmpty(); private: gltb::RefPtr popWorkItem(gltb::RefPtr worker, bool withTimeout, int timeout); public: /** * Pop a work item from the queue. When the work queue is empty, this * function blocks until a new item is added or wakeAllWaitingThreads() * is called. It does not time out. * * @param worker worker that requests the next job * @return next work item to process or nullptr if queue is empty */ gltb::RefPtr popWorkItem(gltb::RefPtr worker) { return popWorkItem(worker,false,0); } /** * Pop a work item from the queue. When the work queue is empty, this * function waits for a certain time and gives up unless a new work * item is added or wakeAllWaitingThreads() is called. * * @param worker worker that requests the next job * @return next work item to process or nullptr on timeout/empty queue */ gltb::RefPtr popWorkItem(gltb::RefPtr worker, int timeout) { return popWorkItem(worker,true,timeout); } void finishWorkItem(gltb::RefPtr item); /** * Signal all threads waiting for work. Use this with care as it may * have sideeffects. It is intended to be used to terminate worker * threads that wait for work indefinitely. */ void wakeAllWaitingThreads(); /** * Wait until all jobs have been started, that is, until the queue of waiting jobs * is empty. */ void waitForEmptyQueue(); /** * Wait until all jobs have been started, that is, until the queue of waiting jobs * is empty, or until the given time in milliseconds has passed. Note that this * function is prone to spurious wakeups and may return early. * * @return true if queue is empty, false if timeout or spurious wakeup occurred */ bool waitForEmptyQueue(int timeout); /** * Wait until all jobs in the queue have been finished and all workers are idle. */ void waitForCompletedWork(); /** * Wait until all jobs in the queue have been finished and all workers are idle. * Note that this function is prone to spurious wakeups and may return early. * * @return true if queue is empty, false if timeout or spurious wakeup occurred */ bool waitForCompletedWork(int timeout); }; class Worker : public ReferencedObject { public: Worker(gltb::RefPtr queue); void processOneJob(bool block=false); // FIXME these two functions and the worker argument to WorkQueue::popWorkItem produce a tangled mess void _waitForWork(gltb::RefPtr mutex) { workAvailable->wait(mutex); } bool _waitForWork(gltb::RefPtr mutex, int timeout) { return workAvailable->wait(mutex, timeout); }; void wakeUp() { workAvailable->signal(); } protected: RefPtr workAvailable; gltb::RefPtr queue; }; class WorkerRunnable : public Worker, public Runnable { public: WorkerRunnable(gltb::RefPtr queue, RefPtr startupSemaphore, RefPtr perThreadInit = nullptr) : Worker(queue), Runnable(), keepRunning(true), startupSemaphore(startupSemaphore), perThreadInit(perThreadInit) { } void run() override { startupSemaphore->signal(); startupSemaphore = nullptr; // don't need this anymore - forget reference if(perThreadInit!=nullptr) { perThreadInit->run(); perThreadInit=nullptr; } while(keepRunning) { processOneJob(true); } } void quit() { keepRunning=false; wakeUp(); } private: bool keepRunning; RefPtr startupSemaphore; RefPtr perThreadInit; }; class WorkerPool : public ReferencedObject { public: WorkerPool(RefPtr queue, const unsigned int size, RefPtr perThreadInit=nullptr); template WorkerPool(RefPtr queue, const unsigned int size, callable perThreadInit) : workQueue(queue), alreadyShutDown(false) { initWorkerPool(size,new CallableWorkItem(perThreadInit)); } WorkerPool(RefPtr queue, RefPtr perThreadInit=nullptr); ~WorkerPool(); /** * Assign worker threads to one processor each to reduce overhead caused by * OS scheduling. If the number of threads is higher than the number of * processors, only as many threads as there are processors are assigned. * * This only works if Thread::assignToProcessor() is implemented * for the target platform. Otherwise this method does nothing. */ void assignThreadsToProcessors(); /** * Assign worker threads to the given processors to prevent the OS from * scheduling them to random processors. If the number of threads is higher * than the number of processors, only as many threads as there are * processors are assigned. * * This only works if Thread::assignToProcessor() is implemented * for the target platform. Otherwise this method does nothing. */ void assignThreadsToProcessors(const std::vector &processors); /** * Make the worker threads quit work and exit. By default, this function will wait until all * pending work items are processed. * * @param immediately if set, don't wait until work queue is empty before terminating workers */ void quit(bool immediately=false); /** * Get the number of worker threads in this worker pool. */ size_t getNumWorkers() const { return workerThreads.size(); }; std::vector getWorkerThreadIds(); private: RefPtr workQueue; std::vector > workerRunnables; std::vector > workerThreads; bool alreadyShutDown; void initWorkerPool(const unsigned int size, RefPtr perThreadInit); }; } #endif