LCOV - code coverage report
Current view: top level - port - cpl_worker_thread_pool.cpp (source / functions) Hit Total Coverage
Test: gdal_filtered.info Lines: 200 221 90.5 %
Date: 2025-05-24 03:54:53 Functions: 30 30 100.0 %

          Line data    Source code
       1             : /**********************************************************************
       2             :  *
       3             :  * Project:  CPL - Common Portability Library
       4             :  * Purpose:  CPL worker thread pool
       5             :  * Author:   Even Rouault, <even dot rouault at spatialys dot com>
       6             :  *
       7             :  **********************************************************************
       8             :  * Copyright (c) 2015, Even Rouault, <even dot rouault at spatialys dot com>
       9             :  *
      10             :  * SPDX-License-Identifier: MIT
      11             :  ****************************************************************************/
      12             : 
      13             : #include "cpl_port.h"
      14             : #include "cpl_worker_thread_pool.h"
      15             : 
      16             : #include <cstddef>
      17             : #include <memory>
      18             : 
      19             : #include "cpl_conv.h"
      20             : #include "cpl_error.h"
      21             : #include "cpl_vsi.h"
      22             : 
      23             : static thread_local CPLWorkerThreadPool *threadLocalCurrentThreadPool = nullptr;
      24             : 
      25             : /************************************************************************/
      26             : /*                         CPLWorkerThreadPool()                        */
      27             : /************************************************************************/
      28             : 
      29             : /** Instantiate a new pool of worker threads.
      30             :  *
      31             :  * The pool is in an uninitialized state after this call. The Setup() method
      32             :  * must be called.
      33             :  */
      34         747 : CPLWorkerThreadPool::CPLWorkerThreadPool() : jobQueue{}
      35             : {
      36         747 : }
      37             : 
      38             : /** Instantiate a new pool of worker threads.
      39             :  *
      40             :  * \param nThreads  Number of threads in the pool.
      41             :  */
      42         234 : CPLWorkerThreadPool::CPLWorkerThreadPool(int nThreads) : jobQueue{}
      43             : {
      44         234 :     Setup(nThreads, nullptr, nullptr);
      45         234 : }
      46             : 
      47             : /************************************************************************/
      48             : /*                          ~CPLWorkerThreadPool()                      */
      49             : /************************************************************************/
      50             : 
      51             : /** Destroys a pool of worker threads.
      52             :  *
      53             :  * Any still pending job will be completed before the destructor returns.
      54             :  */
      55         976 : CPLWorkerThreadPool::~CPLWorkerThreadPool()
      56             : {
      57         976 :     WaitCompletion();
      58             : 
      59             :     {
      60         976 :         std::lock_guard<std::mutex> oGuard(m_mutex);
      61         976 :         eState = CPLWTS_STOP;
      62             :     }
      63             : 
      64        3484 :     for (auto &wt : aWT)
      65             :     {
      66             :         {
      67        5016 :             std::lock_guard<std::mutex> oGuard(wt->m_mutex);
      68        2508 :             wt->m_cv.notify_one();
      69             :         }
      70        2508 :         CPLJoinThread(wt->hThread);
      71             :     }
      72             : 
      73         976 :     CPLListDestroy(psWaitingWorkerThreadsList);
      74         976 : }
      75             : 
      76             : /************************************************************************/
      77             : /*                        GetThreadCount()                              */
      78             : /************************************************************************/
      79             : 
      80        1234 : int CPLWorkerThreadPool::GetThreadCount() const
      81             : {
      82        1234 :     std::unique_lock<std::mutex> oGuard(m_mutex);
      83        2468 :     return m_nMaxThreads;
      84             : }
      85             : 
      86             : /************************************************************************/
      87             : /*                       WorkerThreadFunction()                         */
      88             : /************************************************************************/
      89             : 
      90        3546 : void CPLWorkerThreadPool::WorkerThreadFunction(void *user_data)
      91             : {
      92        3546 :     CPLWorkerThread *psWT = static_cast<CPLWorkerThread *>(user_data);
      93        3546 :     CPLWorkerThreadPool *poTP = psWT->poTP;
      94             : 
      95        3546 :     threadLocalCurrentThreadPool = poTP;
      96             : 
      97        3546 :     if (psWT->pfnInitFunc)
      98           0 :         psWT->pfnInitFunc(psWT->pInitData);
      99             : 
     100             :     while (true)
     101             :     {
     102       65719 :         std::function<void()> task = poTP->GetNextJob(psWT);
     103       64728 :         if (!task)
     104        2508 :             break;
     105             : 
     106       62100 :         task();
     107             : #if DEBUG_VERBOSE
     108             :         CPLDebug("JOB", "%p finished a job", psWT);
     109             : #endif
     110       62234 :         poTP->DeclareJobFinished();
     111       62173 :     }
     112        2540 : }
     113             : 
     114             : /************************************************************************/
     115             : /*                             SubmitJob()                              */
     116             : /************************************************************************/
     117             : 
     118             : /** Queue a new job.
     119             :  *
     120             :  * @param pfnFunc Function to run for the job.
     121             :  * @param pData User data to pass to the job function.
     122             :  * @return true in case of success.
     123             :  */
     124        7358 : bool CPLWorkerThreadPool::SubmitJob(CPLThreadFunc pfnFunc, void *pData)
     125             : {
     126       14709 :     return SubmitJob([=] { pfnFunc(pData); });
     127             : }
     128             : 
     129             : /** Queue a new job.
     130             :  *
     131             :  * @param task  Void function to execute.
     132             :  * @return true in case of success.
     133             :  */
     134       73537 : bool CPLWorkerThreadPool::SubmitJob(std::function<void()> task)
     135             : {
     136             : #ifdef DEBUG
     137             :     {
     138      146968 :         std::unique_lock<std::mutex> oGuard(m_mutex);
     139       73431 :         CPLAssert(m_nMaxThreads > 0);
     140             :     }
     141             : #endif
     142             : 
     143       73454 :     bool bMustIncrementWaitingWorkerThreadsAfterSubmission = false;
     144       73454 :     if (threadLocalCurrentThreadPool == this)
     145             :     {
     146             :         // If there are waiting threads or we have not started all allowed
     147             :         // threads, we can submit this job asynchronously
     148             :         {
     149      114273 :             std::unique_lock<std::mutex> oGuard(m_mutex);
     150       69889 :             if (nWaitingWorkerThreads > 0 ||
     151       12738 :                 static_cast<int>(aWT.size()) < m_nMaxThreads)
     152             :             {
     153       44413 :                 bMustIncrementWaitingWorkerThreadsAfterSubmission = true;
     154       44413 :                 nWaitingWorkerThreads--;
     155             :             }
     156             :         }
     157       57156 :         if (!bMustIncrementWaitingWorkerThreadsAfterSubmission)
     158             :         {
     159             :             // otherwise there is a risk of deadlock, so execute synchronously.
     160       12742 :             task();
     161       12738 :             return true;
     162             :         }
     163             :     }
     164             : 
     165       60746 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     166             : 
     167       60833 :     if (bMustIncrementWaitingWorkerThreadsAfterSubmission)
     168       44488 :         nWaitingWorkerThreads++;
     169             : 
     170       60833 :     if (static_cast<int>(aWT.size()) < m_nMaxThreads)
     171             :     {
     172             :         // CPLDebug("CPL", "Starting new thread...");
     173        2208 :         auto wt = std::make_unique<CPLWorkerThread>();
     174        1104 :         wt->poTP = this;
     175             :         //ABELL - Why should this fail? And this is a *pool* thread, not necessarily
     176             :         //  tied to the submitted job. The submitted job still needs to run, even if
     177             :         //  this fails. If we can't create a thread, should the entire pool become invalid?
     178        1104 :         wt->hThread = CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
     179             :         /**
     180             :         if (!wt->hThread)
     181             :         {
     182             :             VSIFree(psJob);
     183             :             VSIFree(psItem);
     184             :             return false;
     185             :         }
     186             :         **/
     187        1104 :         if (wt->hThread)
     188        1104 :             aWT.emplace_back(std::move(wt));
     189             :     }
     190             : 
     191       60753 :     jobQueue.emplace(task);
     192       60736 :     nPendingJobs++;
     193             : 
     194       60736 :     if (psWaitingWorkerThreadsList)
     195             :     {
     196       54155 :         CPLWorkerThread *psWorkerThread =
     197       54155 :             static_cast<CPLWorkerThread *>(psWaitingWorkerThreadsList->pData);
     198             : 
     199       54155 :         CPLAssert(psWorkerThread->bMarkedAsWaiting);
     200       54155 :         psWorkerThread->bMarkedAsWaiting = false;
     201             : 
     202       54155 :         CPLList *psNext = psWaitingWorkerThreadsList->psNext;
     203       54155 :         CPLList *psToFree = psWaitingWorkerThreadsList;
     204       54155 :         psWaitingWorkerThreadsList = psNext;
     205       54155 :         nWaitingWorkerThreads--;
     206             : 
     207             : #if DEBUG_VERBOSE
     208             :         CPLDebug("JOB", "Waking up %p", psWorkerThread);
     209             : #endif
     210             : 
     211             :         {
     212      108420 :             std::lock_guard<std::mutex> oGuardWT(psWorkerThread->m_mutex);
     213             :             // coverity[uninit_use_in_call]
     214       54282 :             oGuard.unlock();
     215       54276 :             psWorkerThread->m_cv.notify_one();
     216             :         }
     217             : 
     218       54279 :         CPLFree(psToFree);
     219             :     }
     220             : 
     221             :     // coverity[double_unlock]
     222       60942 :     return true;
     223             : }
     224             : 
     225             : /************************************************************************/
     226             : /*                             SubmitJobs()                              */
     227             : /************************************************************************/
     228             : 
     229             : /** Queue several jobs
     230             :  *
     231             :  * @param pfnFunc Function to run for the job.
     232             :  * @param apData User data instances to pass to the job function.
     233             :  * @return true in case of success.
     234             :  */
     235         156 : bool CPLWorkerThreadPool::SubmitJobs(CPLThreadFunc pfnFunc,
     236             :                                      const std::vector<void *> &apData)
     237             : {
     238         156 :     if (apData.empty())
     239           0 :         return false;
     240             : 
     241             : #ifdef DEBUG
     242             :     {
     243         312 :         std::unique_lock<std::mutex> oGuard(m_mutex);
     244         156 :         CPLAssert(m_nMaxThreads > 0);
     245             :     }
     246             : #endif
     247             : 
     248         156 :     if (threadLocalCurrentThreadPool == this)
     249             :     {
     250             :         // If SubmitJob() is called from a worker thread of this queue,
     251             :         // then synchronously run the task to avoid deadlock.
     252           0 :         for (void *pData : apData)
     253           0 :             pfnFunc(pData);
     254           0 :         return true;
     255             :     }
     256             : 
     257         312 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     258             : 
     259        1534 :     for (void *pData : apData)
     260             :     {
     261        1378 :         if (static_cast<int>(aWT.size()) < m_nMaxThreads)
     262             :         {
     263           0 :             std::unique_ptr<CPLWorkerThread> wt(new CPLWorkerThread);
     264           0 :             wt->poTP = this;
     265           0 :             wt->hThread =
     266           0 :                 CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
     267           0 :             if (wt->hThread == nullptr)
     268             :             {
     269           0 :                 if (aWT.empty())
     270           0 :                     return false;
     271             :             }
     272             :             else
     273             :             {
     274           0 :                 aWT.emplace_back(std::move(wt));
     275             :             }
     276             :         }
     277             : 
     278        2753 :         jobQueue.emplace([=] { pfnFunc(pData); });
     279        1378 :         nPendingJobs++;
     280             :     }
     281             : 
     282         534 :     for (size_t i = 0; i < apData.size(); i++)
     283             :     {
     284         382 :         if (psWaitingWorkerThreadsList)
     285             :         {
     286             :             CPLWorkerThread *psWorkerThread;
     287             : 
     288         378 :             psWorkerThread = static_cast<CPLWorkerThread *>(
     289         378 :                 psWaitingWorkerThreadsList->pData);
     290             : 
     291         378 :             CPLAssert(psWorkerThread->bMarkedAsWaiting);
     292         378 :             psWorkerThread->bMarkedAsWaiting = false;
     293             : 
     294         378 :             CPLList *psNext = psWaitingWorkerThreadsList->psNext;
     295         378 :             CPLList *psToFree = psWaitingWorkerThreadsList;
     296         378 :             psWaitingWorkerThreadsList = psNext;
     297         378 :             nWaitingWorkerThreads--;
     298             : 
     299             : #if DEBUG_VERBOSE
     300             :             CPLDebug("JOB", "Waking up %p", psWorkerThread);
     301             : #endif
     302             :             {
     303         756 :                 std::lock_guard<std::mutex> oGuardWT(psWorkerThread->m_mutex);
     304             :                 // coverity[uninit_use_in_call]
     305         378 :                 oGuard.unlock();
     306         378 :                 psWorkerThread->m_cv.notify_one();
     307             :             }
     308             : 
     309         378 :             CPLFree(psToFree);
     310         378 :             oGuard.lock();
     311             :         }
     312             :         else
     313             :         {
     314           4 :             break;
     315             :         }
     316             :     }
     317             : 
     318         156 :     return true;
     319             : }
     320             : 
     321             : /************************************************************************/
     322             : /*                            WaitCompletion()                          */
     323             : /************************************************************************/
     324             : 
     325             : /** Wait for completion of part or whole jobs.
     326             :  *
     327             :  * @param nMaxRemainingJobs Maximum number of pendings jobs that are allowed
     328             :  *                          in the queue after this method has completed. Might
     329             :  * be 0 to wait for all jobs.
     330             :  */
     331        5708 : void CPLWorkerThreadPool::WaitCompletion(int nMaxRemainingJobs)
     332             : {
     333        5708 :     if (nMaxRemainingJobs < 0)
     334           0 :         nMaxRemainingJobs = 0;
     335       11416 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     336        5708 :     m_cv.wait(oGuard, [this, nMaxRemainingJobs]
     337        7039 :               { return nPendingJobs <= nMaxRemainingJobs; });
     338        5708 : }
     339             : 
     340             : /************************************************************************/
     341             : /*                            WaitEvent()                               */
     342             : /************************************************************************/
     343             : 
     344             : /** Wait for completion of at least one job, if there are any remaining
     345             :  */
     346        1396 : void CPLWorkerThreadPool::WaitEvent()
     347             : {
     348             :     // NOTE - This isn't quite right. After nPendingJobsBefore is set but before
     349             :     // a notification occurs, jobs could be submitted which would increase
     350             :     // nPendingJobs, so a job completion may looks like a spurious wakeup.
     351        1396 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     352        1396 :     if (nPendingJobs == 0)
     353          35 :         return;
     354        1361 :     const int nPendingJobsBefore = nPendingJobs;
     355        1361 :     m_cv.wait(oGuard, [this, nPendingJobsBefore]
     356        2908 :               { return nPendingJobs < nPendingJobsBefore; });
     357             : }
     358             : 
     359             : /************************************************************************/
     360             : /*                                Setup()                               */
     361             : /************************************************************************/
     362             : 
     363             : /** Setup the pool.
     364             :  *
     365             :  * @param nThreads Number of threads to launch
     366             :  * @param pfnInitFunc Initialization function to run in each thread. May be NULL
     367             :  * @param pasInitData Array of initialization data. Its length must be nThreads,
     368             :  *                    or it should be NULL.
     369             :  * @return true if initialization was successful.
     370             :  */
     371         622 : bool CPLWorkerThreadPool::Setup(int nThreads, CPLThreadFunc pfnInitFunc,
     372             :                                 void **pasInitData)
     373             : {
     374         622 :     return Setup(nThreads, pfnInitFunc, pasInitData, true);
     375             : }
     376             : 
     377             : /** Setup the pool.
     378             :  *
     379             :  * @param nThreads Number of threads to launch
     380             :  * @param pfnInitFunc Initialization function to run in each thread. May be NULL
     381             :  * @param pasInitData Array of initialization data. Its length must be nThreads,
     382             :  *                    or it should be NULL.
     383             :  * @param bWaitallStarted Whether to wait for all threads to be fully started.
     384             :  * @return true if initialization was successful.
     385             :  */
     386         647 : bool CPLWorkerThreadPool::Setup(int nThreads, CPLThreadFunc pfnInitFunc,
     387             :                                 void **pasInitData, bool bWaitallStarted)
     388             : {
     389         647 :     CPLAssert(nThreads > 0);
     390             : 
     391        1295 :     if (nThreads > static_cast<int>(aWT.size()) && pfnInitFunc == nullptr &&
     392        1295 :         pasInitData == nullptr && !bWaitallStarted)
     393             :     {
     394          25 :         std::lock_guard<std::mutex> oGuard(m_mutex);
     395          25 :         if (nThreads > m_nMaxThreads)
     396          25 :             m_nMaxThreads = nThreads;
     397          25 :         return true;
     398             :     }
     399             : 
     400         622 :     bool bRet = true;
     401        3066 :     for (int i = static_cast<int>(aWT.size()); i < nThreads; i++)
     402             :     {
     403        2443 :         auto wt = std::make_unique<CPLWorkerThread>();
     404        2442 :         wt->pfnInitFunc = pfnInitFunc;
     405        2443 :         wt->pInitData = pasInitData ? pasInitData[i] : nullptr;
     406        2442 :         wt->poTP = this;
     407        2443 :         wt->hThread = CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
     408        2443 :         if (wt->hThread == nullptr)
     409             :         {
     410           0 :             nThreads = i;
     411           0 :             bRet = false;
     412           0 :             break;
     413             :         }
     414        2443 :         aWT.emplace_back(std::move(wt));
     415             :     }
     416             : 
     417             :     {
     418        1246 :         std::lock_guard<std::mutex> oGuard(m_mutex);
     419         623 :         if (nThreads > m_nMaxThreads)
     420         623 :             m_nMaxThreads = nThreads;
     421             :     }
     422             : 
     423         623 :     if (bWaitallStarted)
     424             :     {
     425             :         // Wait all threads to be started
     426        1246 :         std::unique_lock<std::mutex> oGuard(m_mutex);
     427        1494 :         while (nWaitingWorkerThreads < nThreads)
     428             :         {
     429         871 :             m_cv.wait(oGuard);
     430             :         }
     431             :     }
     432             : 
     433         623 :     if (eState == CPLWTS_ERROR)
     434           0 :         bRet = false;
     435             : 
     436         623 :     return bRet;
     437             : }
     438             : 
     439             : /************************************************************************/
     440             : /*                          DeclareJobFinished()                        */
     441             : /************************************************************************/
     442             : 
     443       62227 : void CPLWorkerThreadPool::DeclareJobFinished()
     444             : {
     445      124486 :     std::lock_guard<std::mutex> oGuard(m_mutex);
     446       62241 :     nPendingJobs--;
     447       62241 :     m_cv.notify_one();
     448       62258 : }
     449             : 
     450             : /************************************************************************/
     451             : /*                             GetNextJob()                             */
     452             : /************************************************************************/
     453             : 
     454             : std::function<void()>
     455       65748 : CPLWorkerThreadPool::GetNextJob(CPLWorkerThread *psWorkerThread)
     456             : {
     457       65748 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     458             :     while (true)
     459             :     {
     460      122846 :         if (eState == CPLWTS_STOP)
     461       64772 :             return std::function<void()>();
     462             : 
     463      120338 :         if (jobQueue.size())
     464             :         {
     465             : #if DEBUG_VERBOSE
     466             :             CPLDebug("JOB", "%p got a job", psWorkerThread);
     467             : #endif
     468      124403 :             auto task = std::move(jobQueue.front());
     469       62193 :             jobQueue.pop();
     470       62128 :             return task;
     471             :         }
     472             : 
     473       58223 :         if (!psWorkerThread->bMarkedAsWaiting)
     474             :         {
     475       58152 :             psWorkerThread->bMarkedAsWaiting = true;
     476       58152 :             nWaitingWorkerThreads++;
     477             : 
     478             :             CPLList *psItem =
     479       58152 :                 static_cast<CPLList *>(VSI_MALLOC_VERBOSE(sizeof(CPLList)));
     480       58187 :             if (psItem == nullptr)
     481             :             {
     482           0 :                 eState = CPLWTS_ERROR;
     483           0 :                 m_cv.notify_one();
     484             : 
     485           0 :                 return nullptr;
     486             :             }
     487             : 
     488       58187 :             psItem->pData = psWorkerThread;
     489       58187 :             psItem->psNext = psWaitingWorkerThreadsList;
     490       58187 :             psWaitingWorkerThreadsList = psItem;
     491             : 
     492             : #if DEBUG_VERBOSE
     493             :             CPLAssert(CPLListCount(psWaitingWorkerThreadsList) ==
     494             :                       nWaitingWorkerThreads);
     495             : #endif
     496             :         }
     497             : 
     498       58258 :         m_cv.notify_one();
     499             : 
     500             : #if DEBUG_VERBOSE
     501             :         CPLDebug("JOB", "%p sleeping", psWorkerThread);
     502             : #endif
     503             : 
     504      115256 :         std::unique_lock<std::mutex> oGuardThisThread(psWorkerThread->m_mutex);
     505             :         // coverity[uninit_use_in_call]
     506       58190 :         oGuard.unlock();
     507             :         // coverity[wait_not_in_locked_loop]
     508       58149 :         psWorkerThread->m_cv.wait(oGuardThisThread);
     509       57098 :         oGuard.lock();
     510       57105 :     }
     511             : }
     512             : 
     513             : /************************************************************************/
     514             : /*                         CreateJobQueue()                             */
     515             : /************************************************************************/
     516             : 
     517             : /** Create a new job queue based on this worker thread pool.
     518             :  *
     519             :  * The worker thread pool must remain alive while the returned object is
     520             :  * itself alive.
     521             :  *
     522             :  * @since GDAL 3.2
     523             :  */
     524       29704 : std::unique_ptr<CPLJobQueue> CPLWorkerThreadPool::CreateJobQueue()
     525             : {
     526       29704 :     return std::unique_ptr<CPLJobQueue>(new CPLJobQueue(this));
     527             : }
     528             : 
     529             : /************************************************************************/
     530             : /*                            CPLJobQueue()                             */
     531             : /************************************************************************/
     532             : 
     533             : //! @cond Doxygen_Suppress
     534       29693 : CPLJobQueue::CPLJobQueue(CPLWorkerThreadPool *poPool) : m_poPool(poPool)
     535             : {
     536       29682 : }
     537             : 
     538             : //! @endcond
     539             : 
     540             : /************************************************************************/
     541             : /*                           ~CPLJobQueue()                             */
     542             : /************************************************************************/
     543             : 
     544       29764 : CPLJobQueue::~CPLJobQueue()
     545             : {
     546       29768 :     WaitCompletion();
     547       29764 : }
     548             : 
     549             : /************************************************************************/
     550             : /*                          DeclareJobFinished()                        */
     551             : /************************************************************************/
     552             : 
     553       65895 : void CPLJobQueue::DeclareJobFinished()
     554             : {
     555      131904 :     std::lock_guard<std::mutex> oGuard(m_mutex);
     556       65894 :     m_nPendingJobs--;
     557       65894 :     m_cv.notify_one();
     558       65928 : }
     559             : 
     560             : /************************************************************************/
     561             : /*                             SubmitJob()                              */
     562             : /************************************************************************/
     563             : 
     564             : /** Queue a new job.
     565             :  *
     566             :  * @param pfnFunc Function to run for the job.
     567             :  * @param pData User data to pass to the job function.
     568             :  * @return true in case of success.
     569             :  */
     570        6765 : bool CPLJobQueue::SubmitJob(CPLThreadFunc pfnFunc, void *pData)
     571             : {
     572       13522 :     return SubmitJob([=] { pfnFunc(pData); });
     573             : }
     574             : 
     575             : /** Queue a new job.
     576             :  *
     577             :  * @param task  Task to execute.
     578             :  * @return true in case of success.
     579             :  */
     580       65881 : bool CPLJobQueue::SubmitJob(std::function<void()> task)
     581             : {
     582             :     {
     583       65881 :         std::lock_guard<std::mutex> oGuard(m_mutex);
     584       65812 :         m_nPendingJobs++;
     585             :     }
     586             : 
     587             :     // coverity[uninit_member,copy_constructor_call]
     588      131612 :     const auto lambda = [this, task]
     589             :     {
     590       65773 :         task();
     591       65839 :         DeclareJobFinished();
     592       65801 :     };
     593             :     // cppcheck-suppress knownConditionTrueFalse
     594      131695 :     return m_poPool->SubmitJob(lambda);
     595             : }
     596             : 
     597             : /************************************************************************/
     598             : /*                            WaitCompletion()                          */
     599             : /************************************************************************/
     600             : 
     601             : /** Wait for completion of part or whole jobs.
     602             :  *
     603             :  * @param nMaxRemainingJobs Maximum number of pendings jobs that are allowed
     604             :  *                          in the queue after this method has completed. Might
     605             :  * be 0 to wait for all jobs.
     606             :  */
     607       59342 : void CPLJobQueue::WaitCompletion(int nMaxRemainingJobs)
     608             : {
     609      118652 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     610       59334 :     m_cv.wait(oGuard, [this, nMaxRemainingJobs]
     611       77607 :               { return m_nPendingJobs <= nMaxRemainingJobs; });
     612       59340 : }
     613             : 
     614             : /************************************************************************/
     615             : /*                             WaitEvent()                              */
     616             : /************************************************************************/
     617             : 
     618             : /** Wait for completion for at least one job.
     619             :  *
     620             :  * @return true if there are remaining jobs.
     621             :  */
     622         216 : bool CPLJobQueue::WaitEvent()
     623             : {
     624             :     // NOTE - This isn't quite right. After nPendingJobsBefore is set but before
     625             :     // a notification occurs, jobs could be submitted which would increase
     626             :     // nPendingJobs, so a job completion may looks like a spurious wakeup.
     627         432 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     628         216 :     if (m_nPendingJobs == 0)
     629           1 :         return false;
     630             : 
     631         215 :     const int nPendingJobsBefore = m_nPendingJobs;
     632         215 :     m_cv.wait(oGuard, [this, nPendingJobsBefore]
     633         430 :               { return m_nPendingJobs < nPendingJobsBefore; });
     634         215 :     return m_nPendingJobs > 0;
     635             : }

Generated by: LCOV version 1.14