Line data Source code
1 : /******************************************************************************
2 : * (c) 2024 info@hobu.co
3 : *
4 : * SPDX-License-Identifier: MIT
5 : ****************************************************************************/
6 :
7 : #include <algorithm>
8 : #include <limits>
9 : #include <thread>
10 :
11 : #include "cpl_worker_thread_pool.h"
12 : #include "memdataset.h"
13 :
14 : #include "combiner.h"
15 : #include "cumulative.h"
16 : #include "notifyqueue.h"
17 : #include "util.h"
18 : #include "viewshed_executor.h"
19 :
20 : namespace gdal
21 : {
22 : namespace viewshed
23 : {
24 :
25 : /// Constructor
26 : ///
27 : /// @param opts Options for viewshed generation.
28 2 : Cumulative::Cumulative(const Options &opts) : m_opts(opts)
29 : {
30 2 : }
31 :
32 : /// Destructor
33 : ///
34 : Cumulative::~Cumulative() = default;
35 :
36 : /// Compute the cumulative viewshed of a raster band.
37 : ///
38 : /// @param srcFilename Source filename.
39 : /// @param pfnProgress Pointer to the progress function. Can be null.
40 : /// @param pProgressArg Argument passed to the progress function
41 : /// @return True on success, false otherwise.
42 2 : bool Cumulative::run(const std::string &srcFilename,
43 : GDALProgressFunc pfnProgress, void *pProgressArg)
44 : {
45 : // In cumulative mode, we run the executors in normal mode and want "1" where things
46 : // are visible.
47 2 : m_opts.outputMode = OutputMode::Normal;
48 2 : m_opts.visibleVal = 1;
49 :
50 : DatasetPtr srcDS(
51 4 : GDALDataset::FromHandle(GDALOpen(srcFilename.c_str(), GA_ReadOnly)));
52 2 : if (!srcDS)
53 : {
54 0 : CPLError(CE_Failure, CPLE_AppDefined, "Unable open source file.");
55 0 : return false;
56 : }
57 :
58 2 : GDALRasterBand *pSrcBand = srcDS->GetRasterBand(1);
59 :
60 : // In cumulative mode, the output extent is always the entire source raster.
61 2 : m_extent.xStop = GDALGetRasterBandXSize(pSrcBand);
62 2 : m_extent.yStop = GDALGetRasterBandYSize(pSrcBand);
63 :
64 : // Make a bunch of observer locations based on the spacing and stick them on a queue
65 : // to be handled by viewshed executors.
66 19 : for (int x = 0; x < m_extent.xStop; x += m_opts.observerSpacing)
67 213 : for (int y = 0; y < m_extent.yStop; y += m_opts.observerSpacing)
68 196 : m_observerQueue.push({x, y});
69 2 : m_observerQueue.done();
70 :
71 : // Run executors.
72 2 : const int numThreads = m_opts.numJobs;
73 2 : std::atomic<bool> err = false;
74 2 : std::atomic<int> running = numThreads;
75 2 : std::atomic<bool> hasFoundNoData = false;
76 : Progress progress(pfnProgress, pProgressArg,
77 4 : m_observerQueue.size() * m_extent.ySize());
78 4 : CPLWorkerThreadPool executorPool(numThreads);
79 8 : for (int i = 0; i < numThreads; ++i)
80 6 : executorPool.SubmitJob(
81 6 : [this, &srcFilename, &progress, &err, &running, &hasFoundNoData] {
82 6 : runExecutor(srcFilename, progress, err, running,
83 : hasFoundNoData);
84 6 : });
85 :
86 : // Run combiners that create 8-bit sums of executor jobs.
87 4 : CPLWorkerThreadPool combinerPool(numThreads);
88 : std::vector<Combiner> combiners(numThreads,
89 6 : Combiner(m_datasetQueue, m_rollupQueue));
90 8 : for (Combiner &c : combiners)
91 12 : combinerPool.SubmitJob([&c] { c.run(); });
92 :
93 : // Run 32-bit rollup job that combines the 8-bit results from the combiners.
94 6 : std::thread sum([this] { rollupRasters(); });
95 :
96 : // When the combiner jobs are done, all the data is in the rollup queue.
97 2 : combinerPool.WaitCompletion();
98 2 : if (m_datasetQueue.isStopped())
99 0 : return false;
100 2 : m_rollupQueue.done();
101 :
102 : // Wait for finalBuf to be fully filled.
103 2 : sum.join();
104 : // The executors should exit naturally, but we wait here so that we don't outrun their
105 : // completion and exit with outstanding threads.
106 2 : executorPool.WaitCompletion();
107 :
108 2 : if (hasFoundNoData)
109 : {
110 0 : CPLError(
111 : CE_Warning, CPLE_AppDefined,
112 : "Nodata value found in input DEM. Output will be likely incorrect");
113 : }
114 :
115 : // Scale the data so that we can write an 8-bit raster output.
116 2 : scaleOutput();
117 2 : if (!writeOutput(createOutputDataset(*pSrcBand, m_opts, m_extent)))
118 : {
119 0 : CPLError(CE_Failure, CPLE_AppDefined,
120 : "Unable to write to output file.");
121 0 : return false;
122 : }
123 2 : progress.emit(1);
124 :
125 2 : return true;
126 : }
127 :
128 : /// Run an executor (single viewshed)
129 : /// @param srcFilename Source filename
130 : /// @param progress Progress supporting support.
131 : /// @param err Shared error flag.
132 : /// @param running Shared count of number of executors running.
133 : /// @param hasFoundNoData Shared flag to indicate if a point at nodata has been encountered.
134 6 : void Cumulative::runExecutor(const std::string &srcFilename, Progress &progress,
135 : std::atomic<bool> &err, std::atomic<int> &running,
136 : std::atomic<bool> &hasFoundNoData)
137 : {
138 12 : DatasetPtr srcDs(GDALDataset::Open(srcFilename.c_str(), GA_ReadOnly));
139 6 : if (!srcDs)
140 : {
141 0 : err = true;
142 : }
143 : else
144 : {
145 : Location loc;
146 202 : while (!err && m_observerQueue.pop(loc))
147 : {
148 196 : DatasetPtr dstDs(MEMDataset::Create(
149 392 : "", m_extent.xSize(), m_extent.ySize(), 1, GDT_Byte, nullptr));
150 196 : if (!dstDs)
151 : {
152 0 : err = true;
153 : }
154 : else
155 : {
156 : ViewshedExecutor executor(
157 196 : *srcDs->GetRasterBand(1), *dstDs->GetRasterBand(1), loc.x,
158 195 : loc.y, m_extent, m_extent, m_opts, progress,
159 587 : /* emitWarningIfNoData = */ false);
160 196 : err = !executor.run();
161 196 : if (!err)
162 196 : m_datasetQueue.push(std::move(dstDs));
163 196 : if (executor.hasFoundNoData())
164 : {
165 0 : hasFoundNoData = true;
166 : }
167 : }
168 : }
169 : }
170 :
171 : // Job done. Set the output queue state. If all the executor jobs have completed,
172 : // set the dataset output queue done.
173 6 : if (err)
174 0 : m_datasetQueue.stop();
175 : else
176 : {
177 6 : running--;
178 6 : if (!running)
179 2 : m_datasetQueue.done();
180 : }
181 6 : }
182 :
183 : // Add 8-bit rasters into the 32-bit raster buffer.
184 2 : void Cumulative::rollupRasters()
185 : {
186 2 : DatasetPtr pDS;
187 :
188 2 : m_finalBuf.resize(m_extent.size());
189 8 : while (m_rollupQueue.pop(pDS))
190 : {
191 : uint8_t *srcP =
192 6 : static_cast<uint8_t *>(pDS->GetInternalHandle("MEMORY1"));
193 86526 : for (size_t i = 0; i < m_extent.size(); ++i)
194 86520 : m_finalBuf[i] += srcP[i];
195 : }
196 2 : }
197 :
198 : /// Scale the output so that it's fully spread in 8 bits. Perhaps this shouldn't happen if
199 : /// the max is less than 255?
200 2 : void Cumulative::scaleOutput()
201 : {
202 2 : uint32_t m = 0; // This gathers all the bits set.
203 28842 : for (uint32_t &val : m_finalBuf)
204 28840 : m = std::max(val, m);
205 :
206 2 : if (m == 0)
207 0 : return;
208 :
209 : double factor =
210 2 : std::numeric_limits<uint8_t>::max() / static_cast<double>(m);
211 28842 : for (uint32_t &val : m_finalBuf)
212 28840 : val = static_cast<uint32_t>(std::floor(factor * val));
213 : }
214 :
215 : /// Write the output dataset.
216 : /// @param pDstDS Pointer to the destination dataset.
217 : /// @return True if the write was successful, false otherwise.
218 2 : bool Cumulative::writeOutput(DatasetPtr pDstDS)
219 : {
220 2 : if (!pDstDS)
221 0 : return false;
222 :
223 2 : GDALRasterBand *pDstBand = pDstDS->GetRasterBand(1);
224 4 : return (pDstBand->RasterIO(GF_Write, 0, 0, m_extent.xSize(),
225 2 : m_extent.ySize(), m_finalBuf.data(),
226 : m_extent.xSize(), m_extent.ySize(), GDT_UInt32,
227 2 : 0, 0, nullptr) == 0);
228 : }
229 :
230 : } // namespace viewshed
231 : } // namespace gdal
|