Line data Source code
1 : /********************************************************************** 2 : * $Id$ 3 : * 4 : * Project: CPL - Common Portability Library 5 : * Purpose: CPL worker thread pool 6 : * Author: Even Rouault, <even dot rouault at spatialys dot com> 7 : * 8 : ********************************************************************** 9 : * Copyright (c) 2015, Even Rouault, <even dot rouault at spatialys dot com> 10 : * 11 : * Permission is hereby granted, free of charge, to any person obtaining a 12 : * copy of this software and associated documentation files (the "Software"), 13 : * to deal in the Software without restriction, including without limitation 14 : * the rights to use, copy, modify, merge, publish, distribute, sublicense, 15 : * and/or sell copies of the Software, and to permit persons to whom the 16 : * Software is furnished to do so, subject to the following conditions: 17 : * 18 : * The above copyright notice and this permission notice shall be included 19 : * in all copies or substantial portions of the Software. 20 : * 21 : * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 22 : * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 23 : * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 24 : * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 25 : * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 26 : * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 27 : * DEALINGS IN THE SOFTWARE. 28 : ****************************************************************************/ 29 : 30 : #ifndef CPL_WORKER_THREAD_POOL_H_INCLUDED_ 31 : #define CPL_WORKER_THREAD_POOL_H_INCLUDED_ 32 : 33 : #include "cpl_multiproc.h" 34 : #include "cpl_list.h" 35 : 36 : #include <condition_variable> 37 : #include <memory> 38 : #include <mutex> 39 : #include <vector> 40 : 41 : /** 42 : * \file cpl_worker_thread_pool.h 43 : * 44 : * Class to manage a pool of worker threads. 45 : * @since GDAL 2.1 46 : */ 47 : 48 : #ifndef DOXYGEN_SKIP 49 : struct CPLWorkerThreadJob; 50 : class CPLWorkerThreadPool; 51 : 52 : struct CPLWorkerThread 53 : { 54 : CPL_DISALLOW_COPY_ASSIGN(CPLWorkerThread) 55 1138 : CPLWorkerThread() = default; 56 : 57 : CPLThreadFunc pfnInitFunc = nullptr; 58 : void *pInitData = nullptr; 59 : CPLWorkerThreadPool *poTP = nullptr; 60 : CPLJoinableThread *hThread = nullptr; 61 : bool bMarkedAsWaiting = false; 62 : 63 : std::mutex m_mutex{}; 64 : std::condition_variable m_cv{}; 65 : }; 66 : 67 : typedef enum 68 : { 69 : CPLWTS_OK, 70 : CPLWTS_STOP, 71 : CPLWTS_ERROR 72 : } CPLWorkerThreadState; 73 : #endif // ndef DOXYGEN_SKIP 74 : 75 : class CPLJobQueue; 76 : 77 : /** Pool of worker threads */ 78 : class CPL_DLL CPLWorkerThreadPool 79 : { 80 : CPL_DISALLOW_COPY_ASSIGN(CPLWorkerThreadPool) 81 : 82 : std::vector<std::unique_ptr<CPLWorkerThread>> aWT{}; 83 : std::mutex m_mutex{}; 84 : std::condition_variable m_cv{}; 85 : volatile CPLWorkerThreadState eState = CPLWTS_OK; 86 : CPLList *psJobQueue = nullptr; 87 : int nPendingJobs = 0; 88 : 89 : CPLList *psWaitingWorkerThreadsList = nullptr; 90 : int nWaitingWorkerThreads = 0; 91 : 92 : int m_nMaxThreads = 0; 93 : 94 : static void WorkerThreadFunction(void *user_data); 95 : 96 : void DeclareJobFinished(); 97 : CPLWorkerThreadJob *GetNextJob(CPLWorkerThread *psWorkerThread); 98 : 99 : public: 100 : CPLWorkerThreadPool(); 101 : ~CPLWorkerThreadPool(); 102 : 103 : bool Setup(int nThreads, CPLThreadFunc pfnInitFunc, void **pasInitData); 104 : bool Setup(int nThreads, CPLThreadFunc pfnInitFunc, void **pasInitData, 105 : bool bWaitallStarted); 106 : 107 : std::unique_ptr<CPLJobQueue> CreateJobQueue(); 108 : 109 : bool SubmitJob(CPLThreadFunc pfnFunc, void *pData); 110 : bool SubmitJobs(CPLThreadFunc pfnFunc, const std::vector<void *> &apData); 111 : void WaitCompletion(int nMaxRemainingJobs = 0); 112 : void WaitEvent(); 113 : 114 : /** Return the number of threads setup */ 115 975 : int GetThreadCount() const 116 : { 117 975 : return m_nMaxThreads; 118 : } 119 : }; 120 : 121 : /** Job queue */ 122 : class CPL_DLL CPLJobQueue 123 : { 124 : CPL_DISALLOW_COPY_ASSIGN(CPLJobQueue) 125 : CPLWorkerThreadPool *m_poPool = nullptr; 126 : std::mutex m_mutex{}; 127 : std::condition_variable m_cv{}; 128 : int m_nPendingJobs = 0; 129 : 130 : static void JobQueueFunction(void *); 131 : void DeclareJobFinished(); 132 : 133 : //! @cond Doxygen_Suppress 134 : protected: 135 : friend class CPLWorkerThreadPool; 136 : explicit CPLJobQueue(CPLWorkerThreadPool *poPool); 137 : //! @endcond 138 : 139 : public: 140 : ~CPLJobQueue(); 141 : 142 : /** Return the owning worker thread pool */ 143 554 : CPLWorkerThreadPool *GetPool() 144 : { 145 554 : return m_poPool; 146 : } 147 : 148 : bool SubmitJob(CPLThreadFunc pfnFunc, void *pData); 149 : void WaitCompletion(int nMaxRemainingJobs = 0); 150 : }; 151 : 152 : #endif // CPL_WORKER_THREAD_POOL_H_INCLUDED_