Line data Source code
1 : /******************************************************************************
2 : * (c) 2024 info@hobu.co
3 : *
4 : * SPDX-License-Identifier: MIT
5 : ****************************************************************************/
6 :
7 : #include <algorithm>
8 : #include <limits>
9 : #include <thread>
10 :
11 : #include "cpl_worker_thread_pool.h"
12 : #include "memdataset.h"
13 :
14 : #include "combiner.h"
15 : #include "cumulative.h"
16 : #include "notifyqueue.h"
17 : #include "util.h"
18 : #include "viewshed_executor.h"
19 :
20 : namespace gdal
21 : {
22 : namespace viewshed
23 : {
24 :
25 : /// Constructor
26 : ///
27 : /// @param opts Options for viewshed generation.
28 2 : Cumulative::Cumulative(const Options &opts) : m_opts(opts)
29 : {
30 2 : }
31 :
32 : /// Destructor
33 : ///
34 : Cumulative::~Cumulative() = default;
35 :
36 : /// Compute the cumulative viewshed of a raster band.
37 : ///
38 : /// @param srcFilename Source filename.
39 : /// @param pfnProgress Pointer to the progress function. Can be null.
40 : /// @param pProgressArg Argument passed to the progress function
41 : /// @return True on success, false otherwise.
42 2 : bool Cumulative::run(const std::string &srcFilename,
43 : GDALProgressFunc pfnProgress, void *pProgressArg)
44 : {
45 : // In cumulative mode, we run the executors in normal mode and want "1" where things
46 : // are visible.
47 2 : m_opts.outputMode = OutputMode::Normal;
48 2 : m_opts.visibleVal = 1;
49 :
50 : DatasetPtr srcDS(
51 4 : GDALDataset::FromHandle(GDALOpen(srcFilename.c_str(), GA_ReadOnly)));
52 2 : if (!srcDS)
53 : {
54 0 : CPLError(CE_Failure, CPLE_AppDefined, "Unable open source file.");
55 0 : return false;
56 : }
57 :
58 2 : GDALRasterBand *pSrcBand = srcDS->GetRasterBand(1);
59 :
60 : // In cumulative mode, the output extent is always the entire source raster.
61 2 : m_extent.xStop = GDALGetRasterBandXSize(pSrcBand);
62 2 : m_extent.yStop = GDALGetRasterBandYSize(pSrcBand);
63 :
64 : // Make a bunch of observer locations based on the spacing and stick them on a queue
65 : // to be handled by viewshed executors.
66 19 : for (int x = 0; x < m_extent.xStop; x += m_opts.observerSpacing)
67 213 : for (int y = 0; y < m_extent.yStop; y += m_opts.observerSpacing)
68 196 : m_observerQueue.push({x, y});
69 2 : m_observerQueue.done();
70 :
71 : // Run executors.
72 2 : const int numThreads = m_opts.numJobs;
73 2 : std::atomic<bool> err = false;
74 2 : std::atomic<int> running = numThreads;
75 2 : std::atomic<bool> hasFoundNoData = false;
76 : Progress progress(pfnProgress, pProgressArg,
77 4 : m_observerQueue.size() * m_extent.ySize());
78 4 : CPLWorkerThreadPool executorPool(numThreads);
79 8 : for (int i = 0; i < numThreads; ++i)
80 6 : executorPool.SubmitJob(
81 6 : [this, &srcFilename, &progress, &err, &running, &hasFoundNoData]
82 : {
83 6 : runExecutor(srcFilename, progress, err, running,
84 : hasFoundNoData);
85 6 : });
86 :
87 : // Run combiners that create 8-bit sums of executor jobs.
88 4 : CPLWorkerThreadPool combinerPool(numThreads);
89 : std::vector<Combiner> combiners(numThreads,
90 6 : Combiner(m_datasetQueue, m_rollupQueue));
91 8 : for (Combiner &c : combiners)
92 12 : combinerPool.SubmitJob([&c] { c.run(); });
93 :
94 : // Run 32-bit rollup job that combines the 8-bit results from the combiners.
95 6 : std::thread sum([this] { rollupRasters(); });
96 :
97 : // When the combiner jobs are done, all the data is in the rollup queue.
98 2 : combinerPool.WaitCompletion();
99 2 : if (m_datasetQueue.isStopped())
100 0 : return false;
101 2 : m_rollupQueue.done();
102 :
103 : // Wait for finalBuf to be fully filled.
104 2 : sum.join();
105 : // The executors should exit naturally, but we wait here so that we don't outrun their
106 : // completion and exit with outstanding threads.
107 2 : executorPool.WaitCompletion();
108 :
109 2 : if (hasFoundNoData)
110 : {
111 0 : CPLError(
112 : CE_Warning, CPLE_AppDefined,
113 : "Nodata value found in input DEM. Output will be likely incorrect");
114 : }
115 :
116 : // Scale the data so that we can write an 8-bit raster output.
117 2 : scaleOutput();
118 2 : if (!writeOutput(createOutputDataset(*pSrcBand, m_opts, m_extent)))
119 : {
120 0 : CPLError(CE_Failure, CPLE_AppDefined,
121 : "Unable to write to output file.");
122 0 : return false;
123 : }
124 2 : progress.emit(1);
125 :
126 2 : return true;
127 : }
128 :
129 : /// Run an executor (single viewshed)
130 : /// @param srcFilename Source filename
131 : /// @param progress Progress supporting support.
132 : /// @param err Shared error flag.
133 : /// @param running Shared count of number of executors running.
134 : /// @param hasFoundNoData Shared flag to indicate if a point at nodata has been encountered.
135 6 : void Cumulative::runExecutor(const std::string &srcFilename, Progress &progress,
136 : std::atomic<bool> &err, std::atomic<int> &running,
137 : std::atomic<bool> &hasFoundNoData)
138 : {
139 12 : DatasetPtr srcDs(GDALDataset::Open(srcFilename.c_str(), GA_ReadOnly));
140 6 : if (!srcDs)
141 : {
142 0 : err = true;
143 : }
144 : else
145 : {
146 : Location loc;
147 202 : while (!err && m_observerQueue.pop(loc))
148 : {
149 196 : DatasetPtr dstDs(MEMDataset::Create(
150 392 : "", m_extent.xSize(), m_extent.ySize(), 1, GDT_UInt8, nullptr));
151 196 : if (!dstDs)
152 : {
153 0 : err = true;
154 : }
155 : else
156 : {
157 : ViewshedExecutor executor(
158 196 : *srcDs->GetRasterBand(1), *dstDs->GetRasterBand(1), loc.x,
159 196 : loc.y, m_extent, m_extent, m_opts, progress,
160 588 : /* emitWarningIfNoData = */ false);
161 196 : err = !executor.run();
162 196 : if (!err)
163 196 : m_datasetQueue.push(std::move(dstDs));
164 196 : if (executor.hasFoundNoData())
165 : {
166 0 : hasFoundNoData = true;
167 : }
168 : }
169 : }
170 : }
171 :
172 : // Job done. Set the output queue state. If all the executor jobs have completed,
173 : // set the dataset output queue done.
174 6 : if (err)
175 0 : m_datasetQueue.stop();
176 : else
177 : {
178 6 : running--;
179 6 : if (!running)
180 2 : m_datasetQueue.done();
181 : }
182 6 : }
183 :
184 : // Add 8-bit rasters into the 32-bit raster buffer.
185 2 : void Cumulative::rollupRasters()
186 : {
187 2 : DatasetPtr pDS;
188 :
189 2 : m_finalBuf.resize(m_extent.size());
190 8 : while (m_rollupQueue.pop(pDS))
191 : {
192 : uint8_t *srcP =
193 6 : static_cast<uint8_t *>(pDS->GetInternalHandle("MEMORY1"));
194 86526 : for (size_t i = 0; i < m_extent.size(); ++i)
195 86520 : m_finalBuf[i] += srcP[i];
196 : }
197 2 : }
198 :
199 : /// Scale the output so that it's fully spread in 8 bits. Perhaps this shouldn't happen if
200 : /// the max is less than 255?
201 2 : void Cumulative::scaleOutput()
202 : {
203 2 : uint32_t m = 0; // This gathers all the bits set.
204 28842 : for (uint32_t &val : m_finalBuf)
205 28840 : m = std::max(val, m);
206 :
207 2 : if (m == 0)
208 0 : return;
209 :
210 : double factor =
211 2 : std::numeric_limits<uint8_t>::max() / static_cast<double>(m);
212 28842 : for (uint32_t &val : m_finalBuf)
213 28840 : val = static_cast<uint32_t>(std::floor(factor * val));
214 : }
215 :
216 : /// Write the output dataset.
217 : /// @param pDstDS Pointer to the destination dataset.
218 : /// @return True if the write was successful, false otherwise.
219 2 : bool Cumulative::writeOutput(DatasetPtr pDstDS)
220 : {
221 2 : if (!pDstDS)
222 0 : return false;
223 :
224 2 : GDALRasterBand *pDstBand = pDstDS->GetRasterBand(1);
225 4 : return (pDstBand->RasterIO(GF_Write, 0, 0, m_extent.xSize(),
226 2 : m_extent.ySize(), m_finalBuf.data(),
227 : m_extent.xSize(), m_extent.ySize(), GDT_UInt32,
228 2 : 0, 0, nullptr) == 0);
229 : }
230 :
231 : } // namespace viewshed
232 : } // namespace gdal
|