Line data Source code
1 : /****************************************************************************** 2 : * (c) 2024 info@hobu.co 3 : * 4 : * SPDX-License-Identifier: MIT 5 : ****************************************************************************/ 6 : 7 : #ifndef VIEWSHED_NOTIFYQUEUE_H_INCLUDED 8 : #define VIEWSHED_NOTIFYQUEUE_H_INCLUDED 9 : 10 : #include "cpl_port.h" 11 : 12 : #include <condition_variable> 13 : #include <mutex> 14 : #include <queue> 15 : 16 : namespace gdal 17 : { 18 : namespace viewshed 19 : { 20 : 21 : /// This is a thread-safe queue. Things placed in the queue must be move-constructible. 22 : /// Readers will wait until there is something in the queue or the queue is empty or stopped. 23 : /// If the queue is stopped (error), it will never be in the done state. If in the 24 : /// done state (all writers have finished), it will never be in the error state. 25 : template <class T> class NotifyQueue 26 : { 27 : public: 28 : /// Destructor 29 3 : ~NotifyQueue() 30 : { 31 3 : done(); 32 3 : } 33 : 34 : /// Push an object on the queue and notify readers. 35 : /// \param t Object to be moved onto the queue. 36 1179 : void push(T &&t) 37 : { 38 : { 39 2358 : std::lock_guard<std::mutex> lock(m_mutex); 40 1179 : m_queue.push(std::move(t)); 41 : } 42 1179 : m_cv.notify_all(); 43 1179 : } 44 : 45 : /// Get an item from the queue. 46 : /// \param t Reference to an item to to which a queued item will be moved. 47 : /// \return True if an item was popped. False otherwise. Use isStopped() or isDone() 48 : /// to determine the state if you care when false is returned. 49 1186 : bool pop(T &t) 50 : { 51 2372 : std::unique_lock<std::mutex> lock(m_mutex); 52 1186 : m_cv.wait(lock, 53 2923 : [this] { return !m_queue.empty() || m_done || m_stop; }); 54 : 55 1186 : if (m_stop) 56 0 : return false; 57 : 58 1186 : if (m_queue.size()) 59 : { 60 1179 : t = std::move(m_queue.front()); 61 1179 : m_queue.pop(); 62 1179 : return true; 63 : } 64 : 65 : // m_done must be true and the queue is empty. 66 7 : return false; 67 : } 68 : 69 : /// When we're done putting things in the queue, set the end condition. 70 6 : void done() 71 : { 72 : { 73 6 : std::lock_guard<std::mutex> lock(m_mutex); 74 6 : m_done = !m_stop; // If we're already stopped, we can't be done. 75 : } 76 6 : m_cv.notify_all(); 77 6 : } 78 : 79 : /// Unblock all readers regardless of queue state. 80 0 : void stop() 81 : { 82 : { 83 0 : std::lock_guard<std::mutex> lock(m_mutex); 84 0 : m_stop = !m_done; // If we're already done, we can't be stopped. 85 : } 86 0 : m_cv.notify_all(); 87 0 : } 88 : 89 : /// Determine if the queue was emptied completely. Call after pop() returns false 90 : /// to check queue state. 91 : /// \return Whether the queue was emptied completely. 92 : bool isDone() 93 : { 94 : std::lock_guard<std::mutex> lock(m_mutex); 95 : return m_done; 96 : } 97 : 98 : /// Determine if the queue was stopped. Call after pop() returns false 99 : /// to check queue state. 100 : /// \return Whether the queue was stopped. 101 1 : bool isStopped() 102 : { 103 1 : std::lock_guard<std::mutex> lock(m_mutex); 104 2 : return m_stop; 105 : } 106 : 107 : /// Get the current size of the queue. 108 : /// \return Current queue size. 109 1 : size_t size() const 110 : { 111 2 : std::lock_guard<std::mutex> lock(m_mutex); 112 2 : return m_queue.size(); 113 : } 114 : 115 : private: 116 : std::queue<T> m_queue{}; 117 : mutable std::mutex m_mutex{}; 118 : std::condition_variable m_cv{}; 119 : bool m_done{false}; 120 : bool m_stop{false}; 121 : }; 122 : 123 : } // namespace viewshed 124 : } // namespace gdal 125 : 126 : #endif