LCOV - code coverage report
Current view: top level - port - cpl_worker_thread_pool.cpp (source / functions) Hit Total Coverage
Test: gdal_filtered.info Lines: 199 220 90.5 %
Date: 2024-11-21 22:18:42 Functions: 30 30 100.0 %

          Line data    Source code
       1             : /**********************************************************************
       2             :  *
       3             :  * Project:  CPL - Common Portability Library
       4             :  * Purpose:  CPL worker thread pool
       5             :  * Author:   Even Rouault, <even dot rouault at spatialys dot com>
       6             :  *
       7             :  **********************************************************************
       8             :  * Copyright (c) 2015, Even Rouault, <even dot rouault at spatialys dot com>
       9             :  *
      10             :  * SPDX-License-Identifier: MIT
      11             :  ****************************************************************************/
      12             : 
      13             : #include "cpl_port.h"
      14             : #include "cpl_worker_thread_pool.h"
      15             : 
      16             : #include <cstddef>
      17             : #include <memory>
      18             : 
      19             : #include "cpl_conv.h"
      20             : #include "cpl_error.h"
      21             : #include "cpl_vsi.h"
      22             : 
      23             : static thread_local CPLWorkerThreadPool *threadLocalCurrentThreadPool = nullptr;
      24             : 
      25             : /************************************************************************/
      26             : /*                         CPLWorkerThreadPool()                        */
      27             : /************************************************************************/
      28             : 
      29             : /** Instantiate a new pool of worker threads.
      30             :  *
      31             :  * The pool is in an uninitialized state after this call. The Setup() method
      32             :  * must be called.
      33             :  */
      34         598 : CPLWorkerThreadPool::CPLWorkerThreadPool() : jobQueue{}
      35             : {
      36         598 : }
      37             : 
      38             : /** Instantiate a new pool of worker threads.
      39             :  *
      40             :  * \param nThreads  Number of threads in the pool.
      41             :  */
      42         625 : CPLWorkerThreadPool::CPLWorkerThreadPool(int nThreads) : jobQueue{}
      43             : {
      44         624 :     Setup(nThreads, nullptr, nullptr);
      45         625 : }
      46             : 
      47             : /************************************************************************/
      48             : /*                          ~CPLWorkerThreadPool()                      */
      49             : /************************************************************************/
      50             : 
      51             : /** Destroys a pool of worker threads.
      52             :  *
      53             :  * Any still pending job will be completed before the destructor returns.
      54             :  */
      55        1218 : CPLWorkerThreadPool::~CPLWorkerThreadPool()
      56             : {
      57        1218 :     WaitCompletion();
      58             : 
      59             :     {
      60        1218 :         std::lock_guard<std::mutex> oGuard(m_mutex);
      61        1218 :         eState = CPLWTS_STOP;
      62             :     }
      63             : 
      64        4838 :     for (auto &wt : aWT)
      65             :     {
      66             :         {
      67        7239 :             std::lock_guard<std::mutex> oGuard(wt->m_mutex);
      68        3619 :             wt->m_cv.notify_one();
      69             :         }
      70        3619 :         CPLJoinThread(wt->hThread);
      71             :     }
      72             : 
      73        1218 :     CPLListDestroy(psWaitingWorkerThreadsList);
      74        1218 : }
      75             : 
      76             : /************************************************************************/
      77             : /*                        GetThreadCount()                              */
      78             : /************************************************************************/
      79             : 
      80        1038 : int CPLWorkerThreadPool::GetThreadCount() const
      81             : {
      82        1038 :     std::unique_lock<std::mutex> oGuard(m_mutex);
      83        2076 :     return m_nMaxThreads;
      84             : }
      85             : 
      86             : /************************************************************************/
      87             : /*                       WorkerThreadFunction()                         */
      88             : /************************************************************************/
      89             : 
      90        3642 : void CPLWorkerThreadPool::WorkerThreadFunction(void *user_data)
      91             : {
      92        3642 :     CPLWorkerThread *psWT = static_cast<CPLWorkerThread *>(user_data);
      93        3642 :     CPLWorkerThreadPool *poTP = psWT->poTP;
      94             : 
      95        3642 :     threadLocalCurrentThreadPool = poTP;
      96             : 
      97        3642 :     if (psWT->pfnInitFunc)
      98           0 :         psWT->pfnInitFunc(psWT->pInitData);
      99             : 
     100             :     while (true)
     101             :     {
     102      121772 :         std::function<void()> task = poTP->GetNextJob(psWT);
     103      121864 :         if (!task)
     104        3619 :             break;
     105             : 
     106      118005 :         task();
     107             : #if DEBUG_VERBOSE
     108             :         CPLDebug("JOB", "%p finished a job", psWT);
     109             : #endif
     110      118225 :         poTP->DeclareJobFinished();
     111      118130 :     }
     112        3697 : }
     113             : 
     114             : /************************************************************************/
     115             : /*                             SubmitJob()                              */
     116             : /************************************************************************/
     117             : 
     118             : /** Queue a new job.
     119             :  *
     120             :  * @param pfnFunc Function to run for the job.
     121             :  * @param pData User data to pass to the job function.
     122             :  * @return true in case of success.
     123             :  */
     124        6905 : bool CPLWorkerThreadPool::SubmitJob(CPLThreadFunc pfnFunc, void *pData)
     125             : {
     126       13805 :     return SubmitJob([=] { pfnFunc(pData); });
     127             : }
     128             : 
     129             : /** Queue a new job.
     130             :  *
     131             :  * @param task  Void function to execute.
     132             :  * @return true in case of success.
     133             :  */
     134      190014 : bool CPLWorkerThreadPool::SubmitJob(std::function<void()> task)
     135             : {
     136             : #ifdef DEBUG
     137             :     {
     138      379993 :         std::unique_lock<std::mutex> oGuard(m_mutex);
     139      189979 :         CPLAssert(m_nMaxThreads > 0);
     140             :     }
     141             : #endif
     142             : 
     143      189927 :     bool bMustIncrementWaitingWorkerThreadsAfterSubmission = false;
     144      189927 :     if (threadLocalCurrentThreadPool == this)
     145             :     {
     146             :         // If there are waiting threads or we have not started all allowed
     147             :         // threads, we can submit this job asynchronously
     148             :         {
     149      331625 :             std::unique_lock<std::mutex> oGuard(m_mutex);
     150      239171 :             if (nWaitingWorkerThreads > 0 ||
     151       73309 :                 static_cast<int>(aWT.size()) < m_nMaxThreads)
     152             :             {
     153       92636 :                 bMustIncrementWaitingWorkerThreadsAfterSubmission = true;
     154       92636 :                 nWaitingWorkerThreads--;
     155             :             }
     156             :         }
     157      165873 :         if (!bMustIncrementWaitingWorkerThreadsAfterSubmission)
     158             :         {
     159             :             // otherwise there is a risk of deadlock, so execute synchronously.
     160       73305 :             task();
     161       73306 :             return true;
     162             :         }
     163             :     }
     164             : 
     165      116732 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     166             : 
     167      116869 :     if (bMustIncrementWaitingWorkerThreadsAfterSubmission)
     168       92726 :         nWaitingWorkerThreads++;
     169             : 
     170      116869 :     if (static_cast<int>(aWT.size()) < m_nMaxThreads)
     171             :     {
     172             :         // CPLDebug("CPL", "Starting new thread...");
     173         144 :         auto wt = std::make_unique<CPLWorkerThread>();
     174          72 :         wt->poTP = this;
     175             :         //ABELL - Why should this fail? And this is a *pool* thread, not necessarily
     176             :         //  tied to the submitted job. The submitted job still needs to run, even if
     177             :         //  this fails. If we can't create a thread, should the entire pool become invalid?
     178          72 :         wt->hThread = CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
     179             :         /**
     180             :         if (!wt->hThread)
     181             :         {
     182             :             VSIFree(psJob);
     183             :             VSIFree(psItem);
     184             :             return false;
     185             :         }
     186             :         **/
     187          72 :         if (wt->hThread)
     188          72 :             aWT.emplace_back(std::move(wt));
     189             :     }
     190             : 
     191      116768 :     jobQueue.emplace(task);
     192      116760 :     nPendingJobs++;
     193             : 
     194      116760 :     if (psWaitingWorkerThreadsList)
     195             :     {
     196      107765 :         CPLWorkerThread *psWorkerThread =
     197      107765 :             static_cast<CPLWorkerThread *>(psWaitingWorkerThreadsList->pData);
     198             : 
     199      107765 :         CPLAssert(psWorkerThread->bMarkedAsWaiting);
     200      107765 :         psWorkerThread->bMarkedAsWaiting = false;
     201             : 
     202      107765 :         CPLList *psNext = psWaitingWorkerThreadsList->psNext;
     203      107765 :         CPLList *psToFree = psWaitingWorkerThreadsList;
     204      107765 :         psWaitingWorkerThreadsList = psNext;
     205      107765 :         nWaitingWorkerThreads--;
     206             : 
     207             : #if DEBUG_VERBOSE
     208             :         CPLDebug("JOB", "Waking up %p", psWorkerThread);
     209             : #endif
     210             : 
     211             :         {
     212      215734 :             std::lock_guard<std::mutex> oGuardWT(psWorkerThread->m_mutex);
     213             :             // coverity[ uninit_use_in_call]
     214      107967 :             oGuard.unlock();
     215      107948 :             psWorkerThread->m_cv.notify_one();
     216             :         }
     217             : 
     218      107843 :         CPLFree(psToFree);
     219             :     }
     220             : 
     221      117054 :     return true;
     222             : }
     223             : 
     224             : /************************************************************************/
     225             : /*                             SubmitJobs()                              */
     226             : /************************************************************************/
     227             : 
     228             : /** Queue several jobs
     229             :  *
     230             :  * @param pfnFunc Function to run for the job.
     231             :  * @param apData User data instances to pass to the job function.
     232             :  * @return true in case of success.
     233             :  */
     234         132 : bool CPLWorkerThreadPool::SubmitJobs(CPLThreadFunc pfnFunc,
     235             :                                      const std::vector<void *> &apData)
     236             : {
     237         132 :     if (apData.empty())
     238           0 :         return false;
     239             : 
     240             : #ifdef DEBUG
     241             :     {
     242         264 :         std::unique_lock<std::mutex> oGuard(m_mutex);
     243         132 :         CPLAssert(m_nMaxThreads > 0);
     244             :     }
     245             : #endif
     246             : 
     247         132 :     if (threadLocalCurrentThreadPool == this)
     248             :     {
     249             :         // If SubmitJob() is called from a worker thread of this queue,
     250             :         // then synchronously run the task to avoid deadlock.
     251           0 :         for (void *pData : apData)
     252           0 :             pfnFunc(pData);
     253           0 :         return true;
     254             :     }
     255             : 
     256         264 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     257             : 
     258        1462 :     for (void *pData : apData)
     259             :     {
     260        1330 :         if (static_cast<int>(aWT.size()) < m_nMaxThreads)
     261             :         {
     262           0 :             std::unique_ptr<CPLWorkerThread> wt(new CPLWorkerThread);
     263           0 :             wt->poTP = this;
     264           0 :             wt->hThread =
     265           0 :                 CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
     266           0 :             if (wt->hThread == nullptr)
     267             :             {
     268           0 :                 if (aWT.empty())
     269           0 :                     return false;
     270             :             }
     271             :             else
     272             :             {
     273           0 :                 aWT.emplace_back(std::move(wt));
     274             :             }
     275             :         }
     276             : 
     277        2660 :         jobQueue.emplace([=] { pfnFunc(pData); });
     278        1330 :         nPendingJobs++;
     279             :     }
     280             : 
     281         462 :     for (size_t i = 0; i < apData.size(); i++)
     282             :     {
     283         332 :         if (psWaitingWorkerThreadsList)
     284             :         {
     285             :             CPLWorkerThread *psWorkerThread;
     286             : 
     287         330 :             psWorkerThread = static_cast<CPLWorkerThread *>(
     288         330 :                 psWaitingWorkerThreadsList->pData);
     289             : 
     290         330 :             CPLAssert(psWorkerThread->bMarkedAsWaiting);
     291         330 :             psWorkerThread->bMarkedAsWaiting = false;
     292             : 
     293         330 :             CPLList *psNext = psWaitingWorkerThreadsList->psNext;
     294         330 :             CPLList *psToFree = psWaitingWorkerThreadsList;
     295         330 :             psWaitingWorkerThreadsList = psNext;
     296         330 :             nWaitingWorkerThreads--;
     297             : 
     298             : #if DEBUG_VERBOSE
     299             :             CPLDebug("JOB", "Waking up %p", psWorkerThread);
     300             : #endif
     301             :             {
     302         660 :                 std::lock_guard<std::mutex> oGuardWT(psWorkerThread->m_mutex);
     303             :                 // coverity[ uninit_use_in_call]
     304         330 :                 oGuard.unlock();
     305         330 :                 psWorkerThread->m_cv.notify_one();
     306             :             }
     307             : 
     308         330 :             CPLFree(psToFree);
     309         330 :             oGuard.lock();
     310             :         }
     311             :         else
     312             :         {
     313           2 :             break;
     314             :         }
     315             :     }
     316             : 
     317         132 :     return true;
     318             : }
     319             : 
     320             : /************************************************************************/
     321             : /*                            WaitCompletion()                          */
     322             : /************************************************************************/
     323             : 
     324             : /** Wait for completion of part or whole jobs.
     325             :  *
     326             :  * @param nMaxRemainingJobs Maximum number of pendings jobs that are allowed
     327             :  *                          in the queue after this method has completed. Might
     328             :  * be 0 to wait for all jobs.
     329             :  */
     330        5797 : void CPLWorkerThreadPool::WaitCompletion(int nMaxRemainingJobs)
     331             : {
     332        5797 :     if (nMaxRemainingJobs < 0)
     333           0 :         nMaxRemainingJobs = 0;
     334       11594 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     335        5797 :     m_cv.wait(oGuard, [this, nMaxRemainingJobs]
     336        7145 :               { return nPendingJobs <= nMaxRemainingJobs; });
     337        5797 : }
     338             : 
     339             : /************************************************************************/
     340             : /*                            WaitEvent()                               */
     341             : /************************************************************************/
     342             : 
     343             : /** Wait for completion of at least one job, if there are any remaining
     344             :  */
     345        1396 : void CPLWorkerThreadPool::WaitEvent()
     346             : {
     347             :     // NOTE - This isn't quite right. After nPendingJobsBefore is set but before
     348             :     // a notification occurs, jobs could be submitted which would increase
     349             :     // nPendingJobs, so a job completion may looks like a spurious wakeup.
     350        1396 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     351        1396 :     if (nPendingJobs == 0)
     352          27 :         return;
     353        1369 :     const int nPendingJobsBefore = nPendingJobs;
     354        1369 :     m_cv.wait(oGuard, [this, nPendingJobsBefore]
     355        2913 :               { return nPendingJobs < nPendingJobsBefore; });
     356             : }
     357             : 
     358             : /************************************************************************/
     359             : /*                                Setup()                               */
     360             : /************************************************************************/
     361             : 
     362             : /** Setup the pool.
     363             :  *
     364             :  * @param nThreads Number of threads to launch
     365             :  * @param pfnInitFunc Initialization function to run in each thread. May be NULL
     366             :  * @param pasInitData Array of initialization data. Its length must be nThreads,
     367             :  *                    or it should be NULL.
     368             :  * @return true if initialization was successful.
     369             :  */
     370         900 : bool CPLWorkerThreadPool::Setup(int nThreads, CPLThreadFunc pfnInitFunc,
     371             :                                 void **pasInitData)
     372             : {
     373         900 :     return Setup(nThreads, pfnInitFunc, pasInitData, true);
     374             : }
     375             : 
     376             : /** Setup the pool.
     377             :  *
     378             :  * @param nThreads Number of threads to launch
     379             :  * @param pfnInitFunc Initialization function to run in each thread. May be NULL
     380             :  * @param pasInitData Array of initialization data. Its length must be nThreads,
     381             :  *                    or it should be NULL.
     382             :  * @param bWaitallStarted Whether to wait for all threads to be fully started.
     383             :  * @return true if initialization was successful.
     384             :  */
     385         921 : bool CPLWorkerThreadPool::Setup(int nThreads, CPLThreadFunc pfnInitFunc,
     386             :                                 void **pasInitData, bool bWaitallStarted)
     387             : {
     388         921 :     CPLAssert(nThreads > 0);
     389             : 
     390        1841 :     if (nThreads > static_cast<int>(aWT.size()) && pfnInitFunc == nullptr &&
     391        1840 :         pasInitData == nullptr && !bWaitallStarted)
     392             :     {
     393          20 :         std::lock_guard<std::mutex> oGuard(m_mutex);
     394          20 :         if (nThreads > m_nMaxThreads)
     395          20 :             m_nMaxThreads = nThreads;
     396          20 :         return true;
     397             :     }
     398             : 
     399         900 :     bool bRet = true;
     400        4470 :     for (int i = static_cast<int>(aWT.size()); i < nThreads; i++)
     401             :     {
     402        3569 :         auto wt = std::make_unique<CPLWorkerThread>();
     403        3569 :         wt->pfnInitFunc = pfnInitFunc;
     404        3570 :         wt->pInitData = pasInitData ? pasInitData[i] : nullptr;
     405        3570 :         wt->poTP = this;
     406        3570 :         wt->hThread = CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
     407        3570 :         if (wt->hThread == nullptr)
     408             :         {
     409           0 :             nThreads = i;
     410           0 :             bRet = false;
     411           0 :             break;
     412             :         }
     413        3570 :         aWT.emplace_back(std::move(wt));
     414             :     }
     415             : 
     416             :     {
     417        1803 :         std::lock_guard<std::mutex> oGuard(m_mutex);
     418         901 :         if (nThreads > m_nMaxThreads)
     419         901 :             m_nMaxThreads = nThreads;
     420             :     }
     421             : 
     422         901 :     if (bWaitallStarted)
     423             :     {
     424             :         // Wait all threads to be started
     425        1802 :         std::unique_lock<std::mutex> oGuard(m_mutex);
     426        3698 :         while (nWaitingWorkerThreads < nThreads)
     427             :         {
     428        2797 :             m_cv.wait(oGuard);
     429             :         }
     430             :     }
     431             : 
     432         901 :     if (eState == CPLWTS_ERROR)
     433           0 :         bRet = false;
     434             : 
     435         901 :     return bRet;
     436             : }
     437             : 
     438             : /************************************************************************/
     439             : /*                          DeclareJobFinished()                        */
     440             : /************************************************************************/
     441             : 
     442      118156 : void CPLWorkerThreadPool::DeclareJobFinished()
     443             : {
     444      236333 :     std::lock_guard<std::mutex> oGuard(m_mutex);
     445      118181 :     nPendingJobs--;
     446      118181 :     m_cv.notify_one();
     447      118207 : }
     448             : 
     449             : /************************************************************************/
     450             : /*                             GetNextJob()                             */
     451             : /************************************************************************/
     452             : 
     453             : std::function<void()>
     454      232745 : CPLWorkerThreadPool::GetNextJob(CPLWorkerThread *psWorkerThread)
     455             : {
     456             :     while (true)
     457             :     {
     458      232745 :         std::unique_lock<std::mutex> oGuard(m_mutex);
     459      232814 :         if (eState == CPLWTS_STOP)
     460        3620 :             return std::function<void()>();
     461             : 
     462      229194 :         if (jobQueue.size())
     463             :         {
     464             : #if DEBUG_VERBOSE
     465             :             CPLDebug("JOB", "%p got a job", psWorkerThread);
     466             : #endif
     467      236362 :             auto task = std::move(jobQueue.front());
     468      118078 :             jobQueue.pop();
     469      117986 :             return task;
     470             :         }
     471             : 
     472      111300 :         if (!psWorkerThread->bMarkedAsWaiting)
     473             :         {
     474      111183 :             psWorkerThread->bMarkedAsWaiting = true;
     475      111183 :             nWaitingWorkerThreads++;
     476             : 
     477             :             CPLList *psItem =
     478      111183 :                 static_cast<CPLList *>(VSI_MALLOC_VERBOSE(sizeof(CPLList)));
     479      111212 :             if (psItem == nullptr)
     480             :             {
     481           0 :                 eState = CPLWTS_ERROR;
     482           0 :                 m_cv.notify_one();
     483             : 
     484           0 :                 return nullptr;
     485             :             }
     486             : 
     487      111212 :             psItem->pData = psWorkerThread;
     488      111212 :             psItem->psNext = psWaitingWorkerThreadsList;
     489      111212 :             psWaitingWorkerThreadsList = psItem;
     490             : 
     491             : #if DEBUG_VERBOSE
     492             :             CPLAssert(CPLListCount(psWaitingWorkerThreadsList) ==
     493             :                       nWaitingWorkerThreads);
     494             : #endif
     495             :         }
     496             : 
     497      111329 :         m_cv.notify_one();
     498             : 
     499             : #if DEBUG_VERBOSE
     500             :         CPLDebug("JOB", "%p sleeping", psWorkerThread);
     501             : #endif
     502             : 
     503      222318 :         std::unique_lock<std::mutex> oGuardThisThread(psWorkerThread->m_mutex);
     504             :         // coverity[uninit_use_in_call]
     505      111241 :         oGuard.unlock();
     506      111198 :         psWorkerThread->m_cv.wait(oGuardThisThread);
     507      110906 :     }
     508             : }
     509             : 
     510             : /************************************************************************/
     511             : /*                         CreateJobQueue()                             */
     512             : /************************************************************************/
     513             : 
     514             : /** Create a new job queue based on this worker thread pool.
     515             :  *
     516             :  * The worker thread pool must remain alive while the returned object is
     517             :  * itself alive.
     518             :  *
     519             :  * @since GDAL 3.2
     520             :  */
     521       84616 : std::unique_ptr<CPLJobQueue> CPLWorkerThreadPool::CreateJobQueue()
     522             : {
     523       84616 :     return std::unique_ptr<CPLJobQueue>(new CPLJobQueue(this));
     524             : }
     525             : 
     526             : /************************************************************************/
     527             : /*                            CPLJobQueue()                             */
     528             : /************************************************************************/
     529             : 
     530             : //! @cond Doxygen_Suppress
     531       84511 : CPLJobQueue::CPLJobQueue(CPLWorkerThreadPool *poPool) : m_poPool(poPool)
     532             : {
     533       84592 : }
     534             : 
     535             : //! @endcond
     536             : 
     537             : /************************************************************************/
     538             : /*                           ~CPLJobQueue()                             */
     539             : /************************************************************************/
     540             : 
     541       84709 : CPLJobQueue::~CPLJobQueue()
     542             : {
     543       84706 :     WaitCompletion();
     544       84710 : }
     545             : 
     546             : /************************************************************************/
     547             : /*                          DeclareJobFinished()                        */
     548             : /************************************************************************/
     549             : 
     550      183270 : void CPLJobQueue::DeclareJobFinished()
     551             : {
     552      366583 :     std::lock_guard<std::mutex> oGuard(m_mutex);
     553      183232 :     m_nPendingJobs--;
     554      183232 :     m_cv.notify_one();
     555      183175 : }
     556             : 
     557             : /************************************************************************/
     558             : /*                             SubmitJob()                              */
     559             : /************************************************************************/
     560             : 
     561             : /** Queue a new job.
     562             :  *
     563             :  * @param pfnFunc Function to run for the job.
     564             :  * @param pData User data to pass to the job function.
     565             :  * @return true in case of success.
     566             :  */
     567       14778 : bool CPLJobQueue::SubmitJob(CPLThreadFunc pfnFunc, void *pData)
     568             : {
     569       29550 :     return SubmitJob([=] { pfnFunc(pData); });
     570             : }
     571             : 
     572             : /** Queue a new job.
     573             :  *
     574             :  * @param task  Task to execute.
     575             :  * @return true in case of success.
     576             :  */
     577      183194 : bool CPLJobQueue::SubmitJob(std::function<void()> task)
     578             : {
     579             :     {
     580      183194 :         std::lock_guard<std::mutex> oGuard(m_mutex);
     581      183098 :         m_nPendingJobs++;
     582             :     }
     583             : 
     584             :     // coverity[uninit_member,copy_constructor_call]
     585      366114 :     const auto lambda = [this, task]
     586             :     {
     587      182965 :         task();
     588      183149 :         DeclareJobFinished();
     589      183060 :     };
     590             :     // cppcheck-suppress knownConditionTrueFalse
     591      366238 :     return m_poPool->SubmitJob(lambda);
     592             : }
     593             : 
     594             : /************************************************************************/
     595             : /*                            WaitCompletion()                          */
     596             : /************************************************************************/
     597             : 
     598             : /** Wait for completion of part or whole jobs.
     599             :  *
     600             :  * @param nMaxRemainingJobs Maximum number of pendings jobs that are allowed
     601             :  *                          in the queue after this method has completed. Might
     602             :  * be 0 to wait for all jobs.
     603             :  */
     604      168758 : void CPLJobQueue::WaitCompletion(int nMaxRemainingJobs)
     605             : {
     606      337379 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     607      168742 :     m_cv.wait(oGuard, [this, nMaxRemainingJobs]
     608      207579 :               { return m_nPendingJobs <= nMaxRemainingJobs; });
     609      168721 : }
     610             : 
     611             : /************************************************************************/
     612             : /*                             WaitEvent()                              */
     613             : /************************************************************************/
     614             : 
     615             : /** Wait for completion for at least one job.
     616             :  *
     617             :  * @return true if there are remaining jobs.
     618             :  */
     619         221 : bool CPLJobQueue::WaitEvent()
     620             : {
     621             :     // NOTE - This isn't quite right. After nPendingJobsBefore is set but before
     622             :     // a notification occurs, jobs could be submitted which would increase
     623             :     // nPendingJobs, so a job completion may looks like a spurious wakeup.
     624         442 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     625         221 :     if (m_nPendingJobs == 0)
     626           3 :         return false;
     627             : 
     628         218 :     const int nPendingJobsBefore = m_nPendingJobs;
     629         218 :     m_cv.wait(oGuard, [this, nPendingJobsBefore]
     630         436 :               { return m_nPendingJobs < nPendingJobsBefore; });
     631         218 :     return m_nPendingJobs > 0;
     632             : }

Generated by: LCOV version 1.14