LCOV - code coverage report
Current view: top level - port - cpl_worker_thread_pool.cpp (source / functions) Hit Total Coverage
Test: gdal_filtered.info Lines: 206 227 90.7 %
Date: 2025-07-08 21:33:46 Functions: 31 31 100.0 %

          Line data    Source code
       1             : /**********************************************************************
       2             :  *
       3             :  * Project:  CPL - Common Portability Library
       4             :  * Purpose:  CPL worker thread pool
       5             :  * Author:   Even Rouault, <even dot rouault at spatialys dot com>
       6             :  *
       7             :  **********************************************************************
       8             :  * Copyright (c) 2015, Even Rouault, <even dot rouault at spatialys dot com>
       9             :  *
      10             :  * SPDX-License-Identifier: MIT
      11             :  ****************************************************************************/
      12             : 
      13             : #include "cpl_port.h"
      14             : #include "cpl_worker_thread_pool.h"
      15             : 
      16             : #include <cstddef>
      17             : #include <memory>
      18             : 
      19             : #include "cpl_conv.h"
      20             : #include "cpl_error.h"
      21             : #include "cpl_vsi.h"
      22             : 
      23             : static thread_local CPLWorkerThreadPool *threadLocalCurrentThreadPool = nullptr;
      24             : 
      25             : /************************************************************************/
      26             : /*                         CPLWorkerThreadPool()                        */
      27             : /************************************************************************/
      28             : 
      29             : /** Instantiate a new pool of worker threads.
      30             :  *
      31             :  * The pool is in an uninitialized state after this call. The Setup() method
      32             :  * must be called.
      33             :  */
      34         778 : CPLWorkerThreadPool::CPLWorkerThreadPool() : jobQueue{}
      35             : {
      36         778 : }
      37             : 
      38             : /** Instantiate a new pool of worker threads.
      39             :  *
      40             :  * \param nThreads  Number of threads in the pool.
      41             :  */
      42         238 : CPLWorkerThreadPool::CPLWorkerThreadPool(int nThreads) : jobQueue{}
      43             : {
      44         239 :     Setup(nThreads, nullptr, nullptr);
      45         239 : }
      46             : 
      47             : /************************************************************************/
      48             : /*                          ~CPLWorkerThreadPool()                      */
      49             : /************************************************************************/
      50             : 
      51             : /** Destroys a pool of worker threads.
      52             :  *
      53             :  * Any still pending job will be completed before the destructor returns.
      54             :  */
      55        1012 : CPLWorkerThreadPool::~CPLWorkerThreadPool()
      56             : {
      57        1012 :     WaitCompletion();
      58             : 
      59             :     {
      60        1012 :         std::lock_guard<std::mutex> oGuard(m_mutex);
      61        1012 :         eState = CPLWTS_STOP;
      62             :     }
      63             : 
      64        3504 :     for (auto &wt : aWT)
      65             :     {
      66             :         {
      67        4984 :             std::lock_guard<std::mutex> oGuard(wt->m_mutex);
      68        2492 :             wt->m_cv.notify_one();
      69             :         }
      70        2492 :         CPLJoinThread(wt->hThread);
      71             :     }
      72             : 
      73        1012 :     CPLListDestroy(psWaitingWorkerThreadsList);
      74        1012 : }
      75             : 
      76             : /************************************************************************/
      77             : /*                        GetThreadCount()                              */
      78             : /************************************************************************/
      79             : 
      80        1244 : int CPLWorkerThreadPool::GetThreadCount() const
      81             : {
      82        1244 :     std::unique_lock<std::mutex> oGuard(m_mutex);
      83        2488 :     return m_nMaxThreads;
      84             : }
      85             : 
      86             : /************************************************************************/
      87             : /*                       WorkerThreadFunction()                         */
      88             : /************************************************************************/
      89             : 
      90        3532 : void CPLWorkerThreadPool::WorkerThreadFunction(void *user_data)
      91             : {
      92        3532 :     CPLWorkerThread *psWT = static_cast<CPLWorkerThread *>(user_data);
      93        3532 :     CPLWorkerThreadPool *poTP = psWT->poTP;
      94             : 
      95        3532 :     threadLocalCurrentThreadPool = poTP;
      96             : 
      97        3532 :     if (psWT->pfnInitFunc)
      98           0 :         psWT->pfnInitFunc(psWT->pInitData);
      99             : 
     100             :     while (true)
     101             :     {
     102       60317 :         std::function<void()> task = poTP->GetNextJob(psWT);
     103       59266 :         if (!task)
     104        2492 :             break;
     105             : 
     106       56745 :         task();
     107             : #if DEBUG_VERBOSE
     108             :         CPLDebug("JOB", "%p finished a job", psWT);
     109             : #endif
     110       56833 :         poTP->DeclareJobFinished();
     111       56785 :     }
     112        2503 : }
     113             : 
     114             : /************************************************************************/
     115             : /*                             SubmitJob()                              */
     116             : /************************************************************************/
     117             : 
     118             : /** Queue a new job.
     119             :  *
     120             :  * @param pfnFunc Function to run for the job.
     121             :  * @param pData User data to pass to the job function.
     122             :  * @return true in case of success.
     123             :  */
     124        7355 : bool CPLWorkerThreadPool::SubmitJob(CPLThreadFunc pfnFunc, void *pData)
     125             : {
     126       14704 :     return SubmitJob([=] { pfnFunc(pData); });
     127             : }
     128             : 
     129             : /** Queue a new job.
     130             :  *
     131             :  * @param task  Void function to execute.
     132             :  * @return true in case of success.
     133             :  */
     134       73442 : bool CPLWorkerThreadPool::SubmitJob(std::function<void()> task)
     135             : {
     136             : #ifdef DEBUG
     137             :     {
     138      146781 :         std::unique_lock<std::mutex> oGuard(m_mutex);
     139       73339 :         CPLAssert(m_nMaxThreads > 0);
     140             :     }
     141             : #endif
     142             : 
     143       73363 :     bool bMustIncrementWaitingWorkerThreadsAfterSubmission = false;
     144       73363 :     if (threadLocalCurrentThreadPool == this)
     145             :     {
     146             :         // If there are waiting threads or we have not started all allowed
     147             :         // threads, we can submit this job asynchronously
     148             :         {
     149      114593 :             std::unique_lock<std::mutex> oGuard(m_mutex);
     150       75420 :             if (nWaitingWorkerThreads > 0 ||
     151       18107 :                 static_cast<int>(aWT.size()) < m_nMaxThreads)
     152             :             {
     153       39205 :                 bMustIncrementWaitingWorkerThreadsAfterSubmission = true;
     154       39205 :                 nWaitingWorkerThreads--;
     155             :             }
     156             :         }
     157       57253 :         if (!bMustIncrementWaitingWorkerThreadsAfterSubmission)
     158             :         {
     159             :             // otherwise there is a risk of deadlock, so execute synchronously.
     160       18107 :             task();
     161       18099 :             return true;
     162             :         }
     163             :     }
     164             : 
     165       55229 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     166             : 
     167       55350 :     if (bMustIncrementWaitingWorkerThreadsAfterSubmission)
     168       39223 :         nWaitingWorkerThreads++;
     169             : 
     170       55350 :     if (static_cast<int>(aWT.size()) < m_nMaxThreads)
     171             :     {
     172             :         // CPLDebug("CPL", "Starting new thread...");
     173        2208 :         auto wt = std::make_unique<CPLWorkerThread>();
     174        1104 :         wt->poTP = this;
     175             :         //ABELL - Why should this fail? And this is a *pool* thread, not necessarily
     176             :         //  tied to the submitted job. The submitted job still needs to run, even if
     177             :         //  this fails. If we can't create a thread, should the entire pool become invalid?
     178        1104 :         wt->hThread = CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
     179             :         /**
     180             :         if (!wt->hThread)
     181             :         {
     182             :             VSIFree(psJob);
     183             :             VSIFree(psItem);
     184             :             return false;
     185             :         }
     186             :         **/
     187        1104 :         if (wt->hThread)
     188        1104 :             aWT.emplace_back(std::move(wt));
     189             :     }
     190             : 
     191       55268 :     jobQueue.emplace(task);
     192       55308 :     nPendingJobs++;
     193             : 
     194       55308 :     if (psWaitingWorkerThreadsList)
     195             :     {
     196       49104 :         CPLWorkerThread *psWorkerThread =
     197       49104 :             static_cast<CPLWorkerThread *>(psWaitingWorkerThreadsList->pData);
     198             : 
     199       49104 :         CPLAssert(psWorkerThread->bMarkedAsWaiting);
     200       49104 :         psWorkerThread->bMarkedAsWaiting = false;
     201             : 
     202       49104 :         CPLList *psNext = psWaitingWorkerThreadsList->psNext;
     203       49104 :         CPLList *psToFree = psWaitingWorkerThreadsList;
     204       49104 :         psWaitingWorkerThreadsList = psNext;
     205       49104 :         nWaitingWorkerThreads--;
     206             : 
     207             : #if DEBUG_VERBOSE
     208             :         CPLDebug("JOB", "Waking up %p", psWorkerThread);
     209             : #endif
     210             : 
     211             : #ifdef __COVERITY__
     212             :         CPLError(CE_Failure, CPLE_AppDefined, "Not implemented");
     213             : #else
     214             :         {
     215       98318 :             std::lock_guard<std::mutex> oGuardWT(psWorkerThread->m_mutex);
     216             :             // coverity[uninit_use_in_call]
     217       49173 :             oGuard.unlock();
     218       49170 :             psWorkerThread->m_cv.notify_one();
     219             :         }
     220             : #endif
     221             : 
     222       49206 :         CPLFree(psToFree);
     223             :     }
     224             : 
     225             :     // coverity[double_unlock]
     226       55444 :     return true;
     227             : }
     228             : 
     229             : /************************************************************************/
     230             : /*                             SubmitJobs()                              */
     231             : /************************************************************************/
     232             : 
     233             : /** Queue several jobs
     234             :  *
     235             :  * @param pfnFunc Function to run for the job.
     236             :  * @param apData User data instances to pass to the job function.
     237             :  * @return true in case of success.
     238             :  */
     239         170 : bool CPLWorkerThreadPool::SubmitJobs(CPLThreadFunc pfnFunc,
     240             :                                      const std::vector<void *> &apData)
     241             : {
     242         170 :     if (apData.empty())
     243           0 :         return false;
     244             : 
     245             : #ifdef DEBUG
     246             :     {
     247         340 :         std::unique_lock<std::mutex> oGuard(m_mutex);
     248         170 :         CPLAssert(m_nMaxThreads > 0);
     249             :     }
     250             : #endif
     251             : 
     252         170 :     if (threadLocalCurrentThreadPool == this)
     253             :     {
     254             :         // If SubmitJob() is called from a worker thread of this queue,
     255             :         // then synchronously run the task to avoid deadlock.
     256           0 :         for (void *pData : apData)
     257           0 :             pfnFunc(pData);
     258           0 :         return true;
     259             :     }
     260             : 
     261         340 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     262             : 
     263        1604 :     for (void *pData : apData)
     264             :     {
     265        1434 :         if (static_cast<int>(aWT.size()) < m_nMaxThreads)
     266             :         {
     267           0 :             std::unique_ptr<CPLWorkerThread> wt(new CPLWorkerThread);
     268           0 :             wt->poTP = this;
     269           0 :             wt->hThread =
     270           0 :                 CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
     271           0 :             if (wt->hThread == nullptr)
     272             :             {
     273           0 :                 if (aWT.empty())
     274           0 :                     return false;
     275             :             }
     276             :             else
     277             :             {
     278           0 :                 aWT.emplace_back(std::move(wt));
     279             :             }
     280             :         }
     281             : 
     282        2863 :         jobQueue.emplace([=] { pfnFunc(pData); });
     283        1434 :         nPendingJobs++;
     284             :     }
     285             : 
     286         598 :     for (size_t i = 0; i < apData.size(); i++)
     287             :     {
     288         436 :         if (psWaitingWorkerThreadsList)
     289             :         {
     290             :             CPLWorkerThread *psWorkerThread;
     291             : 
     292         428 :             psWorkerThread = static_cast<CPLWorkerThread *>(
     293         428 :                 psWaitingWorkerThreadsList->pData);
     294             : 
     295         428 :             CPLAssert(psWorkerThread->bMarkedAsWaiting);
     296         428 :             psWorkerThread->bMarkedAsWaiting = false;
     297             : 
     298         428 :             CPLList *psNext = psWaitingWorkerThreadsList->psNext;
     299         428 :             CPLList *psToFree = psWaitingWorkerThreadsList;
     300         428 :             psWaitingWorkerThreadsList = psNext;
     301         428 :             nWaitingWorkerThreads--;
     302             : 
     303             : #if DEBUG_VERBOSE
     304             :             CPLDebug("JOB", "Waking up %p", psWorkerThread);
     305             : #endif
     306             :             {
     307         856 :                 std::lock_guard<std::mutex> oGuardWT(psWorkerThread->m_mutex);
     308             :                 // coverity[uninit_use_in_call]
     309         428 :                 oGuard.unlock();
     310         428 :                 psWorkerThread->m_cv.notify_one();
     311             :             }
     312             : 
     313         428 :             CPLFree(psToFree);
     314         428 :             oGuard.lock();
     315             :         }
     316             :         else
     317             :         {
     318           8 :             break;
     319             :         }
     320             :     }
     321             : 
     322         170 :     return true;
     323             : }
     324             : 
     325             : /************************************************************************/
     326             : /*                            WaitCompletion()                          */
     327             : /************************************************************************/
     328             : 
     329             : /** Wait for completion of part or whole jobs.
     330             :  *
     331             :  * @param nMaxRemainingJobs Maximum number of pendings jobs that are allowed
     332             :  *                          in the queue after this method has completed. Might
     333             :  * be 0 to wait for all jobs.
     334             :  */
     335        5726 : void CPLWorkerThreadPool::WaitCompletion(int nMaxRemainingJobs)
     336             : {
     337        5726 :     if (nMaxRemainingJobs < 0)
     338           0 :         nMaxRemainingJobs = 0;
     339       11452 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     340        5726 :     m_cv.wait(oGuard, [this, nMaxRemainingJobs]
     341        7352 :               { return nPendingJobs <= nMaxRemainingJobs; });
     342        5726 : }
     343             : 
     344             : /************************************************************************/
     345             : /*                            WaitEvent()                               */
     346             : /************************************************************************/
     347             : 
     348             : /** Wait for completion of at least one job, if there are any remaining,
     349             :  * or for WakeUpWaitEvent() to have been called.
     350             :  */
     351        1414 : void CPLWorkerThreadPool::WaitEvent()
     352             : {
     353             :     // NOTE - This isn't quite right. After nPendingJobsBefore is set but before
     354             :     // a notification occurs, jobs could be submitted which would increase
     355             :     // nPendingJobs, so a job completion may looks like a spurious wakeup.
     356        1414 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     357        1414 :     if (nPendingJobs == 0)
     358          45 :         return;
     359        1369 :     const int nPendingJobsBefore = nPendingJobs;
     360        1369 :     m_cv.wait(oGuard, [this, nPendingJobsBefore]
     361        2934 :               { return nPendingJobs < nPendingJobsBefore || m_bNotifyEvent; });
     362        1369 :     m_bNotifyEvent = false;
     363             : }
     364             : 
     365             : /************************************************************************/
     366             : /*                          WakeUpWaitEvent()                           */
     367             : /************************************************************************/
     368             : 
     369             : /** Wake-up WaitEvent().
     370             :  *
     371             :  * This method is thread-safe.
     372             :  *
     373             :  * @since GDAL 3.12
     374             :  */
     375         168 : void CPLWorkerThreadPool::WakeUpWaitEvent()
     376             : {
     377         336 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     378         168 :     m_bNotifyEvent = true;
     379         168 :     m_cv.notify_one();
     380         168 : }
     381             : 
     382             : /************************************************************************/
     383             : /*                                Setup()                               */
     384             : /************************************************************************/
     385             : 
     386             : /** Setup the pool.
     387             :  *
     388             :  * @param nThreads Number of threads to launch
     389             :  * @param pfnInitFunc Initialization function to run in each thread. May be NULL
     390             :  * @param pasInitData Array of initialization data. Its length must be nThreads,
     391             :  *                    or it should be NULL.
     392             :  * @return true if initialization was successful.
     393             :  */
     394         615 : bool CPLWorkerThreadPool::Setup(int nThreads, CPLThreadFunc pfnInitFunc,
     395             :                                 void **pasInitData)
     396             : {
     397         615 :     return Setup(nThreads, pfnInitFunc, pasInitData, true);
     398             : }
     399             : 
     400             : /** Setup the pool.
     401             :  *
     402             :  * @param nThreads Number of threads to launch
     403             :  * @param pfnInitFunc Initialization function to run in each thread. May be NULL
     404             :  * @param pasInitData Array of initialization data. Its length must be nThreads,
     405             :  *                    or it should be NULL.
     406             :  * @param bWaitallStarted Whether to wait for all threads to be fully started.
     407             :  * @return true if initialization was successful.
     408             :  */
     409         640 : bool CPLWorkerThreadPool::Setup(int nThreads, CPLThreadFunc pfnInitFunc,
     410             :                                 void **pasInitData, bool bWaitallStarted)
     411             : {
     412         640 :     CPLAssert(nThreads > 0);
     413             : 
     414        1281 :     if (nThreads > static_cast<int>(aWT.size()) && pfnInitFunc == nullptr &&
     415        1282 :         pasInitData == nullptr && !bWaitallStarted)
     416             :     {
     417          25 :         std::lock_guard<std::mutex> oGuard(m_mutex);
     418          25 :         if (nThreads > m_nMaxThreads)
     419          25 :             m_nMaxThreads = nThreads;
     420          25 :         return true;
     421             :     }
     422             : 
     423         616 :     bool bRet = true;
     424        3044 :     for (int i = static_cast<int>(aWT.size()); i < nThreads; i++)
     425             :     {
     426        2428 :         auto wt = std::make_unique<CPLWorkerThread>();
     427        2427 :         wt->pfnInitFunc = pfnInitFunc;
     428        2428 :         wt->pInitData = pasInitData ? pasInitData[i] : nullptr;
     429        2428 :         wt->poTP = this;
     430        2428 :         wt->hThread = CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
     431        2428 :         if (wt->hThread == nullptr)
     432             :         {
     433           0 :             nThreads = i;
     434           0 :             bRet = false;
     435           0 :             break;
     436             :         }
     437        2428 :         aWT.emplace_back(std::move(wt));
     438             :     }
     439             : 
     440             :     {
     441        1232 :         std::lock_guard<std::mutex> oGuard(m_mutex);
     442         616 :         if (nThreads > m_nMaxThreads)
     443         616 :             m_nMaxThreads = nThreads;
     444             :     }
     445             : 
     446         616 :     if (bWaitallStarted)
     447             :     {
     448             :         // Wait all threads to be started
     449        1232 :         std::unique_lock<std::mutex> oGuard(m_mutex);
     450        1713 :         while (nWaitingWorkerThreads < nThreads)
     451             :         {
     452        1097 :             m_cv.wait(oGuard);
     453             :         }
     454             :     }
     455             : 
     456         616 :     if (eState == CPLWTS_ERROR)
     457           0 :         bRet = false;
     458             : 
     459         616 :     return bRet;
     460             : }
     461             : 
     462             : /************************************************************************/
     463             : /*                          DeclareJobFinished()                        */
     464             : /************************************************************************/
     465             : 
     466       56762 : void CPLWorkerThreadPool::DeclareJobFinished()
     467             : {
     468      113603 :     std::lock_guard<std::mutex> oGuard(m_mutex);
     469       56824 :     nPendingJobs--;
     470       56824 :     m_cv.notify_one();
     471       56843 : }
     472             : 
     473             : /************************************************************************/
     474             : /*                             GetNextJob()                             */
     475             : /************************************************************************/
     476             : 
     477             : std::function<void()>
     478       60356 : CPLWorkerThreadPool::GetNextJob(CPLWorkerThread *psWorkerThread)
     479             : {
     480       60356 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     481             :     while (true)
     482             :     {
     483      112333 :         if (eState == CPLWTS_STOP)
     484       59336 :             return std::function<void()>();
     485             : 
     486      109841 :         if (jobQueue.size())
     487             :         {
     488             : #if DEBUG_VERBOSE
     489             :             CPLDebug("JOB", "%p got a job", psWorkerThread);
     490             : #endif
     491      113622 :             auto task = std::move(jobQueue.front());
     492       56777 :             jobQueue.pop();
     493       56746 :             return task;
     494             :         }
     495             : 
     496       53067 :         if (!psWorkerThread->bMarkedAsWaiting)
     497             :         {
     498       53052 :             psWorkerThread->bMarkedAsWaiting = true;
     499       53052 :             nWaitingWorkerThreads++;
     500             : 
     501             :             CPLList *psItem =
     502       53052 :                 static_cast<CPLList *>(VSI_MALLOC_VERBOSE(sizeof(CPLList)));
     503       53076 :             if (psItem == nullptr)
     504             :             {
     505           0 :                 eState = CPLWTS_ERROR;
     506           0 :                 m_cv.notify_one();
     507             : 
     508           0 :                 return nullptr;
     509             :             }
     510             : 
     511       53076 :             psItem->pData = psWorkerThread;
     512       53076 :             psItem->psNext = psWaitingWorkerThreadsList;
     513       53076 :             psWaitingWorkerThreadsList = psItem;
     514             : 
     515             : #if DEBUG_VERBOSE
     516             :             CPLAssert(CPLListCount(psWaitingWorkerThreadsList) ==
     517             :                       nWaitingWorkerThreads);
     518             : #endif
     519             :         }
     520             : 
     521       53091 :         m_cv.notify_one();
     522             : 
     523             : #if DEBUG_VERBOSE
     524             :         CPLDebug("JOB", "%p sleeping", psWorkerThread);
     525             : #endif
     526             : 
     527             : #ifdef __COVERITY__
     528             :         CPLError(CE_Failure, CPLE_AppDefined, "Not implemented");
     529             : #else
     530      105046 :         std::unique_lock<std::mutex> oGuardThisThread(psWorkerThread->m_mutex);
     531             :         // coverity[uninit_use_in_call]
     532       53060 :         oGuard.unlock();
     533             :         // coverity[wait_not_in_locked_loop]
     534       53072 :         psWorkerThread->m_cv.wait(oGuardThisThread);
     535             :         // coverity[lock_order]
     536       51970 :         oGuard.lock();
     537             : #endif
     538       51973 :     }
     539             : }
     540             : 
     541             : /************************************************************************/
     542             : /*                         CreateJobQueue()                             */
     543             : /************************************************************************/
     544             : 
     545             : /** Create a new job queue based on this worker thread pool.
     546             :  *
     547             :  * The worker thread pool must remain alive while the returned object is
     548             :  * itself alive.
     549             :  *
     550             :  * @since GDAL 3.2
     551             :  */
     552       29776 : std::unique_ptr<CPLJobQueue> CPLWorkerThreadPool::CreateJobQueue()
     553             : {
     554       29776 :     return std::unique_ptr<CPLJobQueue>(new CPLJobQueue(this));
     555             : }
     556             : 
     557             : /************************************************************************/
     558             : /*                            CPLJobQueue()                             */
     559             : /************************************************************************/
     560             : 
     561             : //! @cond Doxygen_Suppress
     562       29756 : CPLJobQueue::CPLJobQueue(CPLWorkerThreadPool *poPool) : m_poPool(poPool)
     563             : {
     564       29793 : }
     565             : 
     566             : //! @endcond
     567             : 
     568             : /************************************************************************/
     569             : /*                           ~CPLJobQueue()                             */
     570             : /************************************************************************/
     571             : 
     572       29823 : CPLJobQueue::~CPLJobQueue()
     573             : {
     574       29824 :     WaitCompletion();
     575       29818 : }
     576             : 
     577             : /************************************************************************/
     578             : /*                          DeclareJobFinished()                        */
     579             : /************************************************************************/
     580             : 
     581       66074 : void CPLJobQueue::DeclareJobFinished()
     582             : {
     583      132205 :     std::lock_guard<std::mutex> oGuard(m_mutex);
     584       66044 :     m_nPendingJobs--;
     585       66044 :     m_cv.notify_one();
     586       66080 : }
     587             : 
     588             : /************************************************************************/
     589             : /*                             SubmitJob()                              */
     590             : /************************************************************************/
     591             : 
     592             : /** Queue a new job.
     593             :  *
     594             :  * @param pfnFunc Function to run for the job.
     595             :  * @param pData User data to pass to the job function.
     596             :  * @return true in case of success.
     597             :  */
     598        6765 : bool CPLJobQueue::SubmitJob(CPLThreadFunc pfnFunc, void *pData)
     599             : {
     600       13526 :     return SubmitJob([=] { pfnFunc(pData); });
     601             : }
     602             : 
     603             : /** Queue a new job.
     604             :  *
     605             :  * @param task  Task to execute.
     606             :  * @return true in case of success.
     607             :  */
     608       65980 : bool CPLJobQueue::SubmitJob(std::function<void()> task)
     609             : {
     610             :     {
     611       65980 :         std::lock_guard<std::mutex> oGuard(m_mutex);
     612       65980 :         m_nPendingJobs++;
     613             :     }
     614             : 
     615             :     // coverity[uninit_member,copy_constructor_call]
     616      197995 :     const auto lambda = [this, capturedTask = std::move(task)]
     617             :     {
     618       65973 :         capturedTask();
     619       66044 :         DeclareJobFinished();
     620       65978 :     };
     621             :     // cppcheck-suppress knownConditionTrueFalse
     622      132025 :     return m_poPool->SubmitJob(std::move(lambda));
     623             : }
     624             : 
     625             : /************************************************************************/
     626             : /*                            WaitCompletion()                          */
     627             : /************************************************************************/
     628             : 
     629             : /** Wait for completion of part or whole jobs.
     630             :  *
     631             :  * @param nMaxRemainingJobs Maximum number of pendings jobs that are allowed
     632             :  *                          in the queue after this method has completed. Might
     633             :  * be 0 to wait for all jobs.
     634             :  */
     635       59462 : void CPLJobQueue::WaitCompletion(int nMaxRemainingJobs)
     636             : {
     637      118908 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     638       59467 :     m_cv.wait(oGuard, [this, nMaxRemainingJobs]
     639       80461 :               { return m_nPendingJobs <= nMaxRemainingJobs; });
     640       59461 : }
     641             : 
     642             : /************************************************************************/
     643             : /*                             WaitEvent()                              */
     644             : /************************************************************************/
     645             : 
     646             : /** Wait for completion for at least one job.
     647             :  *
     648             :  * @return true if there are remaining jobs.
     649             :  */
     650         242 : bool CPLJobQueue::WaitEvent()
     651             : {
     652             :     // NOTE - This isn't quite right. After nPendingJobsBefore is set but before
     653             :     // a notification occurs, jobs could be submitted which would increase
     654             :     // nPendingJobs, so a job completion may looks like a spurious wakeup.
     655         484 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     656         242 :     if (m_nPendingJobs == 0)
     657           2 :         return false;
     658             : 
     659         240 :     const int nPendingJobsBefore = m_nPendingJobs;
     660         240 :     m_cv.wait(oGuard, [this, nPendingJobsBefore]
     661         480 :               { return m_nPendingJobs < nPendingJobsBefore; });
     662         240 :     return m_nPendingJobs > 0;
     663             : }

Generated by: LCOV version 1.14