Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Elasticsearch Translator
4 : * Purpose:
5 : * Author:
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2021, Even Rouault <even dot rouault at spatialys.com>
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "ogr_elastic.h"
14 : #include "ogrlibjsonutils.h"
15 : #include "cpl_json.h"
16 :
17 : #include <algorithm>
18 : #include <set>
19 :
20 : /************************************************************************/
21 : /* OGRElasticAggregationLayer() */
22 : /************************************************************************/
23 :
24 3 : OGRElasticAggregationLayer::OGRElasticAggregationLayer(
25 3 : OGRElasticDataSource *poDS)
26 3 : : m_poDS(poDS)
27 : {
28 3 : m_poFeatureDefn = new OGRFeatureDefn("aggregation");
29 3 : m_poFeatureDefn->SetGeomType(wkbPoint);
30 3 : m_poFeatureDefn->Reference();
31 3 : SetDescription(m_poFeatureDefn->GetName());
32 :
33 3 : OGRSpatialReference *poSRS_WGS84 = new OGRSpatialReference();
34 3 : poSRS_WGS84->SetFromUserInput(SRS_WKT_WGS84_LAT_LONG);
35 3 : poSRS_WGS84->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
36 3 : m_poFeatureDefn->GetGeomFieldDefn(0)->SetSpatialRef(poSRS_WGS84);
37 3 : poSRS_WGS84->Dereference();
38 :
39 6 : OGRFieldDefn oKey("key", OFTString);
40 3 : m_poFeatureDefn->AddFieldDefn(&oKey);
41 :
42 6 : OGRFieldDefn oDocCount("doc_count", OFTInteger64);
43 3 : m_poFeatureDefn->AddFieldDefn(&oDocCount);
44 3 : }
45 :
46 : /************************************************************************/
47 : /* ~OGRElasticAggregationLayer() */
48 : /************************************************************************/
49 :
50 6 : OGRElasticAggregationLayer::~OGRElasticAggregationLayer()
51 : {
52 3 : m_poFeatureDefn->Release();
53 6 : }
54 :
55 : /************************************************************************/
56 : /* Build() */
57 : /************************************************************************/
58 :
59 : std::unique_ptr<OGRElasticAggregationLayer>
60 3 : OGRElasticAggregationLayer::Build(OGRElasticDataSource *poDS,
61 : const char *pszAggregation)
62 : {
63 6 : CPLJSONDocument oDoc;
64 3 : if (!oDoc.LoadMemory(pszAggregation))
65 0 : return nullptr;
66 6 : const auto oRoot = oDoc.GetRoot();
67 9 : std::string osIndex = oRoot.GetString("index");
68 3 : if (osIndex.empty())
69 : {
70 0 : CPLError(CE_Failure, CPLE_AppDefined,
71 : "Missing 'index' member in AGGREGATION");
72 0 : return nullptr;
73 : }
74 :
75 9 : auto osGeometryField = oRoot.GetString("geometry_field");
76 3 : if (osGeometryField.empty())
77 : {
78 3 : std::set<CPLString> oSetLayers;
79 3 : std::vector<std::unique_ptr<OGRElasticLayer>> apoLayers;
80 3 : poDS->FetchMapping(osIndex.c_str(), oSetLayers, apoLayers);
81 3 : if (apoLayers.size() == 1)
82 : {
83 3 : apoLayers[0]->SetFeatureDefnFinalized();
84 : const int nGeomFieldCount =
85 3 : apoLayers[0]->GetLayerDefn()->GetGeomFieldCount();
86 3 : if (nGeomFieldCount == 1)
87 : {
88 6 : std::vector<CPLString> aosPath;
89 3 : bool bIsGeoPoint = false;
90 3 : apoLayers[0]->GetGeomFieldProperties(0, aosPath, bIsGeoPoint);
91 9 : for (const auto &osPart : aosPath)
92 : {
93 6 : if (!osGeometryField.empty())
94 3 : osGeometryField += '.';
95 6 : osGeometryField += osPart;
96 : }
97 : }
98 0 : else if (nGeomFieldCount == 0)
99 : {
100 0 : CPLError(
101 : CE_Failure, CPLE_AppDefined,
102 : "No geometry field found upon which to build aggregation");
103 0 : return nullptr;
104 : }
105 : else
106 : {
107 0 : CPLError(CE_Failure, CPLE_AppDefined,
108 : "Multiple geometry fields exist in the index. "
109 : "Specify one with the 'geometry_field' member in "
110 : "AGGREGATION");
111 0 : return nullptr;
112 : }
113 : }
114 : else
115 : {
116 0 : CPLError(CE_Failure, CPLE_AppDefined,
117 : "Missing 'geometry_field' member in AGGREGATION");
118 0 : return nullptr;
119 : }
120 : }
121 :
122 6 : auto poLayer = std::make_unique<OGRElasticAggregationLayer>(poDS);
123 3 : poLayer->m_osIndexName = std::move(osIndex);
124 3 : poLayer->m_osGeometryField = std::move(osGeometryField);
125 :
126 : // Parse geohash_grid options
127 9 : auto oGeohashGrid = oRoot["geohash_grid"];
128 4 : if (oGeohashGrid.IsValid() &&
129 1 : oGeohashGrid.GetType() == CPLJSONObject::Type::Object)
130 : {
131 1 : const int nPrecision = oGeohashGrid.GetInteger("precision");
132 1 : if (nPrecision > 0)
133 1 : poLayer->m_nGeohashGridPrecision = nPrecision;
134 :
135 1 : const int nMaxSize = oGeohashGrid.GetInteger("size");
136 1 : if (nMaxSize > 0)
137 1 : poLayer->m_nGeohashGridMaxSize = nMaxSize;
138 : }
139 :
140 : // Parse additional fields that correspond to statistical operations on
141 : // fields
142 3 : poLayer->m_oFieldDef = oRoot["fields"];
143 4 : if (poLayer->m_oFieldDef.IsValid() &&
144 1 : poLayer->m_oFieldDef.GetType() == CPLJSONObject::Type::Object)
145 : {
146 : // Start with stats, to keep track of the created columns, and
147 : // avoid duplicating them if a users ask for stats and min/max/etc.
148 : // on the same property.
149 : {
150 3 : auto oOp = poLayer->m_oFieldDef["stats"];
151 1 : if (oOp.IsValid() && oOp.GetType() == CPLJSONObject::Type::Array)
152 : {
153 2 : for (const auto &oField : oOp.ToArray())
154 : {
155 1 : if (oField.GetType() == CPLJSONObject::Type::String)
156 : {
157 5 : for (const char *pszOp :
158 6 : {"min", "max", "avg", "sum", "count"})
159 : {
160 : OGRFieldDefn oFieldDefn(
161 10 : CPLSPrintf("%s_%s", oField.ToString().c_str(),
162 : pszOp),
163 5 : strcmp(pszOp, "count") == 0 ? OFTInteger64
164 15 : : OFTReal);
165 5 : poLayer->m_poFeatureDefn->AddFieldDefn(&oFieldDefn);
166 : }
167 :
168 2 : CPLJSONObject oAgg;
169 1 : CPLJSONObject oFieldAgg;
170 1 : oFieldAgg.Add("field", oField.ToString());
171 1 : oAgg.Add("stats", oFieldAgg);
172 3 : poLayer->m_oAggregatedFieldsRequest.Add(
173 2 : CPLSPrintf("%s_stats", oField.ToString().c_str()),
174 : oAgg);
175 : }
176 : }
177 : }
178 : }
179 :
180 6 : for (const char *pszOp : {"min", "max", "avg", "sum", "count"})
181 : {
182 15 : auto oOp = poLayer->m_oFieldDef[pszOp];
183 5 : if (oOp.IsValid() && oOp.GetType() == CPLJSONObject::Type::Array)
184 : {
185 11 : for (const auto &oField : oOp.ToArray())
186 : {
187 6 : if (oField.GetType() == CPLJSONObject::Type::String)
188 : {
189 6 : const char *pszFieldName = CPLSPrintf(
190 12 : "%s_%s", oField.ToString().c_str(), pszOp);
191 6 : if (poLayer->m_poFeatureDefn->GetFieldIndex(
192 6 : pszFieldName) < 0)
193 : {
194 : OGRFieldDefn oFieldDefn(pszFieldName,
195 5 : strcmp(pszOp, "count") == 0
196 : ? OFTInteger64
197 10 : : OFTReal);
198 5 : poLayer->m_poFeatureDefn->AddFieldDefn(&oFieldDefn);
199 :
200 10 : CPLJSONObject oAgg;
201 5 : CPLJSONObject oFieldAgg;
202 5 : oFieldAgg.Add("field", oField.ToString());
203 5 : oAgg.Add(strcmp(pszOp, "count") == 0 ? "value_count"
204 : : pszOp,
205 : oFieldAgg);
206 5 : poLayer->m_oAggregatedFieldsRequest.Add(
207 : oFieldDefn.GetNameRef(), oAgg);
208 : }
209 : }
210 : }
211 : }
212 : }
213 : }
214 :
215 3 : return poLayer;
216 : }
217 :
218 : /************************************************************************/
219 : /* ResetReading() */
220 : /************************************************************************/
221 :
222 6 : void OGRElasticAggregationLayer::ResetReading()
223 : {
224 6 : m_iCurFeature = 0;
225 6 : }
226 :
227 : /************************************************************************/
228 : /* ISetSpatialFilter() */
229 : /************************************************************************/
230 :
231 2 : OGRErr OGRElasticAggregationLayer::ISetSpatialFilter(int iGeomField,
232 : const OGRGeometry *poGeom)
233 :
234 : {
235 2 : const OGRErr eErr = OGRLayer::ISetSpatialFilter(iGeomField, poGeom);
236 2 : if (eErr == OGRERR_NONE)
237 : {
238 2 : m_bFeaturesRequested = false;
239 2 : m_apoCachedFeatures.clear();
240 : }
241 2 : return eErr;
242 : }
243 :
244 : /************************************************************************/
245 : /* BuildRequest() */
246 : /************************************************************************/
247 :
248 : #define FILTERED_STR "filtered"
249 : #define GRID_STR "grid"
250 : #define CENTROID_STR "centroid"
251 :
252 : // Return a JSON serialized document that is the payload to POST for a /_search
253 : // request
254 5 : std::string OGRElasticAggregationLayer::BuildRequest()
255 : {
256 10 : CPLJSONDocument oDoc;
257 10 : auto oRoot = oDoc.GetRoot();
258 5 : oRoot.Set("size", 0);
259 :
260 10 : auto aggs = CPLJSONObject();
261 :
262 : // Clamp spatial filter if needed
263 5 : m_bRequestHasSpatialFilter = false;
264 5 : OGREnvelope sEnvelope;
265 5 : if (m_poFilterGeom)
266 : {
267 2 : m_poFilterGeom->getEnvelope(&sEnvelope);
268 :
269 2 : OGRElasticLayer::ClampEnvelope(sEnvelope);
270 2 : if (!(sEnvelope.MinX == -180 && sEnvelope.MinY == -90 &&
271 1 : sEnvelope.MaxX == 180 && sEnvelope.MaxY == 90))
272 : {
273 1 : m_bRequestHasSpatialFilter = true;
274 : }
275 : }
276 :
277 5 : if (m_bRequestHasSpatialFilter)
278 : {
279 : // Add spatial filtering
280 2 : auto top_aggs = CPLJSONObject();
281 1 : oRoot.Add("aggs", top_aggs);
282 :
283 2 : auto filtered = CPLJSONObject();
284 1 : top_aggs.Add(FILTERED_STR, filtered);
285 :
286 2 : auto filter = CPLJSONObject();
287 1 : filtered.Add("filter", filter);
288 1 : filtered.Add("aggs", aggs);
289 :
290 2 : auto geo_bounding_box = CPLJSONObject();
291 1 : filter.Add("geo_bounding_box", geo_bounding_box);
292 :
293 2 : auto coordinates = CPLJSONObject();
294 1 : geo_bounding_box.Add(m_osGeometryField, coordinates);
295 :
296 2 : auto top_left = CPLJSONObject();
297 1 : coordinates.Add("top_left", top_left);
298 1 : top_left.Add("lat", sEnvelope.MaxY);
299 1 : top_left.Add("lon", sEnvelope.MinX);
300 :
301 1 : auto bottom_right = CPLJSONObject();
302 1 : coordinates.Add("bottom_right", bottom_right);
303 1 : bottom_right.Add("lat", sEnvelope.MinY);
304 1 : bottom_right.Add("lon", sEnvelope.MaxX);
305 : }
306 : else
307 : {
308 4 : oRoot.Add("aggs", aggs);
309 : }
310 :
311 10 : auto grid = CPLJSONObject();
312 5 : aggs.Add(GRID_STR, grid);
313 :
314 : // Build geohash_grid aggregation object
315 10 : auto geohash_grid = CPLJSONObject();
316 5 : grid.Add("geohash_grid", geohash_grid);
317 5 : geohash_grid.Set("field", m_osGeometryField);
318 :
319 5 : if (m_nGeohashGridPrecision >= 1)
320 : {
321 1 : geohash_grid.Set("precision", m_nGeohashGridPrecision);
322 : }
323 4 : else if (!m_bRequestHasSpatialFilter || (sEnvelope.MinX < sEnvelope.MaxX &&
324 1 : sEnvelope.MinY < sEnvelope.MaxY))
325 : {
326 4 : const double dfSpatialRatio =
327 4 : m_bRequestHasSpatialFilter
328 4 : ? (sEnvelope.MaxX - sEnvelope.MinX) / 360. *
329 1 : (sEnvelope.MaxY - sEnvelope.MinY) / 180.
330 : : 1.0;
331 :
332 : // A geohash of size 1 can encode up to 32 positions, size 2 up to 32*32
333 : // etc.
334 4 : const int geohashSize = static_cast<int>(std::min(
335 12 : 12.0, std::max(1.0, log(m_nGeohashGridMaxSize / dfSpatialRatio) /
336 4 : log(32.0))));
337 :
338 4 : geohash_grid.Set("precision", geohashSize);
339 : }
340 5 : geohash_grid.Set("size", m_nGeohashGridMaxSize);
341 :
342 10 : auto subaggs = CPLJSONObject();
343 5 : grid.Add("aggs", subaggs);
344 :
345 10 : auto centroid = CPLJSONObject();
346 5 : subaggs.Add(CENTROID_STR, centroid);
347 :
348 10 : auto geo_centroid = CPLJSONObject();
349 5 : centroid.Add("geo_centroid", geo_centroid);
350 :
351 5 : geo_centroid.Set("field", m_osGeometryField);
352 :
353 : // Add extra fields
354 11 : for (const auto &oChild : m_oAggregatedFieldsRequest.GetChildren())
355 : {
356 6 : subaggs.Add(oChild.GetName(), oChild);
357 : }
358 :
359 10 : return oRoot.Format(CPLJSONObject::PrettyFormat::Plain);
360 : }
361 :
362 : /************************************************************************/
363 : /* IssueAggregationRequest() */
364 : /************************************************************************/
365 :
366 5 : void OGRElasticAggregationLayer::IssueAggregationRequest()
367 : {
368 26 : const auto IsNumericJsonType = [](json_type type)
369 26 : { return type == json_type_int || type == json_type_double; };
370 :
371 5 : m_apoCachedFeatures.clear();
372 :
373 15 : json_object *poResponse = m_poDS->RunRequest(
374 10 : (std::string(m_poDS->GetURL()) + "/" + m_osIndexName + "/_search")
375 : .c_str(),
376 10 : BuildRequest().c_str());
377 5 : if (!poResponse)
378 0 : return;
379 5 : json_object *poBuckets = json_ex_get_object_by_path(
380 5 : poResponse, m_bRequestHasSpatialFilter
381 : ? "aggregations." FILTERED_STR "." GRID_STR ".buckets"
382 : : "aggregations." GRID_STR ".buckets");
383 5 : if (poBuckets && json_object_get_type(poBuckets) == json_type_array)
384 : {
385 5 : const auto nBuckets = json_object_array_length(poBuckets);
386 13 : for (auto i = decltype(nBuckets){0}; i < nBuckets; i++)
387 : {
388 8 : json_object *poBucket = json_object_array_get_idx(poBuckets, i);
389 8 : if (poBucket && json_object_get_type(poBucket) == json_type_object)
390 : {
391 8 : OGRFeature *poFeature = new OGRFeature(m_poFeatureDefn);
392 8 : poFeature->SetFID(i);
393 :
394 : json_object *poKey =
395 8 : CPL_json_object_object_get(poBucket, "key");
396 8 : if (poKey && json_object_get_type(poKey) == json_type_string)
397 : {
398 8 : poFeature->SetField("key", json_object_get_string(poKey));
399 : }
400 :
401 : json_object *poDocCount =
402 8 : CPL_json_object_object_get(poBucket, "doc_count");
403 16 : if (poDocCount &&
404 8 : json_object_get_type(poDocCount) == json_type_int)
405 : {
406 16 : poFeature->SetField("doc_count",
407 : static_cast<GIntBig>(
408 8 : json_object_get_int64(poDocCount)));
409 : }
410 :
411 8 : json_object *poLocation = json_ex_get_object_by_path(
412 : poBucket, CENTROID_STR ".location");
413 16 : if (poLocation &&
414 8 : json_object_get_type(poLocation) == json_type_object)
415 : {
416 : json_object *poLat =
417 8 : CPL_json_object_object_get(poLocation, "lat");
418 : json_object *poLon =
419 8 : CPL_json_object_object_get(poLocation, "lon");
420 8 : if (poLat &&
421 16 : IsNumericJsonType(json_object_get_type(poLat)) &&
422 16 : poLon && IsNumericJsonType(json_object_get_type(poLon)))
423 : {
424 : OGRPoint *poPoint =
425 8 : new OGRPoint(json_object_get_double(poLon),
426 8 : json_object_get_double(poLat));
427 8 : poPoint->assignSpatialReference(
428 16 : m_poFeatureDefn->GetGeomFieldDefn(0)
429 8 : ->GetSpatialRef());
430 8 : poFeature->SetGeometryDirectly(poPoint);
431 : }
432 : }
433 :
434 9 : if (m_oFieldDef.IsValid() &&
435 1 : m_oFieldDef.GetType() == CPLJSONObject::Type::Object)
436 : {
437 5 : for (const char *pszOp :
438 6 : {"min", "max", "avg", "sum", "count"})
439 : {
440 15 : auto oOp = m_oFieldDef[pszOp];
441 10 : if (oOp.IsValid() &&
442 5 : oOp.GetType() == CPLJSONObject::Type::Array)
443 : {
444 11 : for (const auto &oField : oOp.ToArray())
445 : {
446 6 : if (oField.GetType() ==
447 : CPLJSONObject::Type::String)
448 : {
449 : json_object *poField =
450 6 : json_ex_get_object_by_path(
451 : poBucket,
452 : CPLSPrintf(
453 : "%s_%s.value",
454 12 : oField.ToString().c_str(),
455 : pszOp));
456 11 : if (poField &&
457 5 : IsNumericJsonType(
458 : json_object_get_type(poField)))
459 : {
460 5 : const char *pszFieldName = CPLSPrintf(
461 10 : "%s_%s", oField.ToString().c_str(),
462 : pszOp);
463 5 : if (strcmp(pszOp, "count") == 0)
464 : {
465 2 : poFeature->SetField(
466 : pszFieldName,
467 : static_cast<GIntBig>(
468 1 : json_object_get_int64(
469 : poField)));
470 : }
471 : else
472 : {
473 4 : poFeature->SetField(
474 : pszFieldName,
475 : json_object_get_double(
476 : poField));
477 : }
478 : }
479 : }
480 : }
481 : }
482 : }
483 :
484 3 : auto oOp = m_oFieldDef["stats"];
485 2 : if (oOp.IsValid() &&
486 1 : oOp.GetType() == CPLJSONObject::Type::Array)
487 : {
488 2 : for (const auto &oField : oOp.ToArray())
489 : {
490 1 : if (oField.GetType() == CPLJSONObject::Type::String)
491 : {
492 5 : for (const char *pszOp :
493 6 : {"min", "max", "avg", "sum", "count"})
494 : {
495 : json_object *poField =
496 5 : json_ex_get_object_by_path(
497 : poBucket,
498 : CPLSPrintf(
499 : "%s_stats.%s",
500 10 : oField.ToString().c_str(),
501 : pszOp));
502 10 : if (poField &&
503 5 : IsNumericJsonType(
504 : json_object_get_type(poField)))
505 : {
506 5 : const char *pszFieldName = CPLSPrintf(
507 10 : "%s_%s", oField.ToString().c_str(),
508 : pszOp);
509 5 : if (strcmp(pszOp, "count") == 0)
510 : {
511 2 : poFeature->SetField(
512 : pszFieldName,
513 : static_cast<GIntBig>(
514 1 : json_object_get_int64(
515 : poField)));
516 : }
517 : else
518 : {
519 4 : poFeature->SetField(
520 : pszFieldName,
521 : json_object_get_double(
522 : poField));
523 : }
524 : }
525 : }
526 : }
527 : }
528 : }
529 : }
530 :
531 8 : m_apoCachedFeatures.emplace_back(poFeature);
532 : }
533 : }
534 : }
535 5 : json_object_put(poResponse);
536 : }
537 :
538 : /************************************************************************/
539 : /* GetNextRawFeature() */
540 : /************************************************************************/
541 :
542 7 : OGRFeature *OGRElasticAggregationLayer::GetNextRawFeature()
543 : {
544 7 : if (!m_bFeaturesRequested)
545 : {
546 4 : m_bFeaturesRequested = true;
547 4 : IssueAggregationRequest();
548 : }
549 7 : if (m_iCurFeature < static_cast<int>(m_apoCachedFeatures.size()))
550 : {
551 6 : auto poFeature = m_apoCachedFeatures[m_iCurFeature]->Clone();
552 6 : ++m_iCurFeature;
553 6 : return poFeature;
554 : }
555 :
556 1 : return nullptr;
557 : }
558 :
559 : /************************************************************************/
560 : /* GetFeatureCount() */
561 : /************************************************************************/
562 :
563 3 : GIntBig OGRElasticAggregationLayer::GetFeatureCount(int bForce)
564 : {
565 3 : if (m_poFilterGeom == nullptr && m_poAttrQuery == nullptr)
566 : {
567 2 : if (!m_bFeaturesRequested)
568 : {
569 1 : m_bFeaturesRequested = true;
570 1 : IssueAggregationRequest();
571 : }
572 2 : return static_cast<int>(m_apoCachedFeatures.size());
573 : }
574 1 : return OGRLayer::GetFeatureCount(bForce);
575 : }
576 :
577 : /************************************************************************/
578 : /* TestCapability() */
579 : /************************************************************************/
580 :
581 1 : int OGRElasticAggregationLayer::TestCapability(const char *pszCap)
582 : {
583 1 : return EQUAL(pszCap, OLCStringsAsUTF8);
584 : }
585 :
586 : /************************************************************************/
587 : /* GetDataset() */
588 : /************************************************************************/
589 :
590 1 : GDALDataset *OGRElasticAggregationLayer::GetDataset()
591 : {
592 1 : return m_poDS;
593 : }
|