Line data Source code
1 : /**********************************************************************
2 : *
3 : * Project: CPL - Common Portability Library
4 : * Purpose: CPL worker thread pool
5 : * Author: Even Rouault, <even dot rouault at spatialys dot com>
6 : *
7 : **********************************************************************
8 : * Copyright (c) 2015, Even Rouault, <even dot rouault at spatialys dot com>
9 : *
10 : * Permission is hereby granted, free of charge, to any person obtaining a
11 : * copy of this software and associated documentation files (the "Software"),
12 : * to deal in the Software without restriction, including without limitation
13 : * the rights to use, copy, modify, merge, publish, distribute, sublicense,
14 : * and/or sell copies of the Software, and to permit persons to whom the
15 : * Software is furnished to do so, subject to the following conditions:
16 : *
17 : * The above copyright notice and this permission notice shall be included
18 : * in all copies or substantial portions of the Software.
19 : *
20 : * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 : * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 : * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23 : * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 : * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
25 : * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
26 : * DEALINGS IN THE SOFTWARE.
27 : ****************************************************************************/
28 :
29 : #include "cpl_port.h"
30 : #include "cpl_worker_thread_pool.h"
31 :
32 : #include <cstddef>
33 : #include <memory>
34 :
35 : #include "cpl_conv.h"
36 : #include "cpl_error.h"
37 : #include "cpl_vsi.h"
38 :
39 : struct CPLWorkerThreadJob
40 : {
41 : CPLThreadFunc pfnFunc;
42 : void *pData;
43 : };
44 :
45 : static thread_local CPLWorkerThreadPool *threadLocalCurrentThreadPool = nullptr;
46 :
47 : /************************************************************************/
48 : /* CPLWorkerThreadPool() */
49 : /************************************************************************/
50 :
51 : /** Instantiate a new pool of worker threads.
52 : *
53 : * The pool is in an uninitialized state after this call. The Setup() method
54 : * must be called.
55 : */
56 592 : CPLWorkerThreadPool::CPLWorkerThreadPool()
57 : {
58 592 : }
59 :
60 : /************************************************************************/
61 : /* ~CPLWorkerThreadPool() */
62 : /************************************************************************/
63 :
64 : /** Destroys a pool of worker threads.
65 : *
66 : * Any still pending job will be completed before the destructor returns.
67 : */
68 588 : CPLWorkerThreadPool::~CPLWorkerThreadPool()
69 : {
70 588 : WaitCompletion();
71 :
72 : {
73 588 : std::lock_guard<std::mutex> oGuard(m_mutex);
74 588 : eState = CPLWTS_STOP;
75 : }
76 :
77 1706 : for (auto &wt : aWT)
78 : {
79 : {
80 2236 : std::lock_guard<std::mutex> oGuard(wt->m_mutex);
81 1118 : wt->m_cv.notify_one();
82 : }
83 1118 : CPLJoinThread(wt->hThread);
84 : }
85 :
86 588 : CPLListDestroy(psWaitingWorkerThreadsList);
87 588 : }
88 :
89 : /************************************************************************/
90 : /* WorkerThreadFunction() */
91 : /************************************************************************/
92 :
93 1138 : void CPLWorkerThreadPool::WorkerThreadFunction(void *user_data)
94 : {
95 1138 : CPLWorkerThread *psWT = static_cast<CPLWorkerThread *>(user_data);
96 1138 : CPLWorkerThreadPool *poTP = psWT->poTP;
97 :
98 1138 : threadLocalCurrentThreadPool = poTP;
99 :
100 1138 : if (psWT->pfnInitFunc)
101 0 : psWT->pfnInitFunc(psWT->pInitData);
102 :
103 : while (true)
104 : {
105 15721 : CPLWorkerThreadJob *psJob = poTP->GetNextJob(psWT);
106 15701 : if (psJob == nullptr)
107 1118 : break;
108 :
109 14583 : if (psJob->pfnFunc)
110 : {
111 14583 : psJob->pfnFunc(psJob->pData);
112 : }
113 14581 : CPLFree(psJob);
114 : #if DEBUG_VERBOSE
115 : CPLDebug("JOB", "%p finished a job", psWT);
116 : #endif
117 14576 : poTP->DeclareJobFinished();
118 14583 : }
119 1118 : }
120 :
121 : /************************************************************************/
122 : /* SubmitJob() */
123 : /************************************************************************/
124 :
125 : /** Queue a new job.
126 : *
127 : * @param pfnFunc Function to run for the job.
128 : * @param pData User data to pass to the job function.
129 : * @return true in case of success.
130 : */
131 13261 : bool CPLWorkerThreadPool::SubmitJob(CPLThreadFunc pfnFunc, void *pData)
132 : {
133 13261 : CPLAssert(m_nMaxThreads > 0);
134 :
135 13261 : bool bMustIncrementWaitingWorkerThreadsAfterSubmission = false;
136 13261 : if (threadLocalCurrentThreadPool == this)
137 : {
138 : // If there are waiting threads or we have not started all allowed
139 : // threads, we can submit this job asynchronously
140 : {
141 18 : std::unique_lock<std::mutex> oGuard(m_mutex);
142 17 : if (nWaitingWorkerThreads > 0 ||
143 8 : static_cast<int>(aWT.size()) < m_nMaxThreads)
144 : {
145 1 : bMustIncrementWaitingWorkerThreadsAfterSubmission = true;
146 1 : nWaitingWorkerThreads--;
147 : }
148 : }
149 9 : if (!bMustIncrementWaitingWorkerThreadsAfterSubmission)
150 : {
151 : // otherwise there is a risk of deadlock, so execute synchronously.
152 8 : pfnFunc(pData);
153 8 : return true;
154 : }
155 : }
156 :
157 : CPLWorkerThreadJob *psJob = static_cast<CPLWorkerThreadJob *>(
158 13253 : VSI_MALLOC_VERBOSE(sizeof(CPLWorkerThreadJob)));
159 13253 : if (psJob == nullptr)
160 : {
161 0 : if (bMustIncrementWaitingWorkerThreadsAfterSubmission)
162 : {
163 0 : std::unique_lock<std::mutex> oGuard(m_mutex);
164 0 : nWaitingWorkerThreads++;
165 : }
166 0 : return false;
167 : }
168 13253 : psJob->pfnFunc = pfnFunc;
169 13253 : psJob->pData = pData;
170 :
171 : CPLList *psItem =
172 13253 : static_cast<CPLList *>(VSI_MALLOC_VERBOSE(sizeof(CPLList)));
173 13253 : if (psItem == nullptr)
174 : {
175 0 : VSIFree(psJob);
176 0 : if (bMustIncrementWaitingWorkerThreadsAfterSubmission)
177 : {
178 0 : std::unique_lock<std::mutex> oGuard(m_mutex);
179 0 : nWaitingWorkerThreads++;
180 : }
181 0 : return false;
182 : }
183 13253 : psItem->pData = psJob;
184 :
185 26506 : std::unique_lock<std::mutex> oGuard(m_mutex);
186 :
187 13253 : if (bMustIncrementWaitingWorkerThreadsAfterSubmission)
188 1 : nWaitingWorkerThreads++;
189 :
190 13253 : if (static_cast<int>(aWT.size()) < m_nMaxThreads)
191 : {
192 : // CPLDebug("CPL", "Starting new thread...");
193 70 : std::unique_ptr<CPLWorkerThread> wt(new CPLWorkerThread);
194 70 : wt->pfnInitFunc = nullptr;
195 70 : wt->pInitData = nullptr;
196 70 : wt->poTP = this;
197 70 : wt->bMarkedAsWaiting = false;
198 70 : wt->hThread = CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
199 70 : if (wt->hThread == nullptr)
200 : {
201 0 : VSIFree(psJob);
202 0 : VSIFree(psItem);
203 0 : return false;
204 : }
205 : else
206 : {
207 70 : aWT.emplace_back(std::move(wt));
208 : }
209 : }
210 :
211 13253 : psItem->psNext = psJobQueue;
212 13253 : psJobQueue = psItem;
213 13253 : nPendingJobs++;
214 :
215 13253 : if (psWaitingWorkerThreadsList)
216 : {
217 5353 : CPLWorkerThread *psWorkerThread =
218 5353 : static_cast<CPLWorkerThread *>(psWaitingWorkerThreadsList->pData);
219 :
220 5353 : CPLAssert(psWorkerThread->bMarkedAsWaiting);
221 5353 : psWorkerThread->bMarkedAsWaiting = false;
222 :
223 5353 : CPLList *psNext = psWaitingWorkerThreadsList->psNext;
224 5353 : CPLList *psToFree = psWaitingWorkerThreadsList;
225 5353 : psWaitingWorkerThreadsList = psNext;
226 5353 : nWaitingWorkerThreads--;
227 :
228 : // CPLAssert(
229 : // CPLListCount(psWaitingWorkerThreadsList) == nWaitingWorkerThreads);
230 :
231 : #if DEBUG_VERBOSE
232 : CPLDebug("JOB", "Waking up %p", psWorkerThread);
233 : #endif
234 :
235 : {
236 10706 : std::lock_guard<std::mutex> oGuardWT(psWorkerThread->m_mutex);
237 5353 : oGuard.unlock();
238 5353 : psWorkerThread->m_cv.notify_one();
239 : }
240 :
241 5353 : CPLFree(psToFree);
242 : }
243 :
244 13253 : return true;
245 : }
246 :
247 : /************************************************************************/
248 : /* SubmitJobs() */
249 : /************************************************************************/
250 :
251 : /** Queue several jobs
252 : *
253 : * @param pfnFunc Function to run for the job.
254 : * @param apData User data instances to pass to the job function.
255 : * @return true in case of success.
256 : */
257 132 : bool CPLWorkerThreadPool::SubmitJobs(CPLThreadFunc pfnFunc,
258 : const std::vector<void *> &apData)
259 : {
260 132 : CPLAssert(m_nMaxThreads > 0);
261 :
262 132 : if (threadLocalCurrentThreadPool == this)
263 : {
264 : // If SubmitJob() is called from a worker thread of this queue,
265 : // then synchronously run the task to avoid deadlock.
266 0 : for (size_t i = 0; i < apData.size(); i++)
267 : {
268 0 : pfnFunc(apData[i]);
269 : }
270 0 : return true;
271 : }
272 :
273 264 : std::unique_lock<std::mutex> oGuard(m_mutex);
274 :
275 132 : CPLList *psJobQueueInit = psJobQueue;
276 132 : bool bRet = true;
277 :
278 1462 : for (size_t i = 0; i < apData.size(); i++)
279 : {
280 1330 : if (static_cast<int>(aWT.size()) < m_nMaxThreads)
281 : {
282 0 : std::unique_ptr<CPLWorkerThread> wt(new CPLWorkerThread);
283 0 : wt->pfnInitFunc = nullptr;
284 0 : wt->pInitData = nullptr;
285 0 : wt->poTP = this;
286 0 : wt->bMarkedAsWaiting = false;
287 0 : wt->hThread =
288 0 : CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
289 0 : if (wt->hThread == nullptr)
290 : {
291 0 : if (aWT.empty())
292 0 : return false;
293 : }
294 : else
295 : {
296 0 : aWT.emplace_back(std::move(wt));
297 : }
298 : }
299 :
300 : CPLWorkerThreadJob *psJob = static_cast<CPLWorkerThreadJob *>(
301 1330 : VSI_MALLOC_VERBOSE(sizeof(CPLWorkerThreadJob)));
302 1330 : if (psJob == nullptr)
303 : {
304 0 : bRet = false;
305 0 : break;
306 : }
307 1330 : psJob->pfnFunc = pfnFunc;
308 1330 : psJob->pData = apData[i];
309 :
310 : CPLList *psItem =
311 1330 : static_cast<CPLList *>(VSI_MALLOC_VERBOSE(sizeof(CPLList)));
312 1330 : if (psItem == nullptr)
313 : {
314 0 : VSIFree(psJob);
315 0 : bRet = false;
316 0 : break;
317 : }
318 1330 : psItem->pData = psJob;
319 :
320 1330 : psItem->psNext = psJobQueue;
321 1330 : psJobQueue = psItem;
322 1330 : nPendingJobs++;
323 : }
324 :
325 132 : if (!bRet)
326 : {
327 0 : for (CPLList *psIter = psJobQueue; psIter != psJobQueueInit;)
328 : {
329 0 : CPLList *psNext = psIter->psNext;
330 0 : VSIFree(psIter->pData);
331 0 : VSIFree(psIter);
332 0 : nPendingJobs--;
333 0 : psIter = psNext;
334 : }
335 0 : return false;
336 : }
337 :
338 457 : for (size_t i = 0; i < apData.size(); i++)
339 : {
340 331 : if (psWaitingWorkerThreadsList && psJobQueue)
341 : {
342 : CPLWorkerThread *psWorkerThread;
343 :
344 325 : psWorkerThread = static_cast<CPLWorkerThread *>(
345 325 : psWaitingWorkerThreadsList->pData);
346 :
347 325 : CPLAssert(psWorkerThread->bMarkedAsWaiting);
348 325 : psWorkerThread->bMarkedAsWaiting = false;
349 :
350 325 : CPLList *psNext = psWaitingWorkerThreadsList->psNext;
351 325 : CPLList *psToFree = psWaitingWorkerThreadsList;
352 325 : psWaitingWorkerThreadsList = psNext;
353 325 : nWaitingWorkerThreads--;
354 :
355 : // CPLAssert(
356 : // CPLListCount(psWaitingWorkerThreadsList) ==
357 : // nWaitingWorkerThreads);
358 :
359 : #if DEBUG_VERBOSE
360 : CPLDebug("JOB", "Waking up %p", psWorkerThread);
361 : #endif
362 : {
363 650 : std::lock_guard<std::mutex> oGuardWT(psWorkerThread->m_mutex);
364 325 : oGuard.unlock();
365 325 : psWorkerThread->m_cv.notify_one();
366 : }
367 :
368 325 : CPLFree(psToFree);
369 325 : oGuard.lock();
370 : }
371 : else
372 : {
373 : break;
374 : }
375 : }
376 :
377 132 : return true;
378 : }
379 :
380 : /************************************************************************/
381 : /* WaitCompletion() */
382 : /************************************************************************/
383 :
384 : /** Wait for completion of part or whole jobs.
385 : *
386 : * @param nMaxRemainingJobs Maximum number of pendings jobs that are allowed
387 : * in the queue after this method has completed. Might
388 : * be 0 to wait for all jobs.
389 : */
390 5165 : void CPLWorkerThreadPool::WaitCompletion(int nMaxRemainingJobs)
391 : {
392 5165 : if (nMaxRemainingJobs < 0)
393 0 : nMaxRemainingJobs = 0;
394 10330 : std::unique_lock<std::mutex> oGuard(m_mutex);
395 8140 : while (nPendingJobs > nMaxRemainingJobs)
396 : {
397 2975 : m_cv.wait(oGuard);
398 : }
399 5165 : }
400 :
401 : /************************************************************************/
402 : /* WaitEvent() */
403 : /************************************************************************/
404 :
405 : /** Wait for completion of at least one job, if there are any remaining
406 : */
407 1388 : void CPLWorkerThreadPool::WaitEvent()
408 : {
409 2776 : std::unique_lock<std::mutex> oGuard(m_mutex);
410 : while (true)
411 : {
412 1604 : const int nPendingJobsBefore = nPendingJobs;
413 1604 : if (nPendingJobsBefore == 0)
414 : {
415 6 : break;
416 : }
417 1598 : m_cv.wait(oGuard);
418 : // cppcheck-suppress knownConditionTrueFalse
419 1598 : if (nPendingJobs < nPendingJobsBefore)
420 : {
421 1382 : break;
422 : }
423 216 : }
424 1388 : }
425 :
426 : /************************************************************************/
427 : /* Setup() */
428 : /************************************************************************/
429 :
430 : /** Setup the pool.
431 : *
432 : * @param nThreads Number of threads to launch
433 : * @param pfnInitFunc Initialization function to run in each thread. May be NULL
434 : * @param pasInitData Array of initialization data. Its length must be nThreads,
435 : * or it should be NULL.
436 : * @return true if initialization was successful.
437 : */
438 274 : bool CPLWorkerThreadPool::Setup(int nThreads, CPLThreadFunc pfnInitFunc,
439 : void **pasInitData)
440 : {
441 274 : return Setup(nThreads, pfnInitFunc, pasInitData, true);
442 : }
443 :
444 : /** Setup the pool.
445 : *
446 : * @param nThreads Number of threads to launch
447 : * @param pfnInitFunc Initialization function to run in each thread. May be NULL
448 : * @param pasInitData Array of initialization data. Its length must be nThreads,
449 : * or it should be NULL.
450 : * @param bWaitallStarted Whether to wait for all threads to be fully started.
451 : * @return true if initialization was successful.
452 : */
453 293 : bool CPLWorkerThreadPool::Setup(int nThreads, CPLThreadFunc pfnInitFunc,
454 : void **pasInitData, bool bWaitallStarted)
455 : {
456 293 : CPLAssert(nThreads > 0);
457 :
458 586 : if (nThreads > static_cast<int>(aWT.size()) && pfnInitFunc == nullptr &&
459 586 : pasInitData == nullptr && !bWaitallStarted)
460 : {
461 18 : std::lock_guard<std::mutex> oGuard(m_mutex);
462 18 : if (nThreads > m_nMaxThreads)
463 18 : m_nMaxThreads = nThreads;
464 18 : return true;
465 : }
466 :
467 275 : bool bRet = true;
468 1343 : for (int i = static_cast<int>(aWT.size()); i < nThreads; i++)
469 : {
470 1068 : std::unique_ptr<CPLWorkerThread> wt(new CPLWorkerThread);
471 1068 : wt->pfnInitFunc = pfnInitFunc;
472 1068 : wt->pInitData = pasInitData ? pasInitData[i] : nullptr;
473 1068 : wt->poTP = this;
474 1068 : wt->bMarkedAsWaiting = false;
475 1068 : wt->hThread = CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
476 1068 : if (wt->hThread == nullptr)
477 : {
478 0 : nThreads = i;
479 0 : bRet = false;
480 0 : break;
481 : }
482 1068 : aWT.emplace_back(std::move(wt));
483 : }
484 :
485 : {
486 550 : std::lock_guard<std::mutex> oGuard(m_mutex);
487 275 : if (nThreads > m_nMaxThreads)
488 275 : m_nMaxThreads = nThreads;
489 : }
490 :
491 275 : if (bWaitallStarted)
492 : {
493 : // Wait all threads to be started
494 550 : std::unique_lock<std::mutex> oGuard(m_mutex);
495 732 : while (nWaitingWorkerThreads < nThreads)
496 : {
497 457 : m_cv.wait(oGuard);
498 : }
499 : }
500 :
501 275 : if (eState == CPLWTS_ERROR)
502 0 : bRet = false;
503 :
504 275 : return bRet;
505 : }
506 :
507 : /************************************************************************/
508 : /* DeclareJobFinished() */
509 : /************************************************************************/
510 :
511 14579 : void CPLWorkerThreadPool::DeclareJobFinished()
512 : {
513 29162 : std::lock_guard<std::mutex> oGuard(m_mutex);
514 14583 : nPendingJobs--;
515 14583 : m_cv.notify_one();
516 14583 : }
517 :
518 : /************************************************************************/
519 : /* GetNextJob() */
520 : /************************************************************************/
521 :
522 : CPLWorkerThreadJob *
523 22445 : CPLWorkerThreadPool::GetNextJob(CPLWorkerThread *psWorkerThread)
524 : {
525 : while (true)
526 : {
527 22445 : std::unique_lock<std::mutex> oGuard(m_mutex);
528 22453 : if (eState == CPLWTS_STOP)
529 : {
530 1118 : return nullptr;
531 : }
532 21335 : CPLList *psTopJobIter = psJobQueue;
533 21335 : if (psTopJobIter)
534 : {
535 14583 : psJobQueue = psTopJobIter->psNext;
536 :
537 : #if DEBUG_VERBOSE
538 : CPLDebug("JOB", "%p got a job", psWorkerThread);
539 : #endif
540 14583 : CPLWorkerThreadJob *psJob =
541 : static_cast<CPLWorkerThreadJob *>(psTopJobIter->pData);
542 14583 : CPLFree(psTopJobIter);
543 14583 : return psJob;
544 : }
545 :
546 6752 : if (!psWorkerThread->bMarkedAsWaiting)
547 : {
548 6752 : psWorkerThread->bMarkedAsWaiting = true;
549 6752 : nWaitingWorkerThreads++;
550 :
551 : CPLList *psItem =
552 6752 : static_cast<CPLList *>(VSI_MALLOC_VERBOSE(sizeof(CPLList)));
553 6752 : if (psItem == nullptr)
554 : {
555 0 : eState = CPLWTS_ERROR;
556 0 : m_cv.notify_one();
557 :
558 0 : return nullptr;
559 : }
560 :
561 6752 : psItem->pData = psWorkerThread;
562 6752 : psItem->psNext = psWaitingWorkerThreadsList;
563 6752 : psWaitingWorkerThreadsList = psItem;
564 :
565 : #if DEBUG_VERBOSE
566 : CPLAssert(CPLListCount(psWaitingWorkerThreadsList) ==
567 : nWaitingWorkerThreads);
568 : #endif
569 : }
570 :
571 6752 : m_cv.notify_one();
572 :
573 : #if DEBUG_VERBOSE
574 : CPLDebug("JOB", "%p sleeping", psWorkerThread);
575 : #endif
576 :
577 13481 : std::unique_lock<std::mutex> oGuardThisThread(psWorkerThread->m_mutex);
578 6752 : oGuard.unlock();
579 6752 : psWorkerThread->m_cv.wait(oGuardThisThread);
580 6729 : }
581 : }
582 :
583 : /************************************************************************/
584 : /* CreateJobQueue() */
585 : /************************************************************************/
586 :
587 : /** Create a new job queue based on this worker thread pool.
588 : *
589 : * The worker thread pool must remain alive while the returned object is
590 : * itself alive.
591 : *
592 : * @since GDAL 3.2
593 : */
594 316 : std::unique_ptr<CPLJobQueue> CPLWorkerThreadPool::CreateJobQueue()
595 : {
596 316 : return std::unique_ptr<CPLJobQueue>(new CPLJobQueue(this));
597 : }
598 :
599 : /************************************************************************/
600 : /* CPLJobQueue() */
601 : /************************************************************************/
602 :
603 : //! @cond Doxygen_Suppress
604 316 : CPLJobQueue::CPLJobQueue(CPLWorkerThreadPool *poPool) : m_poPool(poPool)
605 : {
606 316 : }
607 :
608 : //! @endcond
609 :
610 : /************************************************************************/
611 : /* ~CPLJobQueue() */
612 : /************************************************************************/
613 :
614 316 : CPLJobQueue::~CPLJobQueue()
615 : {
616 316 : WaitCompletion();
617 316 : }
618 :
619 : /************************************************************************/
620 : /* JobQueueJob */
621 : /************************************************************************/
622 :
623 : struct JobQueueJob
624 : {
625 : CPLJobQueue *poQueue = nullptr;
626 : CPLThreadFunc pfnFunc = nullptr;
627 : void *pData = nullptr;
628 : };
629 :
630 : /************************************************************************/
631 : /* JobQueueFunction() */
632 : /************************************************************************/
633 :
634 6356 : void CPLJobQueue::JobQueueFunction(void *pData)
635 : {
636 6356 : JobQueueJob *poJob = static_cast<JobQueueJob *>(pData);
637 6356 : poJob->pfnFunc(poJob->pData);
638 6354 : poJob->poQueue->DeclareJobFinished();
639 6355 : delete poJob;
640 6355 : }
641 :
642 : /************************************************************************/
643 : /* DeclareJobFinished() */
644 : /************************************************************************/
645 :
646 6350 : void CPLJobQueue::DeclareJobFinished()
647 : {
648 12706 : std::lock_guard<std::mutex> oGuard(m_mutex);
649 6356 : m_nPendingJobs--;
650 6356 : m_cv.notify_one();
651 6355 : }
652 :
653 : /************************************************************************/
654 : /* SubmitJob() */
655 : /************************************************************************/
656 :
657 : /** Queue a new job.
658 : *
659 : * @param pfnFunc Function to run for the job.
660 : * @param pData User data to pass to the job function.
661 : * @return true in case of success.
662 : */
663 6356 : bool CPLJobQueue::SubmitJob(CPLThreadFunc pfnFunc, void *pData)
664 : {
665 6356 : JobQueueJob *poJob = new JobQueueJob;
666 6356 : poJob->poQueue = this;
667 6356 : poJob->pfnFunc = pfnFunc;
668 6356 : poJob->pData = pData;
669 : {
670 6356 : std::lock_guard<std::mutex> oGuard(m_mutex);
671 6356 : m_nPendingJobs++;
672 : }
673 6356 : bool bRet = m_poPool->SubmitJob(JobQueueFunction, poJob);
674 6356 : if (!bRet)
675 : {
676 0 : delete poJob;
677 : }
678 6356 : return bRet;
679 : }
680 :
681 : /************************************************************************/
682 : /* WaitCompletion() */
683 : /************************************************************************/
684 :
685 : /** Wait for completion of part or whole jobs.
686 : *
687 : * @param nMaxRemainingJobs Maximum number of pendings jobs that are allowed
688 : * in the queue after this method has completed. Might
689 : * be 0 to wait for all jobs.
690 : */
691 763 : void CPLJobQueue::WaitCompletion(int nMaxRemainingJobs)
692 : {
693 1526 : std::unique_lock<std::mutex> oGuard(m_mutex);
694 : // coverity[missing_lock:FALSE]
695 1939 : while (m_nPendingJobs > nMaxRemainingJobs)
696 : {
697 1176 : m_cv.wait(oGuard);
698 : }
699 763 : }
|