Line data Source code
1 : /********************************************************************** 2 : * $Id$ 3 : * 4 : * Project: CPL - Common Portability Library 5 : * Purpose: CPL worker thread pool 6 : * Author: Even Rouault, <even dot rouault at spatialys dot com> 7 : * 8 : ********************************************************************** 9 : * Copyright (c) 2015, Even Rouault, <even dot rouault at spatialys dot com> 10 : * 11 : * SPDX-License-Identifier: MIT 12 : ****************************************************************************/ 13 : 14 : #ifndef CPL_WORKER_THREAD_POOL_H_INCLUDED_ 15 : #define CPL_WORKER_THREAD_POOL_H_INCLUDED_ 16 : 17 : #include "cpl_multiproc.h" 18 : #include "cpl_list.h" 19 : 20 : #include <condition_variable> 21 : #include <functional> 22 : #include <memory> 23 : #include <mutex> 24 : #include <queue> 25 : #include <vector> 26 : 27 : /** 28 : * \file cpl_worker_thread_pool.h 29 : * 30 : * Class to manage a pool of worker threads. 31 : * @since GDAL 2.1 32 : */ 33 : 34 : #ifndef DOXYGEN_SKIP 35 : class CPLWorkerThreadPool; 36 : 37 : struct CPLWorkerThread 38 : { 39 : CPL_DISALLOW_COPY_ASSIGN(CPLWorkerThread) 40 3642 : CPLWorkerThread() = default; 41 : 42 : CPLThreadFunc pfnInitFunc = nullptr; 43 : void *pInitData = nullptr; 44 : CPLWorkerThreadPool *poTP = nullptr; 45 : CPLJoinableThread *hThread = nullptr; 46 : bool bMarkedAsWaiting = false; 47 : 48 : std::mutex m_mutex{}; 49 : std::condition_variable m_cv{}; 50 : }; 51 : 52 : typedef enum 53 : { 54 : CPLWTS_OK, 55 : CPLWTS_STOP, 56 : CPLWTS_ERROR 57 : } CPLWorkerThreadState; 58 : #endif // ndef DOXYGEN_SKIP 59 : 60 : class CPLJobQueue; 61 : /// Unique pointer to a job queue. 62 : using CPLJobQueuePtr = std::unique_ptr<CPLJobQueue>; 63 : 64 : /** Pool of worker threads */ 65 : class CPL_DLL CPLWorkerThreadPool 66 : { 67 : CPL_DISALLOW_COPY_ASSIGN(CPLWorkerThreadPool) 68 : 69 : std::vector<std::unique_ptr<CPLWorkerThread>> aWT{}; 70 : mutable std::mutex m_mutex{}; 71 : std::condition_variable m_cv{}; 72 : volatile CPLWorkerThreadState eState = CPLWTS_OK; 73 : std::queue<std::function<void()>> jobQueue; 74 : int nPendingJobs = 0; 75 : 76 : CPLList *psWaitingWorkerThreadsList = nullptr; 77 : int nWaitingWorkerThreads = 0; 78 : 79 : int m_nMaxThreads = 0; 80 : 81 : static void WorkerThreadFunction(void *user_data); 82 : 83 : void DeclareJobFinished(); 84 : std::function<void()> GetNextJob(CPLWorkerThread *psWorkerThread); 85 : 86 : public: 87 : CPLWorkerThreadPool(); 88 : explicit CPLWorkerThreadPool(int nThreads); 89 : ~CPLWorkerThreadPool(); 90 : 91 : bool Setup(int nThreads, CPLThreadFunc pfnInitFunc, void **pasInitData); 92 : bool Setup(int nThreads, CPLThreadFunc pfnInitFunc, void **pasInitData, 93 : bool bWaitallStarted); 94 : 95 : CPLJobQueuePtr CreateJobQueue(); 96 : 97 : bool SubmitJob(std::function<void()> task); 98 : bool SubmitJob(CPLThreadFunc pfnFunc, void *pData); 99 : bool SubmitJobs(CPLThreadFunc pfnFunc, const std::vector<void *> &apData); 100 : void WaitCompletion(int nMaxRemainingJobs = 0); 101 : void WaitEvent(); 102 : 103 : /** Return the number of threads setup */ 104 : int GetThreadCount() const; 105 : }; 106 : 107 : /** Job queue */ 108 : class CPL_DLL CPLJobQueue 109 : { 110 : CPL_DISALLOW_COPY_ASSIGN(CPLJobQueue) 111 : CPLWorkerThreadPool *m_poPool = nullptr; 112 : std::mutex m_mutex{}; 113 : std::condition_variable m_cv{}; 114 : int m_nPendingJobs = 0; 115 : 116 : void DeclareJobFinished(); 117 : 118 : //! @cond Doxygen_Suppress 119 : protected: 120 : friend class CPLWorkerThreadPool; 121 : explicit CPLJobQueue(CPLWorkerThreadPool *poPool); 122 : //! @endcond 123 : 124 : public: 125 : ~CPLJobQueue(); 126 : 127 : /** Return the owning worker thread pool */ 128 593 : CPLWorkerThreadPool *GetPool() 129 : { 130 593 : return m_poPool; 131 : } 132 : 133 : bool SubmitJob(CPLThreadFunc pfnFunc, void *pData); 134 : bool SubmitJob(std::function<void()> task); 135 : void WaitCompletion(int nMaxRemainingJobs = 0); 136 : bool WaitEvent(); 137 : }; 138 : 139 : #endif // CPL_WORKER_THREAD_POOL_H_INCLUDED_