Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Parquet Translator
4 : * Purpose: Implements OGRParquetDriver.
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * Permission is hereby granted, free of charge, to any person obtaining a
11 : * copy of this software and associated documentation files (the "Software"),
12 : * to deal in the Software without restriction, including without limitation
13 : * the rights to use, copy, modify, merge, publish, distribute, sublicense,
14 : * and/or sell copies of the Software, and to permit persons to whom the
15 : * Software is furnished to do so, subject to the following conditions:
16 : *
17 : * The above copyright notice and this permission notice shall be included
18 : * in all copies or substantial portions of the Software.
19 : *
20 : * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
21 : * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 : * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23 : * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 : * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
25 : * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
26 : * DEALINGS IN THE SOFTWARE.
27 : ****************************************************************************/
28 :
29 : #include "ogrsf_frmts.h"
30 :
31 : #include <algorithm>
32 : #include <map>
33 : #include <set>
34 : #include <utility>
35 :
36 : #include "ogr_parquet.h"
37 :
38 : #include "../arrow_common/ograrrowlayer.hpp"
39 : #include "../arrow_common/ograrrowdataset.hpp"
40 :
41 : /************************************************************************/
42 : /* OGRParquetLayer() */
43 : /************************************************************************/
44 :
45 21 : OGRParquetDatasetLayer::OGRParquetDatasetLayer(
46 : OGRParquetDataset *poDS, const char *pszLayerName,
47 : const std::shared_ptr<arrow::dataset::Scanner> &scanner,
48 21 : const std::shared_ptr<arrow::Schema> &schema, CSLConstList papszOpenOptions)
49 : : OGRParquetLayerBase(poDS, pszLayerName, papszOpenOptions),
50 21 : m_poScanner(scanner)
51 : {
52 21 : m_poSchema = schema;
53 21 : EstablishFeatureDefn();
54 21 : CPLAssert(static_cast<int>(m_aeGeomEncoding.size()) ==
55 : m_poFeatureDefn->GetGeomFieldCount());
56 21 : }
57 :
58 : /************************************************************************/
59 : /* EstablishFeatureDefn() */
60 : /************************************************************************/
61 :
62 21 : void OGRParquetDatasetLayer::EstablishFeatureDefn()
63 : {
64 21 : const auto &kv_metadata = m_poSchema->metadata();
65 :
66 21 : LoadGeoMetadata(kv_metadata);
67 : const auto oMapFieldNameToGDALSchemaFieldDefn =
68 42 : LoadGDALSchema(kv_metadata.get());
69 :
70 21 : LoadGDALMetadata(kv_metadata.get());
71 :
72 21 : const auto &fields = m_poSchema->fields();
73 149 : for (int i = 0; i < m_poSchema->num_fields(); ++i)
74 : {
75 128 : const auto &field = fields[i];
76 :
77 128 : if (!m_osFIDColumn.empty() && field->name() == m_osFIDColumn)
78 : {
79 0 : m_iFIDArrowColumn = i;
80 0 : continue;
81 : }
82 :
83 : const bool bGeometryField =
84 129 : DealWithGeometryColumn(i, field, []() { return wkbUnknown; });
85 128 : if (!bGeometryField)
86 : {
87 125 : CreateFieldFromSchema(field, {i},
88 : oMapFieldNameToGDALSchemaFieldDefn);
89 : }
90 : }
91 :
92 21 : CPLAssert(static_cast<int>(m_anMapFieldIndexToArrowColumn.size()) ==
93 : m_poFeatureDefn->GetFieldCount());
94 21 : CPLAssert(static_cast<int>(m_anMapGeomFieldIndexToArrowColumn.size()) ==
95 : m_poFeatureDefn->GetGeomFieldCount());
96 21 : }
97 :
98 : /************************************************************************/
99 : /* ReadNextBatch() */
100 : /************************************************************************/
101 :
102 231 : bool OGRParquetDatasetLayer::ReadNextBatch()
103 : {
104 231 : m_nIdxInBatch = 0;
105 :
106 231 : if (m_poRecordBatchReader == nullptr)
107 : {
108 81 : auto result = m_poScanner->ToRecordBatchReader();
109 81 : if (!result.ok())
110 : {
111 0 : CPLError(CE_Failure, CPLE_AppDefined,
112 : "ToRecordBatchReader() failed: %s",
113 0 : result.status().message().c_str());
114 0 : return false;
115 : }
116 81 : m_poRecordBatchReader = *result;
117 81 : if (m_poRecordBatchReader == nullptr)
118 0 : return false;
119 : }
120 :
121 462 : std::shared_ptr<arrow::RecordBatch> poNextBatch;
122 2 : do
123 : {
124 233 : ++m_iRecordBatch;
125 :
126 233 : poNextBatch.reset();
127 233 : auto status = m_poRecordBatchReader->ReadNext(&poNextBatch);
128 233 : if (!status.ok())
129 : {
130 0 : CPLError(CE_Failure, CPLE_AppDefined, "ReadNext() failed: %s",
131 0 : status.message().c_str());
132 0 : poNextBatch.reset();
133 : }
134 233 : if (poNextBatch == nullptr)
135 : {
136 72 : m_poBatch.reset();
137 72 : return false;
138 : }
139 161 : } while (poNextBatch->num_rows() == 0);
140 :
141 159 : SetBatch(poNextBatch);
142 :
143 159 : return true;
144 : }
145 :
146 : /************************************************************************/
147 : /* InvalidateCachedBatches() */
148 : /************************************************************************/
149 :
150 32 : void OGRParquetDatasetLayer::InvalidateCachedBatches()
151 : {
152 32 : ResetReading();
153 32 : }
154 :
155 : /************************************************************************/
156 : /* GetFeatureCount() */
157 : /************************************************************************/
158 :
159 36 : GIntBig OGRParquetDatasetLayer::GetFeatureCount(int bForce)
160 : {
161 36 : if (m_poAttrQuery == nullptr && m_poFilterGeom == nullptr)
162 : {
163 29 : auto status = m_poScanner->CountRows();
164 29 : if (status.ok())
165 29 : return *status;
166 : }
167 7 : return OGRLayer::GetFeatureCount(bForce);
168 : }
169 :
170 : /************************************************************************/
171 : /* GetExtent() */
172 : /************************************************************************/
173 :
174 2 : OGRErr OGRParquetDatasetLayer::GetExtent(OGREnvelope *psExtent, int bForce)
175 : {
176 2 : return GetExtent(0, psExtent, bForce);
177 : }
178 :
179 : /************************************************************************/
180 : /* FastGetExtent() */
181 : /************************************************************************/
182 :
183 19 : bool OGRParquetDatasetLayer::FastGetExtent(int iGeomField,
184 : OGREnvelope *psExtent) const
185 : {
186 19 : const auto oIter = m_oMapExtents.find(iGeomField);
187 19 : if (oIter != m_oMapExtents.end())
188 : {
189 9 : *psExtent = oIter->second;
190 9 : return true;
191 : }
192 :
193 10 : return false;
194 : }
195 :
196 : /************************************************************************/
197 : /* GetExtent() */
198 : /************************************************************************/
199 :
200 10 : OGRErr OGRParquetDatasetLayer::GetExtent(int iGeomField, OGREnvelope *psExtent,
201 : int bForce)
202 : {
203 10 : if (iGeomField < 0 || iGeomField >= m_poFeatureDefn->GetGeomFieldCount())
204 : {
205 4 : if (iGeomField != 0)
206 : {
207 3 : CPLError(CE_Failure, CPLE_AppDefined,
208 : "Invalid geometry field index : %d", iGeomField);
209 : }
210 4 : return OGRERR_FAILURE;
211 : }
212 :
213 6 : if (FastGetExtent(iGeomField, psExtent))
214 : {
215 3 : return OGRERR_NONE;
216 : }
217 :
218 : // bbox in general m_oMapGeometryColumns can not be trusted (at least at
219 : // time of writing), so we have to iterate over each fragment.
220 : const char *pszGeomFieldName =
221 3 : m_poFeatureDefn->GetGeomFieldDefn(iGeomField)->GetNameRef();
222 3 : auto oIter = m_oMapGeometryColumns.find(pszGeomFieldName);
223 3 : if (oIter != m_oMapGeometryColumns.end())
224 : {
225 3 : auto statusFragments = m_poScanner->dataset()->GetFragments();
226 3 : if (statusFragments.ok())
227 : {
228 3 : *psExtent = OGREnvelope();
229 3 : int nFragmentCount = 0;
230 3 : int nBBoxFragmentCount = 0;
231 8 : for (const auto &oFragmentStatus : *statusFragments)
232 : {
233 5 : if (oFragmentStatus.ok())
234 : {
235 : auto statusSchema =
236 5 : (*oFragmentStatus)->ReadPhysicalSchema();
237 5 : if (statusSchema.ok())
238 : {
239 5 : nFragmentCount++;
240 5 : const auto &kv_metadata = (*statusSchema)->metadata();
241 5 : if (kv_metadata && kv_metadata->Contains("geo"))
242 : {
243 10 : auto geo = kv_metadata->Get("geo");
244 10 : CPLJSONDocument oDoc;
245 5 : if (geo.ok() && oDoc.LoadMemory(*geo))
246 : {
247 10 : auto oRoot = oDoc.GetRoot();
248 15 : auto oColumns = oRoot.GetObj("columns");
249 15 : auto oCol = oColumns.GetObj(pszGeomFieldName);
250 5 : OGREnvelope3D sFragmentExtent;
251 10 : if (oCol.IsValid() &&
252 5 : GetExtentFromMetadata(
253 : oCol, &sFragmentExtent) == OGRERR_NONE)
254 : {
255 4 : nBBoxFragmentCount++;
256 4 : psExtent->Merge(sFragmentExtent);
257 : }
258 : }
259 : }
260 5 : if (nFragmentCount != nBBoxFragmentCount)
261 1 : break;
262 : }
263 : }
264 : }
265 3 : if (nFragmentCount == nBBoxFragmentCount)
266 : {
267 2 : m_oMapExtents[iGeomField] = *psExtent;
268 2 : return OGRERR_NONE;
269 : }
270 : }
271 : }
272 :
273 1 : return OGRParquetLayerBase::GetExtent(iGeomField, psExtent, bForce);
274 : }
|