Line data Source code
1 : /**********************************************************************
2 : *
3 : * Project: CPL - Common Portability Library
4 : * Purpose: CPL worker thread pool
5 : * Author: Even Rouault, <even dot rouault at spatialys dot com>
6 : *
7 : **********************************************************************
8 : * Copyright (c) 2015, Even Rouault, <even dot rouault at spatialys dot com>
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "cpl_port.h"
14 : #include "cpl_worker_thread_pool.h"
15 :
16 : #include <cstddef>
17 : #include <memory>
18 :
19 : #include "cpl_conv.h"
20 : #include "cpl_error.h"
21 : #include "cpl_vsi.h"
22 :
23 : static thread_local CPLWorkerThreadPool *threadLocalCurrentThreadPool = nullptr;
24 :
25 : /************************************************************************/
26 : /* CPLWorkerThreadPool() */
27 : /************************************************************************/
28 :
29 : /** Instantiate a new pool of worker threads.
30 : *
31 : * The pool is in an uninitialized state after this call. The Setup() method
32 : * must be called.
33 : */
34 747 : CPLWorkerThreadPool::CPLWorkerThreadPool() : jobQueue{}
35 : {
36 747 : }
37 :
38 : /** Instantiate a new pool of worker threads.
39 : *
40 : * \param nThreads Number of threads in the pool.
41 : */
42 234 : CPLWorkerThreadPool::CPLWorkerThreadPool(int nThreads) : jobQueue{}
43 : {
44 234 : Setup(nThreads, nullptr, nullptr);
45 234 : }
46 :
47 : /************************************************************************/
48 : /* ~CPLWorkerThreadPool() */
49 : /************************************************************************/
50 :
51 : /** Destroys a pool of worker threads.
52 : *
53 : * Any still pending job will be completed before the destructor returns.
54 : */
55 976 : CPLWorkerThreadPool::~CPLWorkerThreadPool()
56 : {
57 976 : WaitCompletion();
58 :
59 : {
60 976 : std::lock_guard<std::mutex> oGuard(m_mutex);
61 976 : eState = CPLWTS_STOP;
62 : }
63 :
64 3484 : for (auto &wt : aWT)
65 : {
66 : {
67 5016 : std::lock_guard<std::mutex> oGuard(wt->m_mutex);
68 2508 : wt->m_cv.notify_one();
69 : }
70 2508 : CPLJoinThread(wt->hThread);
71 : }
72 :
73 976 : CPLListDestroy(psWaitingWorkerThreadsList);
74 976 : }
75 :
76 : /************************************************************************/
77 : /* GetThreadCount() */
78 : /************************************************************************/
79 :
80 1234 : int CPLWorkerThreadPool::GetThreadCount() const
81 : {
82 1234 : std::unique_lock<std::mutex> oGuard(m_mutex);
83 2468 : return m_nMaxThreads;
84 : }
85 :
86 : /************************************************************************/
87 : /* WorkerThreadFunction() */
88 : /************************************************************************/
89 :
90 3546 : void CPLWorkerThreadPool::WorkerThreadFunction(void *user_data)
91 : {
92 3546 : CPLWorkerThread *psWT = static_cast<CPLWorkerThread *>(user_data);
93 3546 : CPLWorkerThreadPool *poTP = psWT->poTP;
94 :
95 3546 : threadLocalCurrentThreadPool = poTP;
96 :
97 3546 : if (psWT->pfnInitFunc)
98 0 : psWT->pfnInitFunc(psWT->pInitData);
99 :
100 : while (true)
101 : {
102 65719 : std::function<void()> task = poTP->GetNextJob(psWT);
103 64728 : if (!task)
104 2508 : break;
105 :
106 62100 : task();
107 : #if DEBUG_VERBOSE
108 : CPLDebug("JOB", "%p finished a job", psWT);
109 : #endif
110 62234 : poTP->DeclareJobFinished();
111 62173 : }
112 2540 : }
113 :
114 : /************************************************************************/
115 : /* SubmitJob() */
116 : /************************************************************************/
117 :
118 : /** Queue a new job.
119 : *
120 : * @param pfnFunc Function to run for the job.
121 : * @param pData User data to pass to the job function.
122 : * @return true in case of success.
123 : */
124 7358 : bool CPLWorkerThreadPool::SubmitJob(CPLThreadFunc pfnFunc, void *pData)
125 : {
126 14709 : return SubmitJob([=] { pfnFunc(pData); });
127 : }
128 :
129 : /** Queue a new job.
130 : *
131 : * @param task Void function to execute.
132 : * @return true in case of success.
133 : */
134 73537 : bool CPLWorkerThreadPool::SubmitJob(std::function<void()> task)
135 : {
136 : #ifdef DEBUG
137 : {
138 146968 : std::unique_lock<std::mutex> oGuard(m_mutex);
139 73431 : CPLAssert(m_nMaxThreads > 0);
140 : }
141 : #endif
142 :
143 73454 : bool bMustIncrementWaitingWorkerThreadsAfterSubmission = false;
144 73454 : if (threadLocalCurrentThreadPool == this)
145 : {
146 : // If there are waiting threads or we have not started all allowed
147 : // threads, we can submit this job asynchronously
148 : {
149 114273 : std::unique_lock<std::mutex> oGuard(m_mutex);
150 69889 : if (nWaitingWorkerThreads > 0 ||
151 12738 : static_cast<int>(aWT.size()) < m_nMaxThreads)
152 : {
153 44413 : bMustIncrementWaitingWorkerThreadsAfterSubmission = true;
154 44413 : nWaitingWorkerThreads--;
155 : }
156 : }
157 57156 : if (!bMustIncrementWaitingWorkerThreadsAfterSubmission)
158 : {
159 : // otherwise there is a risk of deadlock, so execute synchronously.
160 12742 : task();
161 12738 : return true;
162 : }
163 : }
164 :
165 60746 : std::unique_lock<std::mutex> oGuard(m_mutex);
166 :
167 60833 : if (bMustIncrementWaitingWorkerThreadsAfterSubmission)
168 44488 : nWaitingWorkerThreads++;
169 :
170 60833 : if (static_cast<int>(aWT.size()) < m_nMaxThreads)
171 : {
172 : // CPLDebug("CPL", "Starting new thread...");
173 2208 : auto wt = std::make_unique<CPLWorkerThread>();
174 1104 : wt->poTP = this;
175 : //ABELL - Why should this fail? And this is a *pool* thread, not necessarily
176 : // tied to the submitted job. The submitted job still needs to run, even if
177 : // this fails. If we can't create a thread, should the entire pool become invalid?
178 1104 : wt->hThread = CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
179 : /**
180 : if (!wt->hThread)
181 : {
182 : VSIFree(psJob);
183 : VSIFree(psItem);
184 : return false;
185 : }
186 : **/
187 1104 : if (wt->hThread)
188 1104 : aWT.emplace_back(std::move(wt));
189 : }
190 :
191 60753 : jobQueue.emplace(task);
192 60736 : nPendingJobs++;
193 :
194 60736 : if (psWaitingWorkerThreadsList)
195 : {
196 54155 : CPLWorkerThread *psWorkerThread =
197 54155 : static_cast<CPLWorkerThread *>(psWaitingWorkerThreadsList->pData);
198 :
199 54155 : CPLAssert(psWorkerThread->bMarkedAsWaiting);
200 54155 : psWorkerThread->bMarkedAsWaiting = false;
201 :
202 54155 : CPLList *psNext = psWaitingWorkerThreadsList->psNext;
203 54155 : CPLList *psToFree = psWaitingWorkerThreadsList;
204 54155 : psWaitingWorkerThreadsList = psNext;
205 54155 : nWaitingWorkerThreads--;
206 :
207 : #if DEBUG_VERBOSE
208 : CPLDebug("JOB", "Waking up %p", psWorkerThread);
209 : #endif
210 :
211 : {
212 108420 : std::lock_guard<std::mutex> oGuardWT(psWorkerThread->m_mutex);
213 : // coverity[uninit_use_in_call]
214 54282 : oGuard.unlock();
215 54276 : psWorkerThread->m_cv.notify_one();
216 : }
217 :
218 54279 : CPLFree(psToFree);
219 : }
220 :
221 : // coverity[double_unlock]
222 60942 : return true;
223 : }
224 :
225 : /************************************************************************/
226 : /* SubmitJobs() */
227 : /************************************************************************/
228 :
229 : /** Queue several jobs
230 : *
231 : * @param pfnFunc Function to run for the job.
232 : * @param apData User data instances to pass to the job function.
233 : * @return true in case of success.
234 : */
235 156 : bool CPLWorkerThreadPool::SubmitJobs(CPLThreadFunc pfnFunc,
236 : const std::vector<void *> &apData)
237 : {
238 156 : if (apData.empty())
239 0 : return false;
240 :
241 : #ifdef DEBUG
242 : {
243 312 : std::unique_lock<std::mutex> oGuard(m_mutex);
244 156 : CPLAssert(m_nMaxThreads > 0);
245 : }
246 : #endif
247 :
248 156 : if (threadLocalCurrentThreadPool == this)
249 : {
250 : // If SubmitJob() is called from a worker thread of this queue,
251 : // then synchronously run the task to avoid deadlock.
252 0 : for (void *pData : apData)
253 0 : pfnFunc(pData);
254 0 : return true;
255 : }
256 :
257 312 : std::unique_lock<std::mutex> oGuard(m_mutex);
258 :
259 1534 : for (void *pData : apData)
260 : {
261 1378 : if (static_cast<int>(aWT.size()) < m_nMaxThreads)
262 : {
263 0 : std::unique_ptr<CPLWorkerThread> wt(new CPLWorkerThread);
264 0 : wt->poTP = this;
265 0 : wt->hThread =
266 0 : CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
267 0 : if (wt->hThread == nullptr)
268 : {
269 0 : if (aWT.empty())
270 0 : return false;
271 : }
272 : else
273 : {
274 0 : aWT.emplace_back(std::move(wt));
275 : }
276 : }
277 :
278 2753 : jobQueue.emplace([=] { pfnFunc(pData); });
279 1378 : nPendingJobs++;
280 : }
281 :
282 534 : for (size_t i = 0; i < apData.size(); i++)
283 : {
284 382 : if (psWaitingWorkerThreadsList)
285 : {
286 : CPLWorkerThread *psWorkerThread;
287 :
288 378 : psWorkerThread = static_cast<CPLWorkerThread *>(
289 378 : psWaitingWorkerThreadsList->pData);
290 :
291 378 : CPLAssert(psWorkerThread->bMarkedAsWaiting);
292 378 : psWorkerThread->bMarkedAsWaiting = false;
293 :
294 378 : CPLList *psNext = psWaitingWorkerThreadsList->psNext;
295 378 : CPLList *psToFree = psWaitingWorkerThreadsList;
296 378 : psWaitingWorkerThreadsList = psNext;
297 378 : nWaitingWorkerThreads--;
298 :
299 : #if DEBUG_VERBOSE
300 : CPLDebug("JOB", "Waking up %p", psWorkerThread);
301 : #endif
302 : {
303 756 : std::lock_guard<std::mutex> oGuardWT(psWorkerThread->m_mutex);
304 : // coverity[uninit_use_in_call]
305 378 : oGuard.unlock();
306 378 : psWorkerThread->m_cv.notify_one();
307 : }
308 :
309 378 : CPLFree(psToFree);
310 378 : oGuard.lock();
311 : }
312 : else
313 : {
314 4 : break;
315 : }
316 : }
317 :
318 156 : return true;
319 : }
320 :
321 : /************************************************************************/
322 : /* WaitCompletion() */
323 : /************************************************************************/
324 :
325 : /** Wait for completion of part or whole jobs.
326 : *
327 : * @param nMaxRemainingJobs Maximum number of pendings jobs that are allowed
328 : * in the queue after this method has completed. Might
329 : * be 0 to wait for all jobs.
330 : */
331 5708 : void CPLWorkerThreadPool::WaitCompletion(int nMaxRemainingJobs)
332 : {
333 5708 : if (nMaxRemainingJobs < 0)
334 0 : nMaxRemainingJobs = 0;
335 11416 : std::unique_lock<std::mutex> oGuard(m_mutex);
336 5708 : m_cv.wait(oGuard, [this, nMaxRemainingJobs]
337 7039 : { return nPendingJobs <= nMaxRemainingJobs; });
338 5708 : }
339 :
340 : /************************************************************************/
341 : /* WaitEvent() */
342 : /************************************************************************/
343 :
344 : /** Wait for completion of at least one job, if there are any remaining
345 : */
346 1396 : void CPLWorkerThreadPool::WaitEvent()
347 : {
348 : // NOTE - This isn't quite right. After nPendingJobsBefore is set but before
349 : // a notification occurs, jobs could be submitted which would increase
350 : // nPendingJobs, so a job completion may looks like a spurious wakeup.
351 1396 : std::unique_lock<std::mutex> oGuard(m_mutex);
352 1396 : if (nPendingJobs == 0)
353 35 : return;
354 1361 : const int nPendingJobsBefore = nPendingJobs;
355 1361 : m_cv.wait(oGuard, [this, nPendingJobsBefore]
356 2908 : { return nPendingJobs < nPendingJobsBefore; });
357 : }
358 :
359 : /************************************************************************/
360 : /* Setup() */
361 : /************************************************************************/
362 :
363 : /** Setup the pool.
364 : *
365 : * @param nThreads Number of threads to launch
366 : * @param pfnInitFunc Initialization function to run in each thread. May be NULL
367 : * @param pasInitData Array of initialization data. Its length must be nThreads,
368 : * or it should be NULL.
369 : * @return true if initialization was successful.
370 : */
371 622 : bool CPLWorkerThreadPool::Setup(int nThreads, CPLThreadFunc pfnInitFunc,
372 : void **pasInitData)
373 : {
374 622 : return Setup(nThreads, pfnInitFunc, pasInitData, true);
375 : }
376 :
377 : /** Setup the pool.
378 : *
379 : * @param nThreads Number of threads to launch
380 : * @param pfnInitFunc Initialization function to run in each thread. May be NULL
381 : * @param pasInitData Array of initialization data. Its length must be nThreads,
382 : * or it should be NULL.
383 : * @param bWaitallStarted Whether to wait for all threads to be fully started.
384 : * @return true if initialization was successful.
385 : */
386 647 : bool CPLWorkerThreadPool::Setup(int nThreads, CPLThreadFunc pfnInitFunc,
387 : void **pasInitData, bool bWaitallStarted)
388 : {
389 647 : CPLAssert(nThreads > 0);
390 :
391 1295 : if (nThreads > static_cast<int>(aWT.size()) && pfnInitFunc == nullptr &&
392 1295 : pasInitData == nullptr && !bWaitallStarted)
393 : {
394 25 : std::lock_guard<std::mutex> oGuard(m_mutex);
395 25 : if (nThreads > m_nMaxThreads)
396 25 : m_nMaxThreads = nThreads;
397 25 : return true;
398 : }
399 :
400 622 : bool bRet = true;
401 3066 : for (int i = static_cast<int>(aWT.size()); i < nThreads; i++)
402 : {
403 2443 : auto wt = std::make_unique<CPLWorkerThread>();
404 2442 : wt->pfnInitFunc = pfnInitFunc;
405 2443 : wt->pInitData = pasInitData ? pasInitData[i] : nullptr;
406 2442 : wt->poTP = this;
407 2443 : wt->hThread = CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
408 2443 : if (wt->hThread == nullptr)
409 : {
410 0 : nThreads = i;
411 0 : bRet = false;
412 0 : break;
413 : }
414 2443 : aWT.emplace_back(std::move(wt));
415 : }
416 :
417 : {
418 1246 : std::lock_guard<std::mutex> oGuard(m_mutex);
419 623 : if (nThreads > m_nMaxThreads)
420 623 : m_nMaxThreads = nThreads;
421 : }
422 :
423 623 : if (bWaitallStarted)
424 : {
425 : // Wait all threads to be started
426 1246 : std::unique_lock<std::mutex> oGuard(m_mutex);
427 1494 : while (nWaitingWorkerThreads < nThreads)
428 : {
429 871 : m_cv.wait(oGuard);
430 : }
431 : }
432 :
433 623 : if (eState == CPLWTS_ERROR)
434 0 : bRet = false;
435 :
436 623 : return bRet;
437 : }
438 :
439 : /************************************************************************/
440 : /* DeclareJobFinished() */
441 : /************************************************************************/
442 :
443 62227 : void CPLWorkerThreadPool::DeclareJobFinished()
444 : {
445 124486 : std::lock_guard<std::mutex> oGuard(m_mutex);
446 62241 : nPendingJobs--;
447 62241 : m_cv.notify_one();
448 62258 : }
449 :
450 : /************************************************************************/
451 : /* GetNextJob() */
452 : /************************************************************************/
453 :
454 : std::function<void()>
455 65748 : CPLWorkerThreadPool::GetNextJob(CPLWorkerThread *psWorkerThread)
456 : {
457 65748 : std::unique_lock<std::mutex> oGuard(m_mutex);
458 : while (true)
459 : {
460 122846 : if (eState == CPLWTS_STOP)
461 64772 : return std::function<void()>();
462 :
463 120338 : if (jobQueue.size())
464 : {
465 : #if DEBUG_VERBOSE
466 : CPLDebug("JOB", "%p got a job", psWorkerThread);
467 : #endif
468 124403 : auto task = std::move(jobQueue.front());
469 62193 : jobQueue.pop();
470 62128 : return task;
471 : }
472 :
473 58223 : if (!psWorkerThread->bMarkedAsWaiting)
474 : {
475 58152 : psWorkerThread->bMarkedAsWaiting = true;
476 58152 : nWaitingWorkerThreads++;
477 :
478 : CPLList *psItem =
479 58152 : static_cast<CPLList *>(VSI_MALLOC_VERBOSE(sizeof(CPLList)));
480 58187 : if (psItem == nullptr)
481 : {
482 0 : eState = CPLWTS_ERROR;
483 0 : m_cv.notify_one();
484 :
485 0 : return nullptr;
486 : }
487 :
488 58187 : psItem->pData = psWorkerThread;
489 58187 : psItem->psNext = psWaitingWorkerThreadsList;
490 58187 : psWaitingWorkerThreadsList = psItem;
491 :
492 : #if DEBUG_VERBOSE
493 : CPLAssert(CPLListCount(psWaitingWorkerThreadsList) ==
494 : nWaitingWorkerThreads);
495 : #endif
496 : }
497 :
498 58258 : m_cv.notify_one();
499 :
500 : #if DEBUG_VERBOSE
501 : CPLDebug("JOB", "%p sleeping", psWorkerThread);
502 : #endif
503 :
504 115256 : std::unique_lock<std::mutex> oGuardThisThread(psWorkerThread->m_mutex);
505 : // coverity[uninit_use_in_call]
506 58190 : oGuard.unlock();
507 : // coverity[wait_not_in_locked_loop]
508 58149 : psWorkerThread->m_cv.wait(oGuardThisThread);
509 57098 : oGuard.lock();
510 57105 : }
511 : }
512 :
513 : /************************************************************************/
514 : /* CreateJobQueue() */
515 : /************************************************************************/
516 :
517 : /** Create a new job queue based on this worker thread pool.
518 : *
519 : * The worker thread pool must remain alive while the returned object is
520 : * itself alive.
521 : *
522 : * @since GDAL 3.2
523 : */
524 29704 : std::unique_ptr<CPLJobQueue> CPLWorkerThreadPool::CreateJobQueue()
525 : {
526 29704 : return std::unique_ptr<CPLJobQueue>(new CPLJobQueue(this));
527 : }
528 :
529 : /************************************************************************/
530 : /* CPLJobQueue() */
531 : /************************************************************************/
532 :
533 : //! @cond Doxygen_Suppress
534 29693 : CPLJobQueue::CPLJobQueue(CPLWorkerThreadPool *poPool) : m_poPool(poPool)
535 : {
536 29682 : }
537 :
538 : //! @endcond
539 :
540 : /************************************************************************/
541 : /* ~CPLJobQueue() */
542 : /************************************************************************/
543 :
544 29764 : CPLJobQueue::~CPLJobQueue()
545 : {
546 29768 : WaitCompletion();
547 29764 : }
548 :
549 : /************************************************************************/
550 : /* DeclareJobFinished() */
551 : /************************************************************************/
552 :
553 65895 : void CPLJobQueue::DeclareJobFinished()
554 : {
555 131904 : std::lock_guard<std::mutex> oGuard(m_mutex);
556 65894 : m_nPendingJobs--;
557 65894 : m_cv.notify_one();
558 65928 : }
559 :
560 : /************************************************************************/
561 : /* SubmitJob() */
562 : /************************************************************************/
563 :
564 : /** Queue a new job.
565 : *
566 : * @param pfnFunc Function to run for the job.
567 : * @param pData User data to pass to the job function.
568 : * @return true in case of success.
569 : */
570 6765 : bool CPLJobQueue::SubmitJob(CPLThreadFunc pfnFunc, void *pData)
571 : {
572 13522 : return SubmitJob([=] { pfnFunc(pData); });
573 : }
574 :
575 : /** Queue a new job.
576 : *
577 : * @param task Task to execute.
578 : * @return true in case of success.
579 : */
580 65881 : bool CPLJobQueue::SubmitJob(std::function<void()> task)
581 : {
582 : {
583 65881 : std::lock_guard<std::mutex> oGuard(m_mutex);
584 65812 : m_nPendingJobs++;
585 : }
586 :
587 : // coverity[uninit_member,copy_constructor_call]
588 131612 : const auto lambda = [this, task]
589 : {
590 65773 : task();
591 65839 : DeclareJobFinished();
592 65801 : };
593 : // cppcheck-suppress knownConditionTrueFalse
594 131695 : return m_poPool->SubmitJob(lambda);
595 : }
596 :
597 : /************************************************************************/
598 : /* WaitCompletion() */
599 : /************************************************************************/
600 :
601 : /** Wait for completion of part or whole jobs.
602 : *
603 : * @param nMaxRemainingJobs Maximum number of pendings jobs that are allowed
604 : * in the queue after this method has completed. Might
605 : * be 0 to wait for all jobs.
606 : */
607 59342 : void CPLJobQueue::WaitCompletion(int nMaxRemainingJobs)
608 : {
609 118652 : std::unique_lock<std::mutex> oGuard(m_mutex);
610 59334 : m_cv.wait(oGuard, [this, nMaxRemainingJobs]
611 77607 : { return m_nPendingJobs <= nMaxRemainingJobs; });
612 59340 : }
613 :
614 : /************************************************************************/
615 : /* WaitEvent() */
616 : /************************************************************************/
617 :
618 : /** Wait for completion for at least one job.
619 : *
620 : * @return true if there are remaining jobs.
621 : */
622 216 : bool CPLJobQueue::WaitEvent()
623 : {
624 : // NOTE - This isn't quite right. After nPendingJobsBefore is set but before
625 : // a notification occurs, jobs could be submitted which would increase
626 : // nPendingJobs, so a job completion may looks like a spurious wakeup.
627 432 : std::unique_lock<std::mutex> oGuard(m_mutex);
628 216 : if (m_nPendingJobs == 0)
629 1 : return false;
630 :
631 215 : const int nPendingJobsBefore = m_nPendingJobs;
632 215 : m_cv.wait(oGuard, [this, nPendingJobsBefore]
633 430 : { return m_nPendingJobs < nPendingJobsBefore; });
634 215 : return m_nPendingJobs > 0;
635 : }
|