LCOV - code coverage report
Current view: top level - port - cpl_worker_thread_pool.cpp (source / functions) Hit Total Coverage
Test: gdal_filtered.info Lines: 199 220 90.5 %
Date: 2025-01-18 12:42:00 Functions: 30 30 100.0 %

          Line data    Source code
       1             : /**********************************************************************
       2             :  *
       3             :  * Project:  CPL - Common Portability Library
       4             :  * Purpose:  CPL worker thread pool
       5             :  * Author:   Even Rouault, <even dot rouault at spatialys dot com>
       6             :  *
       7             :  **********************************************************************
       8             :  * Copyright (c) 2015, Even Rouault, <even dot rouault at spatialys dot com>
       9             :  *
      10             :  * SPDX-License-Identifier: MIT
      11             :  ****************************************************************************/
      12             : 
      13             : #include "cpl_port.h"
      14             : #include "cpl_worker_thread_pool.h"
      15             : 
      16             : #include <cstddef>
      17             : #include <memory>
      18             : 
      19             : #include "cpl_conv.h"
      20             : #include "cpl_error.h"
      21             : #include "cpl_vsi.h"
      22             : 
      23             : static thread_local CPLWorkerThreadPool *threadLocalCurrentThreadPool = nullptr;
      24             : 
      25             : /************************************************************************/
      26             : /*                         CPLWorkerThreadPool()                        */
      27             : /************************************************************************/
      28             : 
      29             : /** Instantiate a new pool of worker threads.
      30             :  *
      31             :  * The pool is in an uninitialized state after this call. The Setup() method
      32             :  * must be called.
      33             :  */
      34         600 : CPLWorkerThreadPool::CPLWorkerThreadPool() : jobQueue{}
      35             : {
      36         600 : }
      37             : 
      38             : /** Instantiate a new pool of worker threads.
      39             :  *
      40             :  * \param nThreads  Number of threads in the pool.
      41             :  */
      42         625 : CPLWorkerThreadPool::CPLWorkerThreadPool(int nThreads) : jobQueue{}
      43             : {
      44         625 :     Setup(nThreads, nullptr, nullptr);
      45         625 : }
      46             : 
      47             : /************************************************************************/
      48             : /*                          ~CPLWorkerThreadPool()                      */
      49             : /************************************************************************/
      50             : 
      51             : /** Destroys a pool of worker threads.
      52             :  *
      53             :  * Any still pending job will be completed before the destructor returns.
      54             :  */
      55        1220 : CPLWorkerThreadPool::~CPLWorkerThreadPool()
      56             : {
      57        1220 :     WaitCompletion();
      58             : 
      59             :     {
      60        1220 :         std::lock_guard<std::mutex> oGuard(m_mutex);
      61        1220 :         eState = CPLWTS_STOP;
      62             :     }
      63             : 
      64        4844 :     for (auto &wt : aWT)
      65             :     {
      66             :         {
      67        7248 :             std::lock_guard<std::mutex> oGuard(wt->m_mutex);
      68        3624 :             wt->m_cv.notify_one();
      69             :         }
      70        3624 :         CPLJoinThread(wt->hThread);
      71             :     }
      72             : 
      73        1220 :     CPLListDestroy(psWaitingWorkerThreadsList);
      74        1220 : }
      75             : 
      76             : /************************************************************************/
      77             : /*                        GetThreadCount()                              */
      78             : /************************************************************************/
      79             : 
      80        1136 : int CPLWorkerThreadPool::GetThreadCount() const
      81             : {
      82        1136 :     std::unique_lock<std::mutex> oGuard(m_mutex);
      83        2272 :     return m_nMaxThreads;
      84             : }
      85             : 
      86             : /************************************************************************/
      87             : /*                       WorkerThreadFunction()                         */
      88             : /************************************************************************/
      89             : 
      90        4662 : void CPLWorkerThreadPool::WorkerThreadFunction(void *user_data)
      91             : {
      92        4662 :     CPLWorkerThread *psWT = static_cast<CPLWorkerThread *>(user_data);
      93        4662 :     CPLWorkerThreadPool *poTP = psWT->poTP;
      94             : 
      95        4662 :     threadLocalCurrentThreadPool = poTP;
      96             : 
      97        4662 :     if (psWT->pfnInitFunc)
      98           0 :         psWT->pfnInitFunc(psWT->pInitData);
      99             : 
     100             :     while (true)
     101             :     {
     102      120678 :         std::function<void()> task = poTP->GetNextJob(psWT);
     103      119707 :         if (!task)
     104        3624 :             break;
     105             : 
     106      115954 :         task();
     107             : #if DEBUG_VERBOSE
     108             :         CPLDebug("JOB", "%p finished a job", psWT);
     109             : #endif
     110      115955 :         poTP->DeclareJobFinished();
     111      116016 :     }
     112        3695 : }
     113             : 
     114             : /************************************************************************/
     115             : /*                             SubmitJob()                              */
     116             : /************************************************************************/
     117             : 
     118             : /** Queue a new job.
     119             :  *
     120             :  * @param pfnFunc Function to run for the job.
     121             :  * @param pData User data to pass to the job function.
     122             :  * @return true in case of success.
     123             :  */
     124        6898 : bool CPLWorkerThreadPool::SubmitJob(CPLThreadFunc pfnFunc, void *pData)
     125             : {
     126       13781 :     return SubmitJob([=] { pfnFunc(pData); });
     127             : }
     128             : 
     129             : /** Queue a new job.
     130             :  *
     131             :  * @param task  Void function to execute.
     132             :  * @return true in case of success.
     133             :  */
     134      191128 : bool CPLWorkerThreadPool::SubmitJob(std::function<void()> task)
     135             : {
     136             : #ifdef DEBUG
     137             :     {
     138      382262 :         std::unique_lock<std::mutex> oGuard(m_mutex);
     139      191134 :         CPLAssert(m_nMaxThreads > 0);
     140             :     }
     141             : #endif
     142             : 
     143      191081 :     bool bMustIncrementWaitingWorkerThreadsAfterSubmission = false;
     144      191081 :     if (threadLocalCurrentThreadPool == this)
     145             :     {
     146             :         // If there are waiting threads or we have not started all allowed
     147             :         // threads, we can submit this job asynchronously
     148             :         {
     149      331936 :             std::unique_lock<std::mutex> oGuard(m_mutex);
     150      242470 :             if (nWaitingWorkerThreads > 0 ||
     151       76490 :                 static_cast<int>(aWT.size()) < m_nMaxThreads)
     152             :             {
     153       89541 :                 bMustIncrementWaitingWorkerThreadsAfterSubmission = true;
     154       89541 :                 nWaitingWorkerThreads--;
     155             :             }
     156             :         }
     157      165991 :         if (!bMustIncrementWaitingWorkerThreadsAfterSubmission)
     158             :         {
     159             :             // otherwise there is a risk of deadlock, so execute synchronously.
     160       76498 :             task();
     161       76470 :             return true;
     162             :         }
     163             :     }
     164             : 
     165      114618 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     166             : 
     167      114750 :     if (bMustIncrementWaitingWorkerThreadsAfterSubmission)
     168       89550 :         nWaitingWorkerThreads++;
     169             : 
     170      114750 :     if (static_cast<int>(aWT.size()) < m_nMaxThreads)
     171             :     {
     172             :         // CPLDebug("CPL", "Starting new thread...");
     173        2176 :         auto wt = std::make_unique<CPLWorkerThread>();
     174        1088 :         wt->poTP = this;
     175             :         //ABELL - Why should this fail? And this is a *pool* thread, not necessarily
     176             :         //  tied to the submitted job. The submitted job still needs to run, even if
     177             :         //  this fails. If we can't create a thread, should the entire pool become invalid?
     178        1088 :         wt->hThread = CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
     179             :         /**
     180             :         if (!wt->hThread)
     181             :         {
     182             :             VSIFree(psJob);
     183             :             VSIFree(psItem);
     184             :             return false;
     185             :         }
     186             :         **/
     187        1088 :         if (wt->hThread)
     188        1088 :             aWT.emplace_back(std::move(wt));
     189             :     }
     190             : 
     191      114739 :     jobQueue.emplace(task);
     192      114687 :     nPendingJobs++;
     193             : 
     194      114687 :     if (psWaitingWorkerThreadsList)
     195             :     {
     196      110066 :         CPLWorkerThread *psWorkerThread =
     197      110066 :             static_cast<CPLWorkerThread *>(psWaitingWorkerThreadsList->pData);
     198             : 
     199      110066 :         CPLAssert(psWorkerThread->bMarkedAsWaiting);
     200      110066 :         psWorkerThread->bMarkedAsWaiting = false;
     201             : 
     202      110066 :         CPLList *psNext = psWaitingWorkerThreadsList->psNext;
     203      110066 :         CPLList *psToFree = psWaitingWorkerThreadsList;
     204      110066 :         psWaitingWorkerThreadsList = psNext;
     205      110066 :         nWaitingWorkerThreads--;
     206             : 
     207             : #if DEBUG_VERBOSE
     208             :         CPLDebug("JOB", "Waking up %p", psWorkerThread);
     209             : #endif
     210             : 
     211             :         {
     212      220211 :             std::lock_guard<std::mutex> oGuardWT(psWorkerThread->m_mutex);
     213             :             // coverity[ uninit_use_in_call]
     214      110207 :             oGuard.unlock();
     215      110157 :             psWorkerThread->m_cv.notify_one();
     216             :         }
     217             : 
     218      110128 :         CPLFree(psToFree);
     219             :     }
     220             : 
     221      114861 :     return true;
     222             : }
     223             : 
     224             : /************************************************************************/
     225             : /*                             SubmitJobs()                              */
     226             : /************************************************************************/
     227             : 
     228             : /** Queue several jobs
     229             :  *
     230             :  * @param pfnFunc Function to run for the job.
     231             :  * @param apData User data instances to pass to the job function.
     232             :  * @return true in case of success.
     233             :  */
     234         132 : bool CPLWorkerThreadPool::SubmitJobs(CPLThreadFunc pfnFunc,
     235             :                                      const std::vector<void *> &apData)
     236             : {
     237         132 :     if (apData.empty())
     238           0 :         return false;
     239             : 
     240             : #ifdef DEBUG
     241             :     {
     242         264 :         std::unique_lock<std::mutex> oGuard(m_mutex);
     243         132 :         CPLAssert(m_nMaxThreads > 0);
     244             :     }
     245             : #endif
     246             : 
     247         132 :     if (threadLocalCurrentThreadPool == this)
     248             :     {
     249             :         // If SubmitJob() is called from a worker thread of this queue,
     250             :         // then synchronously run the task to avoid deadlock.
     251           0 :         for (void *pData : apData)
     252           0 :             pfnFunc(pData);
     253           0 :         return true;
     254             :     }
     255             : 
     256         264 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     257             : 
     258        1462 :     for (void *pData : apData)
     259             :     {
     260        1330 :         if (static_cast<int>(aWT.size()) < m_nMaxThreads)
     261             :         {
     262           0 :             std::unique_ptr<CPLWorkerThread> wt(new CPLWorkerThread);
     263           0 :             wt->poTP = this;
     264           0 :             wt->hThread =
     265           0 :                 CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
     266           0 :             if (wt->hThread == nullptr)
     267             :             {
     268           0 :                 if (aWT.empty())
     269           0 :                     return false;
     270             :             }
     271             :             else
     272             :             {
     273           0 :                 aWT.emplace_back(std::move(wt));
     274             :             }
     275             :         }
     276             : 
     277        2660 :         jobQueue.emplace([=] { pfnFunc(pData); });
     278        1330 :         nPendingJobs++;
     279             :     }
     280             : 
     281         461 :     for (size_t i = 0; i < apData.size(); i++)
     282             :     {
     283         333 :         if (psWaitingWorkerThreadsList)
     284             :         {
     285             :             CPLWorkerThread *psWorkerThread;
     286             : 
     287         329 :             psWorkerThread = static_cast<CPLWorkerThread *>(
     288         329 :                 psWaitingWorkerThreadsList->pData);
     289             : 
     290         329 :             CPLAssert(psWorkerThread->bMarkedAsWaiting);
     291         329 :             psWorkerThread->bMarkedAsWaiting = false;
     292             : 
     293         329 :             CPLList *psNext = psWaitingWorkerThreadsList->psNext;
     294         329 :             CPLList *psToFree = psWaitingWorkerThreadsList;
     295         329 :             psWaitingWorkerThreadsList = psNext;
     296         329 :             nWaitingWorkerThreads--;
     297             : 
     298             : #if DEBUG_VERBOSE
     299             :             CPLDebug("JOB", "Waking up %p", psWorkerThread);
     300             : #endif
     301             :             {
     302         658 :                 std::lock_guard<std::mutex> oGuardWT(psWorkerThread->m_mutex);
     303             :                 // coverity[ uninit_use_in_call]
     304         329 :                 oGuard.unlock();
     305         329 :                 psWorkerThread->m_cv.notify_one();
     306             :             }
     307             : 
     308         329 :             CPLFree(psToFree);
     309         329 :             oGuard.lock();
     310             :         }
     311             :         else
     312             :         {
     313           4 :             break;
     314             :         }
     315             :     }
     316             : 
     317         132 :     return true;
     318             : }
     319             : 
     320             : /************************************************************************/
     321             : /*                            WaitCompletion()                          */
     322             : /************************************************************************/
     323             : 
     324             : /** Wait for completion of part or whole jobs.
     325             :  *
     326             :  * @param nMaxRemainingJobs Maximum number of pendings jobs that are allowed
     327             :  *                          in the queue after this method has completed. Might
     328             :  * be 0 to wait for all jobs.
     329             :  */
     330        5793 : void CPLWorkerThreadPool::WaitCompletion(int nMaxRemainingJobs)
     331             : {
     332        5793 :     if (nMaxRemainingJobs < 0)
     333           0 :         nMaxRemainingJobs = 0;
     334       11586 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     335        5793 :     m_cv.wait(oGuard, [this, nMaxRemainingJobs]
     336        8585 :               { return nPendingJobs <= nMaxRemainingJobs; });
     337        5793 : }
     338             : 
     339             : /************************************************************************/
     340             : /*                            WaitEvent()                               */
     341             : /************************************************************************/
     342             : 
     343             : /** Wait for completion of at least one job, if there are any remaining
     344             :  */
     345        1460 : void CPLWorkerThreadPool::WaitEvent()
     346             : {
     347             :     // NOTE - This isn't quite right. After nPendingJobsBefore is set but before
     348             :     // a notification occurs, jobs could be submitted which would increase
     349             :     // nPendingJobs, so a job completion may looks like a spurious wakeup.
     350        1460 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     351        1460 :     if (nPendingJobs == 0)
     352           9 :         return;
     353        1451 :     const int nPendingJobsBefore = nPendingJobs;
     354        1451 :     m_cv.wait(oGuard, [this, nPendingJobsBefore]
     355        3028 :               { return nPendingJobs < nPendingJobsBefore; });
     356             : }
     357             : 
     358             : /************************************************************************/
     359             : /*                                Setup()                               */
     360             : /************************************************************************/
     361             : 
     362             : /** Setup the pool.
     363             :  *
     364             :  * @param nThreads Number of threads to launch
     365             :  * @param pfnInitFunc Initialization function to run in each thread. May be NULL
     366             :  * @param pasInitData Array of initialization data. Its length must be nThreads,
     367             :  *                    or it should be NULL.
     368             :  * @return true if initialization was successful.
     369             :  */
     370         901 : bool CPLWorkerThreadPool::Setup(int nThreads, CPLThreadFunc pfnInitFunc,
     371             :                                 void **pasInitData)
     372             : {
     373         901 :     return Setup(nThreads, pfnInitFunc, pasInitData, true);
     374             : }
     375             : 
     376             : /** Setup the pool.
     377             :  *
     378             :  * @param nThreads Number of threads to launch
     379             :  * @param pfnInitFunc Initialization function to run in each thread. May be NULL
     380             :  * @param pasInitData Array of initialization data. Its length must be nThreads,
     381             :  *                    or it should be NULL.
     382             :  * @param bWaitallStarted Whether to wait for all threads to be fully started.
     383             :  * @return true if initialization was successful.
     384             :  */
     385         923 : bool CPLWorkerThreadPool::Setup(int nThreads, CPLThreadFunc pfnInitFunc,
     386             :                                 void **pasInitData, bool bWaitallStarted)
     387             : {
     388         923 :     CPLAssert(nThreads > 0);
     389             : 
     390        1846 :     if (nThreads > static_cast<int>(aWT.size()) && pfnInitFunc == nullptr &&
     391        1846 :         pasInitData == nullptr && !bWaitallStarted)
     392             :     {
     393          21 :         std::lock_guard<std::mutex> oGuard(m_mutex);
     394          21 :         if (nThreads > m_nMaxThreads)
     395          21 :             m_nMaxThreads = nThreads;
     396          21 :         return true;
     397             :     }
     398             : 
     399         902 :     bool bRet = true;
     400        4476 :     for (int i = static_cast<int>(aWT.size()); i < nThreads; i++)
     401             :     {
     402        3574 :         auto wt = std::make_unique<CPLWorkerThread>();
     403        3574 :         wt->pfnInitFunc = pfnInitFunc;
     404        3574 :         wt->pInitData = pasInitData ? pasInitData[i] : nullptr;
     405        3574 :         wt->poTP = this;
     406        3574 :         wt->hThread = CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
     407        3574 :         if (wt->hThread == nullptr)
     408             :         {
     409           0 :             nThreads = i;
     410           0 :             bRet = false;
     411           0 :             break;
     412             :         }
     413        3574 :         aWT.emplace_back(std::move(wt));
     414             :     }
     415             : 
     416             :     {
     417        1804 :         std::lock_guard<std::mutex> oGuard(m_mutex);
     418         902 :         if (nThreads > m_nMaxThreads)
     419         902 :             m_nMaxThreads = nThreads;
     420             :     }
     421             : 
     422         902 :     if (bWaitallStarted)
     423             :     {
     424             :         // Wait all threads to be started
     425        1804 :         std::unique_lock<std::mutex> oGuard(m_mutex);
     426        3804 :         while (nWaitingWorkerThreads < nThreads)
     427             :         {
     428        2902 :             m_cv.wait(oGuard);
     429             :         }
     430             :     }
     431             : 
     432         902 :     if (eState == CPLWTS_ERROR)
     433           0 :         bRet = false;
     434             : 
     435         902 :     return bRet;
     436             : }
     437             : 
     438             : /************************************************************************/
     439             : /*                          DeclareJobFinished()                        */
     440             : /************************************************************************/
     441             : 
     442      115979 : void CPLWorkerThreadPool::DeclareJobFinished()
     443             : {
     444      232070 :     std::lock_guard<std::mutex> oGuard(m_mutex);
     445      116081 :     nPendingJobs--;
     446      116081 :     m_cv.notify_one();
     447      116114 : }
     448             : 
     449             : /************************************************************************/
     450             : /*                             GetNextJob()                             */
     451             : /************************************************************************/
     452             : 
     453             : std::function<void()>
     454      233903 : CPLWorkerThreadPool::GetNextJob(CPLWorkerThread *psWorkerThread)
     455             : {
     456             :     while (true)
     457             :     {
     458      233903 :         std::unique_lock<std::mutex> oGuard(m_mutex);
     459      233963 :         if (eState == CPLWTS_STOP)
     460        3624 :             return std::function<void()>();
     461             : 
     462      230339 :         if (jobQueue.size())
     463             :         {
     464             : #if DEBUG_VERBOSE
     465             :             CPLDebug("JOB", "%p got a job", psWorkerThread);
     466             : #endif
     467      231950 :             auto task = std::move(jobQueue.front());
     468      115956 :             jobQueue.pop();
     469      115953 :             return task;
     470             :         }
     471             : 
     472      114619 :         if (!psWorkerThread->bMarkedAsWaiting)
     473             :         {
     474      114397 :             psWorkerThread->bMarkedAsWaiting = true;
     475      114397 :             nWaitingWorkerThreads++;
     476             : 
     477             :             CPLList *psItem =
     478      114397 :                 static_cast<CPLList *>(VSI_MALLOC_VERBOSE(sizeof(CPLList)));
     479      114464 :             if (psItem == nullptr)
     480             :             {
     481           0 :                 eState = CPLWTS_ERROR;
     482           0 :                 m_cv.notify_one();
     483             : 
     484           0 :                 return nullptr;
     485             :             }
     486             : 
     487      114464 :             psItem->pData = psWorkerThread;
     488      114464 :             psItem->psNext = psWaitingWorkerThreadsList;
     489      114464 :             psWaitingWorkerThreadsList = psItem;
     490             : 
     491             : #if DEBUG_VERBOSE
     492             :             CPLAssert(CPLListCount(psWaitingWorkerThreadsList) ==
     493             :                       nWaitingWorkerThreads);
     494             : #endif
     495             :         }
     496             : 
     497      114686 :         m_cv.notify_one();
     498             : 
     499             : #if DEBUG_VERBOSE
     500             :         CPLDebug("JOB", "%p sleeping", psWorkerThread);
     501             : #endif
     502             : 
     503      227844 :         std::unique_lock<std::mutex> oGuardThisThread(psWorkerThread->m_mutex);
     504             :         // coverity[uninit_use_in_call]
     505      114479 :         oGuard.unlock();
     506             :         // coverity[wait_not_in_locked_loop]
     507      114454 :         psWorkerThread->m_cv.wait(oGuardThisThread);
     508      113249 :     }
     509             : }
     510             : 
     511             : /************************************************************************/
     512             : /*                         CreateJobQueue()                             */
     513             : /************************************************************************/
     514             : 
     515             : /** Create a new job queue based on this worker thread pool.
     516             :  *
     517             :  * The worker thread pool must remain alive while the returned object is
     518             :  * itself alive.
     519             :  *
     520             :  * @since GDAL 3.2
     521             :  */
     522       84866 : std::unique_ptr<CPLJobQueue> CPLWorkerThreadPool::CreateJobQueue()
     523             : {
     524       84866 :     return std::unique_ptr<CPLJobQueue>(new CPLJobQueue(this));
     525             : }
     526             : 
     527             : /************************************************************************/
     528             : /*                            CPLJobQueue()                             */
     529             : /************************************************************************/
     530             : 
     531             : //! @cond Doxygen_Suppress
     532       84835 : CPLJobQueue::CPLJobQueue(CPLWorkerThreadPool *poPool) : m_poPool(poPool)
     533             : {
     534       84910 : }
     535             : 
     536             : //! @endcond
     537             : 
     538             : /************************************************************************/
     539             : /*                           ~CPLJobQueue()                             */
     540             : /************************************************************************/
     541             : 
     542       84971 : CPLJobQueue::~CPLJobQueue()
     543             : {
     544       84974 :     WaitCompletion();
     545       84972 : }
     546             : 
     547             : /************************************************************************/
     548             : /*                          DeclareJobFinished()                        */
     549             : /************************************************************************/
     550             : 
     551      184226 : void CPLJobQueue::DeclareJobFinished()
     552             : {
     553      368638 :     std::lock_guard<std::mutex> oGuard(m_mutex);
     554      184309 :     m_nPendingJobs--;
     555      184309 :     m_cv.notify_one();
     556      184257 : }
     557             : 
     558             : /************************************************************************/
     559             : /*                             SubmitJob()                              */
     560             : /************************************************************************/
     561             : 
     562             : /** Queue a new job.
     563             :  *
     564             :  * @param pfnFunc Function to run for the job.
     565             :  * @param pData User data to pass to the job function.
     566             :  * @return true in case of success.
     567             :  */
     568       14778 : bool CPLJobQueue::SubmitJob(CPLThreadFunc pfnFunc, void *pData)
     569             : {
     570       29550 :     return SubmitJob([=] { pfnFunc(pData); });
     571             : }
     572             : 
     573             : /** Queue a new job.
     574             :  *
     575             :  * @param task  Task to execute.
     576             :  * @return true in case of success.
     577             :  */
     578      184271 : bool CPLJobQueue::SubmitJob(std::function<void()> task)
     579             : {
     580             :     {
     581      184271 :         std::lock_guard<std::mutex> oGuard(m_mutex);
     582      184302 :         m_nPendingJobs++;
     583             :     }
     584             : 
     585             :     // coverity[uninit_member,copy_constructor_call]
     586      368305 :     const auto lambda = [this, task]
     587             :     {
     588      184044 :         task();
     589      184261 :         DeclareJobFinished();
     590      184197 :     };
     591             :     // cppcheck-suppress knownConditionTrueFalse
     592      368420 :     return m_poPool->SubmitJob(lambda);
     593             : }
     594             : 
     595             : /************************************************************************/
     596             : /*                            WaitCompletion()                          */
     597             : /************************************************************************/
     598             : 
     599             : /** Wait for completion of part or whole jobs.
     600             :  *
     601             :  * @param nMaxRemainingJobs Maximum number of pendings jobs that are allowed
     602             :  *                          in the queue after this method has completed. Might
     603             :  * be 0 to wait for all jobs.
     604             :  */
     605      169262 : void CPLJobQueue::WaitCompletion(int nMaxRemainingJobs)
     606             : {
     607      338538 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     608      169277 :     m_cv.wait(oGuard, [this, nMaxRemainingJobs]
     609      209514 :               { return m_nPendingJobs <= nMaxRemainingJobs; });
     610      169308 : }
     611             : 
     612             : /************************************************************************/
     613             : /*                             WaitEvent()                              */
     614             : /************************************************************************/
     615             : 
     616             : /** Wait for completion for at least one job.
     617             :  *
     618             :  * @return true if there are remaining jobs.
     619             :  */
     620         195 : bool CPLJobQueue::WaitEvent()
     621             : {
     622             :     // NOTE - This isn't quite right. After nPendingJobsBefore is set but before
     623             :     // a notification occurs, jobs could be submitted which would increase
     624             :     // nPendingJobs, so a job completion may looks like a spurious wakeup.
     625         390 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     626         195 :     if (m_nPendingJobs == 0)
     627           2 :         return false;
     628             : 
     629         193 :     const int nPendingJobsBefore = m_nPendingJobs;
     630         193 :     m_cv.wait(oGuard, [this, nPendingJobsBefore]
     631         386 :               { return m_nPendingJobs < nPendingJobsBefore; });
     632         193 :     return m_nPendingJobs > 0;
     633             : }

Generated by: LCOV version 1.14