Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Elasticsearch Translator
4 : * Purpose:
5 : * Author:
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2021, Even Rouault <even dot rouault at spatialys.com>
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #include "ogr_elastic.h"
14 : #include "ogrlibjsonutils.h"
15 : #include "cpl_json.h"
16 :
17 : #include <algorithm>
18 : #include <set>
19 :
20 : /************************************************************************/
21 : /* OGRElasticAggregationLayer() */
22 : /************************************************************************/
23 :
24 3 : OGRElasticAggregationLayer::OGRElasticAggregationLayer(
25 3 : OGRElasticDataSource *poDS)
26 3 : : m_poDS(poDS)
27 : {
28 3 : m_poFeatureDefn = new OGRFeatureDefn("aggregation");
29 3 : m_poFeatureDefn->SetGeomType(wkbPoint);
30 3 : m_poFeatureDefn->Reference();
31 3 : SetDescription(m_poFeatureDefn->GetName());
32 :
33 3 : OGRSpatialReference *poSRS_WGS84 = new OGRSpatialReference();
34 3 : poSRS_WGS84->SetFromUserInput(SRS_WKT_WGS84_LAT_LONG);
35 3 : poSRS_WGS84->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
36 3 : m_poFeatureDefn->GetGeomFieldDefn(0)->SetSpatialRef(poSRS_WGS84);
37 3 : poSRS_WGS84->Dereference();
38 :
39 6 : OGRFieldDefn oKey("key", OFTString);
40 3 : m_poFeatureDefn->AddFieldDefn(&oKey);
41 :
42 6 : OGRFieldDefn oDocCount("doc_count", OFTInteger64);
43 3 : m_poFeatureDefn->AddFieldDefn(&oDocCount);
44 3 : }
45 :
46 : /************************************************************************/
47 : /* ~OGRElasticAggregationLayer() */
48 : /************************************************************************/
49 :
50 6 : OGRElasticAggregationLayer::~OGRElasticAggregationLayer()
51 : {
52 3 : m_poFeatureDefn->Release();
53 6 : }
54 :
55 : /************************************************************************/
56 : /* Build() */
57 : /************************************************************************/
58 :
59 : std::unique_ptr<OGRElasticAggregationLayer>
60 3 : OGRElasticAggregationLayer::Build(OGRElasticDataSource *poDS,
61 : const char *pszAggregation)
62 : {
63 6 : CPLJSONDocument oDoc;
64 3 : if (!oDoc.LoadMemory(pszAggregation))
65 0 : return nullptr;
66 6 : const auto oRoot = oDoc.GetRoot();
67 9 : const auto osIndex = oRoot.GetString("index");
68 3 : if (osIndex.empty())
69 : {
70 0 : CPLError(CE_Failure, CPLE_AppDefined,
71 : "Missing 'index' member in AGGREGATION");
72 0 : return nullptr;
73 : }
74 :
75 9 : auto osGeometryField = oRoot.GetString("geometry_field");
76 3 : if (osGeometryField.empty())
77 : {
78 3 : std::set<CPLString> oSetLayers;
79 3 : std::vector<std::unique_ptr<OGRElasticLayer>> apoLayers;
80 3 : poDS->FetchMapping(osIndex.c_str(), oSetLayers, apoLayers);
81 3 : if (apoLayers.size() == 1)
82 : {
83 3 : apoLayers[0]->SetFeatureDefnFinalized();
84 : const int nGeomFieldCount =
85 3 : apoLayers[0]->GetLayerDefn()->GetGeomFieldCount();
86 3 : if (nGeomFieldCount == 1)
87 : {
88 6 : std::vector<CPLString> aosPath;
89 3 : bool bIsGeoPoint = false;
90 3 : apoLayers[0]->GetGeomFieldProperties(0, aosPath, bIsGeoPoint);
91 9 : for (const auto &osPart : aosPath)
92 : {
93 6 : if (!osGeometryField.empty())
94 3 : osGeometryField += '.';
95 6 : osGeometryField += osPart;
96 : }
97 : }
98 0 : else if (nGeomFieldCount == 0)
99 : {
100 0 : CPLError(
101 : CE_Failure, CPLE_AppDefined,
102 : "No geometry field found upon which to build aggregation");
103 0 : return nullptr;
104 : }
105 : else
106 : {
107 0 : CPLError(CE_Failure, CPLE_AppDefined,
108 : "Multiple geometry fields exist in the index. "
109 : "Specify one with the 'geometry_field' member in "
110 : "AGGREGATION");
111 0 : return nullptr;
112 : }
113 : }
114 : else
115 : {
116 0 : CPLError(CE_Failure, CPLE_AppDefined,
117 : "Missing 'geometry_field' member in AGGREGATION");
118 0 : return nullptr;
119 : }
120 : }
121 :
122 : auto poLayer = std::unique_ptr<OGRElasticAggregationLayer>(
123 6 : new OGRElasticAggregationLayer(poDS));
124 3 : poLayer->m_osIndexName = osIndex;
125 3 : poLayer->m_osGeometryField = std::move(osGeometryField);
126 :
127 : // Parse geohash_grid options
128 9 : auto oGeohashGrid = oRoot["geohash_grid"];
129 4 : if (oGeohashGrid.IsValid() &&
130 1 : oGeohashGrid.GetType() == CPLJSONObject::Type::Object)
131 : {
132 1 : const int nPrecision = oGeohashGrid.GetInteger("precision");
133 1 : if (nPrecision > 0)
134 1 : poLayer->m_nGeohashGridPrecision = nPrecision;
135 :
136 1 : const int nMaxSize = oGeohashGrid.GetInteger("size");
137 1 : if (nMaxSize > 0)
138 1 : poLayer->m_nGeohashGridMaxSize = nMaxSize;
139 : }
140 :
141 : // Parse additional fields that correspond to statistical operations on
142 : // fields
143 3 : poLayer->m_oFieldDef = oRoot["fields"];
144 4 : if (poLayer->m_oFieldDef.IsValid() &&
145 1 : poLayer->m_oFieldDef.GetType() == CPLJSONObject::Type::Object)
146 : {
147 : // Start with stats, to keep track of the created columns, and
148 : // avoid duplicating them if a users ask for stats and min/max/etc.
149 : // on the same property.
150 : {
151 3 : auto oOp = poLayer->m_oFieldDef["stats"];
152 1 : if (oOp.IsValid() && oOp.GetType() == CPLJSONObject::Type::Array)
153 : {
154 2 : for (const auto &oField : oOp.ToArray())
155 : {
156 1 : if (oField.GetType() == CPLJSONObject::Type::String)
157 : {
158 5 : for (const char *pszOp :
159 6 : {"min", "max", "avg", "sum", "count"})
160 : {
161 : OGRFieldDefn oFieldDefn(
162 10 : CPLSPrintf("%s_%s", oField.ToString().c_str(),
163 : pszOp),
164 5 : strcmp(pszOp, "count") == 0 ? OFTInteger64
165 15 : : OFTReal);
166 5 : poLayer->m_poFeatureDefn->AddFieldDefn(&oFieldDefn);
167 : }
168 :
169 2 : CPLJSONObject oAgg;
170 1 : CPLJSONObject oFieldAgg;
171 1 : oFieldAgg.Add("field", oField.ToString());
172 1 : oAgg.Add("stats", oFieldAgg);
173 3 : poLayer->m_oAggregatedFieldsRequest.Add(
174 2 : CPLSPrintf("%s_stats", oField.ToString().c_str()),
175 : oAgg);
176 : }
177 : }
178 : }
179 : }
180 :
181 6 : for (const char *pszOp : {"min", "max", "avg", "sum", "count"})
182 : {
183 15 : auto oOp = poLayer->m_oFieldDef[pszOp];
184 5 : if (oOp.IsValid() && oOp.GetType() == CPLJSONObject::Type::Array)
185 : {
186 11 : for (const auto &oField : oOp.ToArray())
187 : {
188 6 : if (oField.GetType() == CPLJSONObject::Type::String)
189 : {
190 6 : const char *pszFieldName = CPLSPrintf(
191 12 : "%s_%s", oField.ToString().c_str(), pszOp);
192 6 : if (poLayer->m_poFeatureDefn->GetFieldIndex(
193 6 : pszFieldName) < 0)
194 : {
195 : OGRFieldDefn oFieldDefn(pszFieldName,
196 5 : strcmp(pszOp, "count") == 0
197 : ? OFTInteger64
198 10 : : OFTReal);
199 5 : poLayer->m_poFeatureDefn->AddFieldDefn(&oFieldDefn);
200 :
201 10 : CPLJSONObject oAgg;
202 5 : CPLJSONObject oFieldAgg;
203 5 : oFieldAgg.Add("field", oField.ToString());
204 5 : oAgg.Add(strcmp(pszOp, "count") == 0 ? "value_count"
205 : : pszOp,
206 : oFieldAgg);
207 5 : poLayer->m_oAggregatedFieldsRequest.Add(
208 : oFieldDefn.GetNameRef(), oAgg);
209 : }
210 : }
211 : }
212 : }
213 : }
214 : }
215 :
216 3 : return poLayer;
217 : }
218 :
219 : /************************************************************************/
220 : /* ResetReading() */
221 : /************************************************************************/
222 :
223 6 : void OGRElasticAggregationLayer::ResetReading()
224 : {
225 6 : m_iCurFeature = 0;
226 6 : }
227 :
228 : /************************************************************************/
229 : /* SetSpatialFilter() */
230 : /************************************************************************/
231 :
232 2 : void OGRElasticAggregationLayer::SetSpatialFilter(OGRGeometry *poGeom)
233 :
234 : {
235 2 : OGRLayer::SetSpatialFilter(poGeom);
236 2 : m_bFeaturesRequested = false;
237 2 : m_apoCachedFeatures.clear();
238 2 : }
239 :
240 : /************************************************************************/
241 : /* BuildRequest() */
242 : /************************************************************************/
243 :
244 : #define FILTERED_STR "filtered"
245 : #define GRID_STR "grid"
246 : #define CENTROID_STR "centroid"
247 :
248 : // Return a JSON serialized document that is the payload to POST for a /_search
249 : // request
250 5 : std::string OGRElasticAggregationLayer::BuildRequest()
251 : {
252 10 : CPLJSONDocument oDoc;
253 10 : auto oRoot = oDoc.GetRoot();
254 5 : oRoot.Set("size", 0);
255 :
256 10 : auto aggs = CPLJSONObject();
257 :
258 : // Clamp spatial filter if needed
259 5 : m_bRequestHasSpatialFilter = false;
260 5 : OGREnvelope sEnvelope;
261 5 : if (m_poFilterGeom)
262 : {
263 2 : m_poFilterGeom->getEnvelope(&sEnvelope);
264 :
265 2 : OGRElasticLayer::ClampEnvelope(sEnvelope);
266 2 : if (!(sEnvelope.MinX == -180 && sEnvelope.MinY == -90 &&
267 1 : sEnvelope.MaxX == 180 && sEnvelope.MaxY == 90))
268 : {
269 1 : m_bRequestHasSpatialFilter = true;
270 : }
271 : }
272 :
273 5 : if (m_bRequestHasSpatialFilter)
274 : {
275 : // Add spatial filtering
276 2 : auto top_aggs = CPLJSONObject();
277 1 : oRoot.Add("aggs", top_aggs);
278 :
279 2 : auto filtered = CPLJSONObject();
280 1 : top_aggs.Add(FILTERED_STR, filtered);
281 :
282 2 : auto filter = CPLJSONObject();
283 1 : filtered.Add("filter", filter);
284 1 : filtered.Add("aggs", aggs);
285 :
286 2 : auto geo_bounding_box = CPLJSONObject();
287 1 : filter.Add("geo_bounding_box", geo_bounding_box);
288 :
289 2 : auto coordinates = CPLJSONObject();
290 1 : geo_bounding_box.Add(m_osGeometryField, coordinates);
291 :
292 2 : auto top_left = CPLJSONObject();
293 1 : coordinates.Add("top_left", top_left);
294 1 : top_left.Add("lat", sEnvelope.MaxY);
295 1 : top_left.Add("lon", sEnvelope.MinX);
296 :
297 1 : auto bottom_right = CPLJSONObject();
298 1 : coordinates.Add("bottom_right", bottom_right);
299 1 : bottom_right.Add("lat", sEnvelope.MinY);
300 1 : bottom_right.Add("lon", sEnvelope.MaxX);
301 : }
302 : else
303 : {
304 4 : oRoot.Add("aggs", aggs);
305 : }
306 :
307 10 : auto grid = CPLJSONObject();
308 5 : aggs.Add(GRID_STR, grid);
309 :
310 : // Build geohash_grid aggregation object
311 10 : auto geohash_grid = CPLJSONObject();
312 5 : grid.Add("geohash_grid", geohash_grid);
313 5 : geohash_grid.Set("field", m_osGeometryField);
314 :
315 5 : if (m_nGeohashGridPrecision >= 1)
316 : {
317 1 : geohash_grid.Set("precision", m_nGeohashGridPrecision);
318 : }
319 4 : else if (!m_bRequestHasSpatialFilter || (sEnvelope.MinX < sEnvelope.MaxX &&
320 1 : sEnvelope.MinY < sEnvelope.MaxY))
321 : {
322 4 : const double dfSpatialRatio =
323 4 : m_bRequestHasSpatialFilter
324 4 : ? (sEnvelope.MaxX - sEnvelope.MinX) / 360. *
325 1 : (sEnvelope.MaxY - sEnvelope.MinY) / 180.
326 : : 1.0;
327 :
328 : // A geohash of size 1 can encode up to 32 positions, size 2 up to 32*32
329 : // etc.
330 4 : const int geohashSize = static_cast<int>(std::min(
331 12 : 12.0, std::max(1.0, log(m_nGeohashGridMaxSize / dfSpatialRatio) /
332 4 : log(32.0))));
333 :
334 4 : geohash_grid.Set("precision", geohashSize);
335 : }
336 5 : geohash_grid.Set("size", m_nGeohashGridMaxSize);
337 :
338 10 : auto subaggs = CPLJSONObject();
339 5 : grid.Add("aggs", subaggs);
340 :
341 10 : auto centroid = CPLJSONObject();
342 5 : subaggs.Add(CENTROID_STR, centroid);
343 :
344 10 : auto geo_centroid = CPLJSONObject();
345 5 : centroid.Add("geo_centroid", geo_centroid);
346 :
347 5 : geo_centroid.Set("field", m_osGeometryField);
348 :
349 : // Add extra fields
350 11 : for (const auto &oChild : m_oAggregatedFieldsRequest.GetChildren())
351 : {
352 6 : subaggs.Add(oChild.GetName(), oChild);
353 : }
354 :
355 10 : return oRoot.Format(CPLJSONObject::PrettyFormat::Plain);
356 : }
357 :
358 : /************************************************************************/
359 : /* IssueAggregationRequest() */
360 : /************************************************************************/
361 :
362 5 : void OGRElasticAggregationLayer::IssueAggregationRequest()
363 : {
364 26 : const auto IsNumericJsonType = [](json_type type)
365 26 : { return type == json_type_int || type == json_type_double; };
366 :
367 5 : m_apoCachedFeatures.clear();
368 :
369 15 : json_object *poResponse = m_poDS->RunRequest(
370 10 : (std::string(m_poDS->GetURL()) + "/" + m_osIndexName + "/_search")
371 : .c_str(),
372 10 : BuildRequest().c_str());
373 5 : if (!poResponse)
374 0 : return;
375 5 : json_object *poBuckets = json_ex_get_object_by_path(
376 5 : poResponse, m_bRequestHasSpatialFilter
377 : ? "aggregations." FILTERED_STR "." GRID_STR ".buckets"
378 : : "aggregations." GRID_STR ".buckets");
379 5 : if (poBuckets && json_object_get_type(poBuckets) == json_type_array)
380 : {
381 5 : const auto nBuckets = json_object_array_length(poBuckets);
382 13 : for (auto i = decltype(nBuckets){0}; i < nBuckets; i++)
383 : {
384 8 : json_object *poBucket = json_object_array_get_idx(poBuckets, i);
385 8 : if (poBucket && json_object_get_type(poBucket) == json_type_object)
386 : {
387 8 : OGRFeature *poFeature = new OGRFeature(m_poFeatureDefn);
388 8 : poFeature->SetFID(i);
389 :
390 : json_object *poKey =
391 8 : CPL_json_object_object_get(poBucket, "key");
392 8 : if (poKey && json_object_get_type(poKey) == json_type_string)
393 : {
394 8 : poFeature->SetField("key", json_object_get_string(poKey));
395 : }
396 :
397 : json_object *poDocCount =
398 8 : CPL_json_object_object_get(poBucket, "doc_count");
399 16 : if (poDocCount &&
400 8 : json_object_get_type(poDocCount) == json_type_int)
401 : {
402 16 : poFeature->SetField("doc_count",
403 : static_cast<GIntBig>(
404 8 : json_object_get_int64(poDocCount)));
405 : }
406 :
407 8 : json_object *poLocation = json_ex_get_object_by_path(
408 : poBucket, CENTROID_STR ".location");
409 16 : if (poLocation &&
410 8 : json_object_get_type(poLocation) == json_type_object)
411 : {
412 : json_object *poLat =
413 8 : CPL_json_object_object_get(poLocation, "lat");
414 : json_object *poLon =
415 8 : CPL_json_object_object_get(poLocation, "lon");
416 8 : if (poLat &&
417 16 : IsNumericJsonType(json_object_get_type(poLat)) &&
418 16 : poLon && IsNumericJsonType(json_object_get_type(poLon)))
419 : {
420 : OGRPoint *poPoint =
421 8 : new OGRPoint(json_object_get_double(poLon),
422 8 : json_object_get_double(poLat));
423 8 : poPoint->assignSpatialReference(
424 16 : m_poFeatureDefn->GetGeomFieldDefn(0)
425 8 : ->GetSpatialRef());
426 8 : poFeature->SetGeometryDirectly(poPoint);
427 : }
428 : }
429 :
430 9 : if (m_oFieldDef.IsValid() &&
431 1 : m_oFieldDef.GetType() == CPLJSONObject::Type::Object)
432 : {
433 5 : for (const char *pszOp :
434 6 : {"min", "max", "avg", "sum", "count"})
435 : {
436 15 : auto oOp = m_oFieldDef[pszOp];
437 10 : if (oOp.IsValid() &&
438 5 : oOp.GetType() == CPLJSONObject::Type::Array)
439 : {
440 11 : for (const auto &oField : oOp.ToArray())
441 : {
442 6 : if (oField.GetType() ==
443 : CPLJSONObject::Type::String)
444 : {
445 : json_object *poField =
446 6 : json_ex_get_object_by_path(
447 : poBucket,
448 : CPLSPrintf(
449 : "%s_%s.value",
450 12 : oField.ToString().c_str(),
451 : pszOp));
452 11 : if (poField &&
453 5 : IsNumericJsonType(
454 : json_object_get_type(poField)))
455 : {
456 5 : const char *pszFieldName = CPLSPrintf(
457 10 : "%s_%s", oField.ToString().c_str(),
458 : pszOp);
459 5 : if (strcmp(pszOp, "count") == 0)
460 : {
461 2 : poFeature->SetField(
462 : pszFieldName,
463 : static_cast<GIntBig>(
464 1 : json_object_get_int64(
465 : poField)));
466 : }
467 : else
468 : {
469 4 : poFeature->SetField(
470 : pszFieldName,
471 : json_object_get_double(
472 : poField));
473 : }
474 : }
475 : }
476 : }
477 : }
478 : }
479 :
480 3 : auto oOp = m_oFieldDef["stats"];
481 2 : if (oOp.IsValid() &&
482 1 : oOp.GetType() == CPLJSONObject::Type::Array)
483 : {
484 2 : for (const auto &oField : oOp.ToArray())
485 : {
486 1 : if (oField.GetType() == CPLJSONObject::Type::String)
487 : {
488 5 : for (const char *pszOp :
489 6 : {"min", "max", "avg", "sum", "count"})
490 : {
491 : json_object *poField =
492 5 : json_ex_get_object_by_path(
493 : poBucket,
494 : CPLSPrintf(
495 : "%s_stats.%s",
496 10 : oField.ToString().c_str(),
497 : pszOp));
498 10 : if (poField &&
499 5 : IsNumericJsonType(
500 : json_object_get_type(poField)))
501 : {
502 5 : const char *pszFieldName = CPLSPrintf(
503 10 : "%s_%s", oField.ToString().c_str(),
504 : pszOp);
505 5 : if (strcmp(pszOp, "count") == 0)
506 : {
507 2 : poFeature->SetField(
508 : pszFieldName,
509 : static_cast<GIntBig>(
510 1 : json_object_get_int64(
511 : poField)));
512 : }
513 : else
514 : {
515 4 : poFeature->SetField(
516 : pszFieldName,
517 : json_object_get_double(
518 : poField));
519 : }
520 : }
521 : }
522 : }
523 : }
524 : }
525 : }
526 :
527 8 : m_apoCachedFeatures.emplace_back(poFeature);
528 : }
529 : }
530 : }
531 5 : json_object_put(poResponse);
532 : }
533 :
534 : /************************************************************************/
535 : /* GetNextRawFeature() */
536 : /************************************************************************/
537 :
538 7 : OGRFeature *OGRElasticAggregationLayer::GetNextRawFeature()
539 : {
540 7 : if (!m_bFeaturesRequested)
541 : {
542 4 : m_bFeaturesRequested = true;
543 4 : IssueAggregationRequest();
544 : }
545 7 : if (m_iCurFeature < static_cast<int>(m_apoCachedFeatures.size()))
546 : {
547 6 : auto poFeature = m_apoCachedFeatures[m_iCurFeature]->Clone();
548 6 : ++m_iCurFeature;
549 6 : return poFeature;
550 : }
551 :
552 1 : return nullptr;
553 : }
554 :
555 : /************************************************************************/
556 : /* GetFeatureCount() */
557 : /************************************************************************/
558 :
559 3 : GIntBig OGRElasticAggregationLayer::GetFeatureCount(int bForce)
560 : {
561 3 : if (m_poFilterGeom == nullptr && m_poAttrQuery == nullptr)
562 : {
563 2 : if (!m_bFeaturesRequested)
564 : {
565 1 : m_bFeaturesRequested = true;
566 1 : IssueAggregationRequest();
567 : }
568 2 : return static_cast<int>(m_apoCachedFeatures.size());
569 : }
570 1 : return OGRLayer::GetFeatureCount(bForce);
571 : }
572 :
573 : /************************************************************************/
574 : /* TestCapability() */
575 : /************************************************************************/
576 :
577 1 : int OGRElasticAggregationLayer::TestCapability(const char *pszCap)
578 : {
579 1 : return EQUAL(pszCap, OLCStringsAsUTF8);
580 : }
581 :
582 : /************************************************************************/
583 : /* GetDataset() */
584 : /************************************************************************/
585 :
586 1 : GDALDataset *OGRElasticAggregationLayer::GetDataset()
587 : {
588 1 : return m_poDS;
589 : }
|