Line data Source code
1 : /**********************************************************************
2 : *
3 : * Project: CPL - Common Portability Library
4 : * Purpose: CPL worker thread pool
5 : * Author: Even Rouault, <even dot rouault at spatialys dot com>
6 : *
7 : **********************************************************************
8 : * Copyright (c) 2015, Even Rouault, <even dot rouault at spatialys dot com>
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "cpl_port.h"
14 : #include "cpl_worker_thread_pool.h"
15 :
16 : #include <cstddef>
17 : #include <memory>
18 :
19 : #include "cpl_conv.h"
20 : #include "cpl_error.h"
21 : #include "cpl_vsi.h"
22 :
23 : static thread_local CPLWorkerThreadPool *threadLocalCurrentThreadPool = nullptr;
24 :
25 : /************************************************************************/
26 : /* CPLWorkerThreadPool() */
27 : /************************************************************************/
28 :
29 : /** Instantiate a new pool of worker threads.
30 : *
31 : * The pool is in an uninitialized state after this call. The Setup() method
32 : * must be called.
33 : */
34 846 : CPLWorkerThreadPool::CPLWorkerThreadPool() : jobQueue{}
35 : {
36 846 : }
37 :
38 : /** Instantiate a new pool of worker threads.
39 : *
40 : * \param nThreads Number of threads in the pool.
41 : */
42 248 : CPLWorkerThreadPool::CPLWorkerThreadPool(int nThreads) : jobQueue{}
43 : {
44 247 : Setup(nThreads, nullptr, nullptr);
45 248 : }
46 :
47 : /************************************************************************/
48 : /* ~CPLWorkerThreadPool() */
49 : /************************************************************************/
50 :
51 : /** Destroys a pool of worker threads.
52 : *
53 : * Any still pending job will be completed before the destructor returns.
54 : */
55 1089 : CPLWorkerThreadPool::~CPLWorkerThreadPool()
56 : {
57 1089 : WaitCompletion();
58 :
59 : {
60 1089 : std::lock_guard<std::mutex> oGuard(m_mutex);
61 1089 : eState = CPLWTS_STOP;
62 : }
63 :
64 3661 : for (auto &wt : aWT)
65 : {
66 : {
67 5144 : std::lock_guard<std::mutex> oGuard(wt->m_mutex);
68 2572 : wt->m_cv.notify_one();
69 : }
70 2572 : CPLJoinThread(wt->hThread);
71 : }
72 :
73 1089 : CPLListDestroy(psWaitingWorkerThreadsList);
74 1089 : }
75 :
76 : /************************************************************************/
77 : /* GetThreadCount() */
78 : /************************************************************************/
79 :
80 1248 : int CPLWorkerThreadPool::GetThreadCount() const
81 : {
82 1248 : std::unique_lock<std::mutex> oGuard(m_mutex);
83 2496 : return m_nMaxThreads;
84 : }
85 :
86 : /************************************************************************/
87 : /* WorkerThreadFunction() */
88 : /************************************************************************/
89 :
90 3610 : void CPLWorkerThreadPool::WorkerThreadFunction(void *user_data)
91 : {
92 3610 : CPLWorkerThread *psWT = static_cast<CPLWorkerThread *>(user_data);
93 3610 : CPLWorkerThreadPool *poTP = psWT->poTP;
94 :
95 3610 : threadLocalCurrentThreadPool = poTP;
96 :
97 3610 : if (psWT->pfnInitFunc)
98 0 : psWT->pfnInitFunc(psWT->pInitData);
99 :
100 : while (true)
101 : {
102 64066 : std::function<void()> task = poTP->GetNextJob(psWT);
103 62994 : if (!task)
104 2572 : break;
105 :
106 60329 : task();
107 : #if DEBUG_VERBOSE
108 : CPLDebug("JOB", "%p finished a job", psWT);
109 : #endif
110 60452 : poTP->DeclareJobFinished();
111 60456 : }
112 2580 : }
113 :
114 : /************************************************************************/
115 : /* SubmitJob() */
116 : /************************************************************************/
117 :
118 : /** Queue a new job.
119 : *
120 : * @param pfnFunc Function to run for the job.
121 : * @param pData User data to pass to the job function.
122 : * @return true in case of success.
123 : */
124 7372 : bool CPLWorkerThreadPool::SubmitJob(CPLThreadFunc pfnFunc, void *pData)
125 : {
126 14729 : return SubmitJob([=] { pfnFunc(pData); });
127 : }
128 :
129 : /** Queue a new job.
130 : *
131 : * @param task Void function to execute.
132 : * @return true in case of success.
133 : */
134 74184 : bool CPLWorkerThreadPool::SubmitJob(std::function<void()> task)
135 : {
136 : #ifdef DEBUG
137 : {
138 148295 : std::unique_lock<std::mutex> oGuard(m_mutex);
139 74111 : CPLAssert(m_nMaxThreads > 0);
140 : }
141 : #endif
142 :
143 74073 : bool bMustIncrementWaitingWorkerThreadsAfterSubmission = false;
144 74073 : if (threadLocalCurrentThreadPool == this)
145 : {
146 : // If there are waiting threads or we have not started all allowed
147 : // threads, we can submit this job asynchronously
148 : {
149 115788 : std::unique_lock<std::mutex> oGuard(m_mutex);
150 73122 : if (nWaitingWorkerThreads > 0 ||
151 15214 : static_cast<int>(aWT.size()) < m_nMaxThreads)
152 : {
153 42717 : bMustIncrementWaitingWorkerThreadsAfterSubmission = true;
154 42717 : nWaitingWorkerThreads--;
155 : }
156 : }
157 57934 : if (!bMustIncrementWaitingWorkerThreadsAfterSubmission)
158 : {
159 : // otherwise there is a risk of deadlock, so execute synchronously.
160 15220 : task();
161 15221 : return true;
162 : }
163 : }
164 :
165 58907 : std::unique_lock<std::mutex> oGuard(m_mutex);
166 :
167 58955 : if (bMustIncrementWaitingWorkerThreadsAfterSubmission)
168 42736 : nWaitingWorkerThreads++;
169 :
170 58955 : if (static_cast<int>(aWT.size()) < m_nMaxThreads)
171 : {
172 : // CPLDebug("CPL", "Starting new thread...");
173 2208 : auto wt = std::make_unique<CPLWorkerThread>();
174 1104 : wt->poTP = this;
175 : //ABELL - Why should this fail? And this is a *pool* thread, not necessarily
176 : // tied to the submitted job. The submitted job still needs to run, even if
177 : // this fails. If we can't create a thread, should the entire pool become invalid?
178 1104 : wt->hThread = CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
179 : /**
180 : if (!wt->hThread)
181 : {
182 : VSIFree(psJob);
183 : VSIFree(psItem);
184 : return false;
185 : }
186 : **/
187 1104 : if (wt->hThread)
188 1104 : aWT.emplace_back(std::move(wt));
189 : }
190 :
191 58892 : jobQueue.emplace(task);
192 58931 : nPendingJobs++;
193 :
194 58931 : if (psWaitingWorkerThreadsList)
195 : {
196 52522 : CPLWorkerThread *psWorkerThread =
197 52522 : static_cast<CPLWorkerThread *>(psWaitingWorkerThreadsList->pData);
198 :
199 52522 : CPLAssert(psWorkerThread->bMarkedAsWaiting);
200 52522 : psWorkerThread->bMarkedAsWaiting = false;
201 :
202 52522 : CPLList *psNext = psWaitingWorkerThreadsList->psNext;
203 52522 : CPLList *psToFree = psWaitingWorkerThreadsList;
204 52522 : psWaitingWorkerThreadsList = psNext;
205 52522 : nWaitingWorkerThreads--;
206 :
207 : #if DEBUG_VERBOSE
208 : CPLDebug("JOB", "Waking up %p", psWorkerThread);
209 : #endif
210 :
211 : #ifdef __COVERITY__
212 : CPLError(CE_Failure, CPLE_AppDefined, "Not implemented");
213 : #else
214 : {
215 105121 : std::lock_guard<std::mutex> oGuardWT(psWorkerThread->m_mutex);
216 : // coverity[uninit_use_in_call]
217 52564 : oGuard.unlock();
218 52543 : psWorkerThread->m_cv.notify_one();
219 : }
220 : #endif
221 :
222 52587 : CPLFree(psToFree);
223 : }
224 :
225 : // coverity[double_unlock]
226 59051 : return true;
227 : }
228 :
229 : /************************************************************************/
230 : /* SubmitJobs() */
231 : /************************************************************************/
232 :
233 : /** Queue several jobs
234 : *
235 : * @param pfnFunc Function to run for the job.
236 : * @param apData User data instances to pass to the job function.
237 : * @return true in case of success.
238 : */
239 175 : bool CPLWorkerThreadPool::SubmitJobs(CPLThreadFunc pfnFunc,
240 : const std::vector<void *> &apData)
241 : {
242 175 : if (apData.empty())
243 0 : return false;
244 :
245 : #ifdef DEBUG
246 : {
247 350 : std::unique_lock<std::mutex> oGuard(m_mutex);
248 175 : CPLAssert(m_nMaxThreads > 0);
249 : }
250 : #endif
251 :
252 175 : if (threadLocalCurrentThreadPool == this)
253 : {
254 : // If SubmitJob() is called from a worker thread of this queue,
255 : // then synchronously run the task to avoid deadlock.
256 0 : for (void *pData : apData)
257 0 : pfnFunc(pData);
258 0 : return true;
259 : }
260 :
261 350 : std::unique_lock<std::mutex> oGuard(m_mutex);
262 :
263 1614 : for (void *pData : apData)
264 : {
265 1439 : if (static_cast<int>(aWT.size()) < m_nMaxThreads)
266 : {
267 0 : std::unique_ptr<CPLWorkerThread> wt(new CPLWorkerThread);
268 0 : wt->poTP = this;
269 0 : wt->hThread =
270 0 : CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
271 0 : if (wt->hThread == nullptr)
272 : {
273 0 : if (aWT.empty())
274 0 : return false;
275 : }
276 : else
277 : {
278 0 : aWT.emplace_back(std::move(wt));
279 : }
280 : }
281 :
282 2877 : jobQueue.emplace([=] { pfnFunc(pData); });
283 1439 : nPendingJobs++;
284 : }
285 :
286 789 : for (size_t i = 0; i < apData.size(); i++)
287 : {
288 639 : if (psWaitingWorkerThreadsList)
289 : {
290 : CPLWorkerThread *psWorkerThread;
291 :
292 614 : psWorkerThread = static_cast<CPLWorkerThread *>(
293 614 : psWaitingWorkerThreadsList->pData);
294 :
295 614 : CPLAssert(psWorkerThread->bMarkedAsWaiting);
296 614 : psWorkerThread->bMarkedAsWaiting = false;
297 :
298 614 : CPLList *psNext = psWaitingWorkerThreadsList->psNext;
299 614 : CPLList *psToFree = psWaitingWorkerThreadsList;
300 614 : psWaitingWorkerThreadsList = psNext;
301 614 : nWaitingWorkerThreads--;
302 :
303 : #if DEBUG_VERBOSE
304 : CPLDebug("JOB", "Waking up %p", psWorkerThread);
305 : #endif
306 : {
307 1228 : std::lock_guard<std::mutex> oGuardWT(psWorkerThread->m_mutex);
308 : // coverity[uninit_use_in_call]
309 614 : oGuard.unlock();
310 614 : psWorkerThread->m_cv.notify_one();
311 : }
312 :
313 614 : CPLFree(psToFree);
314 614 : oGuard.lock();
315 : }
316 : else
317 : {
318 25 : break;
319 : }
320 : }
321 :
322 175 : return true;
323 : }
324 :
325 : /************************************************************************/
326 : /* WaitCompletion() */
327 : /************************************************************************/
328 :
329 : /** Wait for completion of part or whole jobs.
330 : *
331 : * @param nMaxRemainingJobs Maximum number of pendings jobs that are allowed
332 : * in the queue after this method has completed. Might
333 : * be 0 to wait for all jobs.
334 : */
335 5829 : void CPLWorkerThreadPool::WaitCompletion(int nMaxRemainingJobs)
336 : {
337 5829 : if (nMaxRemainingJobs < 0)
338 0 : nMaxRemainingJobs = 0;
339 11658 : std::unique_lock<std::mutex> oGuard(m_mutex);
340 5829 : m_cv.wait(oGuard, [this, nMaxRemainingJobs]
341 7351 : { return nPendingJobs <= nMaxRemainingJobs; });
342 5829 : }
343 :
344 : /************************************************************************/
345 : /* WaitEvent() */
346 : /************************************************************************/
347 :
348 : /** Wait for completion of at least one job, if there are any remaining,
349 : * or for WakeUpWaitEvent() to have been called.
350 : */
351 1471 : void CPLWorkerThreadPool::WaitEvent()
352 : {
353 : // NOTE - This isn't quite right. After nPendingJobsBefore is set but before
354 : // a notification occurs, jobs could be submitted which would increase
355 : // nPendingJobs, so a job completion may looks like a spurious wakeup.
356 1471 : std::unique_lock<std::mutex> oGuard(m_mutex);
357 1471 : if (nPendingJobs == 0)
358 41 : return;
359 1430 : const int nPendingJobsBefore = nPendingJobs;
360 1430 : m_cv.wait(oGuard, [this, nPendingJobsBefore]
361 3031 : { return nPendingJobs < nPendingJobsBefore || m_bNotifyEvent; });
362 1430 : m_bNotifyEvent = false;
363 : }
364 :
365 : /************************************************************************/
366 : /* WakeUpWaitEvent() */
367 : /************************************************************************/
368 :
369 : /** Wake-up WaitEvent().
370 : *
371 : * This method is thread-safe.
372 : *
373 : * @since GDAL 3.12
374 : */
375 267 : void CPLWorkerThreadPool::WakeUpWaitEvent()
376 : {
377 534 : std::unique_lock<std::mutex> oGuard(m_mutex);
378 267 : m_bNotifyEvent = true;
379 267 : m_cv.notify_one();
380 267 : }
381 :
382 : /************************************************************************/
383 : /* Setup() */
384 : /************************************************************************/
385 :
386 : /** Setup the pool.
387 : *
388 : * @param nThreads Number of threads to launch
389 : * @param pfnInitFunc Initialization function to run in each thread. May be NULL
390 : * @param pasInitData Array of initialization data. Its length must be nThreads,
391 : * or it should be NULL.
392 : * @return true if initialization was successful.
393 : */
394 634 : bool CPLWorkerThreadPool::Setup(int nThreads, CPLThreadFunc pfnInitFunc,
395 : void **pasInitData)
396 : {
397 634 : return Setup(nThreads, pfnInitFunc, pasInitData, true);
398 : }
399 :
400 : /** Setup the pool.
401 : *
402 : * @param nThreads Number of threads to launch
403 : * @param pfnInitFunc Initialization function to run in each thread. May be NULL
404 : * @param pasInitData Array of initialization data. Its length must be nThreads,
405 : * or it should be NULL.
406 : * @param bWaitallStarted Whether to wait for all threads to be fully started.
407 : * @return true if initialization was successful.
408 : */
409 660 : bool CPLWorkerThreadPool::Setup(int nThreads, CPLThreadFunc pfnInitFunc,
410 : void **pasInitData, bool bWaitallStarted)
411 : {
412 660 : CPLAssert(nThreads > 0);
413 :
414 1321 : if (nThreads > static_cast<int>(aWT.size()) && pfnInitFunc == nullptr &&
415 1322 : pasInitData == nullptr && !bWaitallStarted)
416 : {
417 25 : std::lock_guard<std::mutex> oGuard(m_mutex);
418 25 : if (nThreads > m_nMaxThreads)
419 25 : m_nMaxThreads = nThreads;
420 25 : return true;
421 : }
422 :
423 636 : bool bRet = true;
424 3144 : for (int i = static_cast<int>(aWT.size()); i < nThreads; i++)
425 : {
426 2504 : auto wt = std::make_unique<CPLWorkerThread>();
427 2506 : wt->pfnInitFunc = pfnInitFunc;
428 2506 : wt->pInitData = pasInitData ? pasInitData[i] : nullptr;
429 2507 : wt->poTP = this;
430 2508 : wt->hThread = CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
431 2505 : if (wt->hThread == nullptr)
432 : {
433 0 : nThreads = i;
434 0 : bRet = false;
435 0 : break;
436 : }
437 2507 : aWT.emplace_back(std::move(wt));
438 : }
439 :
440 : {
441 1274 : std::lock_guard<std::mutex> oGuard(m_mutex);
442 636 : if (nThreads > m_nMaxThreads)
443 636 : m_nMaxThreads = nThreads;
444 : }
445 :
446 636 : if (bWaitallStarted)
447 : {
448 : // Wait all threads to be started
449 1272 : std::unique_lock<std::mutex> oGuard(m_mutex);
450 1766 : while (nWaitingWorkerThreads < nThreads)
451 : {
452 1130 : m_cv.wait(oGuard);
453 : }
454 : }
455 :
456 636 : if (eState == CPLWTS_ERROR)
457 0 : bRet = false;
458 :
459 636 : return bRet;
460 : }
461 :
462 : /************************************************************************/
463 : /* DeclareJobFinished() */
464 : /************************************************************************/
465 :
466 60439 : void CPLWorkerThreadPool::DeclareJobFinished()
467 : {
468 120898 : std::lock_guard<std::mutex> oGuard(m_mutex);
469 60442 : nPendingJobs--;
470 60442 : m_cv.notify_one();
471 60434 : }
472 :
473 : /************************************************************************/
474 : /* GetNextJob() */
475 : /************************************************************************/
476 :
477 : std::function<void()>
478 64062 : CPLWorkerThreadPool::GetNextJob(CPLWorkerThread *psWorkerThread)
479 : {
480 64062 : std::unique_lock<std::mutex> oGuard(m_mutex);
481 : while (true)
482 : {
483 119653 : if (eState == CPLWTS_STOP)
484 62979 : return std::function<void()>();
485 :
486 117081 : if (jobQueue.size())
487 : {
488 : #if DEBUG_VERBOSE
489 : CPLDebug("JOB", "%p got a job", psWorkerThread);
490 : #endif
491 120746 : auto task = std::move(jobQueue.front());
492 60348 : jobQueue.pop();
493 60304 : return task;
494 : }
495 :
496 56667 : if (!psWorkerThread->bMarkedAsWaiting)
497 : {
498 56682 : psWorkerThread->bMarkedAsWaiting = true;
499 56682 : nWaitingWorkerThreads++;
500 :
501 : CPLList *psItem =
502 56682 : static_cast<CPLList *>(VSI_MALLOC_VERBOSE(sizeof(CPLList)));
503 56728 : if (psItem == nullptr)
504 : {
505 0 : eState = CPLWTS_ERROR;
506 0 : m_cv.notify_one();
507 :
508 0 : return nullptr;
509 : }
510 :
511 56728 : psItem->pData = psWorkerThread;
512 56728 : psItem->psNext = psWaitingWorkerThreadsList;
513 56728 : psWaitingWorkerThreadsList = psItem;
514 :
515 : #if DEBUG_VERBOSE
516 : CPLAssert(CPLListCount(psWaitingWorkerThreadsList) ==
517 : nWaitingWorkerThreads);
518 : #endif
519 : }
520 :
521 56713 : m_cv.notify_one();
522 :
523 : #if DEBUG_VERBOSE
524 : CPLDebug("JOB", "%p sleeping", psWorkerThread);
525 : #endif
526 :
527 : #ifdef __COVERITY__
528 : CPLError(CE_Failure, CPLE_AppDefined, "Not implemented");
529 : #else
530 112333 : std::unique_lock<std::mutex> oGuardThisThread(psWorkerThread->m_mutex);
531 : // coverity[uninit_use_in_call]
532 56704 : oGuard.unlock();
533 : // coverity[wait_not_in_locked_loop]
534 56710 : psWorkerThread->m_cv.wait(oGuardThisThread);
535 : // coverity[lock_order]
536 55597 : oGuard.lock();
537 : #endif
538 55611 : }
539 : }
540 :
541 : /************************************************************************/
542 : /* CreateJobQueue() */
543 : /************************************************************************/
544 :
545 : /** Create a new job queue based on this worker thread pool.
546 : *
547 : * The worker thread pool must remain alive while the returned object is
548 : * itself alive.
549 : *
550 : * @since GDAL 3.2
551 : */
552 30133 : std::unique_ptr<CPLJobQueue> CPLWorkerThreadPool::CreateJobQueue()
553 : {
554 30133 : return std::unique_ptr<CPLJobQueue>(new CPLJobQueue(this));
555 : }
556 :
557 : /************************************************************************/
558 : /* CPLJobQueue() */
559 : /************************************************************************/
560 :
561 : //! @cond Doxygen_Suppress
562 30137 : CPLJobQueue::CPLJobQueue(CPLWorkerThreadPool *poPool) : m_poPool(poPool)
563 : {
564 30148 : }
565 :
566 : //! @endcond
567 :
568 : /************************************************************************/
569 : /* ~CPLJobQueue() */
570 : /************************************************************************/
571 :
572 30099 : CPLJobQueue::~CPLJobQueue()
573 : {
574 30124 : WaitCompletion();
575 30131 : }
576 :
577 : /************************************************************************/
578 : /* DeclareJobFinished() */
579 : /************************************************************************/
580 :
581 66780 : void CPLJobQueue::DeclareJobFinished()
582 : {
583 133603 : std::lock_guard<std::mutex> oGuard(m_mutex);
584 66761 : m_nPendingJobs--;
585 66761 : m_cv.notify_one();
586 66731 : }
587 :
588 : /************************************************************************/
589 : /* SubmitJob() */
590 : /************************************************************************/
591 :
592 : /** Queue a new job.
593 : *
594 : * @param pfnFunc Function to run for the job.
595 : * @param pData User data to pass to the job function.
596 : * @return true in case of success.
597 : */
598 6775 : bool CPLJobQueue::SubmitJob(CPLThreadFunc pfnFunc, void *pData)
599 : {
600 13542 : return SubmitJob([=] { pfnFunc(pData); });
601 : }
602 :
603 : /** Queue a new job.
604 : *
605 : * @param task Task to execute.
606 : * @return true in case of success.
607 : */
608 66743 : bool CPLJobQueue::SubmitJob(std::function<void()> task)
609 : {
610 : {
611 66743 : std::lock_guard<std::mutex> oGuard(m_mutex);
612 66737 : m_nPendingJobs++;
613 : }
614 :
615 : // coverity[uninit_member,copy_constructor_call]
616 200027 : const auto lambda = [this, capturedTask = std::move(task)]
617 : {
618 66600 : capturedTask();
619 66786 : DeclareJobFinished();
620 66641 : };
621 : // cppcheck-suppress knownConditionTrueFalse
622 133393 : return m_poPool->SubmitJob(std::move(lambda));
623 : }
624 :
625 : /************************************************************************/
626 : /* WaitCompletion() */
627 : /************************************************************************/
628 :
629 : /** Wait for completion of part or whole jobs.
630 : *
631 : * @param nMaxRemainingJobs Maximum number of pendings jobs that are allowed
632 : * in the queue after this method has completed. Might
633 : * be 0 to wait for all jobs.
634 : */
635 60138 : void CPLJobQueue::WaitCompletion(int nMaxRemainingJobs)
636 : {
637 120201 : std::unique_lock<std::mutex> oGuard(m_mutex);
638 60081 : m_cv.wait(oGuard, [this, nMaxRemainingJobs]
639 79177 : { return m_nPendingJobs <= nMaxRemainingJobs; });
640 60071 : }
641 :
642 : /************************************************************************/
643 : /* WaitEvent() */
644 : /************************************************************************/
645 :
646 : /** Wait for completion for at least one job.
647 : *
648 : * @return true if there are remaining jobs.
649 : */
650 204 : bool CPLJobQueue::WaitEvent()
651 : {
652 : // NOTE - This isn't quite right. After nPendingJobsBefore is set but before
653 : // a notification occurs, jobs could be submitted which would increase
654 : // nPendingJobs, so a job completion may looks like a spurious wakeup.
655 408 : std::unique_lock<std::mutex> oGuard(m_mutex);
656 204 : if (m_nPendingJobs == 0)
657 3 : return false;
658 :
659 201 : const int nPendingJobsBefore = m_nPendingJobs;
660 201 : m_cv.wait(oGuard, [this, nPendingJobsBefore]
661 402 : { return m_nPendingJobs < nPendingJobsBefore; });
662 201 : return m_nPendingJobs > 0;
663 : }
|