Line data Source code
1 : /******************************************************************************
2 : * (c) 2024 info@hobu.co
3 : *
4 : * SPDX-License-Identifier: MIT
5 : ****************************************************************************/
6 :
7 : #include <algorithm>
8 : #include <limits>
9 : #include <thread>
10 :
11 : #include "cpl_worker_thread_pool.h"
12 :
13 : #include "combiner.h"
14 : #include "cumulative.h"
15 : #include "notifyqueue.h"
16 : #include "util.h"
17 : #include "viewshed_executor.h"
18 :
19 : namespace gdal
20 : {
21 : namespace viewshed
22 : {
23 :
24 : /// Constructor
25 : ///
26 : /// @param opts Options for viewshed generation.
27 1 : Cumulative::Cumulative(const Options &opts) : m_opts(opts)
28 : {
29 1 : }
30 :
31 : /// Destructor
32 : ///
33 : Cumulative::~Cumulative() = default;
34 :
35 : /// Compute the cumulative viewshed of a raster band.
36 : ///
37 : /// @param srcFilename Source filename.
38 : /// @param pfnProgress Pointer to the progress function. Can be null.
39 : /// @param pProgressArg Argument passed to the progress function
40 : /// @return True on success, false otherwise.
41 1 : bool Cumulative::run(const std::string &srcFilename,
42 : GDALProgressFunc pfnProgress, void *pProgressArg)
43 : {
44 : // In cumulative mode, we run the executors in normal mode and want "1" where things
45 : // are visible.
46 1 : m_opts.outputMode = OutputMode::Normal;
47 1 : m_opts.visibleVal = 1;
48 :
49 : DatasetPtr srcDS(
50 2 : GDALDataset::FromHandle(GDALOpen(srcFilename.c_str(), GA_ReadOnly)));
51 1 : if (!srcDS)
52 : {
53 0 : CPLError(CE_Failure, CPLE_AppDefined, "Unable open source file.");
54 0 : return false;
55 : }
56 :
57 1 : GDALRasterBand *pSrcBand = srcDS->GetRasterBand(1);
58 :
59 : // In cumulative mode, the output extent is always the entire source raster.
60 1 : m_extent.xStop = GDALGetRasterBandXSize(pSrcBand);
61 1 : m_extent.yStop = GDALGetRasterBandYSize(pSrcBand);
62 :
63 : // Make a bunch of observer locations based on the spacing and stick them on a queue
64 : // to be handled by viewshed executors.
65 22 : for (int x = 0; x < m_extent.xStop; x += m_opts.observerSpacing)
66 609 : for (int y = 0; y < m_extent.yStop; y += m_opts.observerSpacing)
67 588 : m_observerQueue.push({x, y});
68 1 : m_observerQueue.done();
69 :
70 : // Run executors.
71 1 : const int numThreads = m_opts.numJobs;
72 1 : std::atomic<bool> err = false;
73 1 : std::atomic<int> running = numThreads;
74 : Progress progress(pfnProgress, pProgressArg,
75 2 : m_observerQueue.size() * m_extent.ySize());
76 2 : CPLWorkerThreadPool executorPool(numThreads);
77 4 : for (int i = 0; i < numThreads; ++i)
78 3 : executorPool.SubmitJob(
79 3 : [this, &srcFilename, &progress, &err, &running]
80 3 : { runExecutor(srcFilename, progress, err, running); });
81 :
82 : // Run combiners that create 8-bit sums of executor jobs.
83 2 : CPLWorkerThreadPool combinerPool(numThreads);
84 : std::vector<Combiner> combiners(numThreads,
85 3 : Combiner(m_datasetQueue, m_rollupQueue));
86 4 : for (Combiner &c : combiners)
87 6 : combinerPool.SubmitJob([&c] { c.run(); });
88 :
89 : // Run 32-bit rollup job that combines the 8-bit results from the combiners.
90 3 : std::thread sum([this] { rollupRasters(); });
91 :
92 : // When the combiner jobs are done, all the data is in the rollup queue.
93 1 : combinerPool.WaitCompletion();
94 1 : if (m_datasetQueue.isStopped())
95 0 : return false;
96 1 : m_rollupQueue.done();
97 :
98 : // Wait for finalBuf to be fully filled.
99 1 : sum.join();
100 : // The executors should exit naturally, but we wait here so that we don't outrun their
101 : // completion and exit with outstanding threads.
102 1 : executorPool.WaitCompletion();
103 :
104 : // Scale the data so that we can write an 8-bit raster output.
105 1 : scaleOutput();
106 1 : if (!writeOutput(createOutputDataset(*pSrcBand, m_opts, m_extent)))
107 : {
108 0 : CPLError(CE_Failure, CPLE_AppDefined,
109 : "Unable to write to output file.");
110 0 : return false;
111 : }
112 1 : progress.emit(1);
113 :
114 1 : return true;
115 : }
116 :
117 : /// Run an executor (single viewshed)
118 : /// @param srcFilename Source filename
119 : /// @param progress Progress supporting support.
120 : /// @param err Shared error flag.
121 : /// @param running Shared count of number of executors running.
122 3 : void Cumulative::runExecutor(const std::string &srcFilename, Progress &progress,
123 : std::atomic<bool> &err, std::atomic<int> &running)
124 : {
125 6 : DatasetPtr srcDs(GDALDataset::Open(srcFilename.c_str(), GA_ReadOnly));
126 3 : if (!srcDs)
127 : {
128 0 : err = true;
129 : }
130 : else
131 : {
132 : Location loc;
133 591 : while (!err && m_observerQueue.pop(loc))
134 : {
135 : GDALDriver *memDriver =
136 588 : GetGDALDriverManager()->GetDriverByName("MEM");
137 588 : DatasetPtr dstDs(memDriver ? memDriver->Create("", m_extent.xSize(),
138 : m_extent.ySize(), 1,
139 : GDT_Byte, nullptr)
140 1764 : : nullptr);
141 588 : if (!dstDs)
142 : {
143 0 : err = true;
144 : }
145 : else
146 : {
147 : ViewshedExecutor executor(
148 588 : *srcDs->GetRasterBand(1), *dstDs->GetRasterBand(1), loc.x,
149 1764 : loc.y, m_extent, m_extent, m_opts, progress);
150 588 : err = !executor.run();
151 588 : if (!err)
152 588 : m_datasetQueue.push(std::move(dstDs));
153 : }
154 : }
155 : }
156 :
157 : // Job done. Set the output queue state. If all the executor jobs have completed,
158 : // set the dataset output queue done.
159 3 : if (err)
160 0 : m_datasetQueue.stop();
161 : else
162 : {
163 3 : running--;
164 3 : if (!running)
165 1 : m_datasetQueue.done();
166 : }
167 3 : }
168 :
169 : // Add 8-bit rasters into the 32-bit raster buffer.
170 1 : void Cumulative::rollupRasters()
171 : {
172 1 : DatasetPtr pDS;
173 :
174 1 : m_finalBuf.resize(m_extent.size());
175 4 : while (m_rollupQueue.pop(pDS))
176 : {
177 : uint8_t *srcP =
178 3 : static_cast<uint8_t *>(pDS->GetInternalHandle("MEMORY1"));
179 43263 : for (size_t i = 0; i < m_extent.size(); ++i)
180 43260 : m_finalBuf[i] += srcP[i];
181 : }
182 1 : }
183 :
184 : /// Scale the output so that it's fully spread in 8 bits. Perhaps this shouldn't happen if
185 : /// the max is less than 255?
186 1 : void Cumulative::scaleOutput()
187 : {
188 1 : uint32_t m = 0; // This gathers all the bits set.
189 14421 : for (uint32_t &val : m_finalBuf)
190 14420 : m = std::max(val, m);
191 :
192 1 : if (m == 0)
193 0 : return;
194 :
195 : double factor =
196 1 : std::numeric_limits<uint8_t>::max() / static_cast<double>(m);
197 14421 : for (uint32_t &val : m_finalBuf)
198 14420 : val = static_cast<uint32_t>(std::floor(factor * val));
199 : }
200 :
201 : /// Write the output dataset.
202 : /// @param pDstDS Pointer to the destination dataset.
203 : /// @return True if the write was successful, false otherwise.
204 1 : bool Cumulative::writeOutput(DatasetPtr pDstDS)
205 : {
206 1 : if (!pDstDS)
207 0 : return false;
208 :
209 1 : GDALRasterBand *pDstBand = pDstDS->GetRasterBand(1);
210 2 : return (pDstBand->RasterIO(GF_Write, 0, 0, m_extent.xSize(),
211 1 : m_extent.ySize(), m_finalBuf.data(),
212 : m_extent.xSize(), m_extent.ySize(), GDT_UInt32,
213 1 : 0, 0, nullptr) == 0);
214 : }
215 :
216 : } // namespace viewshed
217 : } // namespace gdal
|