Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Elasticsearch Translator
4 : * Purpose:
5 : * Author:
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2011, Adam Estrada
9 : * Copyright (c) 2012, Even Rouault <even dot rouault at spatialys.com>
10 : *
11 : * Permission is hereby granted, free of charge, to any person obtaining a
12 : * copy of this software and associated documentation files (the "Software"),
13 : * to deal in the Software without restriction, including without limitation
14 : * the rights to use, copy, modify, merge, publish, distribute, sublicense,
15 : * and/or sell copies of the Software, and to permit persons to whom the
16 : * Software is furnished to do so, subject to the following conditions:
17 : *
18 : * The above copyright notice and this permission notice shall be included
19 : * in all copies or substantial portions of the Software.
20 : *
21 : * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
22 : * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 : * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
24 : * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 : * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
26 : * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
27 : * DEALINGS IN THE SOFTWARE.
28 : ****************************************************************************/
29 :
30 : #include "ogr_elastic.h"
31 : #include "cpl_conv.h"
32 : #include "cpl_string.h"
33 : #include "cpl_csv.h"
34 : #include "cpl_http.h"
35 : #include "ogrgeojsonreader.h"
36 : #include "ogr_swq.h"
37 :
38 : /************************************************************************/
39 : /* OGRElasticDataSource() */
40 : /************************************************************************/
41 :
42 40 : OGRElasticDataSource::OGRElasticDataSource()
43 : : m_pszName(nullptr), m_bOverwrite(false), m_nBulkUpload(0),
44 : m_pszWriteMap(nullptr), m_pszMapping(nullptr), m_nBatchSize(100),
45 : m_nFeatureCountToEstablishFeatureDefn(100), m_bJSonField(false),
46 40 : m_bFlattenNestedAttributes(true)
47 : {
48 40 : const char *pszWriteMapIn = CPLGetConfigOption("ES_WRITEMAP", nullptr);
49 40 : if (pszWriteMapIn != nullptr)
50 : {
51 0 : m_pszWriteMap = CPLStrdup(pszWriteMapIn);
52 : }
53 40 : }
54 :
55 : /************************************************************************/
56 : /* ~OGRElasticDataSource() */
57 : /************************************************************************/
58 :
59 80 : OGRElasticDataSource::~OGRElasticDataSource()
60 : {
61 40 : m_apoLayers.clear();
62 40 : CPLFree(m_pszName);
63 40 : CPLFree(m_pszMapping);
64 40 : CPLFree(m_pszWriteMap);
65 80 : }
66 :
67 : /************************************************************************/
68 : /* TestCapability() */
69 : /************************************************************************/
70 :
71 3 : int OGRElasticDataSource::TestCapability(const char *pszCap)
72 : {
73 3 : if (EQUAL(pszCap, ODsCCreateLayer) || EQUAL(pszCap, ODsCDeleteLayer) ||
74 1 : EQUAL(pszCap, ODsCCreateGeomFieldAfterCreateLayer))
75 : {
76 3 : return GetAccess() == GA_Update;
77 : }
78 :
79 0 : return FALSE;
80 : }
81 :
82 : /************************************************************************/
83 : /* GetIndexList() */
84 : /************************************************************************/
85 :
86 : std::vector<std::string>
87 17 : OGRElasticDataSource::GetIndexList(const char *pszQueriedIndexName)
88 : {
89 17 : std::vector<std::string> aosList;
90 34 : std::string osURL(m_osURL);
91 17 : osURL += "/_cat/indices";
92 17 : if (pszQueriedIndexName)
93 : {
94 2 : osURL += '/';
95 2 : osURL += pszQueriedIndexName;
96 : }
97 17 : osURL += "?h=i";
98 17 : CPLHTTPResult *psResult = HTTPFetch(osURL.c_str(), nullptr);
99 17 : if (psResult == nullptr || psResult->pszErrBuf != nullptr ||
100 17 : psResult->pabyData == nullptr)
101 : {
102 2 : CPLHTTPDestroyResult(psResult);
103 2 : return aosList;
104 : }
105 :
106 15 : char *pszCur = (char *)psResult->pabyData;
107 15 : char *pszNextEOL = strchr(pszCur, '\n');
108 30 : while (pszNextEOL && pszNextEOL > pszCur)
109 : {
110 15 : *pszNextEOL = '\0';
111 :
112 15 : char *pszBeforeEOL = pszNextEOL - 1;
113 25 : while (*pszBeforeEOL == ' ')
114 : {
115 10 : *pszBeforeEOL = '\0';
116 10 : pszBeforeEOL--;
117 : }
118 :
119 15 : const char *pszIndexName = pszCur;
120 :
121 15 : pszCur = pszNextEOL + 1;
122 15 : pszNextEOL = strchr(pszCur, '\n');
123 :
124 15 : if (STARTS_WITH(pszIndexName, ".security") ||
125 15 : STARTS_WITH(pszIndexName, ".monitoring") ||
126 15 : STARTS_WITH(pszIndexName, ".geoip_databases"))
127 : {
128 0 : continue;
129 : }
130 :
131 15 : aosList.push_back(pszIndexName);
132 : }
133 15 : CPLHTTPDestroyResult(psResult);
134 :
135 15 : return aosList;
136 : }
137 :
138 : /************************************************************************/
139 : /* GetLayerCount() */
140 : /************************************************************************/
141 :
142 64 : int OGRElasticDataSource::GetLayerCount()
143 : {
144 64 : if (m_bAllLayersListed)
145 : {
146 49 : if (m_poAggregationLayer)
147 3 : return 1;
148 46 : return static_cast<int>(m_apoLayers.size());
149 : }
150 15 : m_bAllLayersListed = true;
151 :
152 15 : const auto aosList = GetIndexList(nullptr);
153 26 : for (const std::string &osIndexName : aosList)
154 : {
155 11 : FetchMapping(osIndexName.c_str());
156 : }
157 :
158 15 : return static_cast<int>(m_apoLayers.size());
159 : }
160 :
161 : /************************************************************************/
162 : /* FetchMapping() */
163 : /************************************************************************/
164 :
165 23 : void OGRElasticDataSource::FetchMapping(
166 : const char *pszIndexName, std::set<CPLString> &oSetLayers,
167 : std::vector<std::unique_ptr<OGRElasticLayer>> &apoLayers)
168 : {
169 23 : if (oSetLayers.find(pszIndexName) != oSetLayers.end())
170 0 : return;
171 :
172 46 : CPLString osURL(m_osURL + CPLString("/") + pszIndexName +
173 69 : CPLString("/_mapping?pretty"));
174 23 : json_object *poRes = RunRequest(osURL, nullptr, std::vector<int>({403}));
175 23 : if (poRes)
176 : {
177 : json_object *poLayerObj =
178 19 : CPL_json_object_object_get(poRes, pszIndexName);
179 19 : json_object *poMappings = nullptr;
180 19 : if (poLayerObj && json_object_get_type(poLayerObj) == json_type_object)
181 19 : poMappings = CPL_json_object_object_get(poLayerObj, "mappings");
182 19 : if (poMappings && json_object_get_type(poMappings) == json_type_object)
183 : {
184 38 : std::vector<CPLString> aosMappings;
185 19 : if (m_nMajorVersion < 7)
186 : {
187 : json_object_iter it;
188 18 : it.key = nullptr;
189 18 : it.val = nullptr;
190 18 : it.entry = nullptr;
191 36 : json_object_object_foreachC(poMappings, it)
192 : {
193 18 : aosMappings.push_back(it.key);
194 : }
195 :
196 46 : if (aosMappings.size() == 1 &&
197 28 : (aosMappings[0] == "FeatureCollection" ||
198 10 : aosMappings[0] == "default"))
199 : {
200 13 : oSetLayers.insert(pszIndexName);
201 : OGRElasticLayer *poLayer = new OGRElasticLayer(
202 13 : pszIndexName, pszIndexName, aosMappings[0], this,
203 13 : papszOpenOptions);
204 13 : poLayer->InitFeatureDefnFromMapping(
205 13 : CPL_json_object_object_get(poMappings, aosMappings[0]),
206 26 : "", std::vector<CPLString>());
207 13 : apoLayers.push_back(
208 26 : std::unique_ptr<OGRElasticLayer>(poLayer));
209 : }
210 : else
211 : {
212 10 : for (size_t i = 0; i < aosMappings.size(); i++)
213 : {
214 10 : CPLString osLayerName(pszIndexName + CPLString("_") +
215 15 : aosMappings[i]);
216 5 : if (oSetLayers.find(osLayerName) == oSetLayers.end())
217 : {
218 5 : oSetLayers.insert(osLayerName);
219 : OGRElasticLayer *poLayer = new OGRElasticLayer(
220 5 : osLayerName, pszIndexName, aosMappings[i], this,
221 10 : papszOpenOptions);
222 5 : poLayer->InitFeatureDefnFromMapping(
223 : CPL_json_object_object_get(poMappings,
224 5 : aosMappings[i]),
225 10 : "", std::vector<CPLString>());
226 :
227 5 : apoLayers.push_back(
228 10 : std::unique_ptr<OGRElasticLayer>(poLayer));
229 : }
230 : }
231 : }
232 : }
233 : else
234 : {
235 1 : oSetLayers.insert(pszIndexName);
236 : OGRElasticLayer *poLayer = new OGRElasticLayer(
237 1 : pszIndexName, pszIndexName, "", this, papszOpenOptions);
238 1 : poLayer->InitFeatureDefnFromMapping(poMappings, "",
239 2 : std::vector<CPLString>());
240 1 : apoLayers.push_back(std::unique_ptr<OGRElasticLayer>(poLayer));
241 : }
242 : }
243 :
244 19 : json_object_put(poRes);
245 : }
246 : }
247 :
248 20 : void OGRElasticDataSource::FetchMapping(const char *pszIndexName)
249 : {
250 20 : FetchMapping(pszIndexName, m_oSetLayers, m_apoLayers);
251 20 : }
252 :
253 : /************************************************************************/
254 : /* GetLayerByName() */
255 : /************************************************************************/
256 :
257 27 : OGRLayer *OGRElasticDataSource::GetLayerByName(const char *pszName)
258 : {
259 27 : const bool bIsMultipleTargetName =
260 27 : strchr(pszName, '*') != nullptr || strchr(pszName, ',') != nullptr;
261 27 : if (!m_bAllLayersListed)
262 : {
263 12 : for (auto &poLayer : m_apoLayers)
264 : {
265 3 : if (EQUAL(poLayer->GetName(), pszName))
266 : {
267 2 : return poLayer.get();
268 : }
269 : }
270 9 : if (!bIsMultipleTargetName)
271 : {
272 7 : size_t nSizeBefore = m_apoLayers.size();
273 7 : FetchMapping(pszName);
274 7 : const char *pszLastUnderscore = strrchr(pszName, '_');
275 7 : if (pszLastUnderscore && m_apoLayers.size() == nSizeBefore)
276 : {
277 4 : CPLString osIndexName(pszName);
278 2 : osIndexName.resize(pszLastUnderscore - pszName);
279 2 : FetchMapping(osIndexName);
280 : }
281 8 : for (auto &poLayer : m_apoLayers)
282 : {
283 6 : if (EQUAL(poLayer->GetIndexName(), pszName))
284 : {
285 5 : return poLayer.get();
286 : }
287 : }
288 : }
289 : }
290 : else
291 : {
292 16 : auto poLayer = GDALDataset::GetLayerByName(pszName);
293 16 : if (poLayer)
294 16 : return poLayer;
295 : }
296 :
297 4 : if (!bIsMultipleTargetName)
298 : {
299 2 : return nullptr;
300 : }
301 :
302 : // Deal with wildcard layer names
303 4 : std::string osSanitizedName(pszName);
304 2 : const auto nPos = osSanitizedName.find(",-");
305 2 : if (nPos != std::string::npos)
306 2 : osSanitizedName.resize(nPos);
307 4 : const auto aosList = GetIndexList(osSanitizedName.c_str());
308 4 : if (aosList.empty() || aosList[0].find('*') != std::string::npos ||
309 2 : aosList[0].find(',') != std::string::npos)
310 : {
311 0 : return nullptr;
312 : }
313 :
314 : // For the sake of simplicity, take the schema of one the layers/indices
315 : // that match the wildcard.
316 : // We could potentially issue a /wildcard*/_mapping request and build a
317 : // schema that merges all mappings, but that would be more involved.
318 : auto poReferenceLayer =
319 2 : dynamic_cast<OGRElasticLayer *>(GetLayerByName(aosList[0].c_str()));
320 2 : if (poReferenceLayer == nullptr)
321 0 : return nullptr;
322 :
323 2 : m_apoLayers.push_back(
324 4 : std::make_unique<OGRElasticLayer>(pszName, poReferenceLayer));
325 2 : return m_apoLayers.back().get();
326 : }
327 :
328 : /************************************************************************/
329 : /* GetLayer() */
330 : /************************************************************************/
331 :
332 30 : OGRLayer *OGRElasticDataSource::GetLayer(int iLayer)
333 : {
334 30 : const int nLayers = GetLayerCount();
335 30 : if (iLayer < 0 || iLayer >= nLayers)
336 0 : return nullptr;
337 : else
338 : {
339 30 : if (m_poAggregationLayer)
340 3 : return m_poAggregationLayer.get();
341 27 : return m_apoLayers[iLayer].get();
342 : }
343 : }
344 :
345 : /************************************************************************/
346 : /* DeleteLayer() */
347 : /************************************************************************/
348 :
349 4 : OGRErr OGRElasticDataSource::DeleteLayer(int iLayer)
350 :
351 : {
352 4 : if (eAccess != GA_Update)
353 : {
354 1 : CPLError(CE_Failure, CPLE_AppDefined,
355 : "Dataset opened in read-only mode");
356 1 : return OGRERR_FAILURE;
357 : }
358 :
359 3 : GetLayerCount();
360 3 : if (iLayer < 0 || iLayer >= static_cast<int>(m_apoLayers.size()))
361 2 : return OGRERR_FAILURE;
362 :
363 : /* -------------------------------------------------------------------- */
364 : /* Blow away our OGR structures related to the layer. This is */
365 : /* pretty dangerous if anything has a reference to this layer! */
366 : /* -------------------------------------------------------------------- */
367 2 : CPLString osLayerName = m_apoLayers[iLayer]->GetName();
368 2 : CPLString osIndex = m_apoLayers[iLayer]->GetIndexName();
369 2 : CPLString osMapping = m_apoLayers[iLayer]->GetMappingName();
370 :
371 1 : bool bSeveralMappings = false;
372 : json_object *poIndexResponse =
373 1 : RunRequest(CPLSPrintf("%s/%s", GetURL(), osIndex.c_str()), nullptr);
374 1 : if (poIndexResponse)
375 : {
376 : json_object *poIndex =
377 1 : CPL_json_object_object_get(poIndexResponse, osMapping);
378 1 : if (poIndex != nullptr)
379 : {
380 : json_object *poMappings =
381 0 : CPL_json_object_object_get(poIndex, "mappings");
382 0 : if (poMappings != nullptr)
383 : {
384 0 : bSeveralMappings = json_object_object_length(poMappings) > 1;
385 : }
386 : }
387 1 : json_object_put(poIndexResponse);
388 : }
389 : // Deletion of one mapping in an index was supported in ES 1.X, but
390 : // considered unsafe and removed in later versions
391 1 : if (bSeveralMappings)
392 : {
393 0 : CPLError(CE_Failure, CPLE_AppDefined,
394 : "%s/%s already exists, but other mappings also exist in "
395 : "this index. "
396 : "You have to delete the whole index.",
397 : osIndex.c_str(), osMapping.c_str());
398 0 : return OGRERR_FAILURE;
399 : }
400 :
401 1 : CPLDebug("ES", "DeleteLayer(%s)", osLayerName.c_str());
402 :
403 1 : m_oSetLayers.erase(osLayerName);
404 1 : m_apoLayers.erase(m_apoLayers.begin() + iLayer);
405 :
406 1 : Delete(CPLSPrintf("%s/%s", GetURL(), osIndex.c_str()));
407 :
408 1 : return OGRERR_NONE;
409 : }
410 :
411 : /************************************************************************/
412 : /* ICreateLayer() */
413 : /************************************************************************/
414 :
415 : OGRLayer *
416 25 : OGRElasticDataSource::ICreateLayer(const char *pszLayerName,
417 : const OGRGeomFieldDefn *poGeomFieldDefn,
418 : CSLConstList papszOptions)
419 : {
420 25 : if (eAccess != GA_Update)
421 : {
422 1 : CPLError(CE_Failure, CPLE_AppDefined,
423 : "Dataset opened in read-only mode");
424 1 : return nullptr;
425 : }
426 :
427 24 : const auto eGType = poGeomFieldDefn ? poGeomFieldDefn->GetType() : wkbNone;
428 : const auto poSRS =
429 24 : poGeomFieldDefn ? poGeomFieldDefn->GetSpatialRef() : nullptr;
430 :
431 48 : CPLString osLaunderedName(pszLayerName);
432 :
433 24 : const char *pszIndexName = CSLFetchNameValue(papszOptions, "INDEX_NAME");
434 24 : if (pszIndexName != nullptr)
435 0 : osLaunderedName = pszIndexName;
436 :
437 173 : for (size_t i = 0; i < osLaunderedName.size(); i++)
438 : {
439 149 : if (osLaunderedName[i] >= 'A' && osLaunderedName[i] <= 'Z')
440 4 : osLaunderedName[i] += 'a' - 'A';
441 145 : else if (osLaunderedName[i] == '/' || osLaunderedName[i] == '?')
442 1 : osLaunderedName[i] = '_';
443 : }
444 24 : if (strcmp(osLaunderedName.c_str(), pszLayerName) != 0)
445 1 : CPLDebug("ES", "Laundered layer name to %s", osLaunderedName.c_str());
446 :
447 : // Backup error state
448 24 : CPLErr eLastErrorType = CPLGetLastErrorType();
449 24 : CPLErrorNum nLastErrorNo = CPLGetLastErrorNo();
450 48 : CPLString osLastErrorMsg = CPLGetLastErrorMsg();
451 :
452 : const char *pszMappingName =
453 24 : m_nMajorVersion < 7 ? CSLFetchNameValueDef(papszOptions, "MAPPING_NAME",
454 : "FeatureCollection")
455 24 : : nullptr;
456 :
457 : // Check if the index and mapping exists
458 24 : bool bIndexExists = false;
459 24 : bool bMappingExists = false;
460 24 : bool bSeveralMappings = false;
461 24 : CPLPushErrorHandler(CPLQuietErrorHandler);
462 24 : json_object *poIndexResponse = RunRequest(
463 : CPLSPrintf("%s/%s", GetURL(), osLaunderedName.c_str()), nullptr);
464 24 : CPLPopErrorHandler();
465 :
466 : // Restore error state
467 24 : CPLErrorSetState(eLastErrorType, nLastErrorNo, osLastErrorMsg);
468 :
469 24 : if (poIndexResponse)
470 : {
471 5 : bIndexExists = true;
472 : json_object *poIndex =
473 5 : CPL_json_object_object_get(poIndexResponse, osLaunderedName);
474 5 : if (m_nMajorVersion < 7)
475 : {
476 5 : if (poIndex != nullptr)
477 : {
478 : json_object *poMappings =
479 3 : CPL_json_object_object_get(poIndex, "mappings");
480 3 : if (poMappings != nullptr)
481 : {
482 3 : bMappingExists = CPL_json_object_object_get(
483 : poMappings, pszMappingName) != nullptr;
484 3 : bSeveralMappings =
485 3 : json_object_object_length(poMappings) > 1;
486 : }
487 : }
488 : }
489 : else
490 : {
491 : // Indexes in Elasticsearch 7+ can not have multiple types,
492 : // so essentially this will always be true.
493 0 : bMappingExists = true;
494 : }
495 5 : json_object_put(poIndexResponse);
496 : }
497 :
498 24 : if (bMappingExists)
499 : {
500 3 : if (CPLFetchBool(papszOptions, "OVERWRITE_INDEX", false))
501 : {
502 0 : Delete(CPLSPrintf("%s/%s", GetURL(), osLaunderedName.c_str()));
503 0 : bIndexExists = false;
504 : }
505 3 : else if (m_bOverwrite || CPLFetchBool(papszOptions, "OVERWRITE", false))
506 : {
507 : // Deletion of one mapping in an index was supported in ES 1.X, but
508 : // considered unsafe and removed in later versions
509 2 : if (m_nMajorVersion >= 7)
510 : {
511 0 : CPLError(CE_Failure, CPLE_AppDefined,
512 : "The index %s already exists. "
513 : "You have to delete the whole index. You can do that "
514 : "with OVERWRITE_INDEX=YES",
515 : osLaunderedName.c_str());
516 0 : return nullptr;
517 : }
518 2 : else if (bSeveralMappings)
519 : {
520 0 : CPLError(CE_Failure, CPLE_AppDefined,
521 : "%s/%s already exists, but other mappings also exist "
522 : "in this index. "
523 : "You have to delete the whole index. You can do that "
524 : "with OVERWRITE_INDEX=YES",
525 : osLaunderedName.c_str(), pszMappingName);
526 0 : return nullptr;
527 : }
528 2 : Delete(CPLSPrintf("%s/%s", GetURL(), osLaunderedName.c_str()));
529 2 : bIndexExists = false;
530 : }
531 : else
532 : {
533 1 : if (m_nMajorVersion < 7)
534 : {
535 1 : CPLError(CE_Failure, CPLE_AppDefined, "%s/%s already exists",
536 : osLaunderedName.c_str(), pszMappingName);
537 : }
538 : else
539 : {
540 0 : CPLError(CE_Failure, CPLE_AppDefined, "%s already exists",
541 : osLaunderedName.c_str());
542 : }
543 1 : return nullptr;
544 : }
545 : }
546 :
547 : // Create the index
548 23 : if (!bIndexExists)
549 : {
550 : CPLString osIndexURL(
551 21 : CPLSPrintf("%s/%s", GetURL(), osLaunderedName.c_str()));
552 :
553 : // If we have a user specified index definition, use it
554 : const char *pszDef =
555 21 : CSLFetchNameValue(papszOptions, "INDEX_DEFINITION");
556 21 : CPLString osDef;
557 21 : if (pszDef != nullptr)
558 : {
559 2 : osDef = pszDef;
560 2 : if (strchr(pszDef, '{') == nullptr)
561 : {
562 1 : VSILFILE *fp = VSIFOpenL(pszDef, "rb");
563 1 : if (fp)
564 : {
565 1 : GByte *pabyRet = nullptr;
566 1 : CPL_IGNORE_RET_VAL(
567 1 : VSIIngestFile(fp, pszDef, &pabyRet, nullptr, -1));
568 1 : if (pabyRet)
569 : {
570 1 : osDef = reinterpret_cast<char *>(pabyRet);
571 1 : VSIFree(pabyRet);
572 : }
573 1 : VSIFCloseL(fp);
574 : }
575 : }
576 : }
577 21 : if (!UploadFile(osIndexURL, osDef.c_str(), "PUT"))
578 1 : return nullptr;
579 : }
580 :
581 : // If we have a user specified mapping, then go ahead and update it now
582 : const char *pszLayerMapping =
583 22 : CSLFetchNameValueDef(papszOptions, "MAPPING", m_pszMapping);
584 22 : if (pszLayerMapping != nullptr)
585 : {
586 7 : CPLString osLayerMapping(pszLayerMapping);
587 7 : if (strchr(pszLayerMapping, '{') == nullptr)
588 : {
589 1 : VSILFILE *fp = VSIFOpenL(pszLayerMapping, "rb");
590 1 : if (fp)
591 : {
592 1 : GByte *pabyRet = nullptr;
593 1 : CPL_IGNORE_RET_VAL(
594 1 : VSIIngestFile(fp, pszLayerMapping, &pabyRet, nullptr, -1));
595 1 : if (pabyRet)
596 : {
597 1 : osLayerMapping = reinterpret_cast<char *>(pabyRet);
598 1 : VSIFree(pabyRet);
599 : }
600 1 : VSIFCloseL(fp);
601 : }
602 : }
603 :
604 : CPLString osMappingURL =
605 7 : CPLSPrintf("%s/%s/_mapping", GetURL(), osLaunderedName.c_str());
606 7 : if (m_nMajorVersion < 7)
607 7 : osMappingURL += CPLSPrintf("/%s", pszMappingName);
608 7 : if (!UploadFile(osMappingURL, osLayerMapping.c_str()))
609 : {
610 1 : return nullptr;
611 : }
612 : }
613 :
614 : OGRElasticLayer *poLayer =
615 21 : new OGRElasticLayer(osLaunderedName.c_str(), osLaunderedName.c_str(),
616 21 : pszMappingName, this, papszOptions);
617 21 : poLayer->FinalizeFeatureDefn(false);
618 :
619 21 : if (eGType != wkbNone)
620 : {
621 : const char *pszGeometryName =
622 18 : CSLFetchNameValueDef(papszOptions, "GEOMETRY_NAME", "geometry");
623 36 : OGRGeomFieldDefn oFieldDefn(pszGeometryName, eGType);
624 18 : if (poSRS)
625 : {
626 17 : OGRSpatialReference *poSRSClone = poSRS->Clone();
627 17 : poSRSClone->SetAxisMappingStrategy(OAMS_TRADITIONAL_GIS_ORDER);
628 17 : oFieldDefn.SetSpatialRef(poSRSClone);
629 17 : poSRSClone->Release();
630 : }
631 18 : poLayer->CreateGeomField(&oFieldDefn, FALSE);
632 : }
633 21 : if (pszLayerMapping)
634 6 : poLayer->SetManualMapping();
635 :
636 21 : poLayer->SetIgnoreSourceID(
637 21 : CPLFetchBool(papszOptions, "IGNORE_SOURCE_ID", false));
638 21 : poLayer->SetDotAsNestedField(
639 21 : CPLFetchBool(papszOptions, "DOT_AS_NESTED_FIELD", true));
640 21 : poLayer->SetFID(CSLFetchNameValueDef(papszOptions, "FID", "ogc_fid"));
641 21 : poLayer->SetNextFID(0);
642 :
643 21 : m_oSetLayers.insert(poLayer->GetName());
644 21 : m_apoLayers.push_back(std::unique_ptr<OGRElasticLayer>(poLayer));
645 :
646 21 : return poLayer;
647 : }
648 :
649 : /************************************************************************/
650 : /* HTTPFetch() */
651 : /************************************************************************/
652 :
653 317 : CPLHTTPResult *OGRElasticDataSource::HTTPFetch(const char *pszURL,
654 : CSLConstList papszOptions)
655 : {
656 634 : CPLStringList aosOptions(papszOptions);
657 317 : if (!m_osUserPwd.empty())
658 1 : aosOptions.SetNameValue("USERPWD", m_osUserPwd.c_str());
659 317 : if (!m_oMapHeadersFromEnv.empty())
660 : {
661 5 : const char *pszExistingHeaders = aosOptions.FetchNameValue("HEADERS");
662 10 : std::string osHeaders;
663 5 : if (pszExistingHeaders)
664 : {
665 2 : osHeaders += pszExistingHeaders;
666 2 : osHeaders += '\n';
667 : }
668 20 : for (const auto &kv : m_oMapHeadersFromEnv)
669 : {
670 : const char *pszValueFromEnv =
671 15 : CPLGetConfigOption(kv.second.c_str(), nullptr);
672 15 : if (pszValueFromEnv)
673 : {
674 10 : osHeaders += kv.first;
675 10 : osHeaders += ": ";
676 10 : osHeaders += pszValueFromEnv;
677 10 : osHeaders += '\n';
678 : }
679 : }
680 5 : aosOptions.SetNameValue("HEADERS", osHeaders.c_str());
681 : }
682 :
683 634 : return CPLHTTPFetch(pszURL, aosOptions);
684 : }
685 :
686 : /************************************************************************/
687 : /* RunRequest() */
688 : /************************************************************************/
689 :
690 : json_object *
691 208 : OGRElasticDataSource::RunRequest(const char *pszURL, const char *pszPostContent,
692 : const std::vector<int> &anSilentedHTTPErrors)
693 : {
694 208 : char **papszOptions = nullptr;
695 :
696 208 : if (pszPostContent && pszPostContent[0])
697 : {
698 : papszOptions =
699 63 : CSLSetNameValue(papszOptions, "POSTFIELDS", pszPostContent);
700 : papszOptions =
701 63 : CSLAddNameValue(papszOptions, "HEADERS",
702 : "Content-Type: application/json; charset=UTF-8");
703 : }
704 :
705 208 : CPLPushErrorHandler(CPLQuietErrorHandler);
706 208 : CPLHTTPResult *psResult = HTTPFetch(pszURL, papszOptions);
707 208 : CPLPopErrorHandler();
708 208 : CSLDestroy(papszOptions);
709 :
710 208 : if (psResult->pszErrBuf != nullptr)
711 : {
712 35 : CPLString osErrorMsg(psResult->pabyData
713 : ? (const char *)psResult->pabyData
714 35 : : psResult->pszErrBuf);
715 35 : bool bSilence = false;
716 39 : for (auto nCode : anSilentedHTTPErrors)
717 : {
718 4 : if (strstr(psResult->pszErrBuf, CPLSPrintf("%d", nCode)))
719 : {
720 0 : bSilence = true;
721 0 : break;
722 : }
723 : }
724 35 : if (bSilence)
725 : {
726 0 : CPLDebug("ES", "%s", osErrorMsg.c_str());
727 : }
728 : else
729 : {
730 35 : CPLError(CE_Failure, CPLE_AppDefined, "%s", osErrorMsg.c_str());
731 : }
732 35 : CPLHTTPDestroyResult(psResult);
733 35 : return nullptr;
734 : }
735 :
736 173 : if (psResult->pabyData == nullptr)
737 : {
738 0 : CPLError(CE_Failure, CPLE_AppDefined,
739 : "Empty content returned by server");
740 0 : CPLHTTPDestroyResult(psResult);
741 0 : return nullptr;
742 : }
743 :
744 173 : if (STARTS_WITH((const char *)psResult->pabyData, "{\"error\":"))
745 : {
746 0 : CPLError(CE_Failure, CPLE_AppDefined, "%s",
747 0 : (const char *)psResult->pabyData);
748 0 : CPLHTTPDestroyResult(psResult);
749 0 : return nullptr;
750 : }
751 :
752 173 : json_object *poObj = nullptr;
753 173 : const char *pszText = reinterpret_cast<const char *>(psResult->pabyData);
754 173 : if (!OGRJSonParse(pszText, &poObj, true))
755 : {
756 0 : CPLHTTPDestroyResult(psResult);
757 0 : return nullptr;
758 : }
759 :
760 173 : CPLHTTPDestroyResult(psResult);
761 :
762 173 : if (json_object_get_type(poObj) != json_type_object)
763 : {
764 0 : CPLError(CE_Failure, CPLE_AppDefined,
765 : "Return is not a JSON dictionary");
766 0 : json_object_put(poObj);
767 0 : poObj = nullptr;
768 : }
769 :
770 173 : return poObj;
771 : }
772 :
773 : /************************************************************************/
774 : /* CheckVersion() */
775 : /************************************************************************/
776 :
777 40 : bool OGRElasticDataSource::CheckVersion()
778 : {
779 40 : json_object *poMainInfo = RunRequest(m_osURL);
780 40 : if (poMainInfo == nullptr)
781 3 : return false;
782 37 : bool bVersionFound = false;
783 37 : json_object *poVersion = CPL_json_object_object_get(poMainInfo, "version");
784 37 : if (poVersion != nullptr)
785 : {
786 33 : json_object *poNumber = CPL_json_object_object_get(poVersion, "number");
787 66 : if (poNumber != nullptr &&
788 33 : json_object_get_type(poNumber) == json_type_string)
789 : {
790 33 : bVersionFound = true;
791 33 : const char *pszVersion = json_object_get_string(poNumber);
792 33 : CPLDebug("ES", "Server version: %s", pszVersion);
793 33 : m_nMajorVersion = atoi(pszVersion);
794 33 : const char *pszDot = strchr(pszVersion, '.');
795 33 : if (pszDot)
796 33 : m_nMinorVersion = atoi(pszDot + 1);
797 : }
798 : }
799 37 : json_object_put(poMainInfo);
800 37 : if (!bVersionFound)
801 : {
802 4 : CPLError(CE_Failure, CPLE_AppDefined, "Server version not found");
803 4 : return false;
804 : }
805 33 : if (m_nMajorVersion < 1 || m_nMajorVersion > 7)
806 : {
807 0 : CPLDebug("ES", "Server version untested with current driver");
808 : }
809 33 : return true;
810 : }
811 :
812 : /************************************************************************/
813 : /* OpenAggregation() */
814 : /************************************************************************/
815 :
816 3 : bool OGRElasticDataSource::OpenAggregation(const char *pszAggregation)
817 : {
818 3 : m_bAllLayersListed = true;
819 : m_poAggregationLayer =
820 3 : OGRElasticAggregationLayer::Build(this, pszAggregation);
821 3 : return m_poAggregationLayer != nullptr;
822 : }
823 :
824 : /************************************************************************/
825 : /* Open() */
826 : /************************************************************************/
827 :
828 30 : bool OGRElasticDataSource::Open(GDALOpenInfo *poOpenInfo)
829 : {
830 30 : eAccess = poOpenInfo->eAccess;
831 30 : m_pszName = CPLStrdup(poOpenInfo->pszFilename);
832 30 : m_osURL = (STARTS_WITH_CI(m_pszName, "ES:")) ? m_pszName + 3 : m_pszName;
833 30 : if (m_osURL.empty())
834 : {
835 0 : const char *pszHost = CSLFetchNameValueDef(poOpenInfo->papszOpenOptions,
836 : "HOST", "localhost");
837 0 : m_osURL = pszHost;
838 0 : m_osURL += ":";
839 : const char *pszPort =
840 0 : CSLFetchNameValueDef(poOpenInfo->papszOpenOptions, "PORT", "9200");
841 0 : m_osURL += pszPort;
842 : }
843 : m_osUserPwd =
844 30 : CSLFetchNameValueDef(poOpenInfo->papszOpenOptions, "USERPWD", "");
845 30 : m_nBatchSize = atoi(CSLFetchNameValueDef(poOpenInfo->papszOpenOptions,
846 : "BATCH_SIZE", "100"));
847 60 : m_nFeatureCountToEstablishFeatureDefn = atoi(
848 30 : CSLFetchNameValueDef(poOpenInfo->papszOpenOptions,
849 : "FEATURE_COUNT_TO_ESTABLISH_FEATURE_DEFN", "100"));
850 30 : m_bJSonField =
851 30 : CPLFetchBool(poOpenInfo->papszOpenOptions, "JSON_FIELD", false);
852 60 : m_bFlattenNestedAttributes = CPLFetchBool(
853 30 : poOpenInfo->papszOpenOptions, "FLATTEN_NESTED_ATTRIBUTES", true);
854 : m_osFID =
855 30 : CSLFetchNameValueDef(poOpenInfo->papszOpenOptions, "FID", "ogc_fid");
856 :
857 : // Only used for wildcard layers
858 30 : m_bAddSourceIndexName = CPLFetchBool(poOpenInfo->papszOpenOptions,
859 : "ADD_SOURCE_INDEX_NAME", false);
860 :
861 : const char *pszHeadersFromEnv =
862 30 : CPLGetConfigOption("ES_FORWARD_HTTP_HEADERS_FROM_ENV",
863 30 : CSLFetchNameValue(poOpenInfo->papszOpenOptions,
864 : "FORWARD_HTTP_HEADERS_FROM_ENV"));
865 30 : if (pszHeadersFromEnv)
866 : {
867 2 : CPLStringList aosTokens(CSLTokenizeString2(pszHeadersFromEnv, ",", 0));
868 4 : for (int i = 0; i < aosTokens.size(); ++i)
869 : {
870 3 : char *pszKey = nullptr;
871 3 : const char *pszValue = CPLParseNameValue(aosTokens[i], &pszKey);
872 3 : if (pszKey && pszValue)
873 : {
874 3 : m_oMapHeadersFromEnv[pszKey] = pszValue;
875 : }
876 3 : CPLFree(pszKey);
877 : }
878 : }
879 :
880 30 : if (!CheckVersion())
881 5 : return false;
882 :
883 : const char *pszLayerName =
884 25 : CSLFetchNameValue(poOpenInfo->papszOpenOptions, "LAYER");
885 : const char *pszAggregation =
886 25 : CSLFetchNameValue(poOpenInfo->papszOpenOptions, "AGGREGATION");
887 25 : if (pszLayerName && pszAggregation)
888 : {
889 0 : CPLError(CE_Failure, CPLE_NotSupported,
890 : "LAYER and AGGREGATION open options are mutually exclusive");
891 0 : return false;
892 : }
893 :
894 25 : if (pszLayerName)
895 : {
896 2 : bool bFound = GetLayerByName(pszLayerName) != nullptr;
897 2 : m_bAllLayersListed = true;
898 2 : return bFound;
899 : }
900 :
901 23 : if (pszAggregation)
902 3 : return OpenAggregation(pszAggregation);
903 :
904 20 : return true;
905 : }
906 :
907 : /************************************************************************/
908 : /* Delete() */
909 : /************************************************************************/
910 :
911 3 : void OGRElasticDataSource::Delete(const CPLString &url)
912 : {
913 3 : char **papszOptions = nullptr;
914 3 : papszOptions = CSLAddNameValue(papszOptions, "CUSTOMREQUEST", "DELETE");
915 3 : CPLHTTPResult *psResult = HTTPFetch(url, papszOptions);
916 3 : CSLDestroy(papszOptions);
917 3 : if (psResult)
918 : {
919 3 : CPLHTTPDestroyResult(psResult);
920 : }
921 3 : }
922 :
923 : /************************************************************************/
924 : /* UploadFile() */
925 : /************************************************************************/
926 :
927 48 : bool OGRElasticDataSource::UploadFile(const CPLString &url,
928 : const CPLString &data,
929 : const CPLString &osVerb)
930 : {
931 48 : bool bRet = true;
932 48 : char **papszOptions = nullptr;
933 48 : if (!osVerb.empty())
934 : {
935 : papszOptions =
936 23 : CSLAddNameValue(papszOptions, "CUSTOMREQUEST", osVerb.c_str());
937 : }
938 48 : if (data.empty())
939 : {
940 19 : if (osVerb.empty())
941 : {
942 : papszOptions =
943 0 : CSLAddNameValue(papszOptions, "CUSTOMREQUEST", "PUT");
944 : }
945 : }
946 : else
947 : {
948 : papszOptions =
949 29 : CSLAddNameValue(papszOptions, "POSTFIELDS", data.c_str());
950 : papszOptions =
951 29 : CSLAddNameValue(papszOptions, "HEADERS",
952 : "Content-Type: application/json; charset=UTF-8");
953 : }
954 :
955 48 : CPLHTTPResult *psResult = HTTPFetch(url, papszOptions);
956 48 : CSLDestroy(papszOptions);
957 48 : if (psResult)
958 : {
959 48 : if (psResult->pszErrBuf != nullptr ||
960 44 : (psResult->pabyData &&
961 42 : STARTS_WITH((const char *)psResult->pabyData, "{\"error\":")) ||
962 44 : (psResult->pabyData && strstr((const char *)psResult->pabyData,
963 : "\"errors\":true,") != nullptr))
964 : {
965 4 : bRet = false;
966 4 : CPLError(CE_Failure, CPLE_AppDefined, "%s",
967 4 : psResult->pabyData ? (const char *)psResult->pabyData
968 : : psResult->pszErrBuf);
969 : }
970 48 : CPLHTTPDestroyResult(psResult);
971 : }
972 48 : return bRet;
973 : }
974 :
975 : /************************************************************************/
976 : /* Create() */
977 : /************************************************************************/
978 :
979 10 : int OGRElasticDataSource::Create(const char *pszFilename,
980 : CPL_UNUSED char **papszOptions)
981 : {
982 10 : eAccess = GA_Update;
983 10 : m_pszName = CPLStrdup(pszFilename);
984 : m_osURL =
985 10 : (STARTS_WITH_CI(pszFilename, "ES:")) ? pszFilename + 3 : pszFilename;
986 10 : if (m_osURL.empty())
987 0 : m_osURL = "localhost:9200";
988 :
989 10 : const char *pszMetaFile = CPLGetConfigOption("ES_META", nullptr);
990 10 : m_bOverwrite = CPLTestBool(CPLGetConfigOption("ES_OVERWRITE", "0"));
991 : // coverity[tainted_data]
992 10 : m_nBulkUpload = (int)CPLAtof(CPLGetConfigOption("ES_BULK", "0"));
993 :
994 : // Read in the meta file from disk
995 10 : if (pszMetaFile != nullptr)
996 : {
997 0 : VSILFILE *fp = VSIFOpenL(pszMetaFile, "rb");
998 0 : if (fp)
999 : {
1000 0 : GByte *pabyRet = nullptr;
1001 0 : CPL_IGNORE_RET_VAL(
1002 0 : VSIIngestFile(fp, pszMetaFile, &pabyRet, nullptr, -1));
1003 0 : if (pabyRet)
1004 : {
1005 0 : m_pszMapping = (char *)pabyRet;
1006 : }
1007 0 : VSIFCloseL(fp);
1008 : }
1009 : }
1010 :
1011 10 : return CheckVersion();
1012 : }
1013 :
1014 : /************************************************************************/
1015 : /* GetLayerIndex() */
1016 : /************************************************************************/
1017 :
1018 4 : int OGRElasticDataSource::GetLayerIndex(const char *pszName)
1019 : {
1020 4 : GetLayerCount();
1021 4 : for (int i = 0; i < static_cast<int>(m_apoLayers.size()); ++i)
1022 : {
1023 4 : if (strcmp(m_apoLayers[i]->GetName(), pszName) == 0)
1024 4 : return i;
1025 : }
1026 0 : for (int i = 0; i < static_cast<int>(m_apoLayers.size()); ++i)
1027 : {
1028 0 : if (EQUAL(m_apoLayers[i]->GetName(), pszName))
1029 0 : return i;
1030 : }
1031 0 : return -1;
1032 : }
1033 :
1034 : /************************************************************************/
1035 : /* ExecuteSQL() */
1036 : /************************************************************************/
1037 :
1038 7 : OGRLayer *OGRElasticDataSource::ExecuteSQL(const char *pszSQLCommand,
1039 : OGRGeometry *poSpatialFilter,
1040 : const char *pszDialect)
1041 : {
1042 7 : GetLayerCount();
1043 13 : for (auto &poLayer : m_apoLayers)
1044 : {
1045 6 : poLayer->SyncToDisk();
1046 : }
1047 :
1048 : /* -------------------------------------------------------------------- */
1049 : /* Special case DELLAYER: command. */
1050 : /* -------------------------------------------------------------------- */
1051 7 : if (STARTS_WITH_CI(pszSQLCommand, "DELLAYER:"))
1052 : {
1053 0 : const char *pszLayerName = pszSQLCommand + 9;
1054 :
1055 0 : while (*pszLayerName == ' ')
1056 0 : pszLayerName++;
1057 :
1058 0 : for (int iLayer = 0; iLayer < static_cast<int>(m_apoLayers.size());
1059 : iLayer++)
1060 : {
1061 0 : if (EQUAL(m_apoLayers[iLayer]->GetName(), pszLayerName))
1062 : {
1063 0 : DeleteLayer(iLayer);
1064 0 : break;
1065 : }
1066 : }
1067 0 : return nullptr;
1068 : }
1069 :
1070 7 : if (pszDialect != nullptr && EQUAL(pszDialect, "ES"))
1071 : {
1072 : return new OGRElasticLayer("RESULT", nullptr, nullptr, this,
1073 3 : papszOpenOptions, pszSQLCommand);
1074 : }
1075 :
1076 : /* -------------------------------------------------------------------- */
1077 : /* Deal with "SELECT xxxx ORDER BY" statement */
1078 : /* -------------------------------------------------------------------- */
1079 4 : if (STARTS_WITH_CI(pszSQLCommand, "SELECT"))
1080 : {
1081 4 : swq_select *psSelectInfo = new swq_select();
1082 4 : if (psSelectInfo->preparse(pszSQLCommand, TRUE) != CE_None)
1083 : {
1084 0 : delete psSelectInfo;
1085 0 : return nullptr;
1086 : }
1087 :
1088 4 : int iLayer = 0;
1089 12 : if (psSelectInfo->table_count == 1 &&
1090 4 : psSelectInfo->table_defs[0].data_source == nullptr &&
1091 4 : (iLayer = GetLayerIndex(psSelectInfo->table_defs[0].table_name)) >=
1092 4 : 0 &&
1093 11 : psSelectInfo->join_count == 0 && psSelectInfo->order_specs > 0 &&
1094 3 : psSelectInfo->poOtherSelect == nullptr)
1095 : {
1096 3 : OGRElasticLayer *poSrcLayer = m_apoLayers[iLayer].get();
1097 3 : std::vector<OGRESSortDesc> aoSortColumns;
1098 3 : int i = 0; // Used after for.
1099 8 : for (; i < psSelectInfo->order_specs; i++)
1100 : {
1101 5 : int nFieldIndex = poSrcLayer->GetLayerDefn()->GetFieldIndex(
1102 5 : psSelectInfo->order_defs[i].field_name);
1103 5 : if (nFieldIndex < 0)
1104 0 : break;
1105 :
1106 : /* Make sure to have the right case */
1107 5 : const char *pszFieldName = poSrcLayer->GetLayerDefn()
1108 5 : ->GetFieldDefn(nFieldIndex)
1109 5 : ->GetNameRef();
1110 :
1111 : OGRESSortDesc oSortDesc(
1112 : pszFieldName,
1113 10 : CPL_TO_BOOL(psSelectInfo->order_defs[i].ascending_flag));
1114 5 : aoSortColumns.push_back(oSortDesc);
1115 : }
1116 :
1117 3 : if (i == psSelectInfo->order_specs)
1118 : {
1119 3 : OGRElasticLayer *poDupLayer = poSrcLayer->Clone();
1120 :
1121 3 : poDupLayer->SetOrderBy(aoSortColumns);
1122 3 : int nBackup = psSelectInfo->order_specs;
1123 3 : psSelectInfo->order_specs = 0;
1124 3 : char *pszSQLWithoutOrderBy = psSelectInfo->Unparse();
1125 3 : CPLDebug("ES", "SQL without ORDER BY: %s",
1126 : pszSQLWithoutOrderBy);
1127 3 : psSelectInfo->order_specs = nBackup;
1128 3 : delete psSelectInfo;
1129 3 : psSelectInfo = nullptr;
1130 :
1131 : /* Just set poDupLayer in the papoLayers for the time of the */
1132 : /* base ExecuteSQL(), so that the OGRGenSQLResultsLayer */
1133 : /* references that temporary layer */
1134 3 : m_apoLayers[iLayer].release();
1135 3 : m_apoLayers[iLayer].reset(poDupLayer);
1136 :
1137 3 : OGRLayer *poResLayer = GDALDataset::ExecuteSQL(
1138 3 : pszSQLWithoutOrderBy, poSpatialFilter, pszDialect);
1139 3 : m_apoLayers[iLayer].release();
1140 3 : m_apoLayers[iLayer].reset(poSrcLayer);
1141 :
1142 3 : CPLFree(pszSQLWithoutOrderBy);
1143 :
1144 3 : if (poResLayer != nullptr)
1145 3 : m_oMapResultSet[poResLayer] = poDupLayer;
1146 : else
1147 0 : delete poDupLayer;
1148 3 : return poResLayer;
1149 : }
1150 : }
1151 1 : delete psSelectInfo;
1152 : }
1153 :
1154 1 : return GDALDataset::ExecuteSQL(pszSQLCommand, poSpatialFilter, pszDialect);
1155 : }
1156 :
1157 : /************************************************************************/
1158 : /* ReleaseResultSet() */
1159 : /************************************************************************/
1160 :
1161 7 : void OGRElasticDataSource::ReleaseResultSet(OGRLayer *poResultsSet)
1162 : {
1163 7 : if (poResultsSet == nullptr)
1164 0 : return;
1165 :
1166 : std::map<OGRLayer *, OGRLayer *>::iterator oIter =
1167 7 : m_oMapResultSet.find(poResultsSet);
1168 7 : if (oIter != m_oMapResultSet.end())
1169 : {
1170 : /* Destroy first the result layer, because it still references */
1171 : /* the poDupLayer (oIter->second) */
1172 3 : delete poResultsSet;
1173 :
1174 3 : delete oIter->second;
1175 3 : m_oMapResultSet.erase(oIter);
1176 : }
1177 : else
1178 : {
1179 4 : delete poResultsSet;
1180 : }
1181 : }
|