Line data Source code
1 : /**********************************************************************
2 : *
3 : * Project: CPL - Common Portability Library
4 : * Purpose: CPL worker thread pool
5 : * Author: Even Rouault, <even dot rouault at spatialys dot com>
6 : *
7 : **********************************************************************
8 : * Copyright (c) 2015, Even Rouault, <even dot rouault at spatialys dot com>
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "cpl_port.h"
14 : #include "cpl_worker_thread_pool.h"
15 :
16 : #include <cstddef>
17 : #include <memory>
18 :
19 : #include "cpl_conv.h"
20 : #include "cpl_error.h"
21 : #include "cpl_vsi.h"
22 :
23 : static thread_local CPLWorkerThreadPool *threadLocalCurrentThreadPool = nullptr;
24 :
25 : /************************************************************************/
26 : /* CPLWorkerThreadPool() */
27 : /************************************************************************/
28 :
29 : /** Instantiate a new pool of worker threads.
30 : *
31 : * The pool is in an uninitialized state after this call. The Setup() method
32 : * must be called.
33 : */
34 600 : CPLWorkerThreadPool::CPLWorkerThreadPool() : jobQueue{}
35 : {
36 600 : }
37 :
38 : /** Instantiate a new pool of worker threads.
39 : *
40 : * \param nThreads Number of threads in the pool.
41 : */
42 625 : CPLWorkerThreadPool::CPLWorkerThreadPool(int nThreads) : jobQueue{}
43 : {
44 625 : Setup(nThreads, nullptr, nullptr);
45 625 : }
46 :
47 : /************************************************************************/
48 : /* ~CPLWorkerThreadPool() */
49 : /************************************************************************/
50 :
51 : /** Destroys a pool of worker threads.
52 : *
53 : * Any still pending job will be completed before the destructor returns.
54 : */
55 1220 : CPLWorkerThreadPool::~CPLWorkerThreadPool()
56 : {
57 1220 : WaitCompletion();
58 :
59 : {
60 1220 : std::lock_guard<std::mutex> oGuard(m_mutex);
61 1220 : eState = CPLWTS_STOP;
62 : }
63 :
64 4844 : for (auto &wt : aWT)
65 : {
66 : {
67 7248 : std::lock_guard<std::mutex> oGuard(wt->m_mutex);
68 3624 : wt->m_cv.notify_one();
69 : }
70 3624 : CPLJoinThread(wt->hThread);
71 : }
72 :
73 1220 : CPLListDestroy(psWaitingWorkerThreadsList);
74 1220 : }
75 :
76 : /************************************************************************/
77 : /* GetThreadCount() */
78 : /************************************************************************/
79 :
80 1136 : int CPLWorkerThreadPool::GetThreadCount() const
81 : {
82 1136 : std::unique_lock<std::mutex> oGuard(m_mutex);
83 2272 : return m_nMaxThreads;
84 : }
85 :
86 : /************************************************************************/
87 : /* WorkerThreadFunction() */
88 : /************************************************************************/
89 :
90 4662 : void CPLWorkerThreadPool::WorkerThreadFunction(void *user_data)
91 : {
92 4662 : CPLWorkerThread *psWT = static_cast<CPLWorkerThread *>(user_data);
93 4662 : CPLWorkerThreadPool *poTP = psWT->poTP;
94 :
95 4662 : threadLocalCurrentThreadPool = poTP;
96 :
97 4662 : if (psWT->pfnInitFunc)
98 0 : psWT->pfnInitFunc(psWT->pInitData);
99 :
100 : while (true)
101 : {
102 120678 : std::function<void()> task = poTP->GetNextJob(psWT);
103 119707 : if (!task)
104 3624 : break;
105 :
106 115954 : task();
107 : #if DEBUG_VERBOSE
108 : CPLDebug("JOB", "%p finished a job", psWT);
109 : #endif
110 115955 : poTP->DeclareJobFinished();
111 116016 : }
112 3695 : }
113 :
114 : /************************************************************************/
115 : /* SubmitJob() */
116 : /************************************************************************/
117 :
118 : /** Queue a new job.
119 : *
120 : * @param pfnFunc Function to run for the job.
121 : * @param pData User data to pass to the job function.
122 : * @return true in case of success.
123 : */
124 6898 : bool CPLWorkerThreadPool::SubmitJob(CPLThreadFunc pfnFunc, void *pData)
125 : {
126 13781 : return SubmitJob([=] { pfnFunc(pData); });
127 : }
128 :
129 : /** Queue a new job.
130 : *
131 : * @param task Void function to execute.
132 : * @return true in case of success.
133 : */
134 191128 : bool CPLWorkerThreadPool::SubmitJob(std::function<void()> task)
135 : {
136 : #ifdef DEBUG
137 : {
138 382262 : std::unique_lock<std::mutex> oGuard(m_mutex);
139 191134 : CPLAssert(m_nMaxThreads > 0);
140 : }
141 : #endif
142 :
143 191081 : bool bMustIncrementWaitingWorkerThreadsAfterSubmission = false;
144 191081 : if (threadLocalCurrentThreadPool == this)
145 : {
146 : // If there are waiting threads or we have not started all allowed
147 : // threads, we can submit this job asynchronously
148 : {
149 331936 : std::unique_lock<std::mutex> oGuard(m_mutex);
150 242470 : if (nWaitingWorkerThreads > 0 ||
151 76490 : static_cast<int>(aWT.size()) < m_nMaxThreads)
152 : {
153 89541 : bMustIncrementWaitingWorkerThreadsAfterSubmission = true;
154 89541 : nWaitingWorkerThreads--;
155 : }
156 : }
157 165991 : if (!bMustIncrementWaitingWorkerThreadsAfterSubmission)
158 : {
159 : // otherwise there is a risk of deadlock, so execute synchronously.
160 76498 : task();
161 76470 : return true;
162 : }
163 : }
164 :
165 114618 : std::unique_lock<std::mutex> oGuard(m_mutex);
166 :
167 114750 : if (bMustIncrementWaitingWorkerThreadsAfterSubmission)
168 89550 : nWaitingWorkerThreads++;
169 :
170 114750 : if (static_cast<int>(aWT.size()) < m_nMaxThreads)
171 : {
172 : // CPLDebug("CPL", "Starting new thread...");
173 2176 : auto wt = std::make_unique<CPLWorkerThread>();
174 1088 : wt->poTP = this;
175 : //ABELL - Why should this fail? And this is a *pool* thread, not necessarily
176 : // tied to the submitted job. The submitted job still needs to run, even if
177 : // this fails. If we can't create a thread, should the entire pool become invalid?
178 1088 : wt->hThread = CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
179 : /**
180 : if (!wt->hThread)
181 : {
182 : VSIFree(psJob);
183 : VSIFree(psItem);
184 : return false;
185 : }
186 : **/
187 1088 : if (wt->hThread)
188 1088 : aWT.emplace_back(std::move(wt));
189 : }
190 :
191 114739 : jobQueue.emplace(task);
192 114687 : nPendingJobs++;
193 :
194 114687 : if (psWaitingWorkerThreadsList)
195 : {
196 110066 : CPLWorkerThread *psWorkerThread =
197 110066 : static_cast<CPLWorkerThread *>(psWaitingWorkerThreadsList->pData);
198 :
199 110066 : CPLAssert(psWorkerThread->bMarkedAsWaiting);
200 110066 : psWorkerThread->bMarkedAsWaiting = false;
201 :
202 110066 : CPLList *psNext = psWaitingWorkerThreadsList->psNext;
203 110066 : CPLList *psToFree = psWaitingWorkerThreadsList;
204 110066 : psWaitingWorkerThreadsList = psNext;
205 110066 : nWaitingWorkerThreads--;
206 :
207 : #if DEBUG_VERBOSE
208 : CPLDebug("JOB", "Waking up %p", psWorkerThread);
209 : #endif
210 :
211 : {
212 220211 : std::lock_guard<std::mutex> oGuardWT(psWorkerThread->m_mutex);
213 : // coverity[ uninit_use_in_call]
214 110207 : oGuard.unlock();
215 110157 : psWorkerThread->m_cv.notify_one();
216 : }
217 :
218 110128 : CPLFree(psToFree);
219 : }
220 :
221 114861 : return true;
222 : }
223 :
224 : /************************************************************************/
225 : /* SubmitJobs() */
226 : /************************************************************************/
227 :
228 : /** Queue several jobs
229 : *
230 : * @param pfnFunc Function to run for the job.
231 : * @param apData User data instances to pass to the job function.
232 : * @return true in case of success.
233 : */
234 132 : bool CPLWorkerThreadPool::SubmitJobs(CPLThreadFunc pfnFunc,
235 : const std::vector<void *> &apData)
236 : {
237 132 : if (apData.empty())
238 0 : return false;
239 :
240 : #ifdef DEBUG
241 : {
242 264 : std::unique_lock<std::mutex> oGuard(m_mutex);
243 132 : CPLAssert(m_nMaxThreads > 0);
244 : }
245 : #endif
246 :
247 132 : if (threadLocalCurrentThreadPool == this)
248 : {
249 : // If SubmitJob() is called from a worker thread of this queue,
250 : // then synchronously run the task to avoid deadlock.
251 0 : for (void *pData : apData)
252 0 : pfnFunc(pData);
253 0 : return true;
254 : }
255 :
256 264 : std::unique_lock<std::mutex> oGuard(m_mutex);
257 :
258 1462 : for (void *pData : apData)
259 : {
260 1330 : if (static_cast<int>(aWT.size()) < m_nMaxThreads)
261 : {
262 0 : std::unique_ptr<CPLWorkerThread> wt(new CPLWorkerThread);
263 0 : wt->poTP = this;
264 0 : wt->hThread =
265 0 : CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
266 0 : if (wt->hThread == nullptr)
267 : {
268 0 : if (aWT.empty())
269 0 : return false;
270 : }
271 : else
272 : {
273 0 : aWT.emplace_back(std::move(wt));
274 : }
275 : }
276 :
277 2660 : jobQueue.emplace([=] { pfnFunc(pData); });
278 1330 : nPendingJobs++;
279 : }
280 :
281 461 : for (size_t i = 0; i < apData.size(); i++)
282 : {
283 333 : if (psWaitingWorkerThreadsList)
284 : {
285 : CPLWorkerThread *psWorkerThread;
286 :
287 329 : psWorkerThread = static_cast<CPLWorkerThread *>(
288 329 : psWaitingWorkerThreadsList->pData);
289 :
290 329 : CPLAssert(psWorkerThread->bMarkedAsWaiting);
291 329 : psWorkerThread->bMarkedAsWaiting = false;
292 :
293 329 : CPLList *psNext = psWaitingWorkerThreadsList->psNext;
294 329 : CPLList *psToFree = psWaitingWorkerThreadsList;
295 329 : psWaitingWorkerThreadsList = psNext;
296 329 : nWaitingWorkerThreads--;
297 :
298 : #if DEBUG_VERBOSE
299 : CPLDebug("JOB", "Waking up %p", psWorkerThread);
300 : #endif
301 : {
302 658 : std::lock_guard<std::mutex> oGuardWT(psWorkerThread->m_mutex);
303 : // coverity[ uninit_use_in_call]
304 329 : oGuard.unlock();
305 329 : psWorkerThread->m_cv.notify_one();
306 : }
307 :
308 329 : CPLFree(psToFree);
309 329 : oGuard.lock();
310 : }
311 : else
312 : {
313 4 : break;
314 : }
315 : }
316 :
317 132 : return true;
318 : }
319 :
320 : /************************************************************************/
321 : /* WaitCompletion() */
322 : /************************************************************************/
323 :
324 : /** Wait for completion of part or whole jobs.
325 : *
326 : * @param nMaxRemainingJobs Maximum number of pendings jobs that are allowed
327 : * in the queue after this method has completed. Might
328 : * be 0 to wait for all jobs.
329 : */
330 5793 : void CPLWorkerThreadPool::WaitCompletion(int nMaxRemainingJobs)
331 : {
332 5793 : if (nMaxRemainingJobs < 0)
333 0 : nMaxRemainingJobs = 0;
334 11586 : std::unique_lock<std::mutex> oGuard(m_mutex);
335 5793 : m_cv.wait(oGuard, [this, nMaxRemainingJobs]
336 8585 : { return nPendingJobs <= nMaxRemainingJobs; });
337 5793 : }
338 :
339 : /************************************************************************/
340 : /* WaitEvent() */
341 : /************************************************************************/
342 :
343 : /** Wait for completion of at least one job, if there are any remaining
344 : */
345 1460 : void CPLWorkerThreadPool::WaitEvent()
346 : {
347 : // NOTE - This isn't quite right. After nPendingJobsBefore is set but before
348 : // a notification occurs, jobs could be submitted which would increase
349 : // nPendingJobs, so a job completion may looks like a spurious wakeup.
350 1460 : std::unique_lock<std::mutex> oGuard(m_mutex);
351 1460 : if (nPendingJobs == 0)
352 9 : return;
353 1451 : const int nPendingJobsBefore = nPendingJobs;
354 1451 : m_cv.wait(oGuard, [this, nPendingJobsBefore]
355 3028 : { return nPendingJobs < nPendingJobsBefore; });
356 : }
357 :
358 : /************************************************************************/
359 : /* Setup() */
360 : /************************************************************************/
361 :
362 : /** Setup the pool.
363 : *
364 : * @param nThreads Number of threads to launch
365 : * @param pfnInitFunc Initialization function to run in each thread. May be NULL
366 : * @param pasInitData Array of initialization data. Its length must be nThreads,
367 : * or it should be NULL.
368 : * @return true if initialization was successful.
369 : */
370 901 : bool CPLWorkerThreadPool::Setup(int nThreads, CPLThreadFunc pfnInitFunc,
371 : void **pasInitData)
372 : {
373 901 : return Setup(nThreads, pfnInitFunc, pasInitData, true);
374 : }
375 :
376 : /** Setup the pool.
377 : *
378 : * @param nThreads Number of threads to launch
379 : * @param pfnInitFunc Initialization function to run in each thread. May be NULL
380 : * @param pasInitData Array of initialization data. Its length must be nThreads,
381 : * or it should be NULL.
382 : * @param bWaitallStarted Whether to wait for all threads to be fully started.
383 : * @return true if initialization was successful.
384 : */
385 923 : bool CPLWorkerThreadPool::Setup(int nThreads, CPLThreadFunc pfnInitFunc,
386 : void **pasInitData, bool bWaitallStarted)
387 : {
388 923 : CPLAssert(nThreads > 0);
389 :
390 1846 : if (nThreads > static_cast<int>(aWT.size()) && pfnInitFunc == nullptr &&
391 1846 : pasInitData == nullptr && !bWaitallStarted)
392 : {
393 21 : std::lock_guard<std::mutex> oGuard(m_mutex);
394 21 : if (nThreads > m_nMaxThreads)
395 21 : m_nMaxThreads = nThreads;
396 21 : return true;
397 : }
398 :
399 902 : bool bRet = true;
400 4476 : for (int i = static_cast<int>(aWT.size()); i < nThreads; i++)
401 : {
402 3574 : auto wt = std::make_unique<CPLWorkerThread>();
403 3574 : wt->pfnInitFunc = pfnInitFunc;
404 3574 : wt->pInitData = pasInitData ? pasInitData[i] : nullptr;
405 3574 : wt->poTP = this;
406 3574 : wt->hThread = CPLCreateJoinableThread(WorkerThreadFunction, wt.get());
407 3574 : if (wt->hThread == nullptr)
408 : {
409 0 : nThreads = i;
410 0 : bRet = false;
411 0 : break;
412 : }
413 3574 : aWT.emplace_back(std::move(wt));
414 : }
415 :
416 : {
417 1804 : std::lock_guard<std::mutex> oGuard(m_mutex);
418 902 : if (nThreads > m_nMaxThreads)
419 902 : m_nMaxThreads = nThreads;
420 : }
421 :
422 902 : if (bWaitallStarted)
423 : {
424 : // Wait all threads to be started
425 1804 : std::unique_lock<std::mutex> oGuard(m_mutex);
426 3804 : while (nWaitingWorkerThreads < nThreads)
427 : {
428 2902 : m_cv.wait(oGuard);
429 : }
430 : }
431 :
432 902 : if (eState == CPLWTS_ERROR)
433 0 : bRet = false;
434 :
435 902 : return bRet;
436 : }
437 :
438 : /************************************************************************/
439 : /* DeclareJobFinished() */
440 : /************************************************************************/
441 :
442 115979 : void CPLWorkerThreadPool::DeclareJobFinished()
443 : {
444 232070 : std::lock_guard<std::mutex> oGuard(m_mutex);
445 116081 : nPendingJobs--;
446 116081 : m_cv.notify_one();
447 116114 : }
448 :
449 : /************************************************************************/
450 : /* GetNextJob() */
451 : /************************************************************************/
452 :
453 : std::function<void()>
454 233903 : CPLWorkerThreadPool::GetNextJob(CPLWorkerThread *psWorkerThread)
455 : {
456 : while (true)
457 : {
458 233903 : std::unique_lock<std::mutex> oGuard(m_mutex);
459 233963 : if (eState == CPLWTS_STOP)
460 3624 : return std::function<void()>();
461 :
462 230339 : if (jobQueue.size())
463 : {
464 : #if DEBUG_VERBOSE
465 : CPLDebug("JOB", "%p got a job", psWorkerThread);
466 : #endif
467 231950 : auto task = std::move(jobQueue.front());
468 115956 : jobQueue.pop();
469 115953 : return task;
470 : }
471 :
472 114619 : if (!psWorkerThread->bMarkedAsWaiting)
473 : {
474 114397 : psWorkerThread->bMarkedAsWaiting = true;
475 114397 : nWaitingWorkerThreads++;
476 :
477 : CPLList *psItem =
478 114397 : static_cast<CPLList *>(VSI_MALLOC_VERBOSE(sizeof(CPLList)));
479 114464 : if (psItem == nullptr)
480 : {
481 0 : eState = CPLWTS_ERROR;
482 0 : m_cv.notify_one();
483 :
484 0 : return nullptr;
485 : }
486 :
487 114464 : psItem->pData = psWorkerThread;
488 114464 : psItem->psNext = psWaitingWorkerThreadsList;
489 114464 : psWaitingWorkerThreadsList = psItem;
490 :
491 : #if DEBUG_VERBOSE
492 : CPLAssert(CPLListCount(psWaitingWorkerThreadsList) ==
493 : nWaitingWorkerThreads);
494 : #endif
495 : }
496 :
497 114686 : m_cv.notify_one();
498 :
499 : #if DEBUG_VERBOSE
500 : CPLDebug("JOB", "%p sleeping", psWorkerThread);
501 : #endif
502 :
503 227844 : std::unique_lock<std::mutex> oGuardThisThread(psWorkerThread->m_mutex);
504 : // coverity[uninit_use_in_call]
505 114479 : oGuard.unlock();
506 : // coverity[wait_not_in_locked_loop]
507 114454 : psWorkerThread->m_cv.wait(oGuardThisThread);
508 113249 : }
509 : }
510 :
511 : /************************************************************************/
512 : /* CreateJobQueue() */
513 : /************************************************************************/
514 :
515 : /** Create a new job queue based on this worker thread pool.
516 : *
517 : * The worker thread pool must remain alive while the returned object is
518 : * itself alive.
519 : *
520 : * @since GDAL 3.2
521 : */
522 84866 : std::unique_ptr<CPLJobQueue> CPLWorkerThreadPool::CreateJobQueue()
523 : {
524 84866 : return std::unique_ptr<CPLJobQueue>(new CPLJobQueue(this));
525 : }
526 :
527 : /************************************************************************/
528 : /* CPLJobQueue() */
529 : /************************************************************************/
530 :
531 : //! @cond Doxygen_Suppress
532 84835 : CPLJobQueue::CPLJobQueue(CPLWorkerThreadPool *poPool) : m_poPool(poPool)
533 : {
534 84910 : }
535 :
536 : //! @endcond
537 :
538 : /************************************************************************/
539 : /* ~CPLJobQueue() */
540 : /************************************************************************/
541 :
542 84971 : CPLJobQueue::~CPLJobQueue()
543 : {
544 84974 : WaitCompletion();
545 84972 : }
546 :
547 : /************************************************************************/
548 : /* DeclareJobFinished() */
549 : /************************************************************************/
550 :
551 184226 : void CPLJobQueue::DeclareJobFinished()
552 : {
553 368638 : std::lock_guard<std::mutex> oGuard(m_mutex);
554 184309 : m_nPendingJobs--;
555 184309 : m_cv.notify_one();
556 184257 : }
557 :
558 : /************************************************************************/
559 : /* SubmitJob() */
560 : /************************************************************************/
561 :
562 : /** Queue a new job.
563 : *
564 : * @param pfnFunc Function to run for the job.
565 : * @param pData User data to pass to the job function.
566 : * @return true in case of success.
567 : */
568 14778 : bool CPLJobQueue::SubmitJob(CPLThreadFunc pfnFunc, void *pData)
569 : {
570 29550 : return SubmitJob([=] { pfnFunc(pData); });
571 : }
572 :
573 : /** Queue a new job.
574 : *
575 : * @param task Task to execute.
576 : * @return true in case of success.
577 : */
578 184271 : bool CPLJobQueue::SubmitJob(std::function<void()> task)
579 : {
580 : {
581 184271 : std::lock_guard<std::mutex> oGuard(m_mutex);
582 184302 : m_nPendingJobs++;
583 : }
584 :
585 : // coverity[uninit_member,copy_constructor_call]
586 368305 : const auto lambda = [this, task]
587 : {
588 184044 : task();
589 184261 : DeclareJobFinished();
590 184197 : };
591 : // cppcheck-suppress knownConditionTrueFalse
592 368420 : return m_poPool->SubmitJob(lambda);
593 : }
594 :
595 : /************************************************************************/
596 : /* WaitCompletion() */
597 : /************************************************************************/
598 :
599 : /** Wait for completion of part or whole jobs.
600 : *
601 : * @param nMaxRemainingJobs Maximum number of pendings jobs that are allowed
602 : * in the queue after this method has completed. Might
603 : * be 0 to wait for all jobs.
604 : */
605 169262 : void CPLJobQueue::WaitCompletion(int nMaxRemainingJobs)
606 : {
607 338538 : std::unique_lock<std::mutex> oGuard(m_mutex);
608 169277 : m_cv.wait(oGuard, [this, nMaxRemainingJobs]
609 209514 : { return m_nPendingJobs <= nMaxRemainingJobs; });
610 169308 : }
611 :
612 : /************************************************************************/
613 : /* WaitEvent() */
614 : /************************************************************************/
615 :
616 : /** Wait for completion for at least one job.
617 : *
618 : * @return true if there are remaining jobs.
619 : */
620 195 : bool CPLJobQueue::WaitEvent()
621 : {
622 : // NOTE - This isn't quite right. After nPendingJobsBefore is set but before
623 : // a notification occurs, jobs could be submitted which would increase
624 : // nPendingJobs, so a job completion may looks like a spurious wakeup.
625 390 : std::unique_lock<std::mutex> oGuard(m_mutex);
626 195 : if (m_nPendingJobs == 0)
627 2 : return false;
628 :
629 193 : const int nPendingJobsBefore = m_nPendingJobs;
630 193 : m_cv.wait(oGuard, [this, nPendingJobsBefore]
631 386 : { return m_nPendingJobs < nPendingJobsBefore; });
632 193 : return m_nPendingJobs > 0;
633 : }
|