Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Elasticsearch Translator
4 : * Purpose:
5 : * Author:
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2021, Even Rouault <even dot rouault at spatialys.com>
9 : *
10 : * Permission is hereby granted, free of charge, to any person obtaining a
11 : * copy of this software and associated documentation files (the "Software"),
12 : * to deal in the Software without restriction, including without limitation
13 : * the rights to use, copy, modify, merge, publish, distribute, sublicense,
14 : * and/or sell copies of the Software, and to permit persons to whom the
15 : * Software is furnished to do so, subject to the following conditions:
16 : *
17 : * The above copyright notice and this permission notice shall be included
18 : * in all copies or substantial portions of the Software.
19 : *
20 : * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
21 : * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 : * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23 : * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 : * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
25 : * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
26 : * DEALINGS IN THE SOFTWARE.
27 : ****************************************************************************/
28 :
29 : #include "ogr_elastic.h"
30 : #include "ogrgeojsonreader.h"
31 : #include "cpl_json.h"
32 :
33 : #include <algorithm>
34 : #include <set>
35 :
36 : /************************************************************************/
37 : /* OGRElasticAggregationLayer() */
38 : /************************************************************************/
39 :
40 3 : OGRElasticAggregationLayer::OGRElasticAggregationLayer(
41 3 : OGRElasticDataSource *poDS)
42 3 : : m_poDS(poDS)
43 : {
44 3 : m_poFeatureDefn = new OGRFeatureDefn("aggregation");
45 3 : m_poFeatureDefn->SetGeomType(wkbPoint);
46 3 : m_poFeatureDefn->Reference();
47 3 : SetDescription(m_poFeatureDefn->GetName());
48 :
49 3 : OGRSpatialReference *poSRS_WGS84 = new OGRSpatialReference();
50 3 : poSRS_WGS84->SetFromUserInput(SRS_WKT_WGS84_LAT_LONG);
51 3 : poSRS_WGS84->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
52 3 : m_poFeatureDefn->GetGeomFieldDefn(0)->SetSpatialRef(poSRS_WGS84);
53 3 : poSRS_WGS84->Dereference();
54 :
55 6 : OGRFieldDefn oKey("key", OFTString);
56 3 : m_poFeatureDefn->AddFieldDefn(&oKey);
57 :
58 6 : OGRFieldDefn oDocCount("doc_count", OFTInteger64);
59 3 : m_poFeatureDefn->AddFieldDefn(&oDocCount);
60 3 : }
61 :
62 : /************************************************************************/
63 : /* ~OGRElasticAggregationLayer() */
64 : /************************************************************************/
65 :
66 6 : OGRElasticAggregationLayer::~OGRElasticAggregationLayer()
67 : {
68 3 : m_poFeatureDefn->Release();
69 6 : }
70 :
71 : /************************************************************************/
72 : /* Build() */
73 : /************************************************************************/
74 :
75 : std::unique_ptr<OGRElasticAggregationLayer>
76 3 : OGRElasticAggregationLayer::Build(OGRElasticDataSource *poDS,
77 : const char *pszAggregation)
78 : {
79 6 : CPLJSONDocument oDoc;
80 3 : if (!oDoc.LoadMemory(pszAggregation))
81 0 : return nullptr;
82 6 : const auto oRoot = oDoc.GetRoot();
83 9 : const auto osIndex = oRoot.GetString("index");
84 3 : if (osIndex.empty())
85 : {
86 0 : CPLError(CE_Failure, CPLE_AppDefined,
87 : "Missing 'index' member in AGGREGATION");
88 0 : return nullptr;
89 : }
90 :
91 9 : auto osGeometryField = oRoot.GetString("geometry_field");
92 3 : if (osGeometryField.empty())
93 : {
94 3 : std::set<CPLString> oSetLayers;
95 3 : std::vector<std::unique_ptr<OGRElasticLayer>> apoLayers;
96 3 : poDS->FetchMapping(osIndex.c_str(), oSetLayers, apoLayers);
97 3 : if (apoLayers.size() == 1)
98 : {
99 3 : apoLayers[0]->SetFeatureDefnFinalized();
100 : const int nGeomFieldCount =
101 3 : apoLayers[0]->GetLayerDefn()->GetGeomFieldCount();
102 3 : if (nGeomFieldCount == 1)
103 : {
104 6 : std::vector<CPLString> aosPath;
105 3 : bool bIsGeoPoint = false;
106 3 : apoLayers[0]->GetGeomFieldProperties(0, aosPath, bIsGeoPoint);
107 9 : for (const auto &osPart : aosPath)
108 : {
109 6 : if (!osGeometryField.empty())
110 3 : osGeometryField += '.';
111 6 : osGeometryField += osPart;
112 : }
113 : }
114 0 : else if (nGeomFieldCount == 0)
115 : {
116 0 : CPLError(
117 : CE_Failure, CPLE_AppDefined,
118 : "No geometry field found upon which to build aggregation");
119 0 : return nullptr;
120 : }
121 : else
122 : {
123 0 : CPLError(CE_Failure, CPLE_AppDefined,
124 : "Multiple geometry fields exist in the index. "
125 : "Specify one with the 'geometry_field' member in "
126 : "AGGREGATION");
127 0 : return nullptr;
128 : }
129 : }
130 : else
131 : {
132 0 : CPLError(CE_Failure, CPLE_AppDefined,
133 : "Missing 'geometry_field' member in AGGREGATION");
134 0 : return nullptr;
135 : }
136 : }
137 :
138 : auto poLayer = std::unique_ptr<OGRElasticAggregationLayer>(
139 6 : new OGRElasticAggregationLayer(poDS));
140 3 : poLayer->m_osIndexName = osIndex;
141 3 : poLayer->m_osGeometryField = std::move(osGeometryField);
142 :
143 : // Parse geohash_grid options
144 9 : auto oGeohashGrid = oRoot["geohash_grid"];
145 4 : if (oGeohashGrid.IsValid() &&
146 1 : oGeohashGrid.GetType() == CPLJSONObject::Type::Object)
147 : {
148 1 : const int nPrecision = oGeohashGrid.GetInteger("precision");
149 1 : if (nPrecision > 0)
150 1 : poLayer->m_nGeohashGridPrecision = nPrecision;
151 :
152 1 : const int nMaxSize = oGeohashGrid.GetInteger("size");
153 1 : if (nMaxSize > 0)
154 1 : poLayer->m_nGeohashGridMaxSize = nMaxSize;
155 : }
156 :
157 : // Parse additional fields that correspond to statistical operations on
158 : // fields
159 3 : poLayer->m_oFieldDef = oRoot["fields"];
160 4 : if (poLayer->m_oFieldDef.IsValid() &&
161 1 : poLayer->m_oFieldDef.GetType() == CPLJSONObject::Type::Object)
162 : {
163 : // Start with stats, to keep track of the created columns, and
164 : // avoid duplicating them if a users ask for stats and min/max/etc.
165 : // on the same property.
166 : {
167 3 : auto oOp = poLayer->m_oFieldDef["stats"];
168 1 : if (oOp.IsValid() && oOp.GetType() == CPLJSONObject::Type::Array)
169 : {
170 2 : for (const auto &oField : oOp.ToArray())
171 : {
172 1 : if (oField.GetType() == CPLJSONObject::Type::String)
173 : {
174 5 : for (const char *pszOp :
175 6 : {"min", "max", "avg", "sum", "count"})
176 : {
177 : OGRFieldDefn oFieldDefn(
178 10 : CPLSPrintf("%s_%s", oField.ToString().c_str(),
179 : pszOp),
180 5 : strcmp(pszOp, "count") == 0 ? OFTInteger64
181 15 : : OFTReal);
182 5 : poLayer->m_poFeatureDefn->AddFieldDefn(&oFieldDefn);
183 : }
184 :
185 2 : CPLJSONObject oAgg;
186 1 : CPLJSONObject oFieldAgg;
187 1 : oFieldAgg.Add("field", oField.ToString());
188 1 : oAgg.Add("stats", oFieldAgg);
189 3 : poLayer->m_oAggregatedFieldsRequest.Add(
190 2 : CPLSPrintf("%s_stats", oField.ToString().c_str()),
191 : oAgg);
192 : }
193 : }
194 : }
195 : }
196 :
197 6 : for (const char *pszOp : {"min", "max", "avg", "sum", "count"})
198 : {
199 15 : auto oOp = poLayer->m_oFieldDef[pszOp];
200 5 : if (oOp.IsValid() && oOp.GetType() == CPLJSONObject::Type::Array)
201 : {
202 11 : for (const auto &oField : oOp.ToArray())
203 : {
204 6 : if (oField.GetType() == CPLJSONObject::Type::String)
205 : {
206 6 : const char *pszFieldName = CPLSPrintf(
207 12 : "%s_%s", oField.ToString().c_str(), pszOp);
208 6 : if (poLayer->m_poFeatureDefn->GetFieldIndex(
209 6 : pszFieldName) < 0)
210 : {
211 : OGRFieldDefn oFieldDefn(pszFieldName,
212 5 : strcmp(pszOp, "count") == 0
213 : ? OFTInteger64
214 10 : : OFTReal);
215 5 : poLayer->m_poFeatureDefn->AddFieldDefn(&oFieldDefn);
216 :
217 10 : CPLJSONObject oAgg;
218 5 : CPLJSONObject oFieldAgg;
219 5 : oFieldAgg.Add("field", oField.ToString());
220 5 : oAgg.Add(strcmp(pszOp, "count") == 0 ? "value_count"
221 : : pszOp,
222 : oFieldAgg);
223 5 : poLayer->m_oAggregatedFieldsRequest.Add(
224 : oFieldDefn.GetNameRef(), oAgg);
225 : }
226 : }
227 : }
228 : }
229 : }
230 : }
231 :
232 3 : return poLayer;
233 : }
234 :
235 : /************************************************************************/
236 : /* ResetReading() */
237 : /************************************************************************/
238 :
239 6 : void OGRElasticAggregationLayer::ResetReading()
240 : {
241 6 : m_iCurFeature = 0;
242 6 : }
243 :
244 : /************************************************************************/
245 : /* SetSpatialFilter() */
246 : /************************************************************************/
247 :
248 2 : void OGRElasticAggregationLayer::SetSpatialFilter(OGRGeometry *poGeom)
249 :
250 : {
251 2 : OGRLayer::SetSpatialFilter(poGeom);
252 2 : m_bFeaturesRequested = false;
253 2 : m_apoCachedFeatures.clear();
254 2 : }
255 :
256 : /************************************************************************/
257 : /* BuildRequest() */
258 : /************************************************************************/
259 :
260 : #define FILTERED_STR "filtered"
261 : #define GRID_STR "grid"
262 : #define CENTROID_STR "centroid"
263 :
264 : // Return a JSON serialized document that is the payload to POST for a /_search
265 : // request
266 5 : std::string OGRElasticAggregationLayer::BuildRequest()
267 : {
268 10 : CPLJSONDocument oDoc;
269 10 : auto oRoot = oDoc.GetRoot();
270 5 : oRoot.Set("size", 0);
271 :
272 10 : auto aggs = CPLJSONObject();
273 :
274 : // Clamp spatial filter if needed
275 5 : m_bRequestHasSpatialFilter = false;
276 5 : OGREnvelope sEnvelope;
277 5 : if (m_poFilterGeom)
278 : {
279 2 : m_poFilterGeom->getEnvelope(&sEnvelope);
280 :
281 2 : OGRElasticLayer::ClampEnvelope(sEnvelope);
282 2 : if (!(sEnvelope.MinX == -180 && sEnvelope.MinY == -90 &&
283 1 : sEnvelope.MaxX == 180 && sEnvelope.MaxY == 90))
284 : {
285 1 : m_bRequestHasSpatialFilter = true;
286 : }
287 : }
288 :
289 5 : if (m_bRequestHasSpatialFilter)
290 : {
291 : // Add spatial filtering
292 2 : auto top_aggs = CPLJSONObject();
293 1 : oRoot.Add("aggs", top_aggs);
294 :
295 2 : auto filtered = CPLJSONObject();
296 1 : top_aggs.Add(FILTERED_STR, filtered);
297 :
298 2 : auto filter = CPLJSONObject();
299 1 : filtered.Add("filter", filter);
300 1 : filtered.Add("aggs", aggs);
301 :
302 2 : auto geo_bounding_box = CPLJSONObject();
303 1 : filter.Add("geo_bounding_box", geo_bounding_box);
304 :
305 2 : auto coordinates = CPLJSONObject();
306 1 : geo_bounding_box.Add(m_osGeometryField, coordinates);
307 :
308 2 : auto top_left = CPLJSONObject();
309 1 : coordinates.Add("top_left", top_left);
310 1 : top_left.Add("lat", sEnvelope.MaxY);
311 1 : top_left.Add("lon", sEnvelope.MinX);
312 :
313 1 : auto bottom_right = CPLJSONObject();
314 1 : coordinates.Add("bottom_right", bottom_right);
315 1 : bottom_right.Add("lat", sEnvelope.MinY);
316 1 : bottom_right.Add("lon", sEnvelope.MaxX);
317 : }
318 : else
319 : {
320 4 : oRoot.Add("aggs", aggs);
321 : }
322 :
323 10 : auto grid = CPLJSONObject();
324 5 : aggs.Add(GRID_STR, grid);
325 :
326 : // Build geohash_grid aggregation object
327 10 : auto geohash_grid = CPLJSONObject();
328 5 : grid.Add("geohash_grid", geohash_grid);
329 5 : geohash_grid.Set("field", m_osGeometryField);
330 :
331 5 : if (m_nGeohashGridPrecision >= 1)
332 : {
333 1 : geohash_grid.Set("precision", m_nGeohashGridPrecision);
334 : }
335 4 : else if (!m_bRequestHasSpatialFilter || (sEnvelope.MinX < sEnvelope.MaxX &&
336 1 : sEnvelope.MinY < sEnvelope.MaxY))
337 : {
338 4 : const double dfSpatialRatio =
339 4 : m_bRequestHasSpatialFilter
340 4 : ? (sEnvelope.MaxX - sEnvelope.MinX) / 360. *
341 1 : (sEnvelope.MaxY - sEnvelope.MinY) / 180.
342 : : 1.0;
343 :
344 : // A geohash of size 1 can encode up to 32 positions, size 2 up to 32*32
345 : // etc.
346 4 : const int geohashSize = static_cast<int>(std::min(
347 12 : 12.0, std::max(1.0, log(m_nGeohashGridMaxSize / dfSpatialRatio) /
348 4 : log(32.0))));
349 :
350 4 : geohash_grid.Set("precision", geohashSize);
351 : }
352 5 : geohash_grid.Set("size", m_nGeohashGridMaxSize);
353 :
354 10 : auto subaggs = CPLJSONObject();
355 5 : grid.Add("aggs", subaggs);
356 :
357 10 : auto centroid = CPLJSONObject();
358 5 : subaggs.Add(CENTROID_STR, centroid);
359 :
360 10 : auto geo_centroid = CPLJSONObject();
361 5 : centroid.Add("geo_centroid", geo_centroid);
362 :
363 5 : geo_centroid.Set("field", m_osGeometryField);
364 :
365 : // Add extra fields
366 11 : for (const auto &oChild : m_oAggregatedFieldsRequest.GetChildren())
367 : {
368 6 : subaggs.Add(oChild.GetName(), oChild);
369 : }
370 :
371 10 : return oRoot.Format(CPLJSONObject::PrettyFormat::Plain);
372 : }
373 :
374 : /************************************************************************/
375 : /* IssueAggregationRequest() */
376 : /************************************************************************/
377 :
378 5 : void OGRElasticAggregationLayer::IssueAggregationRequest()
379 : {
380 26 : const auto IsNumericJsonType = [](json_type type)
381 26 : { return type == json_type_int || type == json_type_double; };
382 :
383 5 : m_apoCachedFeatures.clear();
384 :
385 15 : json_object *poResponse = m_poDS->RunRequest(
386 10 : (std::string(m_poDS->GetURL()) + "/" + m_osIndexName + "/_search")
387 : .c_str(),
388 10 : BuildRequest().c_str());
389 5 : if (!poResponse)
390 0 : return;
391 5 : json_object *poBuckets = json_ex_get_object_by_path(
392 5 : poResponse, m_bRequestHasSpatialFilter
393 : ? "aggregations." FILTERED_STR "." GRID_STR ".buckets"
394 : : "aggregations." GRID_STR ".buckets");
395 5 : if (poBuckets && json_object_get_type(poBuckets) == json_type_array)
396 : {
397 5 : const auto nBuckets = json_object_array_length(poBuckets);
398 13 : for (auto i = decltype(nBuckets){0}; i < nBuckets; i++)
399 : {
400 8 : json_object *poBucket = json_object_array_get_idx(poBuckets, i);
401 8 : if (poBucket && json_object_get_type(poBucket) == json_type_object)
402 : {
403 8 : OGRFeature *poFeature = new OGRFeature(m_poFeatureDefn);
404 8 : poFeature->SetFID(i);
405 :
406 : json_object *poKey =
407 8 : CPL_json_object_object_get(poBucket, "key");
408 8 : if (poKey && json_object_get_type(poKey) == json_type_string)
409 : {
410 8 : poFeature->SetField("key", json_object_get_string(poKey));
411 : }
412 :
413 : json_object *poDocCount =
414 8 : CPL_json_object_object_get(poBucket, "doc_count");
415 16 : if (poDocCount &&
416 8 : json_object_get_type(poDocCount) == json_type_int)
417 : {
418 16 : poFeature->SetField("doc_count",
419 : static_cast<GIntBig>(
420 8 : json_object_get_int64(poDocCount)));
421 : }
422 :
423 8 : json_object *poLocation = json_ex_get_object_by_path(
424 : poBucket, CENTROID_STR ".location");
425 16 : if (poLocation &&
426 8 : json_object_get_type(poLocation) == json_type_object)
427 : {
428 : json_object *poLat =
429 8 : CPL_json_object_object_get(poLocation, "lat");
430 : json_object *poLon =
431 8 : CPL_json_object_object_get(poLocation, "lon");
432 8 : if (poLat &&
433 16 : IsNumericJsonType(json_object_get_type(poLat)) &&
434 16 : poLon && IsNumericJsonType(json_object_get_type(poLon)))
435 : {
436 : OGRPoint *poPoint =
437 8 : new OGRPoint(json_object_get_double(poLon),
438 8 : json_object_get_double(poLat));
439 8 : poPoint->assignSpatialReference(
440 16 : m_poFeatureDefn->GetGeomFieldDefn(0)
441 8 : ->GetSpatialRef());
442 8 : poFeature->SetGeometryDirectly(poPoint);
443 : }
444 : }
445 :
446 9 : if (m_oFieldDef.IsValid() &&
447 1 : m_oFieldDef.GetType() == CPLJSONObject::Type::Object)
448 : {
449 5 : for (const char *pszOp :
450 6 : {"min", "max", "avg", "sum", "count"})
451 : {
452 15 : auto oOp = m_oFieldDef[pszOp];
453 10 : if (oOp.IsValid() &&
454 5 : oOp.GetType() == CPLJSONObject::Type::Array)
455 : {
456 11 : for (const auto &oField : oOp.ToArray())
457 : {
458 6 : if (oField.GetType() ==
459 : CPLJSONObject::Type::String)
460 : {
461 : json_object *poField =
462 6 : json_ex_get_object_by_path(
463 : poBucket,
464 : CPLSPrintf(
465 : "%s_%s.value",
466 12 : oField.ToString().c_str(),
467 : pszOp));
468 11 : if (poField &&
469 5 : IsNumericJsonType(
470 : json_object_get_type(poField)))
471 : {
472 5 : const char *pszFieldName = CPLSPrintf(
473 10 : "%s_%s", oField.ToString().c_str(),
474 : pszOp);
475 5 : if (strcmp(pszOp, "count") == 0)
476 : {
477 2 : poFeature->SetField(
478 : pszFieldName,
479 : static_cast<GIntBig>(
480 1 : json_object_get_int64(
481 : poField)));
482 : }
483 : else
484 : {
485 4 : poFeature->SetField(
486 : pszFieldName,
487 : json_object_get_double(
488 : poField));
489 : }
490 : }
491 : }
492 : }
493 : }
494 : }
495 :
496 3 : auto oOp = m_oFieldDef["stats"];
497 2 : if (oOp.IsValid() &&
498 1 : oOp.GetType() == CPLJSONObject::Type::Array)
499 : {
500 2 : for (const auto &oField : oOp.ToArray())
501 : {
502 1 : if (oField.GetType() == CPLJSONObject::Type::String)
503 : {
504 5 : for (const char *pszOp :
505 6 : {"min", "max", "avg", "sum", "count"})
506 : {
507 : json_object *poField =
508 5 : json_ex_get_object_by_path(
509 : poBucket,
510 : CPLSPrintf(
511 : "%s_stats.%s",
512 10 : oField.ToString().c_str(),
513 : pszOp));
514 10 : if (poField &&
515 5 : IsNumericJsonType(
516 : json_object_get_type(poField)))
517 : {
518 5 : const char *pszFieldName = CPLSPrintf(
519 10 : "%s_%s", oField.ToString().c_str(),
520 : pszOp);
521 5 : if (strcmp(pszOp, "count") == 0)
522 : {
523 2 : poFeature->SetField(
524 : pszFieldName,
525 : static_cast<GIntBig>(
526 1 : json_object_get_int64(
527 : poField)));
528 : }
529 : else
530 : {
531 4 : poFeature->SetField(
532 : pszFieldName,
533 : json_object_get_double(
534 : poField));
535 : }
536 : }
537 : }
538 : }
539 : }
540 : }
541 : }
542 :
543 8 : m_apoCachedFeatures.emplace_back(poFeature);
544 : }
545 : }
546 : }
547 5 : json_object_put(poResponse);
548 : }
549 :
550 : /************************************************************************/
551 : /* GetNextRawFeature() */
552 : /************************************************************************/
553 :
554 7 : OGRFeature *OGRElasticAggregationLayer::GetNextRawFeature()
555 : {
556 7 : if (!m_bFeaturesRequested)
557 : {
558 4 : m_bFeaturesRequested = true;
559 4 : IssueAggregationRequest();
560 : }
561 7 : if (m_iCurFeature < static_cast<int>(m_apoCachedFeatures.size()))
562 : {
563 6 : auto poFeature = m_apoCachedFeatures[m_iCurFeature]->Clone();
564 6 : ++m_iCurFeature;
565 6 : return poFeature;
566 : }
567 :
568 1 : return nullptr;
569 : }
570 :
571 : /************************************************************************/
572 : /* GetFeatureCount() */
573 : /************************************************************************/
574 :
575 3 : GIntBig OGRElasticAggregationLayer::GetFeatureCount(int bForce)
576 : {
577 3 : if (m_poFilterGeom == nullptr && m_poAttrQuery == nullptr)
578 : {
579 2 : if (!m_bFeaturesRequested)
580 : {
581 1 : m_bFeaturesRequested = true;
582 1 : IssueAggregationRequest();
583 : }
584 2 : return static_cast<int>(m_apoCachedFeatures.size());
585 : }
586 1 : return OGRLayer::GetFeatureCount(bForce);
587 : }
588 :
589 : /************************************************************************/
590 : /* TestCapability() */
591 : /************************************************************************/
592 :
593 1 : int OGRElasticAggregationLayer::TestCapability(const char *pszCap)
594 : {
595 1 : return EQUAL(pszCap, OLCStringsAsUTF8);
596 : }
597 :
598 : /************************************************************************/
599 : /* GetDataset() */
600 : /************************************************************************/
601 :
602 1 : GDALDataset *OGRElasticAggregationLayer::GetDataset()
603 : {
604 1 : return m_poDS;
605 : }
|