LCOV - code coverage report
Current view: top level - port - cpl_worker_thread_pool.cpp (source / functions) Hit Total Coverage
Test: gdal_filtered.info Lines: 215 263 81.7 %
Date: 2024-04-28 18:08:58 Functions: 18 18 100.0 %

          Line data    Source code
       1             : /**********************************************************************
       2             :  *
       3             :  * Project:  CPL - Common Portability Library
       4             :  * Purpose:  CPL worker thread pool
       5             :  * Author:   Even Rouault, <even dot rouault at spatialys dot com>
       6             :  *
       7             :  **********************************************************************
       8             :  * Copyright (c) 2015, Even Rouault, <even dot rouault at spatialys dot com>
       9             :  *
      10             :  * Permission is hereby granted, free of charge, to any person obtaining a
      11             :  * copy of this software and associated documentation files (the "Software"),
      12             :  * to deal in the Software without restriction, including without limitation
      13             :  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
      14             :  * and/or sell copies of the Software, and to permit persons to whom the
      15             :  * Software is furnished to do so, subject to the following conditions:
      16             :  *
      17             :  * The above copyright notice and this permission notice shall be included
      18             :  * in all copies or substantial portions of the Software.
      19             :  *
      20             :  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
      21             :  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
      22             :  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL
      23             :  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
      24             :  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
      25             :  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
      26             :  * DEALINGS IN THE SOFTWARE.
      27             :  ****************************************************************************/
      28             : 
      29             : #include "cpl_port.h"
      30             : #include "cpl_worker_thread_pool.h"
      31             : 
      32             : #include <cstddef>
      33             : #include <memory>
      34             : 
      35             : #include "cpl_conv.h"
      36             : #include "cpl_error.h"
      37             : #include "cpl_vsi.h"
      38             : 
      39             : struct CPLWorkerThreadJob
      40             : {
      41             :     CPLThreadFunc pfnFunc;
      42             :     void *pData;
      43             : };
      44             : 
      45             : static thread_local CPLWorkerThreadPool *threadLocalCurrentThreadPool = nullptr;
      46             : 
      47             : /************************************************************************/
      48             : /*                         CPLWorkerThreadPool()                        */
      49             : /************************************************************************/
      50             : 
      51             : /** Instantiate a new pool of worker threads.
      52             :  *
      53             :  * The pool is in an uninitialized state after this call. The Setup() method
      54             :  * must be called.
      55             :  */
      56         592 : CPLWorkerThreadPool::CPLWorkerThreadPool()
      57             : {
      58         592 : }
      59             : 
      60             : /************************************************************************/
      61             : /*                          ~CPLWorkerThreadPool()                      */
      62             : /************************************************************************/
      63             : 
      64             : /** Destroys a pool of worker threads.
      65             :  *
      66             :  * Any still pending job will be completed before the destructor returns.
      67             :  */
      68         588 : CPLWorkerThreadPool::~CPLWorkerThreadPool()
      69             : {
      70         588 :     WaitCompletion();
      71             : 
      72             :     {
      73         588 :         std::lock_guard<std::mutex> oGuard(m_mutex);
      74         588 :         eState = CPLWTS_STOP;
      75             :     }
      76             : 
      77        1706 :     for (auto &wt : aWT)
      78             :     {
      79             :         {
      80        2236 :             std::lock_guard<std::mutex> oGuard(wt->m_mutex);
      81        1118 :             wt->m_cv.notify_one();
      82             :         }
      83        1118 :         CPLJoinThread(wt->hThread);
      84             :     }
      85             : 
      86         588 :     CPLListDestroy(psWaitingWorkerThreadsList);
      87         588 : }
      88             : 
      89             : /************************************************************************/
      90             : /*                       WorkerThreadFunction()                         */
      91             : /************************************************************************/
      92             : 
      93        1138 : void CPLWorkerThreadPool::WorkerThreadFunction(void *user_data)
      94             : {
      95        1138 :     CPLWorkerThread *psWT = static_cast<CPLWorkerThread *>(user_data);
      96        1138 :     CPLWorkerThreadPool *poTP = psWT->poTP;
      97             : 
      98        1138 :     threadLocalCurrentThreadPool = poTP;
      99             : 
     100        1138 :     if (psWT->pfnInitFunc)
     101           0 :         psWT->pfnInitFunc(psWT->pInitData);
     102             : 
     103             :     while (true)
     104             :     {
     105       15721 :         CPLWorkerThreadJob *psJob = poTP->GetNextJob(psWT);
     106       15701 :         if (psJob == nullptr)
     107        1118 :             break;
     108             : 
     109       14583 :         if (psJob->pfnFunc)
     110             :         {
     111       14583 :             psJob->pfnFunc(psJob->pData);
     112             :         }
     113       14581 :         CPLFree(psJob);
     114             : #if DEBUG_VERBOSE
     115             :         CPLDebug("JOB", "%p finished a job", psWT);
     116             : #endif
     117       14576 :         poTP->DeclareJobFinished();
     118       14583 :     }
     119        1118 : }
     120             : 
     121             : /************************************************************************/
     122             : /*                             SubmitJob()                              */
     123             : /************************************************************************/
     124             : 
     125             : /** Queue a new job.
     126             :  *
     127             :  * @param pfnFunc Function to run for the job.
     128             :  * @param pData User data to pass to the job function.
     129             :  * @return true in case of success.
     130             :  */
     131       13261 : bool CPLWorkerThreadPool::SubmitJob(CPLThreadFunc pfnFunc, void *pData)
     132             : {
     133       13261 :     CPLAssert(m_nMaxThreads > 0);
     134             : 
     135       13261 :     bool bMustIncrementWaitingWorkerThreadsAfterSubmission = false;
     136       13261 :     if (threadLocalCurrentThreadPool == this)
     137             :     {
     138             :         // If there are waiting threads or we have not started all allowed
     139             :         // threads, we can submit this job asynchronously
     140             :         {
     141          18 :             std::unique_lock<std::mutex> oGuard(m_mutex);
     142          17 :             if (nWaitingWorkerThreads > 0 ||
     143           8 :                 static_cast<int>(aWT.size()) < m_nMaxThreads)
     144             :             {
     145           1 :                 bMustIncrementWaitingWorkerThreadsAfterSubmission = true;
     146           1 :                 nWaitingWorkerThreads--;
     147             :             }
     148             :         }
     149           9 :         if (!bMustIncrementWaitingWorkerThreadsAfterSubmission)
     150             :         {
     151             :             // otherwise there is a risk of deadlock, so execute synchronously.
     152           8 :             pfnFunc(pData);
     153           8 :             return true;
     154             :         }
     155             :     }
     156             : 
     157             :     CPLWorkerThreadJob *psJob = static_cast<CPLWorkerThreadJob *>(
     158       13253 :         VSI_MALLOC_VERBOSE(sizeof(CPLWorkerThreadJob)));
     159       13253 :     if (psJob == nullptr)
     160             :     {
     161           0 :         if (bMustIncrementWaitingWorkerThreadsAfterSubmission)
     162             :         {
     163           0 :             std::unique_lock<std::mutex> oGuard(m_mutex);
     164           0 :             nWaitingWorkerThreads++;
     165             :         }
     166           0 :         return false;
     167             :     }
     168       13253 :     psJob->pfnFunc = pfnFunc;
     169       13253 :     psJob->pData = pData;
     170             : 
     171             :     CPLList *psItem =
     172       13253 :         static_cast<CPLList *>(VSI_MALLOC_VERBOSE(sizeof(CPLList)));
     173       13253 :     if (psItem == nullptr)
     174             :     {
     175           0 :         VSIFree(psJob);
     176           0 :         if (bMustIncrementWaitingWorkerThreadsAfterSubmission)
     177             :         {
     178           0 :             std::unique_lock<std::mutex> oGuard(m_mutex);
     179           0 :             nWaitingWorkerThreads++;
     180             :         }
     181           0 :         return false;
     182             :     }
     183       13253 :     psItem->pData = psJob;
     184             : 
     185       26506 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     186             : 
     187       13253 :     if (bMustIncrementWaitingWorkerThreadsAfterSubmission)
     188           1 :         nWaitingWorkerThreads++;
     189             : 
     190       13253 :     if (static_cast<int>(aWT.size()) < m_nMaxThreads)
     191             :     {
     192             :         // CPLDebug("CPL", "Starting new thread...");
     193          70 :         std::unique_ptr<CPLWorkerThread> wt(new CPLWorkerThread);
     194          70 :         wt->pfnInitFunc = nullptr;
     195          70 :         wt->pInitData = nullptr;
     196          70 :         wt->poTP = this;
     197          70 :         wt->bMarkedAsWaiting = false;
     198          70 :         wt->hThread = CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
     199          70 :         if (wt->hThread == nullptr)
     200             :         {
     201           0 :             VSIFree(psJob);
     202           0 :             VSIFree(psItem);
     203           0 :             return false;
     204             :         }
     205             :         else
     206             :         {
     207          70 :             aWT.emplace_back(std::move(wt));
     208             :         }
     209             :     }
     210             : 
     211       13253 :     psItem->psNext = psJobQueue;
     212       13253 :     psJobQueue = psItem;
     213       13253 :     nPendingJobs++;
     214             : 
     215       13253 :     if (psWaitingWorkerThreadsList)
     216             :     {
     217        5353 :         CPLWorkerThread *psWorkerThread =
     218        5353 :             static_cast<CPLWorkerThread *>(psWaitingWorkerThreadsList->pData);
     219             : 
     220        5353 :         CPLAssert(psWorkerThread->bMarkedAsWaiting);
     221        5353 :         psWorkerThread->bMarkedAsWaiting = false;
     222             : 
     223        5353 :         CPLList *psNext = psWaitingWorkerThreadsList->psNext;
     224        5353 :         CPLList *psToFree = psWaitingWorkerThreadsList;
     225        5353 :         psWaitingWorkerThreadsList = psNext;
     226        5353 :         nWaitingWorkerThreads--;
     227             : 
     228             :         // CPLAssert(
     229             :         //   CPLListCount(psWaitingWorkerThreadsList) == nWaitingWorkerThreads);
     230             : 
     231             : #if DEBUG_VERBOSE
     232             :         CPLDebug("JOB", "Waking up %p", psWorkerThread);
     233             : #endif
     234             : 
     235             :         {
     236       10706 :             std::lock_guard<std::mutex> oGuardWT(psWorkerThread->m_mutex);
     237        5353 :             oGuard.unlock();
     238        5353 :             psWorkerThread->m_cv.notify_one();
     239             :         }
     240             : 
     241        5353 :         CPLFree(psToFree);
     242             :     }
     243             : 
     244       13253 :     return true;
     245             : }
     246             : 
     247             : /************************************************************************/
     248             : /*                             SubmitJobs()                              */
     249             : /************************************************************************/
     250             : 
     251             : /** Queue several jobs
     252             :  *
     253             :  * @param pfnFunc Function to run for the job.
     254             :  * @param apData User data instances to pass to the job function.
     255             :  * @return true in case of success.
     256             :  */
     257         132 : bool CPLWorkerThreadPool::SubmitJobs(CPLThreadFunc pfnFunc,
     258             :                                      const std::vector<void *> &apData)
     259             : {
     260         132 :     CPLAssert(m_nMaxThreads > 0);
     261             : 
     262         132 :     if (threadLocalCurrentThreadPool == this)
     263             :     {
     264             :         // If SubmitJob() is called from a worker thread of this queue,
     265             :         // then synchronously run the task to avoid deadlock.
     266           0 :         for (size_t i = 0; i < apData.size(); i++)
     267             :         {
     268           0 :             pfnFunc(apData[i]);
     269             :         }
     270           0 :         return true;
     271             :     }
     272             : 
     273         264 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     274             : 
     275         132 :     CPLList *psJobQueueInit = psJobQueue;
     276         132 :     bool bRet = true;
     277             : 
     278        1462 :     for (size_t i = 0; i < apData.size(); i++)
     279             :     {
     280        1330 :         if (static_cast<int>(aWT.size()) < m_nMaxThreads)
     281             :         {
     282           0 :             std::unique_ptr<CPLWorkerThread> wt(new CPLWorkerThread);
     283           0 :             wt->pfnInitFunc = nullptr;
     284           0 :             wt->pInitData = nullptr;
     285           0 :             wt->poTP = this;
     286           0 :             wt->bMarkedAsWaiting = false;
     287           0 :             wt->hThread =
     288           0 :                 CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
     289           0 :             if (wt->hThread == nullptr)
     290             :             {
     291           0 :                 if (aWT.empty())
     292           0 :                     return false;
     293             :             }
     294             :             else
     295             :             {
     296           0 :                 aWT.emplace_back(std::move(wt));
     297             :             }
     298             :         }
     299             : 
     300             :         CPLWorkerThreadJob *psJob = static_cast<CPLWorkerThreadJob *>(
     301        1330 :             VSI_MALLOC_VERBOSE(sizeof(CPLWorkerThreadJob)));
     302        1330 :         if (psJob == nullptr)
     303             :         {
     304           0 :             bRet = false;
     305           0 :             break;
     306             :         }
     307        1330 :         psJob->pfnFunc = pfnFunc;
     308        1330 :         psJob->pData = apData[i];
     309             : 
     310             :         CPLList *psItem =
     311        1330 :             static_cast<CPLList *>(VSI_MALLOC_VERBOSE(sizeof(CPLList)));
     312        1330 :         if (psItem == nullptr)
     313             :         {
     314           0 :             VSIFree(psJob);
     315           0 :             bRet = false;
     316           0 :             break;
     317             :         }
     318        1330 :         psItem->pData = psJob;
     319             : 
     320        1330 :         psItem->psNext = psJobQueue;
     321        1330 :         psJobQueue = psItem;
     322        1330 :         nPendingJobs++;
     323             :     }
     324             : 
     325         132 :     if (!bRet)
     326             :     {
     327           0 :         for (CPLList *psIter = psJobQueue; psIter != psJobQueueInit;)
     328             :         {
     329           0 :             CPLList *psNext = psIter->psNext;
     330           0 :             VSIFree(psIter->pData);
     331           0 :             VSIFree(psIter);
     332           0 :             nPendingJobs--;
     333           0 :             psIter = psNext;
     334             :         }
     335           0 :         return false;
     336             :     }
     337             : 
     338         457 :     for (size_t i = 0; i < apData.size(); i++)
     339             :     {
     340         331 :         if (psWaitingWorkerThreadsList && psJobQueue)
     341             :         {
     342             :             CPLWorkerThread *psWorkerThread;
     343             : 
     344         325 :             psWorkerThread = static_cast<CPLWorkerThread *>(
     345         325 :                 psWaitingWorkerThreadsList->pData);
     346             : 
     347         325 :             CPLAssert(psWorkerThread->bMarkedAsWaiting);
     348         325 :             psWorkerThread->bMarkedAsWaiting = false;
     349             : 
     350         325 :             CPLList *psNext = psWaitingWorkerThreadsList->psNext;
     351         325 :             CPLList *psToFree = psWaitingWorkerThreadsList;
     352         325 :             psWaitingWorkerThreadsList = psNext;
     353         325 :             nWaitingWorkerThreads--;
     354             : 
     355             :             // CPLAssert(
     356             :             //    CPLListCount(psWaitingWorkerThreadsList) ==
     357             :             //    nWaitingWorkerThreads);
     358             : 
     359             : #if DEBUG_VERBOSE
     360             :             CPLDebug("JOB", "Waking up %p", psWorkerThread);
     361             : #endif
     362             :             {
     363         650 :                 std::lock_guard<std::mutex> oGuardWT(psWorkerThread->m_mutex);
     364         325 :                 oGuard.unlock();
     365         325 :                 psWorkerThread->m_cv.notify_one();
     366             :             }
     367             : 
     368         325 :             CPLFree(psToFree);
     369         325 :             oGuard.lock();
     370             :         }
     371             :         else
     372             :         {
     373             :             break;
     374             :         }
     375             :     }
     376             : 
     377         132 :     return true;
     378             : }
     379             : 
     380             : /************************************************************************/
     381             : /*                            WaitCompletion()                          */
     382             : /************************************************************************/
     383             : 
     384             : /** Wait for completion of part or whole jobs.
     385             :  *
     386             :  * @param nMaxRemainingJobs Maximum number of pendings jobs that are allowed
     387             :  *                          in the queue after this method has completed. Might
     388             :  * be 0 to wait for all jobs.
     389             :  */
     390        5165 : void CPLWorkerThreadPool::WaitCompletion(int nMaxRemainingJobs)
     391             : {
     392        5165 :     if (nMaxRemainingJobs < 0)
     393           0 :         nMaxRemainingJobs = 0;
     394       10330 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     395        8140 :     while (nPendingJobs > nMaxRemainingJobs)
     396             :     {
     397        2975 :         m_cv.wait(oGuard);
     398             :     }
     399        5165 : }
     400             : 
     401             : /************************************************************************/
     402             : /*                            WaitEvent()                               */
     403             : /************************************************************************/
     404             : 
     405             : /** Wait for completion of at least one job, if there are any remaining
     406             :  */
     407        1388 : void CPLWorkerThreadPool::WaitEvent()
     408             : {
     409        2776 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     410             :     while (true)
     411             :     {
     412        1604 :         const int nPendingJobsBefore = nPendingJobs;
     413        1604 :         if (nPendingJobsBefore == 0)
     414             :         {
     415           6 :             break;
     416             :         }
     417        1598 :         m_cv.wait(oGuard);
     418             :         // cppcheck-suppress knownConditionTrueFalse
     419        1598 :         if (nPendingJobs < nPendingJobsBefore)
     420             :         {
     421        1382 :             break;
     422             :         }
     423         216 :     }
     424        1388 : }
     425             : 
     426             : /************************************************************************/
     427             : /*                                Setup()                               */
     428             : /************************************************************************/
     429             : 
     430             : /** Setup the pool.
     431             :  *
     432             :  * @param nThreads Number of threads to launch
     433             :  * @param pfnInitFunc Initialization function to run in each thread. May be NULL
     434             :  * @param pasInitData Array of initialization data. Its length must be nThreads,
     435             :  *                    or it should be NULL.
     436             :  * @return true if initialization was successful.
     437             :  */
     438         274 : bool CPLWorkerThreadPool::Setup(int nThreads, CPLThreadFunc pfnInitFunc,
     439             :                                 void **pasInitData)
     440             : {
     441         274 :     return Setup(nThreads, pfnInitFunc, pasInitData, true);
     442             : }
     443             : 
     444             : /** Setup the pool.
     445             :  *
     446             :  * @param nThreads Number of threads to launch
     447             :  * @param pfnInitFunc Initialization function to run in each thread. May be NULL
     448             :  * @param pasInitData Array of initialization data. Its length must be nThreads,
     449             :  *                    or it should be NULL.
     450             :  * @param bWaitallStarted Whether to wait for all threads to be fully started.
     451             :  * @return true if initialization was successful.
     452             :  */
     453         293 : bool CPLWorkerThreadPool::Setup(int nThreads, CPLThreadFunc pfnInitFunc,
     454             :                                 void **pasInitData, bool bWaitallStarted)
     455             : {
     456         293 :     CPLAssert(nThreads > 0);
     457             : 
     458         586 :     if (nThreads > static_cast<int>(aWT.size()) && pfnInitFunc == nullptr &&
     459         586 :         pasInitData == nullptr && !bWaitallStarted)
     460             :     {
     461          18 :         std::lock_guard<std::mutex> oGuard(m_mutex);
     462          18 :         if (nThreads > m_nMaxThreads)
     463          18 :             m_nMaxThreads = nThreads;
     464          18 :         return true;
     465             :     }
     466             : 
     467         275 :     bool bRet = true;
     468        1343 :     for (int i = static_cast<int>(aWT.size()); i < nThreads; i++)
     469             :     {
     470        1068 :         std::unique_ptr<CPLWorkerThread> wt(new CPLWorkerThread);
     471        1068 :         wt->pfnInitFunc = pfnInitFunc;
     472        1068 :         wt->pInitData = pasInitData ? pasInitData[i] : nullptr;
     473        1068 :         wt->poTP = this;
     474        1068 :         wt->bMarkedAsWaiting = false;
     475        1068 :         wt->hThread = CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
     476        1068 :         if (wt->hThread == nullptr)
     477             :         {
     478           0 :             nThreads = i;
     479           0 :             bRet = false;
     480           0 :             break;
     481             :         }
     482        1068 :         aWT.emplace_back(std::move(wt));
     483             :     }
     484             : 
     485             :     {
     486         550 :         std::lock_guard<std::mutex> oGuard(m_mutex);
     487         275 :         if (nThreads > m_nMaxThreads)
     488         275 :             m_nMaxThreads = nThreads;
     489             :     }
     490             : 
     491         275 :     if (bWaitallStarted)
     492             :     {
     493             :         // Wait all threads to be started
     494         550 :         std::unique_lock<std::mutex> oGuard(m_mutex);
     495         732 :         while (nWaitingWorkerThreads < nThreads)
     496             :         {
     497         457 :             m_cv.wait(oGuard);
     498             :         }
     499             :     }
     500             : 
     501         275 :     if (eState == CPLWTS_ERROR)
     502           0 :         bRet = false;
     503             : 
     504         275 :     return bRet;
     505             : }
     506             : 
     507             : /************************************************************************/
     508             : /*                          DeclareJobFinished()                        */
     509             : /************************************************************************/
     510             : 
     511       14579 : void CPLWorkerThreadPool::DeclareJobFinished()
     512             : {
     513       29162 :     std::lock_guard<std::mutex> oGuard(m_mutex);
     514       14583 :     nPendingJobs--;
     515       14583 :     m_cv.notify_one();
     516       14583 : }
     517             : 
     518             : /************************************************************************/
     519             : /*                             GetNextJob()                             */
     520             : /************************************************************************/
     521             : 
     522             : CPLWorkerThreadJob *
     523       22445 : CPLWorkerThreadPool::GetNextJob(CPLWorkerThread *psWorkerThread)
     524             : {
     525             :     while (true)
     526             :     {
     527       22445 :         std::unique_lock<std::mutex> oGuard(m_mutex);
     528       22453 :         if (eState == CPLWTS_STOP)
     529             :         {
     530        1118 :             return nullptr;
     531             :         }
     532       21335 :         CPLList *psTopJobIter = psJobQueue;
     533       21335 :         if (psTopJobIter)
     534             :         {
     535       14583 :             psJobQueue = psTopJobIter->psNext;
     536             : 
     537             : #if DEBUG_VERBOSE
     538             :             CPLDebug("JOB", "%p got a job", psWorkerThread);
     539             : #endif
     540       14583 :             CPLWorkerThreadJob *psJob =
     541             :                 static_cast<CPLWorkerThreadJob *>(psTopJobIter->pData);
     542       14583 :             CPLFree(psTopJobIter);
     543       14583 :             return psJob;
     544             :         }
     545             : 
     546        6752 :         if (!psWorkerThread->bMarkedAsWaiting)
     547             :         {
     548        6752 :             psWorkerThread->bMarkedAsWaiting = true;
     549        6752 :             nWaitingWorkerThreads++;
     550             : 
     551             :             CPLList *psItem =
     552        6752 :                 static_cast<CPLList *>(VSI_MALLOC_VERBOSE(sizeof(CPLList)));
     553        6752 :             if (psItem == nullptr)
     554             :             {
     555           0 :                 eState = CPLWTS_ERROR;
     556           0 :                 m_cv.notify_one();
     557             : 
     558           0 :                 return nullptr;
     559             :             }
     560             : 
     561        6752 :             psItem->pData = psWorkerThread;
     562        6752 :             psItem->psNext = psWaitingWorkerThreadsList;
     563        6752 :             psWaitingWorkerThreadsList = psItem;
     564             : 
     565             : #if DEBUG_VERBOSE
     566             :             CPLAssert(CPLListCount(psWaitingWorkerThreadsList) ==
     567             :                       nWaitingWorkerThreads);
     568             : #endif
     569             :         }
     570             : 
     571        6752 :         m_cv.notify_one();
     572             : 
     573             : #if DEBUG_VERBOSE
     574             :         CPLDebug("JOB", "%p sleeping", psWorkerThread);
     575             : #endif
     576             : 
     577       13481 :         std::unique_lock<std::mutex> oGuardThisThread(psWorkerThread->m_mutex);
     578        6752 :         oGuard.unlock();
     579        6752 :         psWorkerThread->m_cv.wait(oGuardThisThread);
     580        6729 :     }
     581             : }
     582             : 
     583             : /************************************************************************/
     584             : /*                         CreateJobQueue()                             */
     585             : /************************************************************************/
     586             : 
     587             : /** Create a new job queue based on this worker thread pool.
     588             :  *
     589             :  * The worker thread pool must remain alive while the returned object is
     590             :  * itself alive.
     591             :  *
     592             :  * @since GDAL 3.2
     593             :  */
     594         316 : std::unique_ptr<CPLJobQueue> CPLWorkerThreadPool::CreateJobQueue()
     595             : {
     596         316 :     return std::unique_ptr<CPLJobQueue>(new CPLJobQueue(this));
     597             : }
     598             : 
     599             : /************************************************************************/
     600             : /*                            CPLJobQueue()                             */
     601             : /************************************************************************/
     602             : 
     603             : //! @cond Doxygen_Suppress
     604         316 : CPLJobQueue::CPLJobQueue(CPLWorkerThreadPool *poPool) : m_poPool(poPool)
     605             : {
     606         316 : }
     607             : 
     608             : //! @endcond
     609             : 
     610             : /************************************************************************/
     611             : /*                           ~CPLJobQueue()                             */
     612             : /************************************************************************/
     613             : 
     614         316 : CPLJobQueue::~CPLJobQueue()
     615             : {
     616         316 :     WaitCompletion();
     617         316 : }
     618             : 
     619             : /************************************************************************/
     620             : /*                           JobQueueJob                                */
     621             : /************************************************************************/
     622             : 
     623             : struct JobQueueJob
     624             : {
     625             :     CPLJobQueue *poQueue = nullptr;
     626             :     CPLThreadFunc pfnFunc = nullptr;
     627             :     void *pData = nullptr;
     628             : };
     629             : 
     630             : /************************************************************************/
     631             : /*                          JobQueueFunction()                          */
     632             : /************************************************************************/
     633             : 
     634        6356 : void CPLJobQueue::JobQueueFunction(void *pData)
     635             : {
     636        6356 :     JobQueueJob *poJob = static_cast<JobQueueJob *>(pData);
     637        6356 :     poJob->pfnFunc(poJob->pData);
     638        6354 :     poJob->poQueue->DeclareJobFinished();
     639        6355 :     delete poJob;
     640        6355 : }
     641             : 
     642             : /************************************************************************/
     643             : /*                          DeclareJobFinished()                        */
     644             : /************************************************************************/
     645             : 
     646        6350 : void CPLJobQueue::DeclareJobFinished()
     647             : {
     648       12706 :     std::lock_guard<std::mutex> oGuard(m_mutex);
     649        6356 :     m_nPendingJobs--;
     650        6356 :     m_cv.notify_one();
     651        6355 : }
     652             : 
     653             : /************************************************************************/
     654             : /*                             SubmitJob()                              */
     655             : /************************************************************************/
     656             : 
     657             : /** Queue a new job.
     658             :  *
     659             :  * @param pfnFunc Function to run for the job.
     660             :  * @param pData User data to pass to the job function.
     661             :  * @return true in case of success.
     662             :  */
     663        6356 : bool CPLJobQueue::SubmitJob(CPLThreadFunc pfnFunc, void *pData)
     664             : {
     665        6356 :     JobQueueJob *poJob = new JobQueueJob;
     666        6356 :     poJob->poQueue = this;
     667        6356 :     poJob->pfnFunc = pfnFunc;
     668        6356 :     poJob->pData = pData;
     669             :     {
     670        6356 :         std::lock_guard<std::mutex> oGuard(m_mutex);
     671        6356 :         m_nPendingJobs++;
     672             :     }
     673        6356 :     bool bRet = m_poPool->SubmitJob(JobQueueFunction, poJob);
     674        6356 :     if (!bRet)
     675             :     {
     676           0 :         delete poJob;
     677             :     }
     678        6356 :     return bRet;
     679             : }
     680             : 
     681             : /************************************************************************/
     682             : /*                            WaitCompletion()                          */
     683             : /************************************************************************/
     684             : 
     685             : /** Wait for completion of part or whole jobs.
     686             :  *
     687             :  * @param nMaxRemainingJobs Maximum number of pendings jobs that are allowed
     688             :  *                          in the queue after this method has completed. Might
     689             :  * be 0 to wait for all jobs.
     690             :  */
     691         763 : void CPLJobQueue::WaitCompletion(int nMaxRemainingJobs)
     692             : {
     693        1526 :     std::unique_lock<std::mutex> oGuard(m_mutex);
     694             :     // coverity[missing_lock:FALSE]
     695        1939 :     while (m_nPendingJobs > nMaxRemainingJobs)
     696             :     {
     697        1176 :         m_cv.wait(oGuard);
     698             :     }
     699         763 : }

Generated by: LCOV version 1.14