Line data Source code
1 : /********************************************************************** 2 : * 3 : * Project: CPL - Common Portability Library 4 : * Purpose: CPL worker thread pool 5 : * Author: Even Rouault, <even dot rouault at spatialys dot com> 6 : * 7 : ********************************************************************** 8 : * Copyright (c) 2015, Even Rouault, <even dot rouault at spatialys dot com> 9 : * 10 : * SPDX-License-Identifier: MIT 11 : ****************************************************************************/ 12 : 13 : #ifndef CPL_WORKER_THREAD_POOL_H_INCLUDED_ 14 : #define CPL_WORKER_THREAD_POOL_H_INCLUDED_ 15 : 16 : #include "cpl_multiproc.h" 17 : #include "cpl_list.h" 18 : 19 : #include <condition_variable> 20 : #include <functional> 21 : #include <memory> 22 : #include <mutex> 23 : #include <queue> 24 : #include <vector> 25 : 26 : /** 27 : * \file cpl_worker_thread_pool.h 28 : * 29 : * Class to manage a pool of worker threads. 30 : */ 31 : 32 : #ifndef DOXYGEN_SKIP 33 : class CPLWorkerThreadPool; 34 : 35 : struct CPLWorkerThread 36 : { 37 : CPL_DISALLOW_COPY_ASSIGN(CPLWorkerThread) 38 3548 : CPLWorkerThread() = default; 39 : 40 : CPLThreadFunc pfnInitFunc = nullptr; 41 : void *pInitData = nullptr; 42 : CPLWorkerThreadPool *poTP = nullptr; 43 : CPLJoinableThread *hThread = nullptr; 44 : bool bMarkedAsWaiting = false; 45 : 46 : std::mutex m_mutex{}; 47 : std::condition_variable m_cv{}; 48 : }; 49 : 50 : typedef enum 51 : { 52 : CPLWTS_OK, 53 : CPLWTS_STOP, 54 : CPLWTS_ERROR 55 : } CPLWorkerThreadState; 56 : #endif // ndef DOXYGEN_SKIP 57 : 58 : class CPLJobQueue; 59 : /// Unique pointer to a job queue. 60 : using CPLJobQueuePtr = std::unique_ptr<CPLJobQueue>; 61 : 62 : /** Pool of worker threads */ 63 : class CPL_DLL CPLWorkerThreadPool 64 : { 65 : CPL_DISALLOW_COPY_ASSIGN(CPLWorkerThreadPool) 66 : 67 : std::vector<std::unique_ptr<CPLWorkerThread>> aWT{}; 68 : mutable std::mutex m_mutex{}; 69 : std::condition_variable m_cv{}; 70 : volatile CPLWorkerThreadState eState = CPLWTS_OK; 71 : std::queue<std::function<void()>> jobQueue; 72 : int nPendingJobs = 0; 73 : bool m_bNotifyEvent = false; 74 : 75 : CPLList *psWaitingWorkerThreadsList = nullptr; 76 : int nWaitingWorkerThreads = 0; 77 : 78 : int m_nMaxThreads = 0; 79 : 80 : static void WorkerThreadFunction(void *user_data); 81 : 82 : void DeclareJobFinished(); 83 : std::function<void()> GetNextJob(CPLWorkerThread *psWorkerThread); 84 : 85 : public: 86 : CPLWorkerThreadPool(); 87 : explicit CPLWorkerThreadPool(int nThreads); 88 : ~CPLWorkerThreadPool(); 89 : 90 : bool Setup(int nThreads, CPLThreadFunc pfnInitFunc, void **pasInitData); 91 : bool Setup(int nThreads, CPLThreadFunc pfnInitFunc, void **pasInitData, 92 : bool bWaitallStarted); 93 : 94 : CPLJobQueuePtr CreateJobQueue(); 95 : 96 : bool SubmitJob(std::function<void()> task); 97 : bool SubmitJob(CPLThreadFunc pfnFunc, void *pData); 98 : bool SubmitJobs(CPLThreadFunc pfnFunc, const std::vector<void *> &apData); 99 : void WaitCompletion(int nMaxRemainingJobs = 0); 100 : void WaitEvent(); 101 : void WakeUpWaitEvent(); 102 : 103 : /** Return the number of threads setup */ 104 : int GetThreadCount() const; 105 : }; 106 : 107 : /** Job queue */ 108 : class CPL_DLL CPLJobQueue 109 : { 110 : CPL_DISALLOW_COPY_ASSIGN(CPLJobQueue) 111 : CPLWorkerThreadPool *m_poPool = nullptr; 112 : std::mutex m_mutex{}; 113 : std::condition_variable m_cv{}; 114 : int m_nPendingJobs = 0; 115 : 116 : void DeclareJobFinished(); 117 : 118 : //! @cond Doxygen_Suppress 119 : protected: 120 : friend class CPLWorkerThreadPool; 121 : explicit CPLJobQueue(CPLWorkerThreadPool *poPool); 122 : //! @endcond 123 : 124 : public: 125 : ~CPLJobQueue(); 126 : 127 : /** Return the owning worker thread pool */ 128 696 : CPLWorkerThreadPool *GetPool() 129 : { 130 696 : return m_poPool; 131 : } 132 : 133 : bool SubmitJob(CPLThreadFunc pfnFunc, void *pData); 134 : bool SubmitJob(std::function<void()> task); 135 : void WaitCompletion(int nMaxRemainingJobs = 0); 136 : bool WaitEvent(); 137 : }; 138 : 139 : #endif // CPL_WORKER_THREAD_POOL_H_INCLUDED_