Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Elasticsearch Translator
4 : * Purpose:
5 : * Author:
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2011, Adam Estrada
9 : * Copyright (c) 2012, Even Rouault <even dot rouault at spatialys.com>
10 : *
11 : * SPDX-License-Identifier: MIT
12 : ****************************************************************************/
13 :
14 : #include "ogr_elastic.h"
15 : #include "cpl_conv.h"
16 : #include "cpl_string.h"
17 : #include "cpl_csv.h"
18 : #include "cpl_http.h"
19 : #include "ogrlibjsonutils.h"
20 : #include "ogr_swq.h"
21 :
22 : /************************************************************************/
23 : /* OGRElasticDataSource() */
24 : /************************************************************************/
25 :
26 40 : OGRElasticDataSource::OGRElasticDataSource()
27 : : m_pszName(nullptr), m_bOverwrite(false), m_nBulkUpload(0),
28 : m_pszWriteMap(nullptr), m_pszMapping(nullptr), m_nBatchSize(100),
29 : m_nFeatureCountToEstablishFeatureDefn(100), m_bJSonField(false),
30 40 : m_bFlattenNestedAttributes(true)
31 : {
32 40 : const char *pszWriteMapIn = CPLGetConfigOption("ES_WRITEMAP", nullptr);
33 40 : if (pszWriteMapIn != nullptr)
34 : {
35 0 : m_pszWriteMap = CPLStrdup(pszWriteMapIn);
36 : }
37 40 : }
38 :
39 : /************************************************************************/
40 : /* ~OGRElasticDataSource() */
41 : /************************************************************************/
42 :
43 80 : OGRElasticDataSource::~OGRElasticDataSource()
44 : {
45 40 : m_apoLayers.clear();
46 40 : CPLFree(m_pszName);
47 40 : CPLFree(m_pszMapping);
48 40 : CPLFree(m_pszWriteMap);
49 80 : }
50 :
51 : /************************************************************************/
52 : /* TestCapability() */
53 : /************************************************************************/
54 :
55 3 : int OGRElasticDataSource::TestCapability(const char *pszCap) const
56 : {
57 3 : if (EQUAL(pszCap, ODsCCreateLayer) || EQUAL(pszCap, ODsCDeleteLayer) ||
58 1 : EQUAL(pszCap, ODsCCreateGeomFieldAfterCreateLayer))
59 : {
60 3 : return GetAccess() == GA_Update;
61 : }
62 :
63 0 : return FALSE;
64 : }
65 :
66 : /************************************************************************/
67 : /* GetIndexList() */
68 : /************************************************************************/
69 :
70 : std::vector<std::string>
71 17 : OGRElasticDataSource::GetIndexList(const char *pszQueriedIndexName)
72 : {
73 17 : std::vector<std::string> aosList;
74 34 : std::string osURL(m_osURL);
75 17 : osURL += "/_cat/indices";
76 17 : if (pszQueriedIndexName)
77 : {
78 2 : osURL += '/';
79 2 : osURL += pszQueriedIndexName;
80 : }
81 17 : osURL += "?h=i";
82 17 : CPLHTTPResult *psResult = HTTPFetch(osURL.c_str(), nullptr);
83 17 : if (psResult == nullptr || psResult->pszErrBuf != nullptr ||
84 17 : psResult->pabyData == nullptr)
85 : {
86 2 : CPLHTTPDestroyResult(psResult);
87 2 : return aosList;
88 : }
89 :
90 15 : char *pszCur = (char *)psResult->pabyData;
91 15 : char *pszNextEOL = strchr(pszCur, '\n');
92 30 : while (pszNextEOL && pszNextEOL > pszCur)
93 : {
94 15 : *pszNextEOL = '\0';
95 :
96 15 : char *pszBeforeEOL = pszNextEOL - 1;
97 25 : while (*pszBeforeEOL == ' ')
98 : {
99 10 : *pszBeforeEOL = '\0';
100 10 : pszBeforeEOL--;
101 : }
102 :
103 15 : const char *pszIndexName = pszCur;
104 :
105 15 : pszCur = pszNextEOL + 1;
106 15 : pszNextEOL = strchr(pszCur, '\n');
107 :
108 15 : if (STARTS_WITH(pszIndexName, ".security") ||
109 15 : STARTS_WITH(pszIndexName, ".monitoring") ||
110 15 : STARTS_WITH(pszIndexName, ".geoip_databases"))
111 : {
112 0 : continue;
113 : }
114 :
115 15 : aosList.push_back(pszIndexName);
116 : }
117 15 : CPLHTTPDestroyResult(psResult);
118 :
119 15 : return aosList;
120 : }
121 :
122 : /************************************************************************/
123 : /* GetLayerCount() */
124 : /************************************************************************/
125 :
126 64 : int OGRElasticDataSource::GetLayerCount() const
127 : {
128 64 : if (m_bAllLayersListed)
129 : {
130 49 : if (m_poAggregationLayer)
131 3 : return 1;
132 46 : return static_cast<int>(m_apoLayers.size());
133 : }
134 15 : m_bAllLayersListed = true;
135 :
136 : const auto aosList =
137 15 : const_cast<OGRElasticDataSource *>(this)->GetIndexList(nullptr);
138 26 : for (const std::string &osIndexName : aosList)
139 : {
140 11 : const_cast<OGRElasticDataSource *>(this)->FetchMapping(
141 : osIndexName.c_str());
142 : }
143 :
144 15 : return static_cast<int>(m_apoLayers.size());
145 : }
146 :
147 : /************************************************************************/
148 : /* FetchMapping() */
149 : /************************************************************************/
150 :
151 23 : void OGRElasticDataSource::FetchMapping(
152 : const char *pszIndexName, std::set<CPLString> &oSetLayers,
153 : std::vector<std::unique_ptr<OGRElasticLayer>> &apoLayers)
154 : {
155 23 : if (oSetLayers.find(pszIndexName) != oSetLayers.end())
156 0 : return;
157 :
158 46 : CPLString osURL(m_osURL + CPLString("/") + pszIndexName +
159 69 : CPLString("/_mapping?pretty"));
160 23 : json_object *poRes = RunRequest(osURL, nullptr, std::vector<int>({403}));
161 23 : if (poRes)
162 : {
163 : json_object *poLayerObj =
164 19 : CPL_json_object_object_get(poRes, pszIndexName);
165 19 : json_object *poMappings = nullptr;
166 19 : if (poLayerObj && json_object_get_type(poLayerObj) == json_type_object)
167 19 : poMappings = CPL_json_object_object_get(poLayerObj, "mappings");
168 19 : if (poMappings && json_object_get_type(poMappings) == json_type_object)
169 : {
170 38 : std::vector<CPLString> aosMappings;
171 19 : if (m_nMajorVersion < 7)
172 : {
173 : json_object_iter it;
174 18 : it.key = nullptr;
175 18 : it.val = nullptr;
176 18 : it.entry = nullptr;
177 36 : json_object_object_foreachC(poMappings, it)
178 : {
179 18 : aosMappings.push_back(it.key);
180 : }
181 :
182 46 : if (aosMappings.size() == 1 &&
183 28 : (aosMappings[0] == "FeatureCollection" ||
184 10 : aosMappings[0] == "default"))
185 : {
186 13 : oSetLayers.insert(pszIndexName);
187 : OGRElasticLayer *poLayer = new OGRElasticLayer(
188 13 : pszIndexName, pszIndexName, aosMappings[0], this,
189 13 : papszOpenOptions);
190 13 : poLayer->InitFeatureDefnFromMapping(
191 13 : CPL_json_object_object_get(poMappings, aosMappings[0]),
192 26 : "", std::vector<CPLString>());
193 13 : apoLayers.push_back(
194 26 : std::unique_ptr<OGRElasticLayer>(poLayer));
195 : }
196 : else
197 : {
198 10 : for (size_t i = 0; i < aosMappings.size(); i++)
199 : {
200 10 : CPLString osLayerName(pszIndexName + CPLString("_") +
201 15 : aosMappings[i]);
202 5 : if (oSetLayers.find(osLayerName) == oSetLayers.end())
203 : {
204 5 : oSetLayers.insert(osLayerName);
205 : OGRElasticLayer *poLayer = new OGRElasticLayer(
206 5 : osLayerName, pszIndexName, aosMappings[i], this,
207 10 : papszOpenOptions);
208 5 : poLayer->InitFeatureDefnFromMapping(
209 : CPL_json_object_object_get(poMappings,
210 5 : aosMappings[i]),
211 10 : "", std::vector<CPLString>());
212 :
213 5 : apoLayers.push_back(
214 10 : std::unique_ptr<OGRElasticLayer>(poLayer));
215 : }
216 : }
217 : }
218 : }
219 : else
220 : {
221 1 : oSetLayers.insert(pszIndexName);
222 : OGRElasticLayer *poLayer = new OGRElasticLayer(
223 1 : pszIndexName, pszIndexName, "", this, papszOpenOptions);
224 1 : poLayer->InitFeatureDefnFromMapping(poMappings, "",
225 2 : std::vector<CPLString>());
226 1 : apoLayers.push_back(std::unique_ptr<OGRElasticLayer>(poLayer));
227 : }
228 : }
229 :
230 19 : json_object_put(poRes);
231 : }
232 : }
233 :
234 20 : void OGRElasticDataSource::FetchMapping(const char *pszIndexName)
235 : {
236 20 : FetchMapping(pszIndexName, m_oSetLayers, m_apoLayers);
237 20 : }
238 :
239 : /************************************************************************/
240 : /* GetLayerByName() */
241 : /************************************************************************/
242 :
243 27 : OGRLayer *OGRElasticDataSource::GetLayerByName(const char *pszName)
244 : {
245 27 : const bool bIsMultipleTargetName =
246 27 : strchr(pszName, '*') != nullptr || strchr(pszName, ',') != nullptr;
247 27 : if (!m_bAllLayersListed)
248 : {
249 12 : for (auto &poLayer : m_apoLayers)
250 : {
251 3 : if (EQUAL(poLayer->GetName(), pszName))
252 : {
253 2 : return poLayer.get();
254 : }
255 : }
256 9 : if (!bIsMultipleTargetName)
257 : {
258 7 : size_t nSizeBefore = m_apoLayers.size();
259 7 : FetchMapping(pszName);
260 7 : const char *pszLastUnderscore = strrchr(pszName, '_');
261 7 : if (pszLastUnderscore && m_apoLayers.size() == nSizeBefore)
262 : {
263 4 : CPLString osIndexName(pszName);
264 2 : osIndexName.resize(pszLastUnderscore - pszName);
265 2 : FetchMapping(osIndexName);
266 : }
267 8 : for (auto &poLayer : m_apoLayers)
268 : {
269 6 : if (EQUAL(poLayer->GetIndexName(), pszName))
270 : {
271 5 : return poLayer.get();
272 : }
273 : }
274 : }
275 : }
276 : else
277 : {
278 16 : auto poLayer = GDALDataset::GetLayerByName(pszName);
279 16 : if (poLayer)
280 16 : return poLayer;
281 : }
282 :
283 4 : if (!bIsMultipleTargetName)
284 : {
285 2 : return nullptr;
286 : }
287 :
288 : // Deal with wildcard layer names
289 4 : std::string osSanitizedName(pszName);
290 2 : const auto nPos = osSanitizedName.find(",-");
291 2 : if (nPos != std::string::npos)
292 2 : osSanitizedName.resize(nPos);
293 4 : const auto aosList = GetIndexList(osSanitizedName.c_str());
294 4 : if (aosList.empty() || aosList[0].find('*') != std::string::npos ||
295 2 : aosList[0].find(',') != std::string::npos)
296 : {
297 0 : return nullptr;
298 : }
299 :
300 : // For the sake of simplicity, take the schema of one the layers/indices
301 : // that match the wildcard.
302 : // We could potentially issue a /wildcard*/_mapping request and build a
303 : // schema that merges all mappings, but that would be more involved.
304 : auto poReferenceLayer =
305 2 : dynamic_cast<OGRElasticLayer *>(GetLayerByName(aosList[0].c_str()));
306 2 : if (poReferenceLayer == nullptr)
307 0 : return nullptr;
308 :
309 2 : m_apoLayers.push_back(
310 4 : std::make_unique<OGRElasticLayer>(pszName, poReferenceLayer));
311 2 : return m_apoLayers.back().get();
312 : }
313 :
314 : /************************************************************************/
315 : /* GetLayer() */
316 : /************************************************************************/
317 :
318 30 : const OGRLayer *OGRElasticDataSource::GetLayer(int iLayer) const
319 : {
320 30 : const int nLayers = GetLayerCount();
321 30 : if (iLayer < 0 || iLayer >= nLayers)
322 0 : return nullptr;
323 : else
324 : {
325 30 : if (m_poAggregationLayer)
326 3 : return m_poAggregationLayer.get();
327 27 : return m_apoLayers[iLayer].get();
328 : }
329 : }
330 :
331 : /************************************************************************/
332 : /* DeleteLayer() */
333 : /************************************************************************/
334 :
335 4 : OGRErr OGRElasticDataSource::DeleteLayer(int iLayer)
336 :
337 : {
338 4 : if (eAccess != GA_Update)
339 : {
340 1 : CPLError(CE_Failure, CPLE_AppDefined,
341 : "Dataset opened in read-only mode");
342 1 : return OGRERR_FAILURE;
343 : }
344 :
345 3 : GetLayerCount();
346 3 : if (iLayer < 0 || iLayer >= static_cast<int>(m_apoLayers.size()))
347 2 : return OGRERR_FAILURE;
348 :
349 : /* -------------------------------------------------------------------- */
350 : /* Blow away our OGR structures related to the layer. This is */
351 : /* pretty dangerous if anything has a reference to this layer! */
352 : /* -------------------------------------------------------------------- */
353 2 : CPLString osLayerName = m_apoLayers[iLayer]->GetName();
354 2 : CPLString osIndex = m_apoLayers[iLayer]->GetIndexName();
355 2 : CPLString osMapping = m_apoLayers[iLayer]->GetMappingName();
356 :
357 1 : bool bSeveralMappings = false;
358 : json_object *poIndexResponse =
359 1 : RunRequest(CPLSPrintf("%s/%s", GetURL(), osIndex.c_str()), nullptr);
360 1 : if (poIndexResponse)
361 : {
362 : json_object *poIndex =
363 1 : CPL_json_object_object_get(poIndexResponse, osMapping);
364 1 : if (poIndex != nullptr)
365 : {
366 : json_object *poMappings =
367 0 : CPL_json_object_object_get(poIndex, "mappings");
368 0 : if (poMappings != nullptr)
369 : {
370 0 : bSeveralMappings = json_object_object_length(poMappings) > 1;
371 : }
372 : }
373 1 : json_object_put(poIndexResponse);
374 : }
375 : // Deletion of one mapping in an index was supported in ES 1.X, but
376 : // considered unsafe and removed in later versions
377 1 : if (bSeveralMappings)
378 : {
379 0 : CPLError(CE_Failure, CPLE_AppDefined,
380 : "%s/%s already exists, but other mappings also exist in "
381 : "this index. "
382 : "You have to delete the whole index.",
383 : osIndex.c_str(), osMapping.c_str());
384 0 : return OGRERR_FAILURE;
385 : }
386 :
387 1 : CPLDebug("ES", "DeleteLayer(%s)", osLayerName.c_str());
388 :
389 1 : m_oSetLayers.erase(osLayerName);
390 1 : m_apoLayers.erase(m_apoLayers.begin() + iLayer);
391 :
392 1 : Delete(CPLSPrintf("%s/%s", GetURL(), osIndex.c_str()));
393 :
394 1 : return OGRERR_NONE;
395 : }
396 :
397 : /************************************************************************/
398 : /* ICreateLayer() */
399 : /************************************************************************/
400 :
401 : OGRLayer *
402 25 : OGRElasticDataSource::ICreateLayer(const char *pszLayerName,
403 : const OGRGeomFieldDefn *poGeomFieldDefn,
404 : CSLConstList papszOptions)
405 : {
406 25 : if (eAccess != GA_Update)
407 : {
408 1 : CPLError(CE_Failure, CPLE_AppDefined,
409 : "Dataset opened in read-only mode");
410 1 : return nullptr;
411 : }
412 :
413 24 : const auto eGType = poGeomFieldDefn ? poGeomFieldDefn->GetType() : wkbNone;
414 : const auto poSRS =
415 24 : poGeomFieldDefn ? poGeomFieldDefn->GetSpatialRef() : nullptr;
416 :
417 48 : CPLString osLaunderedName(pszLayerName);
418 :
419 24 : const char *pszIndexName = CSLFetchNameValue(papszOptions, "INDEX_NAME");
420 24 : if (pszIndexName != nullptr)
421 0 : osLaunderedName = pszIndexName;
422 :
423 173 : for (size_t i = 0; i < osLaunderedName.size(); i++)
424 : {
425 149 : if (osLaunderedName[i] >= 'A' && osLaunderedName[i] <= 'Z')
426 4 : osLaunderedName[i] += 'a' - 'A';
427 145 : else if (osLaunderedName[i] == '/' || osLaunderedName[i] == '?')
428 1 : osLaunderedName[i] = '_';
429 : }
430 24 : if (strcmp(osLaunderedName.c_str(), pszLayerName) != 0)
431 1 : CPLDebug("ES", "Laundered layer name to %s", osLaunderedName.c_str());
432 :
433 : // Backup error state
434 24 : CPLErr eLastErrorType = CPLGetLastErrorType();
435 24 : CPLErrorNum nLastErrorNo = CPLGetLastErrorNo();
436 48 : CPLString osLastErrorMsg = CPLGetLastErrorMsg();
437 :
438 : const char *pszMappingName =
439 24 : m_nMajorVersion < 7 ? CSLFetchNameValueDef(papszOptions, "MAPPING_NAME",
440 : "FeatureCollection")
441 24 : : nullptr;
442 :
443 : // Check if the index and mapping exists
444 24 : bool bIndexExists = false;
445 24 : bool bMappingExists = false;
446 24 : bool bSeveralMappings = false;
447 24 : CPLPushErrorHandler(CPLQuietErrorHandler);
448 24 : json_object *poIndexResponse = RunRequest(
449 : CPLSPrintf("%s/%s", GetURL(), osLaunderedName.c_str()), nullptr);
450 24 : CPLPopErrorHandler();
451 :
452 : // Restore error state
453 24 : CPLErrorSetState(eLastErrorType, nLastErrorNo, osLastErrorMsg);
454 :
455 24 : if (poIndexResponse)
456 : {
457 5 : bIndexExists = true;
458 : json_object *poIndex =
459 5 : CPL_json_object_object_get(poIndexResponse, osLaunderedName);
460 5 : if (m_nMajorVersion < 7)
461 : {
462 5 : if (poIndex != nullptr)
463 : {
464 : json_object *poMappings =
465 3 : CPL_json_object_object_get(poIndex, "mappings");
466 3 : if (poMappings != nullptr)
467 : {
468 3 : bMappingExists = CPL_json_object_object_get(
469 : poMappings, pszMappingName) != nullptr;
470 3 : bSeveralMappings =
471 3 : json_object_object_length(poMappings) > 1;
472 : }
473 : }
474 : }
475 : else
476 : {
477 : // Indexes in Elasticsearch 7+ can not have multiple types,
478 : // so essentially this will always be true.
479 0 : bMappingExists = true;
480 : }
481 5 : json_object_put(poIndexResponse);
482 : }
483 :
484 24 : if (bMappingExists)
485 : {
486 3 : if (CPLFetchBool(papszOptions, "OVERWRITE_INDEX", false))
487 : {
488 0 : Delete(CPLSPrintf("%s/%s", GetURL(), osLaunderedName.c_str()));
489 0 : bIndexExists = false;
490 : }
491 3 : else if (m_bOverwrite || CPLFetchBool(papszOptions, "OVERWRITE", false))
492 : {
493 : // Deletion of one mapping in an index was supported in ES 1.X, but
494 : // considered unsafe and removed in later versions
495 2 : if (m_nMajorVersion >= 7)
496 : {
497 0 : CPLError(CE_Failure, CPLE_AppDefined,
498 : "The index %s already exists. "
499 : "You have to delete the whole index. You can do that "
500 : "with OVERWRITE_INDEX=YES",
501 : osLaunderedName.c_str());
502 0 : return nullptr;
503 : }
504 2 : else if (bSeveralMappings)
505 : {
506 0 : CPLError(CE_Failure, CPLE_AppDefined,
507 : "%s/%s already exists, but other mappings also exist "
508 : "in this index. "
509 : "You have to delete the whole index. You can do that "
510 : "with OVERWRITE_INDEX=YES",
511 : osLaunderedName.c_str(), pszMappingName);
512 0 : return nullptr;
513 : }
514 2 : Delete(CPLSPrintf("%s/%s", GetURL(), osLaunderedName.c_str()));
515 2 : bIndexExists = false;
516 : }
517 : else
518 : {
519 1 : if (m_nMajorVersion < 7)
520 : {
521 1 : CPLError(CE_Failure, CPLE_AppDefined, "%s/%s already exists",
522 : osLaunderedName.c_str(), pszMappingName);
523 : }
524 : else
525 : {
526 0 : CPLError(CE_Failure, CPLE_AppDefined, "%s already exists",
527 : osLaunderedName.c_str());
528 : }
529 1 : return nullptr;
530 : }
531 : }
532 :
533 : // Create the index
534 23 : if (!bIndexExists)
535 : {
536 : CPLString osIndexURL(
537 21 : CPLSPrintf("%s/%s", GetURL(), osLaunderedName.c_str()));
538 :
539 : // If we have a user specified index definition, use it
540 : const char *pszDef =
541 21 : CSLFetchNameValue(papszOptions, "INDEX_DEFINITION");
542 21 : CPLString osDef;
543 21 : if (pszDef != nullptr)
544 : {
545 2 : osDef = pszDef;
546 2 : if (strchr(pszDef, '{') == nullptr)
547 : {
548 1 : VSILFILE *fp = VSIFOpenL(pszDef, "rb");
549 1 : if (fp)
550 : {
551 1 : GByte *pabyRet = nullptr;
552 1 : CPL_IGNORE_RET_VAL(
553 1 : VSIIngestFile(fp, pszDef, &pabyRet, nullptr, -1));
554 1 : if (pabyRet)
555 : {
556 1 : osDef = reinterpret_cast<char *>(pabyRet);
557 1 : VSIFree(pabyRet);
558 : }
559 1 : VSIFCloseL(fp);
560 : }
561 : }
562 : }
563 21 : if (!UploadFile(osIndexURL, osDef.c_str(), "PUT"))
564 1 : return nullptr;
565 : }
566 :
567 : // If we have a user specified mapping, then go ahead and update it now
568 : const char *pszLayerMapping =
569 22 : CSLFetchNameValueDef(papszOptions, "MAPPING", m_pszMapping);
570 22 : if (pszLayerMapping != nullptr)
571 : {
572 7 : CPLString osLayerMapping(pszLayerMapping);
573 7 : if (strchr(pszLayerMapping, '{') == nullptr)
574 : {
575 1 : VSILFILE *fp = VSIFOpenL(pszLayerMapping, "rb");
576 1 : if (fp)
577 : {
578 1 : GByte *pabyRet = nullptr;
579 1 : CPL_IGNORE_RET_VAL(
580 1 : VSIIngestFile(fp, pszLayerMapping, &pabyRet, nullptr, -1));
581 1 : if (pabyRet)
582 : {
583 1 : osLayerMapping = reinterpret_cast<char *>(pabyRet);
584 1 : VSIFree(pabyRet);
585 : }
586 1 : VSIFCloseL(fp);
587 : }
588 : }
589 :
590 : CPLString osMappingURL =
591 7 : CPLSPrintf("%s/%s/_mapping", GetURL(), osLaunderedName.c_str());
592 7 : if (m_nMajorVersion < 7)
593 7 : osMappingURL += CPLSPrintf("/%s", pszMappingName);
594 7 : if (!UploadFile(osMappingURL, osLayerMapping.c_str()))
595 : {
596 1 : return nullptr;
597 : }
598 : }
599 :
600 : OGRElasticLayer *poLayer =
601 21 : new OGRElasticLayer(osLaunderedName.c_str(), osLaunderedName.c_str(),
602 21 : pszMappingName, this, papszOptions);
603 21 : poLayer->FinalizeFeatureDefn(false);
604 :
605 21 : if (eGType != wkbNone)
606 : {
607 : const char *pszGeometryName =
608 18 : CSLFetchNameValueDef(papszOptions, "GEOMETRY_NAME", "geometry");
609 36 : OGRGeomFieldDefn oFieldDefn(pszGeometryName, eGType);
610 18 : if (poSRS)
611 : {
612 17 : OGRSpatialReference *poSRSClone = poSRS->Clone();
613 17 : poSRSClone->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
614 17 : oFieldDefn.SetSpatialRef(poSRSClone);
615 17 : poSRSClone->Release();
616 : }
617 18 : poLayer->CreateGeomField(&oFieldDefn, FALSE);
618 : }
619 21 : if (pszLayerMapping)
620 6 : poLayer->SetManualMapping();
621 :
622 21 : poLayer->SetIgnoreSourceID(
623 21 : CPLFetchBool(papszOptions, "IGNORE_SOURCE_ID", false));
624 21 : poLayer->SetDotAsNestedField(
625 21 : CPLFetchBool(papszOptions, "DOT_AS_NESTED_FIELD", true));
626 21 : poLayer->SetFID(CSLFetchNameValueDef(papszOptions, "FID", "ogc_fid"));
627 21 : poLayer->SetNextFID(0);
628 :
629 21 : m_oSetLayers.insert(poLayer->GetName());
630 21 : m_apoLayers.push_back(std::unique_ptr<OGRElasticLayer>(poLayer));
631 :
632 21 : return poLayer;
633 : }
634 :
635 : /************************************************************************/
636 : /* HTTPFetch() */
637 : /************************************************************************/
638 :
639 317 : CPLHTTPResult *OGRElasticDataSource::HTTPFetch(const char *pszURL,
640 : CSLConstList papszOptions)
641 : {
642 634 : CPLStringList aosOptions(papszOptions);
643 317 : if (!m_osUserPwd.empty())
644 1 : aosOptions.SetNameValue("USERPWD", m_osUserPwd.c_str());
645 317 : if (!m_oMapHeadersFromEnv.empty())
646 : {
647 5 : const char *pszExistingHeaders = aosOptions.FetchNameValue("HEADERS");
648 10 : std::string osHeaders;
649 5 : if (pszExistingHeaders)
650 : {
651 2 : osHeaders += pszExistingHeaders;
652 2 : osHeaders += '\n';
653 : }
654 20 : for (const auto &kv : m_oMapHeadersFromEnv)
655 : {
656 : const char *pszValueFromEnv =
657 15 : CPLGetConfigOption(kv.second.c_str(), nullptr);
658 15 : if (pszValueFromEnv)
659 : {
660 10 : osHeaders += kv.first;
661 10 : osHeaders += ": ";
662 10 : osHeaders += pszValueFromEnv;
663 10 : osHeaders += '\n';
664 : }
665 : }
666 5 : aosOptions.SetNameValue("HEADERS", osHeaders.c_str());
667 : }
668 :
669 634 : return CPLHTTPFetch(pszURL, aosOptions);
670 : }
671 :
672 : /************************************************************************/
673 : /* RunRequest() */
674 : /************************************************************************/
675 :
676 : json_object *
677 208 : OGRElasticDataSource::RunRequest(const char *pszURL, const char *pszPostContent,
678 : const std::vector<int> &anSilentedHTTPErrors)
679 : {
680 208 : char **papszOptions = nullptr;
681 :
682 208 : if (pszPostContent && pszPostContent[0])
683 : {
684 : papszOptions =
685 63 : CSLSetNameValue(papszOptions, "POSTFIELDS", pszPostContent);
686 : papszOptions =
687 63 : CSLAddNameValue(papszOptions, "HEADERS",
688 : "Content-Type: application/json; charset=UTF-8");
689 : }
690 :
691 208 : CPLPushErrorHandler(CPLQuietErrorHandler);
692 208 : CPLHTTPResult *psResult = HTTPFetch(pszURL, papszOptions);
693 208 : CPLPopErrorHandler();
694 208 : CSLDestroy(papszOptions);
695 :
696 208 : if (psResult->pszErrBuf != nullptr)
697 : {
698 35 : CPLString osErrorMsg(psResult->pabyData
699 : ? (const char *)psResult->pabyData
700 35 : : psResult->pszErrBuf);
701 35 : bool bSilence = false;
702 39 : for (auto nCode : anSilentedHTTPErrors)
703 : {
704 4 : if (strstr(psResult->pszErrBuf, CPLSPrintf("%d", nCode)))
705 : {
706 0 : bSilence = true;
707 0 : break;
708 : }
709 : }
710 35 : if (bSilence)
711 : {
712 0 : CPLDebug("ES", "%s", osErrorMsg.c_str());
713 : }
714 : else
715 : {
716 35 : CPLError(CE_Failure, CPLE_AppDefined, "%s", osErrorMsg.c_str());
717 : }
718 35 : CPLHTTPDestroyResult(psResult);
719 35 : return nullptr;
720 : }
721 :
722 173 : if (psResult->pabyData == nullptr)
723 : {
724 0 : CPLError(CE_Failure, CPLE_AppDefined,
725 : "Empty content returned by server");
726 0 : CPLHTTPDestroyResult(psResult);
727 0 : return nullptr;
728 : }
729 :
730 173 : if (STARTS_WITH((const char *)psResult->pabyData, "{\"error\":"))
731 : {
732 0 : CPLError(CE_Failure, CPLE_AppDefined, "%s",
733 0 : (const char *)psResult->pabyData);
734 0 : CPLHTTPDestroyResult(psResult);
735 0 : return nullptr;
736 : }
737 :
738 173 : json_object *poObj = nullptr;
739 173 : const char *pszText = reinterpret_cast<const char *>(psResult->pabyData);
740 173 : if (!OGRJSonParse(pszText, &poObj, true))
741 : {
742 0 : CPLHTTPDestroyResult(psResult);
743 0 : return nullptr;
744 : }
745 :
746 173 : CPLHTTPDestroyResult(psResult);
747 :
748 173 : if (json_object_get_type(poObj) != json_type_object)
749 : {
750 0 : CPLError(CE_Failure, CPLE_AppDefined,
751 : "Return is not a JSON dictionary");
752 0 : json_object_put(poObj);
753 0 : poObj = nullptr;
754 : }
755 :
756 173 : return poObj;
757 : }
758 :
759 : /************************************************************************/
760 : /* CheckVersion() */
761 : /************************************************************************/
762 :
763 40 : bool OGRElasticDataSource::CheckVersion()
764 : {
765 40 : json_object *poMainInfo = RunRequest(m_osURL);
766 40 : if (poMainInfo == nullptr)
767 3 : return false;
768 37 : bool bVersionFound = false;
769 37 : json_object *poVersion = CPL_json_object_object_get(poMainInfo, "version");
770 37 : if (poVersion != nullptr)
771 : {
772 33 : json_object *poNumber = CPL_json_object_object_get(poVersion, "number");
773 66 : if (poNumber != nullptr &&
774 33 : json_object_get_type(poNumber) == json_type_string)
775 : {
776 33 : bVersionFound = true;
777 33 : const char *pszVersion = json_object_get_string(poNumber);
778 33 : CPLDebug("ES", "Server version: %s", pszVersion);
779 33 : m_nMajorVersion = atoi(pszVersion);
780 33 : const char *pszDot = strchr(pszVersion, '.');
781 33 : if (pszDot)
782 33 : m_nMinorVersion = atoi(pszDot + 1);
783 : }
784 : }
785 37 : json_object_put(poMainInfo);
786 37 : if (!bVersionFound)
787 : {
788 4 : CPLError(CE_Failure, CPLE_AppDefined, "Server version not found");
789 4 : return false;
790 : }
791 33 : if (m_nMajorVersion < 1 || m_nMajorVersion > 7)
792 : {
793 0 : CPLDebug("ES", "Server version untested with current driver");
794 : }
795 33 : return true;
796 : }
797 :
798 : /************************************************************************/
799 : /* OpenAggregation() */
800 : /************************************************************************/
801 :
802 3 : bool OGRElasticDataSource::OpenAggregation(const char *pszAggregation)
803 : {
804 3 : m_bAllLayersListed = true;
805 : m_poAggregationLayer =
806 3 : OGRElasticAggregationLayer::Build(this, pszAggregation);
807 3 : return m_poAggregationLayer != nullptr;
808 : }
809 :
810 : /************************************************************************/
811 : /* Open() */
812 : /************************************************************************/
813 :
814 30 : bool OGRElasticDataSource::Open(GDALOpenInfo *poOpenInfo)
815 : {
816 30 : eAccess = poOpenInfo->eAccess;
817 30 : m_pszName = CPLStrdup(poOpenInfo->pszFilename);
818 30 : m_osURL = (STARTS_WITH_CI(m_pszName, "ES:")) ? m_pszName + 3 : m_pszName;
819 30 : if (m_osURL.empty())
820 : {
821 0 : const char *pszHost = CSLFetchNameValueDef(poOpenInfo->papszOpenOptions,
822 : "HOST", "localhost");
823 0 : m_osURL = pszHost;
824 0 : m_osURL += ":";
825 : const char *pszPort =
826 0 : CSLFetchNameValueDef(poOpenInfo->papszOpenOptions, "PORT", "9200");
827 0 : m_osURL += pszPort;
828 : }
829 : m_osUserPwd =
830 30 : CSLFetchNameValueDef(poOpenInfo->papszOpenOptions, "USERPWD", "");
831 30 : m_nBatchSize = atoi(CSLFetchNameValueDef(poOpenInfo->papszOpenOptions,
832 : "BATCH_SIZE", "100"));
833 60 : m_nFeatureCountToEstablishFeatureDefn = atoi(
834 30 : CSLFetchNameValueDef(poOpenInfo->papszOpenOptions,
835 : "FEATURE_COUNT_TO_ESTABLISH_FEATURE_DEFN", "100"));
836 30 : m_bJSonField =
837 30 : CPLFetchBool(poOpenInfo->papszOpenOptions, "JSON_FIELD", false);
838 60 : m_bFlattenNestedAttributes = CPLFetchBool(
839 30 : poOpenInfo->papszOpenOptions, "FLATTEN_NESTED_ATTRIBUTES", true);
840 : m_osFID =
841 30 : CSLFetchNameValueDef(poOpenInfo->papszOpenOptions, "FID", "ogc_fid");
842 :
843 : // Only used for wildcard layers
844 30 : m_bAddSourceIndexName = CPLFetchBool(poOpenInfo->papszOpenOptions,
845 : "ADD_SOURCE_INDEX_NAME", false);
846 :
847 : const char *pszHeadersFromEnv =
848 30 : CPLGetConfigOption("ES_FORWARD_HTTP_HEADERS_FROM_ENV",
849 30 : CSLFetchNameValue(poOpenInfo->papszOpenOptions,
850 : "FORWARD_HTTP_HEADERS_FROM_ENV"));
851 30 : if (pszHeadersFromEnv)
852 : {
853 2 : CPLStringList aosTokens(CSLTokenizeString2(pszHeadersFromEnv, ",", 0));
854 4 : for (int i = 0; i < aosTokens.size(); ++i)
855 : {
856 3 : char *pszKey = nullptr;
857 3 : const char *pszValue = CPLParseNameValue(aosTokens[i], &pszKey);
858 3 : if (pszKey && pszValue)
859 : {
860 3 : m_oMapHeadersFromEnv[pszKey] = pszValue;
861 : }
862 3 : CPLFree(pszKey);
863 : }
864 : }
865 :
866 30 : if (!CheckVersion())
867 5 : return false;
868 :
869 : const char *pszLayerName =
870 25 : CSLFetchNameValue(poOpenInfo->papszOpenOptions, "LAYER");
871 : const char *pszAggregation =
872 25 : CSLFetchNameValue(poOpenInfo->papszOpenOptions, "AGGREGATION");
873 25 : if (pszLayerName && pszAggregation)
874 : {
875 0 : CPLError(CE_Failure, CPLE_NotSupported,
876 : "LAYER and AGGREGATION open options are mutually exclusive");
877 0 : return false;
878 : }
879 :
880 25 : if (pszLayerName)
881 : {
882 2 : bool bFound = GetLayerByName(pszLayerName) != nullptr;
883 2 : m_bAllLayersListed = true;
884 2 : return bFound;
885 : }
886 :
887 23 : if (pszAggregation)
888 3 : return OpenAggregation(pszAggregation);
889 :
890 20 : return true;
891 : }
892 :
893 : /************************************************************************/
894 : /* Delete() */
895 : /************************************************************************/
896 :
897 3 : void OGRElasticDataSource::Delete(const CPLString &url)
898 : {
899 3 : char **papszOptions = nullptr;
900 3 : papszOptions = CSLAddNameValue(papszOptions, "CUSTOMREQUEST", "DELETE");
901 3 : CPLHTTPResult *psResult = HTTPFetch(url, papszOptions);
902 3 : CSLDestroy(papszOptions);
903 3 : if (psResult)
904 : {
905 3 : CPLHTTPDestroyResult(psResult);
906 : }
907 3 : }
908 :
909 : /************************************************************************/
910 : /* UploadFile() */
911 : /************************************************************************/
912 :
913 48 : bool OGRElasticDataSource::UploadFile(const CPLString &url,
914 : const CPLString &data,
915 : const CPLString &osVerb)
916 : {
917 48 : bool bRet = true;
918 48 : char **papszOptions = nullptr;
919 48 : if (!osVerb.empty())
920 : {
921 : papszOptions =
922 23 : CSLAddNameValue(papszOptions, "CUSTOMREQUEST", osVerb.c_str());
923 : }
924 48 : if (data.empty())
925 : {
926 19 : if (osVerb.empty())
927 : {
928 : papszOptions =
929 0 : CSLAddNameValue(papszOptions, "CUSTOMREQUEST", "PUT");
930 : }
931 : }
932 : else
933 : {
934 : papszOptions =
935 29 : CSLAddNameValue(papszOptions, "POSTFIELDS", data.c_str());
936 : papszOptions =
937 29 : CSLAddNameValue(papszOptions, "HEADERS",
938 : "Content-Type: application/json; charset=UTF-8");
939 : }
940 :
941 48 : CPLHTTPResult *psResult = HTTPFetch(url, papszOptions);
942 48 : CSLDestroy(papszOptions);
943 48 : if (psResult)
944 : {
945 48 : if (psResult->pszErrBuf != nullptr ||
946 44 : (psResult->pabyData &&
947 42 : STARTS_WITH((const char *)psResult->pabyData, "{\"error\":")) ||
948 44 : (psResult->pabyData && strstr((const char *)psResult->pabyData,
949 : "\"errors\":true,") != nullptr))
950 : {
951 4 : bRet = false;
952 4 : CPLError(CE_Failure, CPLE_AppDefined, "%s",
953 4 : psResult->pabyData ? (const char *)psResult->pabyData
954 : : psResult->pszErrBuf);
955 : }
956 48 : CPLHTTPDestroyResult(psResult);
957 : }
958 48 : return bRet;
959 : }
960 :
961 : /************************************************************************/
962 : /* Create() */
963 : /************************************************************************/
964 :
965 10 : int OGRElasticDataSource::Create(const char *pszFilename,
966 : CPL_UNUSED char **papszOptions)
967 : {
968 10 : eAccess = GA_Update;
969 10 : m_pszName = CPLStrdup(pszFilename);
970 : m_osURL =
971 10 : (STARTS_WITH_CI(pszFilename, "ES:")) ? pszFilename + 3 : pszFilename;
972 10 : if (m_osURL.empty())
973 0 : m_osURL = "localhost:9200";
974 :
975 10 : const char *pszMetaFile = CPLGetConfigOption("ES_META", nullptr);
976 10 : m_bOverwrite = CPLTestBool(CPLGetConfigOption("ES_OVERWRITE", "0"));
977 : // coverity[tainted_data]
978 10 : m_nBulkUpload = (int)CPLAtof(CPLGetConfigOption("ES_BULK", "0"));
979 :
980 : // Read in the meta file from disk
981 10 : if (pszMetaFile != nullptr)
982 : {
983 0 : VSILFILE *fp = VSIFOpenL(pszMetaFile, "rb");
984 0 : if (fp)
985 : {
986 0 : GByte *pabyRet = nullptr;
987 0 : CPL_IGNORE_RET_VAL(
988 0 : VSIIngestFile(fp, pszMetaFile, &pabyRet, nullptr, -1));
989 0 : if (pabyRet)
990 : {
991 0 : m_pszMapping = (char *)pabyRet;
992 : }
993 0 : VSIFCloseL(fp);
994 : }
995 : }
996 :
997 10 : return CheckVersion();
998 : }
999 :
1000 : /************************************************************************/
1001 : /* GetLayerIndex() */
1002 : /************************************************************************/
1003 :
1004 4 : int OGRElasticDataSource::GetLayerIndex(const char *pszName)
1005 : {
1006 4 : GetLayerCount();
1007 4 : for (int i = 0; i < static_cast<int>(m_apoLayers.size()); ++i)
1008 : {
1009 4 : if (strcmp(m_apoLayers[i]->GetName(), pszName) == 0)
1010 4 : return i;
1011 : }
1012 0 : for (int i = 0; i < static_cast<int>(m_apoLayers.size()); ++i)
1013 : {
1014 0 : if (EQUAL(m_apoLayers[i]->GetName(), pszName))
1015 0 : return i;
1016 : }
1017 0 : return -1;
1018 : }
1019 :
1020 : /************************************************************************/
1021 : /* ExecuteSQL() */
1022 : /************************************************************************/
1023 :
1024 7 : OGRLayer *OGRElasticDataSource::ExecuteSQL(const char *pszSQLCommand,
1025 : OGRGeometry *poSpatialFilter,
1026 : const char *pszDialect)
1027 : {
1028 7 : GetLayerCount();
1029 13 : for (auto &poLayer : m_apoLayers)
1030 : {
1031 6 : poLayer->SyncToDisk();
1032 : }
1033 :
1034 : /* -------------------------------------------------------------------- */
1035 : /* Special case DELLAYER: command. */
1036 : /* -------------------------------------------------------------------- */
1037 7 : if (STARTS_WITH_CI(pszSQLCommand, "DELLAYER:"))
1038 : {
1039 0 : const char *pszLayerName = pszSQLCommand + 9;
1040 :
1041 0 : while (*pszLayerName == ' ')
1042 0 : pszLayerName++;
1043 :
1044 0 : for (int iLayer = 0; iLayer < static_cast<int>(m_apoLayers.size());
1045 : iLayer++)
1046 : {
1047 0 : if (EQUAL(m_apoLayers[iLayer]->GetName(), pszLayerName))
1048 : {
1049 0 : DeleteLayer(iLayer);
1050 0 : break;
1051 : }
1052 : }
1053 0 : return nullptr;
1054 : }
1055 :
1056 7 : if (pszDialect != nullptr && EQUAL(pszDialect, "ES"))
1057 : {
1058 : return new OGRElasticLayer("RESULT", nullptr, nullptr, this,
1059 3 : papszOpenOptions, pszSQLCommand);
1060 : }
1061 :
1062 : /* -------------------------------------------------------------------- */
1063 : /* Deal with "SELECT xxxx ORDER BY" statement */
1064 : /* -------------------------------------------------------------------- */
1065 4 : if (STARTS_WITH_CI(pszSQLCommand, "SELECT"))
1066 : {
1067 4 : swq_select *psSelectInfo = new swq_select();
1068 4 : if (psSelectInfo->preparse(pszSQLCommand, TRUE) != CE_None)
1069 : {
1070 0 : delete psSelectInfo;
1071 0 : return nullptr;
1072 : }
1073 :
1074 4 : int iLayer = 0;
1075 12 : if (psSelectInfo->table_count == 1 &&
1076 4 : psSelectInfo->table_defs[0].data_source == nullptr &&
1077 4 : (iLayer = GetLayerIndex(psSelectInfo->table_defs[0].table_name)) >=
1078 4 : 0 &&
1079 11 : psSelectInfo->join_count == 0 && psSelectInfo->order_specs > 0 &&
1080 3 : psSelectInfo->poOtherSelect == nullptr)
1081 : {
1082 3 : OGRElasticLayer *poSrcLayer = m_apoLayers[iLayer].get();
1083 3 : std::vector<OGRESSortDesc> aoSortColumns;
1084 3 : int i = 0; // Used after for.
1085 8 : for (; i < psSelectInfo->order_specs; i++)
1086 : {
1087 5 : int nFieldIndex = poSrcLayer->GetLayerDefn()->GetFieldIndex(
1088 5 : psSelectInfo->order_defs[i].field_name);
1089 5 : if (nFieldIndex < 0)
1090 0 : break;
1091 :
1092 : /* Make sure to have the right case */
1093 5 : const char *pszFieldName = poSrcLayer->GetLayerDefn()
1094 5 : ->GetFieldDefn(nFieldIndex)
1095 5 : ->GetNameRef();
1096 :
1097 : aoSortColumns.emplace_back(
1098 : pszFieldName,
1099 5 : CPL_TO_BOOL(psSelectInfo->order_defs[i].ascending_flag));
1100 : }
1101 :
1102 3 : if (i == psSelectInfo->order_specs)
1103 : {
1104 3 : OGRElasticLayer *poDupLayer = poSrcLayer->Clone();
1105 :
1106 3 : poDupLayer->SetOrderBy(aoSortColumns);
1107 3 : int nBackup = psSelectInfo->order_specs;
1108 3 : psSelectInfo->order_specs = 0;
1109 3 : char *pszSQLWithoutOrderBy = psSelectInfo->Unparse();
1110 3 : CPLDebug("ES", "SQL without ORDER BY: %s",
1111 : pszSQLWithoutOrderBy);
1112 3 : psSelectInfo->order_specs = nBackup;
1113 3 : delete psSelectInfo;
1114 3 : psSelectInfo = nullptr;
1115 :
1116 : /* Just set poDupLayer in the papoLayers for the time of the */
1117 : /* base ExecuteSQL(), so that the OGRGenSQLResultsLayer */
1118 : /* references that temporary layer */
1119 3 : m_apoLayers[iLayer].release();
1120 3 : m_apoLayers[iLayer].reset(poDupLayer);
1121 :
1122 3 : OGRLayer *poResLayer = GDALDataset::ExecuteSQL(
1123 3 : pszSQLWithoutOrderBy, poSpatialFilter, pszDialect);
1124 3 : m_apoLayers[iLayer].release();
1125 3 : m_apoLayers[iLayer].reset(poSrcLayer);
1126 :
1127 3 : CPLFree(pszSQLWithoutOrderBy);
1128 :
1129 3 : if (poResLayer != nullptr)
1130 3 : m_oMapResultSet[poResLayer] = poDupLayer;
1131 : else
1132 0 : delete poDupLayer;
1133 3 : return poResLayer;
1134 : }
1135 : }
1136 1 : delete psSelectInfo;
1137 : }
1138 :
1139 1 : return GDALDataset::ExecuteSQL(pszSQLCommand, poSpatialFilter, pszDialect);
1140 : }
1141 :
1142 : /************************************************************************/
1143 : /* ReleaseResultSet() */
1144 : /************************************************************************/
1145 :
1146 7 : void OGRElasticDataSource::ReleaseResultSet(OGRLayer *poResultsSet)
1147 : {
1148 7 : if (poResultsSet == nullptr)
1149 0 : return;
1150 :
1151 : std::map<OGRLayer *, OGRLayer *>::iterator oIter =
1152 7 : m_oMapResultSet.find(poResultsSet);
1153 7 : if (oIter != m_oMapResultSet.end())
1154 : {
1155 : /* Destroy first the result layer, because it still references */
1156 : /* the poDupLayer (oIter->second) */
1157 3 : delete poResultsSet;
1158 :
1159 3 : delete oIter->second;
1160 3 : m_oMapResultSet.erase(oIter);
1161 : }
1162 : else
1163 : {
1164 4 : delete poResultsSet;
1165 : }
1166 : }
|