Line data Source code
1 : /********************************************************************** 2 : * 3 : * Project: CPL - Common Portability Library 4 : * Purpose: CPL worker thread pool 5 : * Author: Even Rouault, <even dot rouault at spatialys dot com> 6 : * 7 : ********************************************************************** 8 : * Copyright (c) 2015, Even Rouault, <even dot rouault at spatialys dot com> 9 : * 10 : * SPDX-License-Identifier: MIT 11 : ****************************************************************************/ 12 : 13 : #ifndef CPL_WORKER_THREAD_POOL_H_INCLUDED_ 14 : #define CPL_WORKER_THREAD_POOL_H_INCLUDED_ 15 : 16 : #include "cpl_multiproc.h" 17 : #include "cpl_list.h" 18 : 19 : #include <condition_variable> 20 : #include <functional> 21 : #include <memory> 22 : #include <mutex> 23 : #include <queue> 24 : #include <vector> 25 : 26 : /** 27 : * \file cpl_worker_thread_pool.h 28 : * 29 : * Class to manage a pool of worker threads. 30 : * @since GDAL 2.1 31 : */ 32 : 33 : #ifndef DOXYGEN_SKIP 34 : class CPLWorkerThreadPool; 35 : 36 : struct CPLWorkerThread 37 : { 38 : CPL_DISALLOW_COPY_ASSIGN(CPLWorkerThread) 39 4698 : CPLWorkerThread() = default; 40 : 41 : CPLThreadFunc pfnInitFunc = nullptr; 42 : void *pInitData = nullptr; 43 : CPLWorkerThreadPool *poTP = nullptr; 44 : CPLJoinableThread *hThread = nullptr; 45 : bool bMarkedAsWaiting = false; 46 : 47 : std::mutex m_mutex{}; 48 : std::condition_variable m_cv{}; 49 : }; 50 : 51 : typedef enum 52 : { 53 : CPLWTS_OK, 54 : CPLWTS_STOP, 55 : CPLWTS_ERROR 56 : } CPLWorkerThreadState; 57 : #endif // ndef DOXYGEN_SKIP 58 : 59 : class CPLJobQueue; 60 : /// Unique pointer to a job queue. 61 : using CPLJobQueuePtr = std::unique_ptr<CPLJobQueue>; 62 : 63 : /** Pool of worker threads */ 64 : class CPL_DLL CPLWorkerThreadPool 65 : { 66 : CPL_DISALLOW_COPY_ASSIGN(CPLWorkerThreadPool) 67 : 68 : std::vector<std::unique_ptr<CPLWorkerThread>> aWT{}; 69 : mutable std::mutex m_mutex{}; 70 : std::condition_variable m_cv{}; 71 : volatile CPLWorkerThreadState eState = CPLWTS_OK; 72 : std::queue<std::function<void()>> jobQueue; 73 : int nPendingJobs = 0; 74 : 75 : CPLList *psWaitingWorkerThreadsList = nullptr; 76 : int nWaitingWorkerThreads = 0; 77 : 78 : int m_nMaxThreads = 0; 79 : 80 : static void WorkerThreadFunction(void *user_data); 81 : 82 : void DeclareJobFinished(); 83 : std::function<void()> GetNextJob(CPLWorkerThread *psWorkerThread); 84 : 85 : public: 86 : CPLWorkerThreadPool(); 87 : explicit CPLWorkerThreadPool(int nThreads); 88 : ~CPLWorkerThreadPool(); 89 : 90 : bool Setup(int nThreads, CPLThreadFunc pfnInitFunc, void **pasInitData); 91 : bool Setup(int nThreads, CPLThreadFunc pfnInitFunc, void **pasInitData, 92 : bool bWaitallStarted); 93 : 94 : CPLJobQueuePtr CreateJobQueue(); 95 : 96 : bool SubmitJob(std::function<void()> task); 97 : bool SubmitJob(CPLThreadFunc pfnFunc, void *pData); 98 : bool SubmitJobs(CPLThreadFunc pfnFunc, const std::vector<void *> &apData); 99 : void WaitCompletion(int nMaxRemainingJobs = 0); 100 : void WaitEvent(); 101 : 102 : /** Return the number of threads setup */ 103 : int GetThreadCount() const; 104 : }; 105 : 106 : /** Job queue */ 107 : class CPL_DLL CPLJobQueue 108 : { 109 : CPL_DISALLOW_COPY_ASSIGN(CPLJobQueue) 110 : CPLWorkerThreadPool *m_poPool = nullptr; 111 : std::mutex m_mutex{}; 112 : std::condition_variable m_cv{}; 113 : int m_nPendingJobs = 0; 114 : 115 : void DeclareJobFinished(); 116 : 117 : //! @cond Doxygen_Suppress 118 : protected: 119 : friend class CPLWorkerThreadPool; 120 : explicit CPLJobQueue(CPLWorkerThreadPool *poPool); 121 : //! @endcond 122 : 123 : public: 124 : ~CPLJobQueue(); 125 : 126 : /** Return the owning worker thread pool */ 127 587 : CPLWorkerThreadPool *GetPool() 128 : { 129 587 : return m_poPool; 130 : } 131 : 132 : bool SubmitJob(CPLThreadFunc pfnFunc, void *pData); 133 : bool SubmitJob(std::function<void()> task); 134 : void WaitCompletion(int nMaxRemainingJobs = 0); 135 : bool WaitEvent(); 136 : }; 137 : 138 : #endif // CPL_WORKER_THREAD_POOL_H_INCLUDED_