Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: GDAL
4 : * Purpose: Arrow Database Connectivity driver
5 : * Author: Even Rouault, <even dot rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2024, Even Rouault <even dot rouault at spatialys.com>
9 : * Copyright (c) 2024, Dewey Dunnington <dewey@voltrondata.com>
10 : *
11 : * SPDX-License-Identifier: MIT
12 : ****************************************************************************/
13 :
14 : #include "ogr_adbc.h"
15 : #include "ogradbcdrivercore.h"
16 : #include "ogr_mem.h"
17 : #include "ogr_p.h"
18 : #include "cpl_json.h"
19 : #include "gdal_adbc.h"
20 :
21 : #if defined(OGR_ADBC_HAS_DRIVER_MANAGER)
22 : #include <arrow-adbc/adbc_driver_manager.h>
23 : #endif
24 :
25 : #define OGR_ADBC_VERSION ADBC_VERSION_1_1_0
26 : static_assert(sizeof(AdbcDriver) == ADBC_DRIVER_1_1_0_SIZE);
27 :
28 : namespace
29 : {
30 :
31 0 : AdbcStatusCode OGRADBCLoadDriver(const char *driver_name,
32 : const char *entrypoint, void *driver,
33 : struct AdbcError *error)
34 : {
35 : GDALAdbcLoadDriverFunc load_driver_override =
36 0 : GDALGetAdbcLoadDriverOverride();
37 0 : if (load_driver_override)
38 : {
39 0 : return load_driver_override(driver_name, entrypoint, OGR_ADBC_VERSION,
40 0 : driver, error);
41 : }
42 : else
43 : {
44 : #if defined(OGR_ADBC_HAS_DRIVER_MANAGER)
45 : return AdbcLoadDriver(driver_name, entrypoint, OGR_ADBC_VERSION, driver,
46 : error);
47 : #else
48 0 : return ADBC_STATUS_NOT_IMPLEMENTED;
49 : #endif
50 : }
51 : }
52 :
53 : } // namespace
54 :
55 : // Helper to wrap driver callbacks
56 : #define ADBC_CALL(func, ...) m_driver.func(__VA_ARGS__)
57 :
58 : /************************************************************************/
59 : /* ~OGRADBCDataset() */
60 : /************************************************************************/
61 :
62 0 : OGRADBCDataset::~OGRADBCDataset()
63 : {
64 : // Layers must be closed before the connection
65 0 : m_apoLayers.clear();
66 0 : OGRADBCError error;
67 0 : if (m_connection)
68 0 : ADBC_CALL(ConnectionRelease, m_connection.get(), error);
69 0 : error.clear();
70 0 : if (m_driver.release)
71 : {
72 0 : ADBC_CALL(DatabaseRelease, &m_database, error);
73 0 : m_driver.release(&m_driver, error);
74 : }
75 0 : }
76 :
77 : /************************************************************************/
78 : /* CreateLayer() */
79 : /************************************************************************/
80 :
81 : std::unique_ptr<OGRADBCLayer>
82 0 : OGRADBCDataset::CreateLayer(const char *pszStatement, const char *pszLayerName)
83 : {
84 :
85 0 : OGRADBCError error;
86 :
87 0 : CPLString osStatement(pszStatement);
88 0 : if (!m_osParquetFilename.empty())
89 : {
90 0 : const char *pszSrcLayerName = m_apoLayers.size() == 1
91 0 : ? m_apoLayers[0]->GetDescription()
92 0 : : pszLayerName;
93 : // Substitute the OGR layer name with the DuckDB expected filename,
94 : // single-quoted
95 : const std::string osFrom =
96 0 : std::string(" FROM ").append(pszSrcLayerName);
97 0 : const auto nPos = osStatement.ifind(osFrom);
98 0 : if (nPos != std::string::npos)
99 : {
100 : osStatement =
101 0 : osStatement.substr(0, nPos)
102 0 : .append(" FROM '")
103 0 : .append(OGRDuplicateCharacter(m_osParquetFilename, '\''))
104 0 : .append("'")
105 0 : .append(osStatement.substr(nPos + osFrom.size()));
106 : }
107 : else
108 : {
109 : const std::string osFrom2 =
110 0 : std::string(" FROM \"")
111 0 : .append(OGRDuplicateCharacter(pszSrcLayerName, '"'))
112 0 : .append("\"");
113 0 : const auto nPos2 = osStatement.ifind(osFrom2);
114 0 : if (nPos2 != std::string::npos)
115 : {
116 : osStatement =
117 0 : osStatement.substr(0, nPos2)
118 0 : .append(" FROM '")
119 : .append(
120 0 : OGRDuplicateCharacter(m_osParquetFilename, '\''))
121 0 : .append("'")
122 0 : .append(osStatement.substr(nPos2 + osFrom2.size()));
123 : }
124 : }
125 : }
126 :
127 0 : auto statement = std::make_unique<AdbcStatement>();
128 0 : if (ADBC_CALL(StatementNew, m_connection.get(), statement.get(), error) !=
129 : ADBC_STATUS_OK)
130 : {
131 0 : CPLError(CE_Failure, CPLE_AppDefined, "AdbcStatementNew() failed: %s",
132 : error.message());
133 0 : return nullptr;
134 : }
135 :
136 0 : if (ADBC_CALL(StatementSetSqlQuery, statement.get(), osStatement.c_str(),
137 0 : error) != ADBC_STATUS_OK)
138 : {
139 0 : CPLError(CE_Failure, CPLE_AppDefined,
140 : "AdbcStatementSetSqlQuery() failed: %s", error.message());
141 0 : error.clear();
142 0 : ADBC_CALL(StatementRelease, statement.get(), error);
143 0 : return nullptr;
144 : }
145 :
146 0 : auto stream = std::make_unique<OGRArrowArrayStream>();
147 0 : int64_t rows_affected = -1;
148 0 : if (ADBC_CALL(StatementExecuteQuery, statement.get(), stream->get(),
149 0 : &rows_affected, error) != ADBC_STATUS_OK)
150 : {
151 0 : CPLError(CE_Failure, CPLE_AppDefined,
152 : "AdbcStatementExecuteQuery() failed: %s", error.message());
153 0 : error.clear();
154 0 : ADBC_CALL(StatementRelease, statement.get(), error);
155 0 : return nullptr;
156 : }
157 :
158 0 : ArrowSchema schema = {};
159 0 : if (stream->get_schema(&schema) != 0)
160 : {
161 0 : CPLError(CE_Failure, CPLE_AppDefined, "get_schema() failed");
162 0 : ADBC_CALL(StatementRelease, statement.get(), error);
163 0 : return nullptr;
164 : }
165 :
166 : return std::make_unique<OGRADBCLayer>(
167 0 : this, pszLayerName, std::move(statement), std::move(stream), &schema);
168 : }
169 :
170 : /************************************************************************/
171 : /* ExecuteSQL() */
172 : /************************************************************************/
173 :
174 0 : OGRLayer *OGRADBCDataset::ExecuteSQL(const char *pszStatement,
175 : OGRGeometry *poSpatialFilter,
176 : const char *pszDialect)
177 : {
178 0 : if (pszDialect && pszDialect[0] != 0 && !EQUAL(pszDialect, "NATIVE"))
179 : {
180 0 : return GDALDataset::ExecuteSQL(pszStatement, poSpatialFilter,
181 0 : pszDialect);
182 : }
183 :
184 0 : auto poLayer = CreateLayer(pszStatement, "RESULTSET");
185 0 : if (poLayer && poSpatialFilter)
186 : {
187 0 : if (poLayer->GetGeomType() == wkbNone)
188 0 : return nullptr;
189 0 : poLayer->SetSpatialFilter(poSpatialFilter);
190 : }
191 0 : return poLayer.release();
192 : }
193 :
194 : /************************************************************************/
195 : /* Open() */
196 : /************************************************************************/
197 :
198 0 : bool OGRADBCDataset::Open(const GDALOpenInfo *poOpenInfo)
199 : {
200 0 : OGRADBCError error;
201 :
202 0 : const char *pszFilename = poOpenInfo->pszFilename;
203 0 : std::unique_ptr<GDALOpenInfo> poTmpOpenInfo;
204 0 : if (STARTS_WITH(pszFilename, "ADBC:"))
205 : {
206 0 : pszFilename += strlen("ADBC:");
207 : poTmpOpenInfo =
208 0 : std::make_unique<GDALOpenInfo>(pszFilename, GA_ReadOnly);
209 0 : poTmpOpenInfo->papszOpenOptions = poOpenInfo->papszOpenOptions;
210 0 : poOpenInfo = poTmpOpenInfo.get();
211 : }
212 : const char *pszADBCDriverName =
213 0 : CSLFetchNameValue(poOpenInfo->papszOpenOptions, "ADBC_DRIVER");
214 0 : const bool bIsDuckDB = OGRADBCDriverIsDuckDB(poOpenInfo);
215 : const bool bIsSQLite3 =
216 0 : (pszADBCDriverName && EQUAL(pszADBCDriverName, "adbc_driver_sqlite")) ||
217 0 : OGRADBCDriverIsSQLite3(poOpenInfo);
218 0 : const bool bIsParquet = OGRADBCDriverIsParquet(poOpenInfo) ||
219 0 : EQUAL(CPLGetExtension(pszFilename), "parquet");
220 0 : const bool bIsPostgreSQL = STARTS_WITH(pszFilename, "postgresql://");
221 :
222 0 : if (!pszADBCDriverName)
223 : {
224 0 : if (bIsDuckDB || bIsParquet)
225 : {
226 0 : pszADBCDriverName =
227 : #ifdef _WIN32
228 : "duckdb.dll"
229 : #elif defined(__MACH__) && defined(__APPLE__)
230 : "libduckdb.dylib"
231 : #else
232 : "libduckdb.so"
233 : #endif
234 : ;
235 : }
236 0 : else if (bIsPostgreSQL)
237 0 : pszADBCDriverName = "adbc_driver_postgresql";
238 0 : else if (bIsSQLite3)
239 : {
240 0 : pszADBCDriverName = "adbc_driver_sqlite";
241 : }
242 : else
243 : {
244 0 : CPLError(CE_Failure, CPLE_AppDefined,
245 : "Open option ADBC_DRIVER must be specified");
246 0 : return false;
247 : }
248 : }
249 :
250 : // Load the driver
251 0 : if (pszADBCDriverName &&
252 0 : (bIsDuckDB || bIsParquet || strstr(pszADBCDriverName, "duckdb")))
253 : {
254 0 : if (OGRADBCLoadDriver(pszADBCDriverName, "duckdb_adbc_init", &m_driver,
255 0 : error) != ADBC_STATUS_OK)
256 : {
257 0 : CPLError(CE_Failure, CPLE_AppDefined, "AdbcLoadDriver() failed: %s",
258 : error.message());
259 0 : return false;
260 : }
261 : }
262 : else
263 : {
264 0 : if (OGRADBCLoadDriver(pszADBCDriverName, nullptr, &m_driver, error) !=
265 : ADBC_STATUS_OK)
266 : {
267 0 : CPLError(CE_Failure, CPLE_AppDefined, "AdbcLoadDriver() failed: %s",
268 : error.message());
269 0 : return false;
270 : }
271 : }
272 :
273 : // Allocate the database
274 0 : if (ADBC_CALL(DatabaseNew, &m_database, error) != ADBC_STATUS_OK)
275 : {
276 0 : CPLError(CE_Failure, CPLE_AppDefined, "AdbcDatabaseNew() failed: %s",
277 : error.message());
278 0 : return false;
279 : }
280 :
281 : // Set options
282 0 : if (pszADBCDriverName &&
283 0 : (bIsDuckDB || bIsParquet || strstr(pszADBCDriverName, "duckdb")))
284 : {
285 0 : if (ADBC_CALL(DatabaseSetOption, &m_database, "path",
286 : bIsParquet ? ":memory:" : pszFilename,
287 0 : error) != ADBC_STATUS_OK)
288 : {
289 0 : CPLError(CE_Failure, CPLE_AppDefined,
290 : "AdbcDatabaseSetOption() failed: %s", error.message());
291 0 : return false;
292 : }
293 : }
294 0 : else if (pszFilename[0])
295 : {
296 0 : if (ADBC_CALL(DatabaseSetOption, &m_database, "uri", pszFilename,
297 0 : error) != ADBC_STATUS_OK)
298 : {
299 0 : CPLError(CE_Failure, CPLE_AppDefined,
300 : "AdbcDatabaseSetOption() failed: %s", error.message());
301 0 : return false;
302 : }
303 : }
304 :
305 0 : for (const auto &[pszKey, pszValue] : cpl::IterateNameValue(
306 0 : static_cast<CSLConstList>(poOpenInfo->papszOpenOptions)))
307 : {
308 0 : if (STARTS_WITH_CI(pszKey, "ADBC_OPTION_"))
309 : {
310 0 : if (ADBC_CALL(DatabaseSetOption, &m_database,
311 : pszKey + strlen("ADBC_OPTION_"), pszValue,
312 0 : error) != ADBC_STATUS_OK)
313 : {
314 0 : CPLError(CE_Failure, CPLE_AppDefined,
315 : "AdbcDatabaseSetOption() failed: %s", error.message());
316 0 : return false;
317 : }
318 : }
319 : }
320 :
321 0 : if (ADBC_CALL(DatabaseInit, &m_database, error) != ADBC_STATUS_OK)
322 : {
323 0 : CPLError(CE_Failure, CPLE_AppDefined, "AdbcDatabaseInit() failed: %s",
324 : error.message());
325 0 : return false;
326 : }
327 :
328 0 : m_connection = std::make_unique<AdbcConnection>();
329 0 : if (ADBC_CALL(ConnectionNew, m_connection.get(), error) != ADBC_STATUS_OK)
330 : {
331 0 : CPLError(CE_Failure, CPLE_AppDefined, "AdbcConnectionNew() failed: %s",
332 : error.message());
333 0 : return false;
334 : }
335 :
336 0 : if (ADBC_CALL(ConnectionInit, m_connection.get(), &m_database, error) !=
337 : ADBC_STATUS_OK)
338 : {
339 0 : CPLError(CE_Failure, CPLE_AppDefined, "AdbcConnectionInit() failed: %s",
340 : error.message());
341 0 : return false;
342 : }
343 :
344 0 : char **papszPreludeStatements = CSLFetchNameValueMultiple(
345 0 : poOpenInfo->papszOpenOptions, "PRELUDE_STATEMENTS");
346 0 : for (const char *pszStatement :
347 0 : cpl::Iterate(CSLConstList(papszPreludeStatements)))
348 : {
349 0 : CreateLayer(pszStatement, "temp");
350 : }
351 0 : CSLDestroy(papszPreludeStatements);
352 :
353 0 : std::string osLayerName = "RESULTSET";
354 0 : std::string osSQL;
355 0 : const char *pszSQL = CSLFetchNameValue(poOpenInfo->papszOpenOptions, "SQL");
356 0 : bool bIsParquetLayer = false;
357 0 : if (bIsParquet)
358 : {
359 0 : m_osParquetFilename = pszFilename;
360 0 : osLayerName = CPLGetBasename(pszFilename);
361 0 : if (osLayerName == "*")
362 0 : osLayerName = CPLGetBasename(CPLGetDirname(pszFilename));
363 0 : if (!pszSQL)
364 : {
365 : osSQL =
366 : CPLSPrintf("SELECT * FROM '%s'",
367 0 : OGRDuplicateCharacter(pszFilename, '\'').c_str());
368 0 : pszSQL = osSQL.c_str();
369 0 : bIsParquetLayer = true;
370 : }
371 : }
372 :
373 0 : if (pszSQL)
374 : {
375 0 : if (pszSQL[0])
376 : {
377 0 : auto poLayer = CreateLayer(pszSQL, osLayerName.c_str());
378 0 : if (!poLayer)
379 0 : return false;
380 0 : poLayer->m_bIsParquetLayer = bIsParquetLayer;
381 0 : m_apoLayers.emplace_back(std::move(poLayer));
382 : }
383 : }
384 0 : else if (bIsDuckDB || bIsSQLite3)
385 : {
386 : auto poLayerList = CreateLayer(
387 : "SELECT name FROM sqlite_master WHERE type IN ('table', 'view')",
388 0 : "LAYERLIST");
389 0 : if (!poLayerList || poLayerList->GetLayerDefn()->GetFieldCount() != 1)
390 : {
391 0 : return false;
392 : }
393 :
394 0 : for (const auto &poFeature : poLayerList.get())
395 : {
396 0 : const char *pszLayerName = poFeature->GetFieldAsString(0);
397 0 : if (bIsSQLite3 && EQUAL(pszLayerName, "SpatialIndex"))
398 0 : continue;
399 : const std::string osStatement =
400 : CPLSPrintf("SELECT * FROM \"%s\"",
401 0 : OGRDuplicateCharacter(pszLayerName, '"').c_str());
402 0 : CPLTurnFailureIntoWarning(true);
403 0 : auto poLayer = CreateLayer(osStatement.c_str(), pszLayerName);
404 0 : CPLTurnFailureIntoWarning(false);
405 0 : if (poLayer)
406 : {
407 0 : m_apoLayers.emplace_back(std::move(poLayer));
408 : }
409 0 : }
410 : }
411 0 : else if (bIsPostgreSQL)
412 : {
413 : auto poLayerList = CreateLayer(
414 : "SELECT n.nspname, c.relname FROM pg_class c "
415 : "JOIN pg_namespace n ON c.relnamespace = n.oid "
416 : "AND c.relkind in ('r','v','m','f') "
417 : "AND n.nspname NOT IN ('pg_catalog', 'information_schema') "
418 : "ORDER BY c.oid",
419 0 : "LAYERLIST");
420 0 : if (!poLayerList || poLayerList->GetLayerDefn()->GetFieldCount() != 2)
421 : {
422 0 : return false;
423 : }
424 :
425 0 : for (const auto &poFeature : poLayerList.get())
426 : {
427 0 : const char *pszNamespace = poFeature->GetFieldAsString(0);
428 0 : const char *pszTableName = poFeature->GetFieldAsString(1);
429 : const std::string osStatement =
430 : CPLSPrintf("SELECT * FROM \"%s\".\"%s\"",
431 0 : OGRDuplicateCharacter(pszNamespace, '"').c_str(),
432 0 : OGRDuplicateCharacter(pszTableName, '"').c_str());
433 :
434 0 : CPLTurnFailureIntoWarning(true);
435 : auto poLayer =
436 : CreateLayer(osStatement.c_str(),
437 0 : CPLSPrintf("%s.%s", pszNamespace, pszTableName));
438 0 : CPLTurnFailureIntoWarning(false);
439 0 : if (poLayer)
440 : {
441 0 : m_apoLayers.emplace_back(std::move(poLayer));
442 : }
443 : }
444 : }
445 :
446 0 : return true;
447 : }
448 :
449 : /************************************************************************/
450 : /* GetLayerByName() */
451 : /************************************************************************/
452 :
453 0 : OGRLayer *OGRADBCDataset::GetLayerByName(const char *pszName)
454 : {
455 0 : OGRLayer *poLayer = GDALDataset::GetLayerByName(pszName);
456 0 : if (poLayer || !EQUAL(pszName, "table_list"))
457 0 : return poLayer;
458 :
459 0 : OGRADBCError error;
460 0 : auto objectsStream = std::make_unique<OGRArrowArrayStream>();
461 0 : ADBC_CALL(ConnectionGetObjects, m_connection.get(),
462 : ADBC_OBJECT_DEPTH_TABLES,
463 : /* catalog = */ nullptr,
464 : /* db_schema = */ nullptr,
465 : /* table_name = */ nullptr,
466 : /* table_type = */ nullptr,
467 : /* column_name = */ nullptr, objectsStream->get(), error);
468 :
469 0 : ArrowSchema schema = {};
470 0 : if (objectsStream->get_schema(&schema) != 0)
471 : {
472 0 : CPLError(CE_Failure, CPLE_AppDefined, "get_schema() failed");
473 0 : return nullptr;
474 : }
475 :
476 0 : auto statement = std::make_unique<AdbcStatement>();
477 0 : OGRADBCLayer tmpLayer(this, "", std::move(statement),
478 0 : std::move(objectsStream), &schema);
479 0 : const auto tmpLayerDefn = tmpLayer.GetLayerDefn();
480 0 : if (tmpLayerDefn->GetFieldIndex("catalog_name") < 0 ||
481 0 : tmpLayerDefn->GetFieldIndex("catalog_db_schemas") < 0)
482 : {
483 0 : return nullptr;
484 : }
485 :
486 : auto poTableListLayer =
487 0 : std::make_unique<OGRMemLayer>("table_list", nullptr, wkbNone);
488 : {
489 0 : OGRFieldDefn oField("catalog_name", OFTString);
490 0 : poTableListLayer->CreateField(&oField);
491 : }
492 : {
493 0 : OGRFieldDefn oField("schema_name", OFTString);
494 0 : poTableListLayer->CreateField(&oField);
495 : }
496 : {
497 0 : OGRFieldDefn oField("table_name", OFTString);
498 0 : poTableListLayer->CreateField(&oField);
499 : }
500 : {
501 0 : OGRFieldDefn oField("table_type", OFTString);
502 0 : poTableListLayer->CreateField(&oField);
503 : }
504 :
505 0 : for (const auto &poFeature : tmpLayer)
506 : {
507 : const char *pszCatalogName =
508 0 : poFeature->GetFieldAsString("catalog_name");
509 : const char *pszCatalogDBSchemas =
510 0 : poFeature->GetFieldAsString("catalog_db_schemas");
511 0 : if (pszCatalogDBSchemas)
512 : {
513 0 : CPLJSONDocument oDoc;
514 0 : if (oDoc.LoadMemory(pszCatalogDBSchemas))
515 : {
516 0 : auto oRoot = oDoc.GetRoot();
517 0 : if (oRoot.GetType() == CPLJSONObject::Type::Array)
518 : {
519 0 : for (const auto &oSchema : oRoot.ToArray())
520 : {
521 0 : if (oSchema.GetType() == CPLJSONObject::Type::Object)
522 : {
523 : const std::string osSchemaName =
524 0 : oSchema.GetString("schema_name");
525 : const auto oTables =
526 0 : oSchema.GetArray("db_schema_tables");
527 0 : if (oTables.IsValid())
528 : {
529 0 : for (const auto &oTable : oTables)
530 : {
531 0 : if (oTable.GetType() ==
532 : CPLJSONObject::Type::Object)
533 : {
534 : const std::string osTableName =
535 0 : oTable.GetString("table_name");
536 : const std::string osTableType =
537 0 : oTable.GetString("table_type");
538 0 : if (!osTableName.empty() &&
539 0 : osTableType != "index" &&
540 0 : osTableType != "trigger")
541 : {
542 : OGRFeature oFeat(
543 : poTableListLayer
544 0 : ->GetLayerDefn());
545 0 : if (pszCatalogName)
546 0 : oFeat.SetField("catalog_name",
547 : pszCatalogName);
548 0 : if (oSchema.GetObj("schema_name")
549 0 : .IsValid())
550 0 : oFeat.SetField(
551 : "schema_name",
552 : osSchemaName.c_str());
553 0 : oFeat.SetField("table_name",
554 : osTableName.c_str());
555 0 : if (oTable.GetObj("table_type")
556 0 : .IsValid())
557 0 : oFeat.SetField(
558 : "table_type",
559 : osTableType.c_str());
560 0 : CPL_IGNORE_RET_VAL(
561 0 : poTableListLayer->CreateFeature(
562 : &oFeat));
563 : }
564 : }
565 : }
566 : }
567 : }
568 : }
569 : }
570 : }
571 : }
572 : }
573 :
574 0 : m_apoLayers.emplace_back(std::move(poTableListLayer));
575 0 : return m_apoLayers.back().get();
576 : }
577 :
578 : #undef ADBC_CALL
|