Line data Source code
1 : /********************************************************************** 2 : * 3 : * Project: CPL - Common Portability Library 4 : * Purpose: CPL worker thread pool 5 : * Author: Even Rouault, <even dot rouault at spatialys dot com> 6 : * 7 : ********************************************************************** 8 : * Copyright (c) 2015, Even Rouault, <even dot rouault at spatialys dot com> 9 : * 10 : * SPDX-License-Identifier: MIT 11 : ****************************************************************************/ 12 : 13 : #ifndef CPL_WORKER_THREAD_POOL_H_INCLUDED_ 14 : #define CPL_WORKER_THREAD_POOL_H_INCLUDED_ 15 : 16 : #include "cpl_multiproc.h" 17 : #include "cpl_list.h" 18 : 19 : #include <condition_variable> 20 : #include <functional> 21 : #include <memory> 22 : #include <mutex> 23 : #include <queue> 24 : #include <vector> 25 : 26 : /** 27 : * \file cpl_worker_thread_pool.h 28 : * 29 : * Class to manage a pool of worker threads. 30 : * @since GDAL 2.1 31 : */ 32 : 33 : #ifndef DOXYGEN_SKIP 34 : class CPLWorkerThreadPool; 35 : 36 : struct CPLWorkerThread 37 : { 38 : CPL_DISALLOW_COPY_ASSIGN(CPLWorkerThread) 39 3532 : CPLWorkerThread() = default; 40 : 41 : CPLThreadFunc pfnInitFunc = nullptr; 42 : void *pInitData = nullptr; 43 : CPLWorkerThreadPool *poTP = nullptr; 44 : CPLJoinableThread *hThread = nullptr; 45 : bool bMarkedAsWaiting = false; 46 : 47 : std::mutex m_mutex{}; 48 : std::condition_variable m_cv{}; 49 : }; 50 : 51 : typedef enum 52 : { 53 : CPLWTS_OK, 54 : CPLWTS_STOP, 55 : CPLWTS_ERROR 56 : } CPLWorkerThreadState; 57 : #endif // ndef DOXYGEN_SKIP 58 : 59 : class CPLJobQueue; 60 : /// Unique pointer to a job queue. 61 : using CPLJobQueuePtr = std::unique_ptr<CPLJobQueue>; 62 : 63 : /** Pool of worker threads */ 64 : class CPL_DLL CPLWorkerThreadPool 65 : { 66 : CPL_DISALLOW_COPY_ASSIGN(CPLWorkerThreadPool) 67 : 68 : std::vector<std::unique_ptr<CPLWorkerThread>> aWT{}; 69 : mutable std::mutex m_mutex{}; 70 : std::condition_variable m_cv{}; 71 : volatile CPLWorkerThreadState eState = CPLWTS_OK; 72 : std::queue<std::function<void()>> jobQueue; 73 : int nPendingJobs = 0; 74 : bool m_bNotifyEvent = false; 75 : 76 : CPLList *psWaitingWorkerThreadsList = nullptr; 77 : int nWaitingWorkerThreads = 0; 78 : 79 : int m_nMaxThreads = 0; 80 : 81 : static void WorkerThreadFunction(void *user_data); 82 : 83 : void DeclareJobFinished(); 84 : std::function<void()> GetNextJob(CPLWorkerThread *psWorkerThread); 85 : 86 : public: 87 : CPLWorkerThreadPool(); 88 : explicit CPLWorkerThreadPool(int nThreads); 89 : ~CPLWorkerThreadPool(); 90 : 91 : bool Setup(int nThreads, CPLThreadFunc pfnInitFunc, void **pasInitData); 92 : bool Setup(int nThreads, CPLThreadFunc pfnInitFunc, void **pasInitData, 93 : bool bWaitallStarted); 94 : 95 : CPLJobQueuePtr CreateJobQueue(); 96 : 97 : bool SubmitJob(std::function<void()> task); 98 : bool SubmitJob(CPLThreadFunc pfnFunc, void *pData); 99 : bool SubmitJobs(CPLThreadFunc pfnFunc, const std::vector<void *> &apData); 100 : void WaitCompletion(int nMaxRemainingJobs = 0); 101 : void WaitEvent(); 102 : void WakeUpWaitEvent(); 103 : 104 : /** Return the number of threads setup */ 105 : int GetThreadCount() const; 106 : }; 107 : 108 : /** Job queue */ 109 : class CPL_DLL CPLJobQueue 110 : { 111 : CPL_DISALLOW_COPY_ASSIGN(CPLJobQueue) 112 : CPLWorkerThreadPool *m_poPool = nullptr; 113 : std::mutex m_mutex{}; 114 : std::condition_variable m_cv{}; 115 : int m_nPendingJobs = 0; 116 : 117 : void DeclareJobFinished(); 118 : 119 : //! @cond Doxygen_Suppress 120 : protected: 121 : friend class CPLWorkerThreadPool; 122 : explicit CPLJobQueue(CPLWorkerThreadPool *poPool); 123 : //! @endcond 124 : 125 : public: 126 : ~CPLJobQueue(); 127 : 128 : /** Return the owning worker thread pool */ 129 727 : CPLWorkerThreadPool *GetPool() 130 : { 131 727 : return m_poPool; 132 : } 133 : 134 : bool SubmitJob(CPLThreadFunc pfnFunc, void *pData); 135 : bool SubmitJob(std::function<void()> task); 136 : void WaitCompletion(int nMaxRemainingJobs = 0); 137 : bool WaitEvent(); 138 : }; 139 : 140 : #endif // CPL_WORKER_THREAD_POOL_H_INCLUDED_