Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Elasticsearch Translator
4 : * Purpose:
5 : * Author:
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2021, Even Rouault <even dot rouault at spatialys.com>
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "ogr_elastic.h"
14 : #include "ogrlibjsonutils.h"
15 : #include "cpl_json.h"
16 :
17 : #include <algorithm>
18 : #include <set>
19 :
20 : /************************************************************************/
21 : /* OGRElasticAggregationLayer() */
22 : /************************************************************************/
23 :
24 3 : OGRElasticAggregationLayer::OGRElasticAggregationLayer(
25 3 : OGRElasticDataSource *poDS)
26 3 : : m_poDS(poDS)
27 : {
28 3 : m_poFeatureDefn = new OGRFeatureDefn("aggregation");
29 3 : m_poFeatureDefn->SetGeomType(wkbPoint);
30 3 : m_poFeatureDefn->Reference();
31 3 : SetDescription(m_poFeatureDefn->GetName());
32 :
33 3 : OGRSpatialReference *poSRS_WGS84 = new OGRSpatialReference();
34 3 : poSRS_WGS84->SetFromUserInput(SRS_WKT_WGS84_LAT_LONG);
35 3 : poSRS_WGS84->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
36 3 : m_poFeatureDefn->GetGeomFieldDefn(0)->SetSpatialRef(poSRS_WGS84);
37 3 : poSRS_WGS84->Dereference();
38 :
39 6 : OGRFieldDefn oKey("key", OFTString);
40 3 : m_poFeatureDefn->AddFieldDefn(&oKey);
41 :
42 6 : OGRFieldDefn oDocCount("doc_count", OFTInteger64);
43 3 : m_poFeatureDefn->AddFieldDefn(&oDocCount);
44 3 : }
45 :
46 : /************************************************************************/
47 : /* ~OGRElasticAggregationLayer() */
48 : /************************************************************************/
49 :
50 6 : OGRElasticAggregationLayer::~OGRElasticAggregationLayer()
51 : {
52 3 : m_poFeatureDefn->Release();
53 6 : }
54 :
55 : /************************************************************************/
56 : /* Build() */
57 : /************************************************************************/
58 :
59 : std::unique_ptr<OGRElasticAggregationLayer>
60 3 : OGRElasticAggregationLayer::Build(OGRElasticDataSource *poDS,
61 : const char *pszAggregation)
62 : {
63 6 : CPLJSONDocument oDoc;
64 3 : if (!oDoc.LoadMemory(pszAggregation))
65 0 : return nullptr;
66 6 : const auto oRoot = oDoc.GetRoot();
67 9 : const auto osIndex = oRoot.GetString("index");
68 3 : if (osIndex.empty())
69 : {
70 0 : CPLError(CE_Failure, CPLE_AppDefined,
71 : "Missing 'index' member in AGGREGATION");
72 0 : return nullptr;
73 : }
74 :
75 9 : auto osGeometryField = oRoot.GetString("geometry_field");
76 3 : if (osGeometryField.empty())
77 : {
78 3 : std::set<CPLString> oSetLayers;
79 3 : std::vector<std::unique_ptr<OGRElasticLayer>> apoLayers;
80 3 : poDS->FetchMapping(osIndex.c_str(), oSetLayers, apoLayers);
81 3 : if (apoLayers.size() == 1)
82 : {
83 3 : apoLayers[0]->SetFeatureDefnFinalized();
84 : const int nGeomFieldCount =
85 3 : apoLayers[0]->GetLayerDefn()->GetGeomFieldCount();
86 3 : if (nGeomFieldCount == 1)
87 : {
88 6 : std::vector<CPLString> aosPath;
89 3 : bool bIsGeoPoint = false;
90 3 : apoLayers[0]->GetGeomFieldProperties(0, aosPath, bIsGeoPoint);
91 9 : for (const auto &osPart : aosPath)
92 : {
93 6 : if (!osGeometryField.empty())
94 3 : osGeometryField += '.';
95 6 : osGeometryField += osPart;
96 : }
97 : }
98 0 : else if (nGeomFieldCount == 0)
99 : {
100 0 : CPLError(
101 : CE_Failure, CPLE_AppDefined,
102 : "No geometry field found upon which to build aggregation");
103 0 : return nullptr;
104 : }
105 : else
106 : {
107 0 : CPLError(CE_Failure, CPLE_AppDefined,
108 : "Multiple geometry fields exist in the index. "
109 : "Specify one with the 'geometry_field' member in "
110 : "AGGREGATION");
111 0 : return nullptr;
112 : }
113 : }
114 : else
115 : {
116 0 : CPLError(CE_Failure, CPLE_AppDefined,
117 : "Missing 'geometry_field' member in AGGREGATION");
118 0 : return nullptr;
119 : }
120 : }
121 :
122 : auto poLayer = std::unique_ptr<OGRElasticAggregationLayer>(
123 6 : new OGRElasticAggregationLayer(poDS));
124 3 : poLayer->m_osIndexName = osIndex;
125 3 : poLayer->m_osGeometryField = std::move(osGeometryField);
126 :
127 : // Parse geohash_grid options
128 9 : auto oGeohashGrid = oRoot["geohash_grid"];
129 4 : if (oGeohashGrid.IsValid() &&
130 1 : oGeohashGrid.GetType() == CPLJSONObject::Type::Object)
131 : {
132 1 : const int nPrecision = oGeohashGrid.GetInteger("precision");
133 1 : if (nPrecision > 0)
134 1 : poLayer->m_nGeohashGridPrecision = nPrecision;
135 :
136 1 : const int nMaxSize = oGeohashGrid.GetInteger("size");
137 1 : if (nMaxSize > 0)
138 1 : poLayer->m_nGeohashGridMaxSize = nMaxSize;
139 : }
140 :
141 : // Parse additional fields that correspond to statistical operations on
142 : // fields
143 3 : poLayer->m_oFieldDef = oRoot["fields"];
144 4 : if (poLayer->m_oFieldDef.IsValid() &&
145 1 : poLayer->m_oFieldDef.GetType() == CPLJSONObject::Type::Object)
146 : {
147 : // Start with stats, to keep track of the created columns, and
148 : // avoid duplicating them if a users ask for stats and min/max/etc.
149 : // on the same property.
150 : {
151 3 : auto oOp = poLayer->m_oFieldDef["stats"];
152 1 : if (oOp.IsValid() && oOp.GetType() == CPLJSONObject::Type::Array)
153 : {
154 2 : for (const auto &oField : oOp.ToArray())
155 : {
156 1 : if (oField.GetType() == CPLJSONObject::Type::String)
157 : {
158 5 : for (const char *pszOp :
159 6 : {"min", "max", "avg", "sum", "count"})
160 : {
161 : OGRFieldDefn oFieldDefn(
162 10 : CPLSPrintf("%s_%s", oField.ToString().c_str(),
163 : pszOp),
164 5 : strcmp(pszOp, "count") == 0 ? OFTInteger64
165 15 : : OFTReal);
166 5 : poLayer->m_poFeatureDefn->AddFieldDefn(&oFieldDefn);
167 : }
168 :
169 2 : CPLJSONObject oAgg;
170 1 : CPLJSONObject oFieldAgg;
171 1 : oFieldAgg.Add("field", oField.ToString());
172 1 : oAgg.Add("stats", oFieldAgg);
173 3 : poLayer->m_oAggregatedFieldsRequest.Add(
174 2 : CPLSPrintf("%s_stats", oField.ToString().c_str()),
175 : oAgg);
176 : }
177 : }
178 : }
179 : }
180 :
181 6 : for (const char *pszOp : {"min", "max", "avg", "sum", "count"})
182 : {
183 15 : auto oOp = poLayer->m_oFieldDef[pszOp];
184 5 : if (oOp.IsValid() && oOp.GetType() == CPLJSONObject::Type::Array)
185 : {
186 11 : for (const auto &oField : oOp.ToArray())
187 : {
188 6 : if (oField.GetType() == CPLJSONObject::Type::String)
189 : {
190 6 : const char *pszFieldName = CPLSPrintf(
191 12 : "%s_%s", oField.ToString().c_str(), pszOp);
192 6 : if (poLayer->m_poFeatureDefn->GetFieldIndex(
193 6 : pszFieldName) < 0)
194 : {
195 : OGRFieldDefn oFieldDefn(pszFieldName,
196 5 : strcmp(pszOp, "count") == 0
197 : ? OFTInteger64
198 10 : : OFTReal);
199 5 : poLayer->m_poFeatureDefn->AddFieldDefn(&oFieldDefn);
200 :
201 10 : CPLJSONObject oAgg;
202 5 : CPLJSONObject oFieldAgg;
203 5 : oFieldAgg.Add("field", oField.ToString());
204 5 : oAgg.Add(strcmp(pszOp, "count") == 0 ? "value_count"
205 : : pszOp,
206 : oFieldAgg);
207 5 : poLayer->m_oAggregatedFieldsRequest.Add(
208 : oFieldDefn.GetNameRef(), oAgg);
209 : }
210 : }
211 : }
212 : }
213 : }
214 : }
215 :
216 3 : return poLayer;
217 : }
218 :
219 : /************************************************************************/
220 : /* ResetReading() */
221 : /************************************************************************/
222 :
223 6 : void OGRElasticAggregationLayer::ResetReading()
224 : {
225 6 : m_iCurFeature = 0;
226 6 : }
227 :
228 : /************************************************************************/
229 : /* ISetSpatialFilter() */
230 : /************************************************************************/
231 :
232 2 : OGRErr OGRElasticAggregationLayer::ISetSpatialFilter(int iGeomField,
233 : const OGRGeometry *poGeom)
234 :
235 : {
236 2 : const OGRErr eErr = OGRLayer::ISetSpatialFilter(iGeomField, poGeom);
237 2 : if (eErr == OGRERR_NONE)
238 : {
239 2 : m_bFeaturesRequested = false;
240 2 : m_apoCachedFeatures.clear();
241 : }
242 2 : return eErr;
243 : }
244 :
245 : /************************************************************************/
246 : /* BuildRequest() */
247 : /************************************************************************/
248 :
249 : #define FILTERED_STR "filtered"
250 : #define GRID_STR "grid"
251 : #define CENTROID_STR "centroid"
252 :
253 : // Return a JSON serialized document that is the payload to POST for a /_search
254 : // request
255 5 : std::string OGRElasticAggregationLayer::BuildRequest()
256 : {
257 10 : CPLJSONDocument oDoc;
258 10 : auto oRoot = oDoc.GetRoot();
259 5 : oRoot.Set("size", 0);
260 :
261 10 : auto aggs = CPLJSONObject();
262 :
263 : // Clamp spatial filter if needed
264 5 : m_bRequestHasSpatialFilter = false;
265 5 : OGREnvelope sEnvelope;
266 5 : if (m_poFilterGeom)
267 : {
268 2 : m_poFilterGeom->getEnvelope(&sEnvelope);
269 :
270 2 : OGRElasticLayer::ClampEnvelope(sEnvelope);
271 2 : if (!(sEnvelope.MinX == -180 && sEnvelope.MinY == -90 &&
272 1 : sEnvelope.MaxX == 180 && sEnvelope.MaxY == 90))
273 : {
274 1 : m_bRequestHasSpatialFilter = true;
275 : }
276 : }
277 :
278 5 : if (m_bRequestHasSpatialFilter)
279 : {
280 : // Add spatial filtering
281 2 : auto top_aggs = CPLJSONObject();
282 1 : oRoot.Add("aggs", top_aggs);
283 :
284 2 : auto filtered = CPLJSONObject();
285 1 : top_aggs.Add(FILTERED_STR, filtered);
286 :
287 2 : auto filter = CPLJSONObject();
288 1 : filtered.Add("filter", filter);
289 1 : filtered.Add("aggs", aggs);
290 :
291 2 : auto geo_bounding_box = CPLJSONObject();
292 1 : filter.Add("geo_bounding_box", geo_bounding_box);
293 :
294 2 : auto coordinates = CPLJSONObject();
295 1 : geo_bounding_box.Add(m_osGeometryField, coordinates);
296 :
297 2 : auto top_left = CPLJSONObject();
298 1 : coordinates.Add("top_left", top_left);
299 1 : top_left.Add("lat", sEnvelope.MaxY);
300 1 : top_left.Add("lon", sEnvelope.MinX);
301 :
302 1 : auto bottom_right = CPLJSONObject();
303 1 : coordinates.Add("bottom_right", bottom_right);
304 1 : bottom_right.Add("lat", sEnvelope.MinY);
305 1 : bottom_right.Add("lon", sEnvelope.MaxX);
306 : }
307 : else
308 : {
309 4 : oRoot.Add("aggs", aggs);
310 : }
311 :
312 10 : auto grid = CPLJSONObject();
313 5 : aggs.Add(GRID_STR, grid);
314 :
315 : // Build geohash_grid aggregation object
316 10 : auto geohash_grid = CPLJSONObject();
317 5 : grid.Add("geohash_grid", geohash_grid);
318 5 : geohash_grid.Set("field", m_osGeometryField);
319 :
320 5 : if (m_nGeohashGridPrecision >= 1)
321 : {
322 1 : geohash_grid.Set("precision", m_nGeohashGridPrecision);
323 : }
324 4 : else if (!m_bRequestHasSpatialFilter || (sEnvelope.MinX < sEnvelope.MaxX &&
325 1 : sEnvelope.MinY < sEnvelope.MaxY))
326 : {
327 4 : const double dfSpatialRatio =
328 4 : m_bRequestHasSpatialFilter
329 4 : ? (sEnvelope.MaxX - sEnvelope.MinX) / 360. *
330 1 : (sEnvelope.MaxY - sEnvelope.MinY) / 180.
331 : : 1.0;
332 :
333 : // A geohash of size 1 can encode up to 32 positions, size 2 up to 32*32
334 : // etc.
335 4 : const int geohashSize = static_cast<int>(std::min(
336 12 : 12.0, std::max(1.0, log(m_nGeohashGridMaxSize / dfSpatialRatio) /
337 4 : log(32.0))));
338 :
339 4 : geohash_grid.Set("precision", geohashSize);
340 : }
341 5 : geohash_grid.Set("size", m_nGeohashGridMaxSize);
342 :
343 10 : auto subaggs = CPLJSONObject();
344 5 : grid.Add("aggs", subaggs);
345 :
346 10 : auto centroid = CPLJSONObject();
347 5 : subaggs.Add(CENTROID_STR, centroid);
348 :
349 10 : auto geo_centroid = CPLJSONObject();
350 5 : centroid.Add("geo_centroid", geo_centroid);
351 :
352 5 : geo_centroid.Set("field", m_osGeometryField);
353 :
354 : // Add extra fields
355 11 : for (const auto &oChild : m_oAggregatedFieldsRequest.GetChildren())
356 : {
357 6 : subaggs.Add(oChild.GetName(), oChild);
358 : }
359 :
360 10 : return oRoot.Format(CPLJSONObject::PrettyFormat::Plain);
361 : }
362 :
363 : /************************************************************************/
364 : /* IssueAggregationRequest() */
365 : /************************************************************************/
366 :
367 5 : void OGRElasticAggregationLayer::IssueAggregationRequest()
368 : {
369 26 : const auto IsNumericJsonType = [](json_type type)
370 26 : { return type == json_type_int || type == json_type_double; };
371 :
372 5 : m_apoCachedFeatures.clear();
373 :
374 15 : json_object *poResponse = m_poDS->RunRequest(
375 10 : (std::string(m_poDS->GetURL()) + "/" + m_osIndexName + "/_search")
376 : .c_str(),
377 10 : BuildRequest().c_str());
378 5 : if (!poResponse)
379 0 : return;
380 5 : json_object *poBuckets = json_ex_get_object_by_path(
381 5 : poResponse, m_bRequestHasSpatialFilter
382 : ? "aggregations." FILTERED_STR "." GRID_STR ".buckets"
383 : : "aggregations." GRID_STR ".buckets");
384 5 : if (poBuckets && json_object_get_type(poBuckets) == json_type_array)
385 : {
386 5 : const auto nBuckets = json_object_array_length(poBuckets);
387 13 : for (auto i = decltype(nBuckets){0}; i < nBuckets; i++)
388 : {
389 8 : json_object *poBucket = json_object_array_get_idx(poBuckets, i);
390 8 : if (poBucket && json_object_get_type(poBucket) == json_type_object)
391 : {
392 8 : OGRFeature *poFeature = new OGRFeature(m_poFeatureDefn);
393 8 : poFeature->SetFID(i);
394 :
395 : json_object *poKey =
396 8 : CPL_json_object_object_get(poBucket, "key");
397 8 : if (poKey && json_object_get_type(poKey) == json_type_string)
398 : {
399 8 : poFeature->SetField("key", json_object_get_string(poKey));
400 : }
401 :
402 : json_object *poDocCount =
403 8 : CPL_json_object_object_get(poBucket, "doc_count");
404 16 : if (poDocCount &&
405 8 : json_object_get_type(poDocCount) == json_type_int)
406 : {
407 16 : poFeature->SetField("doc_count",
408 : static_cast<GIntBig>(
409 8 : json_object_get_int64(poDocCount)));
410 : }
411 :
412 8 : json_object *poLocation = json_ex_get_object_by_path(
413 : poBucket, CENTROID_STR ".location");
414 16 : if (poLocation &&
415 8 : json_object_get_type(poLocation) == json_type_object)
416 : {
417 : json_object *poLat =
418 8 : CPL_json_object_object_get(poLocation, "lat");
419 : json_object *poLon =
420 8 : CPL_json_object_object_get(poLocation, "lon");
421 8 : if (poLat &&
422 16 : IsNumericJsonType(json_object_get_type(poLat)) &&
423 16 : poLon && IsNumericJsonType(json_object_get_type(poLon)))
424 : {
425 : OGRPoint *poPoint =
426 8 : new OGRPoint(json_object_get_double(poLon),
427 8 : json_object_get_double(poLat));
428 8 : poPoint->assignSpatialReference(
429 16 : m_poFeatureDefn->GetGeomFieldDefn(0)
430 8 : ->GetSpatialRef());
431 8 : poFeature->SetGeometryDirectly(poPoint);
432 : }
433 : }
434 :
435 9 : if (m_oFieldDef.IsValid() &&
436 1 : m_oFieldDef.GetType() == CPLJSONObject::Type::Object)
437 : {
438 5 : for (const char *pszOp :
439 6 : {"min", "max", "avg", "sum", "count"})
440 : {
441 15 : auto oOp = m_oFieldDef[pszOp];
442 10 : if (oOp.IsValid() &&
443 5 : oOp.GetType() == CPLJSONObject::Type::Array)
444 : {
445 11 : for (const auto &oField : oOp.ToArray())
446 : {
447 6 : if (oField.GetType() ==
448 : CPLJSONObject::Type::String)
449 : {
450 : json_object *poField =
451 6 : json_ex_get_object_by_path(
452 : poBucket,
453 : CPLSPrintf(
454 : "%s_%s.value",
455 12 : oField.ToString().c_str(),
456 : pszOp));
457 11 : if (poField &&
458 5 : IsNumericJsonType(
459 : json_object_get_type(poField)))
460 : {
461 5 : const char *pszFieldName = CPLSPrintf(
462 10 : "%s_%s", oField.ToString().c_str(),
463 : pszOp);
464 5 : if (strcmp(pszOp, "count") == 0)
465 : {
466 2 : poFeature->SetField(
467 : pszFieldName,
468 : static_cast<GIntBig>(
469 1 : json_object_get_int64(
470 : poField)));
471 : }
472 : else
473 : {
474 4 : poFeature->SetField(
475 : pszFieldName,
476 : json_object_get_double(
477 : poField));
478 : }
479 : }
480 : }
481 : }
482 : }
483 : }
484 :
485 3 : auto oOp = m_oFieldDef["stats"];
486 2 : if (oOp.IsValid() &&
487 1 : oOp.GetType() == CPLJSONObject::Type::Array)
488 : {
489 2 : for (const auto &oField : oOp.ToArray())
490 : {
491 1 : if (oField.GetType() == CPLJSONObject::Type::String)
492 : {
493 5 : for (const char *pszOp :
494 6 : {"min", "max", "avg", "sum", "count"})
495 : {
496 : json_object *poField =
497 5 : json_ex_get_object_by_path(
498 : poBucket,
499 : CPLSPrintf(
500 : "%s_stats.%s",
501 10 : oField.ToString().c_str(),
502 : pszOp));
503 10 : if (poField &&
504 5 : IsNumericJsonType(
505 : json_object_get_type(poField)))
506 : {
507 5 : const char *pszFieldName = CPLSPrintf(
508 10 : "%s_%s", oField.ToString().c_str(),
509 : pszOp);
510 5 : if (strcmp(pszOp, "count") == 0)
511 : {
512 2 : poFeature->SetField(
513 : pszFieldName,
514 : static_cast<GIntBig>(
515 1 : json_object_get_int64(
516 : poField)));
517 : }
518 : else
519 : {
520 4 : poFeature->SetField(
521 : pszFieldName,
522 : json_object_get_double(
523 : poField));
524 : }
525 : }
526 : }
527 : }
528 : }
529 : }
530 : }
531 :
532 8 : m_apoCachedFeatures.emplace_back(poFeature);
533 : }
534 : }
535 : }
536 5 : json_object_put(poResponse);
537 : }
538 :
539 : /************************************************************************/
540 : /* GetNextRawFeature() */
541 : /************************************************************************/
542 :
543 7 : OGRFeature *OGRElasticAggregationLayer::GetNextRawFeature()
544 : {
545 7 : if (!m_bFeaturesRequested)
546 : {
547 4 : m_bFeaturesRequested = true;
548 4 : IssueAggregationRequest();
549 : }
550 7 : if (m_iCurFeature < static_cast<int>(m_apoCachedFeatures.size()))
551 : {
552 6 : auto poFeature = m_apoCachedFeatures[m_iCurFeature]->Clone();
553 6 : ++m_iCurFeature;
554 6 : return poFeature;
555 : }
556 :
557 1 : return nullptr;
558 : }
559 :
560 : /************************************************************************/
561 : /* GetFeatureCount() */
562 : /************************************************************************/
563 :
564 3 : GIntBig OGRElasticAggregationLayer::GetFeatureCount(int bForce)
565 : {
566 3 : if (m_poFilterGeom == nullptr && m_poAttrQuery == nullptr)
567 : {
568 2 : if (!m_bFeaturesRequested)
569 : {
570 1 : m_bFeaturesRequested = true;
571 1 : IssueAggregationRequest();
572 : }
573 2 : return static_cast<int>(m_apoCachedFeatures.size());
574 : }
575 1 : return OGRLayer::GetFeatureCount(bForce);
576 : }
577 :
578 : /************************************************************************/
579 : /* TestCapability() */
580 : /************************************************************************/
581 :
582 1 : int OGRElasticAggregationLayer::TestCapability(const char *pszCap)
583 : {
584 1 : return EQUAL(pszCap, OLCStringsAsUTF8);
585 : }
586 :
587 : /************************************************************************/
588 : /* GetDataset() */
589 : /************************************************************************/
590 :
591 1 : GDALDataset *OGRElasticAggregationLayer::GetDataset()
592 : {
593 1 : return m_poDS;
594 : }
|