Line data Source code
1 : /****************************************************************************** 2 : * 3 : * Project: GDAL 4 : * Purpose: Arrow Database Connectivity driver 5 : * Author: Even Rouault, <even dot rouault at spatialys.com> 6 : * 7 : ****************************************************************************** 8 : * Copyright (c) 2024, Even Rouault <even dot rouault at spatialys.com> 9 : * Copyright (c) 2024, Dewey Dunnington <dewey@voltrondata.com> 10 : * 11 : * SPDX-License-Identifier: MIT 12 : ****************************************************************************/ 13 : 14 : #include "ogr_adbc.h" 15 : #include "ogr_p.h" 16 : 17 : #define ADBC_CALL(func, ...) m_poDS->m_driver.func(__VA_ARGS__) 18 : 19 : /************************************************************************/ 20 : /* OGRADBCLayer() */ 21 : /************************************************************************/ 22 : 23 0 : OGRADBCLayer::OGRADBCLayer(OGRADBCDataset *poDS, const char *pszName, 24 : std::unique_ptr<AdbcStatement> poStatement, 25 : std::unique_ptr<OGRArrowArrayStream> poStream, 26 0 : ArrowSchema *schema) 27 0 : : m_poDS(poDS), m_statement(std::move(poStatement)), 28 0 : m_stream(std::move(poStream)) 29 : { 30 0 : SetDescription(pszName); 31 : 32 0 : memcpy(&m_schema, schema, sizeof(m_schema)); 33 0 : schema->release = nullptr; 34 : 35 : m_poAdapterLayer = 36 0 : std::make_unique<OGRArrowArrayToOGRFeatureAdapterLayer>(pszName); 37 0 : for (int i = 0; i < m_schema.n_children; ++i) 38 : { 39 0 : m_poAdapterLayer->CreateFieldFromArrowSchema(m_schema.children[i]); 40 : } 41 0 : } 42 : 43 : /************************************************************************/ 44 : /* ~OGRADBCLayer() */ 45 : /************************************************************************/ 46 : 47 0 : OGRADBCLayer::~OGRADBCLayer() 48 : { 49 0 : OGRADBCError error; 50 0 : if (m_statement) 51 0 : ADBC_CALL(StatementRelease, m_statement.get(), error); 52 0 : if (m_schema.release) 53 0 : m_schema.release(&m_schema); 54 0 : } 55 : 56 : /************************************************************************/ 57 : /* GetNextRawFeature() */ 58 : /************************************************************************/ 59 : 60 0 : OGRFeature *OGRADBCLayer::GetNextRawFeature() 61 : { 62 0 : if (m_bEOF) 63 0 : return nullptr; 64 : 65 0 : if (m_nIdx == m_poAdapterLayer->m_apoFeatures.size()) 66 : { 67 0 : m_nIdx = 0; 68 0 : m_poAdapterLayer->m_apoFeatures.clear(); 69 : 70 0 : if (!m_stream) 71 : { 72 0 : auto stream = std::make_unique<OGRArrowArrayStream>(); 73 0 : if (!GetArrowStreamInternal(stream->get())) 74 : { 75 0 : m_bEOF = true; 76 0 : return nullptr; 77 : } 78 0 : m_stream = std::move(stream); 79 : } 80 : 81 : struct ArrowArray array; 82 0 : memset(&array, 0, sizeof(array)); 83 0 : if (m_stream->get_next(&array) != 0) 84 : { 85 0 : m_bEOF = true; 86 0 : return nullptr; 87 : } 88 : const bool bOK = 89 0 : array.length 90 0 : ? m_poAdapterLayer->WriteArrowBatch(&m_schema, &array, nullptr) 91 0 : : false; 92 0 : if (array.release) 93 0 : array.release(&array); 94 0 : if (!bOK) 95 : { 96 0 : m_bEOF = true; 97 0 : return nullptr; 98 : } 99 : } 100 : 101 0 : auto poFeature = m_poAdapterLayer->m_apoFeatures[m_nIdx++].release(); 102 0 : poFeature->SetFID(m_nFeatureID++); 103 0 : return poFeature; 104 : } 105 : 106 : /************************************************************************/ 107 : /* ResetReading() */ 108 : /************************************************************************/ 109 : 110 0 : void OGRADBCLayer::ResetReading() 111 : { 112 0 : if (m_nIdx > 0 || m_bEOF) 113 : { 114 0 : m_poAdapterLayer->m_apoFeatures.clear(); 115 0 : m_stream.reset(); 116 0 : m_bEOF = false; 117 0 : m_nIdx = 0; 118 0 : m_nFeatureID = 0; 119 : } 120 0 : } 121 : 122 : /************************************************************************/ 123 : /* TestCapability() */ 124 : /************************************************************************/ 125 : 126 0 : int OGRADBCLayer::TestCapability(const char *pszCap) 127 : { 128 0 : if (EQUAL(pszCap, OLCFastGetArrowStream)) 129 : { 130 0 : return !m_poFilterGeom && !m_poAttrQuery; 131 : } 132 0 : else if (EQUAL(pszCap, OLCFastFeatureCount)) 133 : { 134 0 : return !m_poFilterGeom && !m_poAttrQuery && m_bIsParquetLayer; 135 : } 136 : else 137 : { 138 0 : return false; 139 : } 140 : } 141 : 142 : /************************************************************************/ 143 : /* GetDataset() */ 144 : /************************************************************************/ 145 : 146 0 : GDALDataset *OGRADBCLayer::GetDataset() 147 : { 148 0 : return m_poDS; 149 : } 150 : 151 : /************************************************************************/ 152 : /* GetArrowStream() */ 153 : /************************************************************************/ 154 : 155 0 : bool OGRADBCLayer::GetArrowStream(struct ArrowArrayStream *out_stream, 156 : CSLConstList papszOptions) 157 : { 158 0 : if (m_poFilterGeom || m_poAttrQuery) 159 0 : return OGRLayer::GetArrowStream(out_stream, papszOptions); 160 : 161 0 : if (m_stream) 162 : { 163 0 : memcpy(out_stream, m_stream->get(), sizeof(*out_stream)); 164 0 : memset(m_stream->get(), 0, sizeof(*out_stream)); 165 0 : m_stream.reset(); 166 : } 167 : 168 0 : return GetArrowStreamInternal(out_stream); 169 : } 170 : 171 : /************************************************************************/ 172 : /* GetArrowStreamInternal() */ 173 : /************************************************************************/ 174 : 175 0 : bool OGRADBCLayer::GetArrowStreamInternal(struct ArrowArrayStream *out_stream) 176 : { 177 0 : OGRADBCError error; 178 0 : int64_t rows_affected = -1; 179 0 : if (ADBC_CALL(StatementExecuteQuery, m_statement.get(), out_stream, 180 0 : &rows_affected, error) != ADBC_STATUS_OK) 181 : { 182 0 : CPLError(CE_Failure, CPLE_AppDefined, 183 : "AdbcStatementExecuteQuery() failed: %s", error.message()); 184 0 : return false; 185 : } 186 : 187 0 : return true; 188 : } 189 : 190 : /************************************************************************/ 191 : /* GetFeatureCount() */ 192 : /************************************************************************/ 193 : 194 0 : GIntBig OGRADBCLayer::GetFeatureCount(int bForce) 195 : { 196 0 : if (m_poFilterGeom || m_poAttrQuery) 197 : { 198 0 : return OGRLayer::GetFeatureCount(bForce); 199 : } 200 : 201 0 : if (m_bIsParquetLayer) 202 : { 203 0 : return GetFeatureCountParquet(); 204 : } 205 : 206 0 : if (m_nIdx > 0 || m_bEOF) 207 0 : m_stream.reset(); 208 : 209 0 : if (!m_stream) 210 : { 211 0 : auto stream = std::make_unique<OGRArrowArrayStream>(); 212 0 : if (!GetArrowStreamInternal(stream->get())) 213 : { 214 0 : return -1; 215 : } 216 0 : m_stream = std::move(stream); 217 : } 218 : 219 0 : GIntBig nTotal = 0; 220 : while (true) 221 : { 222 : struct ArrowArray array; 223 0 : memset(&array, 0, sizeof(array)); 224 0 : if (m_stream->get_next(&array) != 0) 225 : { 226 0 : m_stream.reset(); 227 0 : return -1; 228 : } 229 0 : const bool bStop = array.length == 0; 230 0 : nTotal += array.length; 231 0 : if (array.release) 232 0 : array.release(&array); 233 0 : if (bStop) 234 0 : break; 235 0 : } 236 0 : m_stream.reset(); 237 0 : return nTotal; 238 : } 239 : 240 : /************************************************************************/ 241 : /* GetFeatureCountParquet() */ 242 : /************************************************************************/ 243 : 244 0 : GIntBig OGRADBCLayer::GetFeatureCountParquet() 245 : { 246 : const std::string osSQL(CPLSPrintf( 247 : "SELECT CAST(SUM(num_rows) AS BIGINT) FROM parquet_file_metadata('%s')", 248 0 : OGRDuplicateCharacter(m_poDS->m_osParquetFilename, '\'').c_str())); 249 0 : auto poCountLayer = m_poDS->CreateLayer(osSQL.c_str(), "numrows"); 250 0 : if (poCountLayer && poCountLayer->GetLayerDefn()->GetFieldCount() == 1) 251 : { 252 : auto poFeature = 253 0 : std::unique_ptr<OGRFeature>(poCountLayer->GetNextFeature()); 254 0 : if (poFeature) 255 0 : return poFeature->GetFieldAsInteger64(0); 256 : } 257 : 258 0 : return -1; 259 : } 260 : 261 : #undef ADBC_CALL