Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Elasticsearch Translator
4 : * Purpose:
5 : * Author:
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2011, Adam Estrada
9 : * Copyright (c) 2012, Even Rouault <even dot rouault at spatialys.com>
10 : *
11 : * SPDX-License-Identifier: MIT
12 : ****************************************************************************/
13 :
14 : #include "ogr_elastic.h"
15 : #include "cpl_conv.h"
16 : #include "cpl_string.h"
17 : #include "cpl_csv.h"
18 : #include "cpl_http.h"
19 : #include "ogrlibjsonutils.h"
20 : #include "ogr_swq.h"
21 :
22 : /************************************************************************/
23 : /* OGRElasticDataSource() */
24 : /************************************************************************/
25 :
26 40 : OGRElasticDataSource::OGRElasticDataSource()
27 : : m_pszName(nullptr), m_bOverwrite(false), m_nBulkUpload(0),
28 : m_pszWriteMap(nullptr), m_pszMapping(nullptr), m_nBatchSize(100),
29 : m_nFeatureCountToEstablishFeatureDefn(100), m_bJSonField(false),
30 40 : m_bFlattenNestedAttributes(true)
31 : {
32 40 : const char *pszWriteMapIn = CPLGetConfigOption("ES_WRITEMAP", nullptr);
33 40 : if (pszWriteMapIn != nullptr)
34 : {
35 0 : m_pszWriteMap = CPLStrdup(pszWriteMapIn);
36 : }
37 40 : }
38 :
39 : /************************************************************************/
40 : /* ~OGRElasticDataSource() */
41 : /************************************************************************/
42 :
43 80 : OGRElasticDataSource::~OGRElasticDataSource()
44 : {
45 40 : m_apoLayers.clear();
46 40 : CPLFree(m_pszName);
47 40 : CPLFree(m_pszMapping);
48 40 : CPLFree(m_pszWriteMap);
49 80 : }
50 :
51 : /************************************************************************/
52 : /* TestCapability() */
53 : /************************************************************************/
54 :
55 3 : int OGRElasticDataSource::TestCapability(const char *pszCap)
56 : {
57 3 : if (EQUAL(pszCap, ODsCCreateLayer) || EQUAL(pszCap, ODsCDeleteLayer) ||
58 1 : EQUAL(pszCap, ODsCCreateGeomFieldAfterCreateLayer))
59 : {
60 3 : return GetAccess() == GA_Update;
61 : }
62 :
63 0 : return FALSE;
64 : }
65 :
66 : /************************************************************************/
67 : /* GetIndexList() */
68 : /************************************************************************/
69 :
70 : std::vector<std::string>
71 17 : OGRElasticDataSource::GetIndexList(const char *pszQueriedIndexName)
72 : {
73 17 : std::vector<std::string> aosList;
74 34 : std::string osURL(m_osURL);
75 17 : osURL += "/_cat/indices";
76 17 : if (pszQueriedIndexName)
77 : {
78 2 : osURL += '/';
79 2 : osURL += pszQueriedIndexName;
80 : }
81 17 : osURL += "?h=i";
82 17 : CPLHTTPResult *psResult = HTTPFetch(osURL.c_str(), nullptr);
83 17 : if (psResult == nullptr || psResult->pszErrBuf != nullptr ||
84 17 : psResult->pabyData == nullptr)
85 : {
86 2 : CPLHTTPDestroyResult(psResult);
87 2 : return aosList;
88 : }
89 :
90 15 : char *pszCur = (char *)psResult->pabyData;
91 15 : char *pszNextEOL = strchr(pszCur, '\n');
92 30 : while (pszNextEOL && pszNextEOL > pszCur)
93 : {
94 15 : *pszNextEOL = '\0';
95 :
96 15 : char *pszBeforeEOL = pszNextEOL - 1;
97 25 : while (*pszBeforeEOL == ' ')
98 : {
99 10 : *pszBeforeEOL = '\0';
100 10 : pszBeforeEOL--;
101 : }
102 :
103 15 : const char *pszIndexName = pszCur;
104 :
105 15 : pszCur = pszNextEOL + 1;
106 15 : pszNextEOL = strchr(pszCur, '\n');
107 :
108 15 : if (STARTS_WITH(pszIndexName, ".security") ||
109 15 : STARTS_WITH(pszIndexName, ".monitoring") ||
110 15 : STARTS_WITH(pszIndexName, ".geoip_databases"))
111 : {
112 0 : continue;
113 : }
114 :
115 15 : aosList.push_back(pszIndexName);
116 : }
117 15 : CPLHTTPDestroyResult(psResult);
118 :
119 15 : return aosList;
120 : }
121 :
122 : /************************************************************************/
123 : /* GetLayerCount() */
124 : /************************************************************************/
125 :
126 64 : int OGRElasticDataSource::GetLayerCount()
127 : {
128 64 : if (m_bAllLayersListed)
129 : {
130 49 : if (m_poAggregationLayer)
131 3 : return 1;
132 46 : return static_cast<int>(m_apoLayers.size());
133 : }
134 15 : m_bAllLayersListed = true;
135 :
136 15 : const auto aosList = GetIndexList(nullptr);
137 26 : for (const std::string &osIndexName : aosList)
138 : {
139 11 : FetchMapping(osIndexName.c_str());
140 : }
141 :
142 15 : return static_cast<int>(m_apoLayers.size());
143 : }
144 :
145 : /************************************************************************/
146 : /* FetchMapping() */
147 : /************************************************************************/
148 :
149 23 : void OGRElasticDataSource::FetchMapping(
150 : const char *pszIndexName, std::set<CPLString> &oSetLayers,
151 : std::vector<std::unique_ptr<OGRElasticLayer>> &apoLayers)
152 : {
153 23 : if (oSetLayers.find(pszIndexName) != oSetLayers.end())
154 0 : return;
155 :
156 46 : CPLString osURL(m_osURL + CPLString("/") + pszIndexName +
157 69 : CPLString("/_mapping?pretty"));
158 23 : json_object *poRes = RunRequest(osURL, nullptr, std::vector<int>({403}));
159 23 : if (poRes)
160 : {
161 : json_object *poLayerObj =
162 19 : CPL_json_object_object_get(poRes, pszIndexName);
163 19 : json_object *poMappings = nullptr;
164 19 : if (poLayerObj && json_object_get_type(poLayerObj) == json_type_object)
165 19 : poMappings = CPL_json_object_object_get(poLayerObj, "mappings");
166 19 : if (poMappings && json_object_get_type(poMappings) == json_type_object)
167 : {
168 38 : std::vector<CPLString> aosMappings;
169 19 : if (m_nMajorVersion < 7)
170 : {
171 : json_object_iter it;
172 18 : it.key = nullptr;
173 18 : it.val = nullptr;
174 18 : it.entry = nullptr;
175 36 : json_object_object_foreachC(poMappings, it)
176 : {
177 18 : aosMappings.push_back(it.key);
178 : }
179 :
180 46 : if (aosMappings.size() == 1 &&
181 28 : (aosMappings[0] == "FeatureCollection" ||
182 10 : aosMappings[0] == "default"))
183 : {
184 13 : oSetLayers.insert(pszIndexName);
185 : OGRElasticLayer *poLayer = new OGRElasticLayer(
186 13 : pszIndexName, pszIndexName, aosMappings[0], this,
187 13 : papszOpenOptions);
188 13 : poLayer->InitFeatureDefnFromMapping(
189 13 : CPL_json_object_object_get(poMappings, aosMappings[0]),
190 26 : "", std::vector<CPLString>());
191 13 : apoLayers.push_back(
192 26 : std::unique_ptr<OGRElasticLayer>(poLayer));
193 : }
194 : else
195 : {
196 10 : for (size_t i = 0; i < aosMappings.size(); i++)
197 : {
198 10 : CPLString osLayerName(pszIndexName + CPLString("_") +
199 15 : aosMappings[i]);
200 5 : if (oSetLayers.find(osLayerName) == oSetLayers.end())
201 : {
202 5 : oSetLayers.insert(osLayerName);
203 : OGRElasticLayer *poLayer = new OGRElasticLayer(
204 5 : osLayerName, pszIndexName, aosMappings[i], this,
205 10 : papszOpenOptions);
206 5 : poLayer->InitFeatureDefnFromMapping(
207 : CPL_json_object_object_get(poMappings,
208 5 : aosMappings[i]),
209 10 : "", std::vector<CPLString>());
210 :
211 5 : apoLayers.push_back(
212 10 : std::unique_ptr<OGRElasticLayer>(poLayer));
213 : }
214 : }
215 : }
216 : }
217 : else
218 : {
219 1 : oSetLayers.insert(pszIndexName);
220 : OGRElasticLayer *poLayer = new OGRElasticLayer(
221 1 : pszIndexName, pszIndexName, "", this, papszOpenOptions);
222 1 : poLayer->InitFeatureDefnFromMapping(poMappings, "",
223 2 : std::vector<CPLString>());
224 1 : apoLayers.push_back(std::unique_ptr<OGRElasticLayer>(poLayer));
225 : }
226 : }
227 :
228 19 : json_object_put(poRes);
229 : }
230 : }
231 :
232 20 : void OGRElasticDataSource::FetchMapping(const char *pszIndexName)
233 : {
234 20 : FetchMapping(pszIndexName, m_oSetLayers, m_apoLayers);
235 20 : }
236 :
237 : /************************************************************************/
238 : /* GetLayerByName() */
239 : /************************************************************************/
240 :
241 27 : OGRLayer *OGRElasticDataSource::GetLayerByName(const char *pszName)
242 : {
243 27 : const bool bIsMultipleTargetName =
244 27 : strchr(pszName, '*') != nullptr || strchr(pszName, ',') != nullptr;
245 27 : if (!m_bAllLayersListed)
246 : {
247 12 : for (auto &poLayer : m_apoLayers)
248 : {
249 3 : if (EQUAL(poLayer->GetName(), pszName))
250 : {
251 2 : return poLayer.get();
252 : }
253 : }
254 9 : if (!bIsMultipleTargetName)
255 : {
256 7 : size_t nSizeBefore = m_apoLayers.size();
257 7 : FetchMapping(pszName);
258 7 : const char *pszLastUnderscore = strrchr(pszName, '_');
259 7 : if (pszLastUnderscore && m_apoLayers.size() == nSizeBefore)
260 : {
261 4 : CPLString osIndexName(pszName);
262 2 : osIndexName.resize(pszLastUnderscore - pszName);
263 2 : FetchMapping(osIndexName);
264 : }
265 8 : for (auto &poLayer : m_apoLayers)
266 : {
267 6 : if (EQUAL(poLayer->GetIndexName(), pszName))
268 : {
269 5 : return poLayer.get();
270 : }
271 : }
272 : }
273 : }
274 : else
275 : {
276 16 : auto poLayer = GDALDataset::GetLayerByName(pszName);
277 16 : if (poLayer)
278 16 : return poLayer;
279 : }
280 :
281 4 : if (!bIsMultipleTargetName)
282 : {
283 2 : return nullptr;
284 : }
285 :
286 : // Deal with wildcard layer names
287 4 : std::string osSanitizedName(pszName);
288 2 : const auto nPos = osSanitizedName.find(",-");
289 2 : if (nPos != std::string::npos)
290 2 : osSanitizedName.resize(nPos);
291 4 : const auto aosList = GetIndexList(osSanitizedName.c_str());
292 4 : if (aosList.empty() || aosList[0].find('*') != std::string::npos ||
293 2 : aosList[0].find(',') != std::string::npos)
294 : {
295 0 : return nullptr;
296 : }
297 :
298 : // For the sake of simplicity, take the schema of one the layers/indices
299 : // that match the wildcard.
300 : // We could potentially issue a /wildcard*/_mapping request and build a
301 : // schema that merges all mappings, but that would be more involved.
302 : auto poReferenceLayer =
303 2 : dynamic_cast<OGRElasticLayer *>(GetLayerByName(aosList[0].c_str()));
304 2 : if (poReferenceLayer == nullptr)
305 0 : return nullptr;
306 :
307 2 : m_apoLayers.push_back(
308 4 : std::make_unique<OGRElasticLayer>(pszName, poReferenceLayer));
309 2 : return m_apoLayers.back().get();
310 : }
311 :
312 : /************************************************************************/
313 : /* GetLayer() */
314 : /************************************************************************/
315 :
316 30 : OGRLayer *OGRElasticDataSource::GetLayer(int iLayer)
317 : {
318 30 : const int nLayers = GetLayerCount();
319 30 : if (iLayer < 0 || iLayer >= nLayers)
320 0 : return nullptr;
321 : else
322 : {
323 30 : if (m_poAggregationLayer)
324 3 : return m_poAggregationLayer.get();
325 27 : return m_apoLayers[iLayer].get();
326 : }
327 : }
328 :
329 : /************************************************************************/
330 : /* DeleteLayer() */
331 : /************************************************************************/
332 :
333 4 : OGRErr OGRElasticDataSource::DeleteLayer(int iLayer)
334 :
335 : {
336 4 : if (eAccess != GA_Update)
337 : {
338 1 : CPLError(CE_Failure, CPLE_AppDefined,
339 : "Dataset opened in read-only mode");
340 1 : return OGRERR_FAILURE;
341 : }
342 :
343 3 : GetLayerCount();
344 3 : if (iLayer < 0 || iLayer >= static_cast<int>(m_apoLayers.size()))
345 2 : return OGRERR_FAILURE;
346 :
347 : /* -------------------------------------------------------------------- */
348 : /* Blow away our OGR structures related to the layer. This is */
349 : /* pretty dangerous if anything has a reference to this layer! */
350 : /* -------------------------------------------------------------------- */
351 2 : CPLString osLayerName = m_apoLayers[iLayer]->GetName();
352 2 : CPLString osIndex = m_apoLayers[iLayer]->GetIndexName();
353 2 : CPLString osMapping = m_apoLayers[iLayer]->GetMappingName();
354 :
355 1 : bool bSeveralMappings = false;
356 : json_object *poIndexResponse =
357 1 : RunRequest(CPLSPrintf("%s/%s", GetURL(), osIndex.c_str()), nullptr);
358 1 : if (poIndexResponse)
359 : {
360 : json_object *poIndex =
361 1 : CPL_json_object_object_get(poIndexResponse, osMapping);
362 1 : if (poIndex != nullptr)
363 : {
364 : json_object *poMappings =
365 0 : CPL_json_object_object_get(poIndex, "mappings");
366 0 : if (poMappings != nullptr)
367 : {
368 0 : bSeveralMappings = json_object_object_length(poMappings) > 1;
369 : }
370 : }
371 1 : json_object_put(poIndexResponse);
372 : }
373 : // Deletion of one mapping in an index was supported in ES 1.X, but
374 : // considered unsafe and removed in later versions
375 1 : if (bSeveralMappings)
376 : {
377 0 : CPLError(CE_Failure, CPLE_AppDefined,
378 : "%s/%s already exists, but other mappings also exist in "
379 : "this index. "
380 : "You have to delete the whole index.",
381 : osIndex.c_str(), osMapping.c_str());
382 0 : return OGRERR_FAILURE;
383 : }
384 :
385 1 : CPLDebug("ES", "DeleteLayer(%s)", osLayerName.c_str());
386 :
387 1 : m_oSetLayers.erase(osLayerName);
388 1 : m_apoLayers.erase(m_apoLayers.begin() + iLayer);
389 :
390 1 : Delete(CPLSPrintf("%s/%s", GetURL(), osIndex.c_str()));
391 :
392 1 : return OGRERR_NONE;
393 : }
394 :
395 : /************************************************************************/
396 : /* ICreateLayer() */
397 : /************************************************************************/
398 :
399 : OGRLayer *
400 25 : OGRElasticDataSource::ICreateLayer(const char *pszLayerName,
401 : const OGRGeomFieldDefn *poGeomFieldDefn,
402 : CSLConstList papszOptions)
403 : {
404 25 : if (eAccess != GA_Update)
405 : {
406 1 : CPLError(CE_Failure, CPLE_AppDefined,
407 : "Dataset opened in read-only mode");
408 1 : return nullptr;
409 : }
410 :
411 24 : const auto eGType = poGeomFieldDefn ? poGeomFieldDefn->GetType() : wkbNone;
412 : const auto poSRS =
413 24 : poGeomFieldDefn ? poGeomFieldDefn->GetSpatialRef() : nullptr;
414 :
415 48 : CPLString osLaunderedName(pszLayerName);
416 :
417 24 : const char *pszIndexName = CSLFetchNameValue(papszOptions, "INDEX_NAME");
418 24 : if (pszIndexName != nullptr)
419 0 : osLaunderedName = pszIndexName;
420 :
421 173 : for (size_t i = 0; i < osLaunderedName.size(); i++)
422 : {
423 149 : if (osLaunderedName[i] >= 'A' && osLaunderedName[i] <= 'Z')
424 4 : osLaunderedName[i] += 'a' - 'A';
425 145 : else if (osLaunderedName[i] == '/' || osLaunderedName[i] == '?')
426 1 : osLaunderedName[i] = '_';
427 : }
428 24 : if (strcmp(osLaunderedName.c_str(), pszLayerName) != 0)
429 1 : CPLDebug("ES", "Laundered layer name to %s", osLaunderedName.c_str());
430 :
431 : // Backup error state
432 24 : CPLErr eLastErrorType = CPLGetLastErrorType();
433 24 : CPLErrorNum nLastErrorNo = CPLGetLastErrorNo();
434 48 : CPLString osLastErrorMsg = CPLGetLastErrorMsg();
435 :
436 : const char *pszMappingName =
437 24 : m_nMajorVersion < 7 ? CSLFetchNameValueDef(papszOptions, "MAPPING_NAME",
438 : "FeatureCollection")
439 24 : : nullptr;
440 :
441 : // Check if the index and mapping exists
442 24 : bool bIndexExists = false;
443 24 : bool bMappingExists = false;
444 24 : bool bSeveralMappings = false;
445 24 : CPLPushErrorHandler(CPLQuietErrorHandler);
446 24 : json_object *poIndexResponse = RunRequest(
447 : CPLSPrintf("%s/%s", GetURL(), osLaunderedName.c_str()), nullptr);
448 24 : CPLPopErrorHandler();
449 :
450 : // Restore error state
451 24 : CPLErrorSetState(eLastErrorType, nLastErrorNo, osLastErrorMsg);
452 :
453 24 : if (poIndexResponse)
454 : {
455 5 : bIndexExists = true;
456 : json_object *poIndex =
457 5 : CPL_json_object_object_get(poIndexResponse, osLaunderedName);
458 5 : if (m_nMajorVersion < 7)
459 : {
460 5 : if (poIndex != nullptr)
461 : {
462 : json_object *poMappings =
463 3 : CPL_json_object_object_get(poIndex, "mappings");
464 3 : if (poMappings != nullptr)
465 : {
466 3 : bMappingExists = CPL_json_object_object_get(
467 : poMappings, pszMappingName) != nullptr;
468 3 : bSeveralMappings =
469 3 : json_object_object_length(poMappings) > 1;
470 : }
471 : }
472 : }
473 : else
474 : {
475 : // Indexes in Elasticsearch 7+ can not have multiple types,
476 : // so essentially this will always be true.
477 0 : bMappingExists = true;
478 : }
479 5 : json_object_put(poIndexResponse);
480 : }
481 :
482 24 : if (bMappingExists)
483 : {
484 3 : if (CPLFetchBool(papszOptions, "OVERWRITE_INDEX", false))
485 : {
486 0 : Delete(CPLSPrintf("%s/%s", GetURL(), osLaunderedName.c_str()));
487 0 : bIndexExists = false;
488 : }
489 3 : else if (m_bOverwrite || CPLFetchBool(papszOptions, "OVERWRITE", false))
490 : {
491 : // Deletion of one mapping in an index was supported in ES 1.X, but
492 : // considered unsafe and removed in later versions
493 2 : if (m_nMajorVersion >= 7)
494 : {
495 0 : CPLError(CE_Failure, CPLE_AppDefined,
496 : "The index %s already exists. "
497 : "You have to delete the whole index. You can do that "
498 : "with OVERWRITE_INDEX=YES",
499 : osLaunderedName.c_str());
500 0 : return nullptr;
501 : }
502 2 : else if (bSeveralMappings)
503 : {
504 0 : CPLError(CE_Failure, CPLE_AppDefined,
505 : "%s/%s already exists, but other mappings also exist "
506 : "in this index. "
507 : "You have to delete the whole index. You can do that "
508 : "with OVERWRITE_INDEX=YES",
509 : osLaunderedName.c_str(), pszMappingName);
510 0 : return nullptr;
511 : }
512 2 : Delete(CPLSPrintf("%s/%s", GetURL(), osLaunderedName.c_str()));
513 2 : bIndexExists = false;
514 : }
515 : else
516 : {
517 1 : if (m_nMajorVersion < 7)
518 : {
519 1 : CPLError(CE_Failure, CPLE_AppDefined, "%s/%s already exists",
520 : osLaunderedName.c_str(), pszMappingName);
521 : }
522 : else
523 : {
524 0 : CPLError(CE_Failure, CPLE_AppDefined, "%s already exists",
525 : osLaunderedName.c_str());
526 : }
527 1 : return nullptr;
528 : }
529 : }
530 :
531 : // Create the index
532 23 : if (!bIndexExists)
533 : {
534 : CPLString osIndexURL(
535 21 : CPLSPrintf("%s/%s", GetURL(), osLaunderedName.c_str()));
536 :
537 : // If we have a user specified index definition, use it
538 : const char *pszDef =
539 21 : CSLFetchNameValue(papszOptions, "INDEX_DEFINITION");
540 21 : CPLString osDef;
541 21 : if (pszDef != nullptr)
542 : {
543 2 : osDef = pszDef;
544 2 : if (strchr(pszDef, '{') == nullptr)
545 : {
546 1 : VSILFILE *fp = VSIFOpenL(pszDef, "rb");
547 1 : if (fp)
548 : {
549 1 : GByte *pabyRet = nullptr;
550 1 : CPL_IGNORE_RET_VAL(
551 1 : VSIIngestFile(fp, pszDef, &pabyRet, nullptr, -1));
552 1 : if (pabyRet)
553 : {
554 1 : osDef = reinterpret_cast<char *>(pabyRet);
555 1 : VSIFree(pabyRet);
556 : }
557 1 : VSIFCloseL(fp);
558 : }
559 : }
560 : }
561 21 : if (!UploadFile(osIndexURL, osDef.c_str(), "PUT"))
562 1 : return nullptr;
563 : }
564 :
565 : // If we have a user specified mapping, then go ahead and update it now
566 : const char *pszLayerMapping =
567 22 : CSLFetchNameValueDef(papszOptions, "MAPPING", m_pszMapping);
568 22 : if (pszLayerMapping != nullptr)
569 : {
570 7 : CPLString osLayerMapping(pszLayerMapping);
571 7 : if (strchr(pszLayerMapping, '{') == nullptr)
572 : {
573 1 : VSILFILE *fp = VSIFOpenL(pszLayerMapping, "rb");
574 1 : if (fp)
575 : {
576 1 : GByte *pabyRet = nullptr;
577 1 : CPL_IGNORE_RET_VAL(
578 1 : VSIIngestFile(fp, pszLayerMapping, &pabyRet, nullptr, -1));
579 1 : if (pabyRet)
580 : {
581 1 : osLayerMapping = reinterpret_cast<char *>(pabyRet);
582 1 : VSIFree(pabyRet);
583 : }
584 1 : VSIFCloseL(fp);
585 : }
586 : }
587 :
588 : CPLString osMappingURL =
589 7 : CPLSPrintf("%s/%s/_mapping", GetURL(), osLaunderedName.c_str());
590 7 : if (m_nMajorVersion < 7)
591 7 : osMappingURL += CPLSPrintf("/%s", pszMappingName);
592 7 : if (!UploadFile(osMappingURL, osLayerMapping.c_str()))
593 : {
594 1 : return nullptr;
595 : }
596 : }
597 :
598 : OGRElasticLayer *poLayer =
599 21 : new OGRElasticLayer(osLaunderedName.c_str(), osLaunderedName.c_str(),
600 21 : pszMappingName, this, papszOptions);
601 21 : poLayer->FinalizeFeatureDefn(false);
602 :
603 21 : if (eGType != wkbNone)
604 : {
605 : const char *pszGeometryName =
606 18 : CSLFetchNameValueDef(papszOptions, "GEOMETRY_NAME", "geometry");
607 36 : OGRGeomFieldDefn oFieldDefn(pszGeometryName, eGType);
608 18 : if (poSRS)
609 : {
610 17 : OGRSpatialReference *poSRSClone = poSRS->Clone();
611 17 : poSRSClone->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
612 17 : oFieldDefn.SetSpatialRef(poSRSClone);
613 17 : poSRSClone->Release();
614 : }
615 18 : poLayer->CreateGeomField(&oFieldDefn, FALSE);
616 : }
617 21 : if (pszLayerMapping)
618 6 : poLayer->SetManualMapping();
619 :
620 21 : poLayer->SetIgnoreSourceID(
621 21 : CPLFetchBool(papszOptions, "IGNORE_SOURCE_ID", false));
622 21 : poLayer->SetDotAsNestedField(
623 21 : CPLFetchBool(papszOptions, "DOT_AS_NESTED_FIELD", true));
624 21 : poLayer->SetFID(CSLFetchNameValueDef(papszOptions, "FID", "ogc_fid"));
625 21 : poLayer->SetNextFID(0);
626 :
627 21 : m_oSetLayers.insert(poLayer->GetName());
628 21 : m_apoLayers.push_back(std::unique_ptr<OGRElasticLayer>(poLayer));
629 :
630 21 : return poLayer;
631 : }
632 :
633 : /************************************************************************/
634 : /* HTTPFetch() */
635 : /************************************************************************/
636 :
637 317 : CPLHTTPResult *OGRElasticDataSource::HTTPFetch(const char *pszURL,
638 : CSLConstList papszOptions)
639 : {
640 634 : CPLStringList aosOptions(papszOptions);
641 317 : if (!m_osUserPwd.empty())
642 1 : aosOptions.SetNameValue("USERPWD", m_osUserPwd.c_str());
643 317 : if (!m_oMapHeadersFromEnv.empty())
644 : {
645 5 : const char *pszExistingHeaders = aosOptions.FetchNameValue("HEADERS");
646 10 : std::string osHeaders;
647 5 : if (pszExistingHeaders)
648 : {
649 2 : osHeaders += pszExistingHeaders;
650 2 : osHeaders += '\n';
651 : }
652 20 : for (const auto &kv : m_oMapHeadersFromEnv)
653 : {
654 : const char *pszValueFromEnv =
655 15 : CPLGetConfigOption(kv.second.c_str(), nullptr);
656 15 : if (pszValueFromEnv)
657 : {
658 10 : osHeaders += kv.first;
659 10 : osHeaders += ": ";
660 10 : osHeaders += pszValueFromEnv;
661 10 : osHeaders += '\n';
662 : }
663 : }
664 5 : aosOptions.SetNameValue("HEADERS", osHeaders.c_str());
665 : }
666 :
667 634 : return CPLHTTPFetch(pszURL, aosOptions);
668 : }
669 :
670 : /************************************************************************/
671 : /* RunRequest() */
672 : /************************************************************************/
673 :
674 : json_object *
675 208 : OGRElasticDataSource::RunRequest(const char *pszURL, const char *pszPostContent,
676 : const std::vector<int> &anSilentedHTTPErrors)
677 : {
678 208 : char **papszOptions = nullptr;
679 :
680 208 : if (pszPostContent && pszPostContent[0])
681 : {
682 : papszOptions =
683 63 : CSLSetNameValue(papszOptions, "POSTFIELDS", pszPostContent);
684 : papszOptions =
685 63 : CSLAddNameValue(papszOptions, "HEADERS",
686 : "Content-Type: application/json; charset=UTF-8");
687 : }
688 :
689 208 : CPLPushErrorHandler(CPLQuietErrorHandler);
690 208 : CPLHTTPResult *psResult = HTTPFetch(pszURL, papszOptions);
691 208 : CPLPopErrorHandler();
692 208 : CSLDestroy(papszOptions);
693 :
694 208 : if (psResult->pszErrBuf != nullptr)
695 : {
696 35 : CPLString osErrorMsg(psResult->pabyData
697 : ? (const char *)psResult->pabyData
698 35 : : psResult->pszErrBuf);
699 35 : bool bSilence = false;
700 39 : for (auto nCode : anSilentedHTTPErrors)
701 : {
702 4 : if (strstr(psResult->pszErrBuf, CPLSPrintf("%d", nCode)))
703 : {
704 0 : bSilence = true;
705 0 : break;
706 : }
707 : }
708 35 : if (bSilence)
709 : {
710 0 : CPLDebug("ES", "%s", osErrorMsg.c_str());
711 : }
712 : else
713 : {
714 35 : CPLError(CE_Failure, CPLE_AppDefined, "%s", osErrorMsg.c_str());
715 : }
716 35 : CPLHTTPDestroyResult(psResult);
717 35 : return nullptr;
718 : }
719 :
720 173 : if (psResult->pabyData == nullptr)
721 : {
722 0 : CPLError(CE_Failure, CPLE_AppDefined,
723 : "Empty content returned by server");
724 0 : CPLHTTPDestroyResult(psResult);
725 0 : return nullptr;
726 : }
727 :
728 173 : if (STARTS_WITH((const char *)psResult->pabyData, "{\"error\":"))
729 : {
730 0 : CPLError(CE_Failure, CPLE_AppDefined, "%s",
731 0 : (const char *)psResult->pabyData);
732 0 : CPLHTTPDestroyResult(psResult);
733 0 : return nullptr;
734 : }
735 :
736 173 : json_object *poObj = nullptr;
737 173 : const char *pszText = reinterpret_cast<const char *>(psResult->pabyData);
738 173 : if (!OGRJSonParse(pszText, &poObj, true))
739 : {
740 0 : CPLHTTPDestroyResult(psResult);
741 0 : return nullptr;
742 : }
743 :
744 173 : CPLHTTPDestroyResult(psResult);
745 :
746 173 : if (json_object_get_type(poObj) != json_type_object)
747 : {
748 0 : CPLError(CE_Failure, CPLE_AppDefined,
749 : "Return is not a JSON dictionary");
750 0 : json_object_put(poObj);
751 0 : poObj = nullptr;
752 : }
753 :
754 173 : return poObj;
755 : }
756 :
757 : /************************************************************************/
758 : /* CheckVersion() */
759 : /************************************************************************/
760 :
761 40 : bool OGRElasticDataSource::CheckVersion()
762 : {
763 40 : json_object *poMainInfo = RunRequest(m_osURL);
764 40 : if (poMainInfo == nullptr)
765 3 : return false;
766 37 : bool bVersionFound = false;
767 37 : json_object *poVersion = CPL_json_object_object_get(poMainInfo, "version");
768 37 : if (poVersion != nullptr)
769 : {
770 33 : json_object *poNumber = CPL_json_object_object_get(poVersion, "number");
771 66 : if (poNumber != nullptr &&
772 33 : json_object_get_type(poNumber) == json_type_string)
773 : {
774 33 : bVersionFound = true;
775 33 : const char *pszVersion = json_object_get_string(poNumber);
776 33 : CPLDebug("ES", "Server version: %s", pszVersion);
777 33 : m_nMajorVersion = atoi(pszVersion);
778 33 : const char *pszDot = strchr(pszVersion, '.');
779 33 : if (pszDot)
780 33 : m_nMinorVersion = atoi(pszDot + 1);
781 : }
782 : }
783 37 : json_object_put(poMainInfo);
784 37 : if (!bVersionFound)
785 : {
786 4 : CPLError(CE_Failure, CPLE_AppDefined, "Server version not found");
787 4 : return false;
788 : }
789 33 : if (m_nMajorVersion < 1 || m_nMajorVersion > 7)
790 : {
791 0 : CPLDebug("ES", "Server version untested with current driver");
792 : }
793 33 : return true;
794 : }
795 :
796 : /************************************************************************/
797 : /* OpenAggregation() */
798 : /************************************************************************/
799 :
800 3 : bool OGRElasticDataSource::OpenAggregation(const char *pszAggregation)
801 : {
802 3 : m_bAllLayersListed = true;
803 : m_poAggregationLayer =
804 3 : OGRElasticAggregationLayer::Build(this, pszAggregation);
805 3 : return m_poAggregationLayer != nullptr;
806 : }
807 :
808 : /************************************************************************/
809 : /* Open() */
810 : /************************************************************************/
811 :
812 30 : bool OGRElasticDataSource::Open(GDALOpenInfo *poOpenInfo)
813 : {
814 30 : eAccess = poOpenInfo->eAccess;
815 30 : m_pszName = CPLStrdup(poOpenInfo->pszFilename);
816 30 : m_osURL = (STARTS_WITH_CI(m_pszName, "ES:")) ? m_pszName + 3 : m_pszName;
817 30 : if (m_osURL.empty())
818 : {
819 0 : const char *pszHost = CSLFetchNameValueDef(poOpenInfo->papszOpenOptions,
820 : "HOST", "localhost");
821 0 : m_osURL = pszHost;
822 0 : m_osURL += ":";
823 : const char *pszPort =
824 0 : CSLFetchNameValueDef(poOpenInfo->papszOpenOptions, "PORT", "9200");
825 0 : m_osURL += pszPort;
826 : }
827 : m_osUserPwd =
828 30 : CSLFetchNameValueDef(poOpenInfo->papszOpenOptions, "USERPWD", "");
829 30 : m_nBatchSize = atoi(CSLFetchNameValueDef(poOpenInfo->papszOpenOptions,
830 : "BATCH_SIZE", "100"));
831 60 : m_nFeatureCountToEstablishFeatureDefn = atoi(
832 30 : CSLFetchNameValueDef(poOpenInfo->papszOpenOptions,
833 : "FEATURE_COUNT_TO_ESTABLISH_FEATURE_DEFN", "100"));
834 30 : m_bJSonField =
835 30 : CPLFetchBool(poOpenInfo->papszOpenOptions, "JSON_FIELD", false);
836 60 : m_bFlattenNestedAttributes = CPLFetchBool(
837 30 : poOpenInfo->papszOpenOptions, "FLATTEN_NESTED_ATTRIBUTES", true);
838 : m_osFID =
839 30 : CSLFetchNameValueDef(poOpenInfo->papszOpenOptions, "FID", "ogc_fid");
840 :
841 : // Only used for wildcard layers
842 30 : m_bAddSourceIndexName = CPLFetchBool(poOpenInfo->papszOpenOptions,
843 : "ADD_SOURCE_INDEX_NAME", false);
844 :
845 : const char *pszHeadersFromEnv =
846 30 : CPLGetConfigOption("ES_FORWARD_HTTP_HEADERS_FROM_ENV",
847 30 : CSLFetchNameValue(poOpenInfo->papszOpenOptions,
848 : "FORWARD_HTTP_HEADERS_FROM_ENV"));
849 30 : if (pszHeadersFromEnv)
850 : {
851 2 : CPLStringList aosTokens(CSLTokenizeString2(pszHeadersFromEnv, ",", 0));
852 4 : for (int i = 0; i < aosTokens.size(); ++i)
853 : {
854 3 : char *pszKey = nullptr;
855 3 : const char *pszValue = CPLParseNameValue(aosTokens[i], &pszKey);
856 3 : if (pszKey && pszValue)
857 : {
858 3 : m_oMapHeadersFromEnv[pszKey] = pszValue;
859 : }
860 3 : CPLFree(pszKey);
861 : }
862 : }
863 :
864 30 : if (!CheckVersion())
865 5 : return false;
866 :
867 : const char *pszLayerName =
868 25 : CSLFetchNameValue(poOpenInfo->papszOpenOptions, "LAYER");
869 : const char *pszAggregation =
870 25 : CSLFetchNameValue(poOpenInfo->papszOpenOptions, "AGGREGATION");
871 25 : if (pszLayerName && pszAggregation)
872 : {
873 0 : CPLError(CE_Failure, CPLE_NotSupported,
874 : "LAYER and AGGREGATION open options are mutually exclusive");
875 0 : return false;
876 : }
877 :
878 25 : if (pszLayerName)
879 : {
880 2 : bool bFound = GetLayerByName(pszLayerName) != nullptr;
881 2 : m_bAllLayersListed = true;
882 2 : return bFound;
883 : }
884 :
885 23 : if (pszAggregation)
886 3 : return OpenAggregation(pszAggregation);
887 :
888 20 : return true;
889 : }
890 :
891 : /************************************************************************/
892 : /* Delete() */
893 : /************************************************************************/
894 :
895 3 : void OGRElasticDataSource::Delete(const CPLString &url)
896 : {
897 3 : char **papszOptions = nullptr;
898 3 : papszOptions = CSLAddNameValue(papszOptions, "CUSTOMREQUEST", "DELETE");
899 3 : CPLHTTPResult *psResult = HTTPFetch(url, papszOptions);
900 3 : CSLDestroy(papszOptions);
901 3 : if (psResult)
902 : {
903 3 : CPLHTTPDestroyResult(psResult);
904 : }
905 3 : }
906 :
907 : /************************************************************************/
908 : /* UploadFile() */
909 : /************************************************************************/
910 :
911 48 : bool OGRElasticDataSource::UploadFile(const CPLString &url,
912 : const CPLString &data,
913 : const CPLString &osVerb)
914 : {
915 48 : bool bRet = true;
916 48 : char **papszOptions = nullptr;
917 48 : if (!osVerb.empty())
918 : {
919 : papszOptions =
920 23 : CSLAddNameValue(papszOptions, "CUSTOMREQUEST", osVerb.c_str());
921 : }
922 48 : if (data.empty())
923 : {
924 19 : if (osVerb.empty())
925 : {
926 : papszOptions =
927 0 : CSLAddNameValue(papszOptions, "CUSTOMREQUEST", "PUT");
928 : }
929 : }
930 : else
931 : {
932 : papszOptions =
933 29 : CSLAddNameValue(papszOptions, "POSTFIELDS", data.c_str());
934 : papszOptions =
935 29 : CSLAddNameValue(papszOptions, "HEADERS",
936 : "Content-Type: application/json; charset=UTF-8");
937 : }
938 :
939 48 : CPLHTTPResult *psResult = HTTPFetch(url, papszOptions);
940 48 : CSLDestroy(papszOptions);
941 48 : if (psResult)
942 : {
943 48 : if (psResult->pszErrBuf != nullptr ||
944 44 : (psResult->pabyData &&
945 42 : STARTS_WITH((const char *)psResult->pabyData, "{\"error\":")) ||
946 44 : (psResult->pabyData && strstr((const char *)psResult->pabyData,
947 : "\"errors\":true,") != nullptr))
948 : {
949 4 : bRet = false;
950 4 : CPLError(CE_Failure, CPLE_AppDefined, "%s",
951 4 : psResult->pabyData ? (const char *)psResult->pabyData
952 : : psResult->pszErrBuf);
953 : }
954 48 : CPLHTTPDestroyResult(psResult);
955 : }
956 48 : return bRet;
957 : }
958 :
959 : /************************************************************************/
960 : /* Create() */
961 : /************************************************************************/
962 :
963 10 : int OGRElasticDataSource::Create(const char *pszFilename,
964 : CPL_UNUSED char **papszOptions)
965 : {
966 10 : eAccess = GA_Update;
967 10 : m_pszName = CPLStrdup(pszFilename);
968 : m_osURL =
969 10 : (STARTS_WITH_CI(pszFilename, "ES:")) ? pszFilename + 3 : pszFilename;
970 10 : if (m_osURL.empty())
971 0 : m_osURL = "localhost:9200";
972 :
973 10 : const char *pszMetaFile = CPLGetConfigOption("ES_META", nullptr);
974 10 : m_bOverwrite = CPLTestBool(CPLGetConfigOption("ES_OVERWRITE", "0"));
975 : // coverity[tainted_data]
976 10 : m_nBulkUpload = (int)CPLAtof(CPLGetConfigOption("ES_BULK", "0"));
977 :
978 : // Read in the meta file from disk
979 10 : if (pszMetaFile != nullptr)
980 : {
981 0 : VSILFILE *fp = VSIFOpenL(pszMetaFile, "rb");
982 0 : if (fp)
983 : {
984 0 : GByte *pabyRet = nullptr;
985 0 : CPL_IGNORE_RET_VAL(
986 0 : VSIIngestFile(fp, pszMetaFile, &pabyRet, nullptr, -1));
987 0 : if (pabyRet)
988 : {
989 0 : m_pszMapping = (char *)pabyRet;
990 : }
991 0 : VSIFCloseL(fp);
992 : }
993 : }
994 :
995 10 : return CheckVersion();
996 : }
997 :
998 : /************************************************************************/
999 : /* GetLayerIndex() */
1000 : /************************************************************************/
1001 :
1002 4 : int OGRElasticDataSource::GetLayerIndex(const char *pszName)
1003 : {
1004 4 : GetLayerCount();
1005 4 : for (int i = 0; i < static_cast<int>(m_apoLayers.size()); ++i)
1006 : {
1007 4 : if (strcmp(m_apoLayers[i]->GetName(), pszName) == 0)
1008 4 : return i;
1009 : }
1010 0 : for (int i = 0; i < static_cast<int>(m_apoLayers.size()); ++i)
1011 : {
1012 0 : if (EQUAL(m_apoLayers[i]->GetName(), pszName))
1013 0 : return i;
1014 : }
1015 0 : return -1;
1016 : }
1017 :
1018 : /************************************************************************/
1019 : /* ExecuteSQL() */
1020 : /************************************************************************/
1021 :
1022 7 : OGRLayer *OGRElasticDataSource::ExecuteSQL(const char *pszSQLCommand,
1023 : OGRGeometry *poSpatialFilter,
1024 : const char *pszDialect)
1025 : {
1026 7 : GetLayerCount();
1027 13 : for (auto &poLayer : m_apoLayers)
1028 : {
1029 6 : poLayer->SyncToDisk();
1030 : }
1031 :
1032 : /* -------------------------------------------------------------------- */
1033 : /* Special case DELLAYER: command. */
1034 : /* -------------------------------------------------------------------- */
1035 7 : if (STARTS_WITH_CI(pszSQLCommand, "DELLAYER:"))
1036 : {
1037 0 : const char *pszLayerName = pszSQLCommand + 9;
1038 :
1039 0 : while (*pszLayerName == ' ')
1040 0 : pszLayerName++;
1041 :
1042 0 : for (int iLayer = 0; iLayer < static_cast<int>(m_apoLayers.size());
1043 : iLayer++)
1044 : {
1045 0 : if (EQUAL(m_apoLayers[iLayer]->GetName(), pszLayerName))
1046 : {
1047 0 : DeleteLayer(iLayer);
1048 0 : break;
1049 : }
1050 : }
1051 0 : return nullptr;
1052 : }
1053 :
1054 7 : if (pszDialect != nullptr && EQUAL(pszDialect, "ES"))
1055 : {
1056 : return new OGRElasticLayer("RESULT", nullptr, nullptr, this,
1057 3 : papszOpenOptions, pszSQLCommand);
1058 : }
1059 :
1060 : /* -------------------------------------------------------------------- */
1061 : /* Deal with "SELECT xxxx ORDER BY" statement */
1062 : /* -------------------------------------------------------------------- */
1063 4 : if (STARTS_WITH_CI(pszSQLCommand, "SELECT"))
1064 : {
1065 4 : swq_select *psSelectInfo = new swq_select();
1066 4 : if (psSelectInfo->preparse(pszSQLCommand, TRUE) != CE_None)
1067 : {
1068 0 : delete psSelectInfo;
1069 0 : return nullptr;
1070 : }
1071 :
1072 4 : int iLayer = 0;
1073 12 : if (psSelectInfo->table_count == 1 &&
1074 4 : psSelectInfo->table_defs[0].data_source == nullptr &&
1075 4 : (iLayer = GetLayerIndex(psSelectInfo->table_defs[0].table_name)) >=
1076 4 : 0 &&
1077 11 : psSelectInfo->join_count == 0 && psSelectInfo->order_specs > 0 &&
1078 3 : psSelectInfo->poOtherSelect == nullptr)
1079 : {
1080 3 : OGRElasticLayer *poSrcLayer = m_apoLayers[iLayer].get();
1081 3 : std::vector<OGRESSortDesc> aoSortColumns;
1082 3 : int i = 0; // Used after for.
1083 8 : for (; i < psSelectInfo->order_specs; i++)
1084 : {
1085 5 : int nFieldIndex = poSrcLayer->GetLayerDefn()->GetFieldIndex(
1086 5 : psSelectInfo->order_defs[i].field_name);
1087 5 : if (nFieldIndex < 0)
1088 0 : break;
1089 :
1090 : /* Make sure to have the right case */
1091 5 : const char *pszFieldName = poSrcLayer->GetLayerDefn()
1092 5 : ->GetFieldDefn(nFieldIndex)
1093 5 : ->GetNameRef();
1094 :
1095 : OGRESSortDesc oSortDesc(
1096 : pszFieldName,
1097 10 : CPL_TO_BOOL(psSelectInfo->order_defs[i].ascending_flag));
1098 5 : aoSortColumns.push_back(oSortDesc);
1099 : }
1100 :
1101 3 : if (i == psSelectInfo->order_specs)
1102 : {
1103 3 : OGRElasticLayer *poDupLayer = poSrcLayer->Clone();
1104 :
1105 3 : poDupLayer->SetOrderBy(aoSortColumns);
1106 3 : int nBackup = psSelectInfo->order_specs;
1107 3 : psSelectInfo->order_specs = 0;
1108 3 : char *pszSQLWithoutOrderBy = psSelectInfo->Unparse();
1109 3 : CPLDebug("ES", "SQL without ORDER BY: %s",
1110 : pszSQLWithoutOrderBy);
1111 3 : psSelectInfo->order_specs = nBackup;
1112 3 : delete psSelectInfo;
1113 3 : psSelectInfo = nullptr;
1114 :
1115 : /* Just set poDupLayer in the papoLayers for the time of the */
1116 : /* base ExecuteSQL(), so that the OGRGenSQLResultsLayer */
1117 : /* references that temporary layer */
1118 3 : m_apoLayers[iLayer].release();
1119 3 : m_apoLayers[iLayer].reset(poDupLayer);
1120 :
1121 3 : OGRLayer *poResLayer = GDALDataset::ExecuteSQL(
1122 3 : pszSQLWithoutOrderBy, poSpatialFilter, pszDialect);
1123 3 : m_apoLayers[iLayer].release();
1124 3 : m_apoLayers[iLayer].reset(poSrcLayer);
1125 :
1126 3 : CPLFree(pszSQLWithoutOrderBy);
1127 :
1128 3 : if (poResLayer != nullptr)
1129 3 : m_oMapResultSet[poResLayer] = poDupLayer;
1130 : else
1131 0 : delete poDupLayer;
1132 3 : return poResLayer;
1133 : }
1134 : }
1135 1 : delete psSelectInfo;
1136 : }
1137 :
1138 1 : return GDALDataset::ExecuteSQL(pszSQLCommand, poSpatialFilter, pszDialect);
1139 : }
1140 :
1141 : /************************************************************************/
1142 : /* ReleaseResultSet() */
1143 : /************************************************************************/
1144 :
1145 7 : void OGRElasticDataSource::ReleaseResultSet(OGRLayer *poResultsSet)
1146 : {
1147 7 : if (poResultsSet == nullptr)
1148 0 : return;
1149 :
1150 : std::map<OGRLayer *, OGRLayer *>::iterator oIter =
1151 7 : m_oMapResultSet.find(poResultsSet);
1152 7 : if (oIter != m_oMapResultSet.end())
1153 : {
1154 : /* Destroy first the result layer, because it still references */
1155 : /* the poDupLayer (oIter->second) */
1156 3 : delete poResultsSet;
1157 :
1158 3 : delete oIter->second;
1159 3 : m_oMapResultSet.erase(oIter);
1160 : }
1161 : else
1162 : {
1163 4 : delete poResultsSet;
1164 : }
1165 : }
|