LCOV - code coverage report
Current view: top level - ogr/ogrsf_frmts/arrow_common - ograrrowrandomaccessfile.h (source / functions) Hit Total Coverage
Test: gdal_filtered.info Lines: 36 57 63.2 %
Date: 2024-11-21 22:18:42 Functions: 7 12 58.3 %

          Line data    Source code
       1             : /******************************************************************************
       2             :  *
       3             :  * Project:  Arrow generic code
       4             :  * Purpose:  Arrow generic code
       5             :  * Author:   Even Rouault, <even.rouault at spatialys.com>
       6             :  *
       7             :  ******************************************************************************
       8             :  * Copyright (c) 2022, Planet Labs
       9             :  *
      10             :  * SPDX-License-Identifier: MIT
      11             :  ****************************************************************************/
      12             : 
      13             : #ifndef OGR_ARROW_RANDOM_ACCESS_FILE_H
      14             : #define OGR_ARROW_RANDOM_ACCESS_FILE_H
      15             : 
      16             : #include "cpl_vsi.h"
      17             : #include "cpl_vsi_virtual.h"
      18             : 
      19             : #include "arrow/buffer.h"
      20             : #include "arrow/io/file.h"
      21             : #include "arrow/io/interfaces.h"
      22             : 
      23             : #include <atomic>
      24             : #include <cinttypes>
      25             : 
      26             : /************************************************************************/
      27             : /*                        OGRArrowRandomAccessFile                      */
      28             : /************************************************************************/
      29             : 
      30             : class OGRArrowRandomAccessFile final : public arrow::io::RandomAccessFile
      31             : {
      32             :     int64_t m_nSize = -1;
      33             :     const std::string m_osFilename;
      34             :     VSILFILE *m_fp;
      35             :     const bool m_bOwnFP;
      36             :     std::atomic<bool> m_bAskedToClosed = false;
      37             : 
      38             : #ifdef OGR_ARROW_USE_PREAD
      39             :     const bool m_bDebugReadAt;
      40             :     const bool m_bUsePRead;
      41             : #endif
      42             : 
      43             :     OGRArrowRandomAccessFile(const OGRArrowRandomAccessFile &) = delete;
      44             :     OGRArrowRandomAccessFile &
      45             :     operator=(const OGRArrowRandomAccessFile &) = delete;
      46             : 
      47             :   public:
      48           5 :     OGRArrowRandomAccessFile(const std::string &osFilename, VSILFILE *fp,
      49             :                              bool bOwnFP)
      50           5 :         : m_osFilename(osFilename), m_fp(fp), m_bOwnFP(bOwnFP)
      51             : #ifdef OGR_ARROW_USE_PREAD
      52             :           ,
      53             :           m_bDebugReadAt(!VSIIsLocal(m_osFilename.c_str())),
      54             :           // Due to the lack of caching for current /vsicurl PRead(), do not
      55             :           // use the PRead() implementation on those files
      56             :           m_bUsePRead(m_fp->HasPRead() &&
      57             :                       CPLTestBool(CPLGetConfigOption(
      58             :                           "OGR_ARROW_USE_PREAD",
      59             :                           VSIIsLocal(m_osFilename.c_str()) ? "YES" : "NO")))
      60             : #endif
      61             :     {
      62           5 :     }
      63             : 
      64        1429 :     OGRArrowRandomAccessFile(const std::string &osFilename,
      65             :                              VSIVirtualHandleUniquePtr &&fp)
      66        1429 :         : m_osFilename(osFilename), m_fp(fp.release()), m_bOwnFP(true)
      67             : #ifdef OGR_ARROW_USE_PREAD
      68             :           ,
      69             :           m_bDebugReadAt(!VSIIsLocal(m_osFilename.c_str())),
      70             :           // Due to the lack of caching for current /vsicurl PRead(), do not
      71             :           // use the PRead() implementation on those files
      72             :           m_bUsePRead(m_fp->HasPRead() &&
      73             :                       CPLTestBool(CPLGetConfigOption(
      74             :                           "OGR_ARROW_USE_PREAD",
      75             :                           VSIIsLocal(m_osFilename.c_str()) ? "YES" : "NO")))
      76             : #endif
      77             :     {
      78        1429 :     }
      79             : 
      80           0 :     void AskToClose()
      81             :     {
      82           0 :         m_bAskedToClosed = true;
      83           0 :         if (m_fp)
      84           0 :             m_fp->Interrupt();
      85           0 :     }
      86             : 
      87        1434 :     ~OGRArrowRandomAccessFile() override
      88        1434 :     {
      89        1434 :         if (m_fp && m_bOwnFP)
      90        1429 :             VSIFCloseL(m_fp);
      91        1434 :     }
      92             : 
      93           0 :     arrow::Status Close() override
      94             :     {
      95           0 :         if (!m_bOwnFP)
      96             :             return arrow::Status::IOError(
      97           0 :                 "Cannot close a file that we don't own");
      98           0 :         int ret = VSIFCloseL(m_fp);
      99           0 :         m_fp = nullptr;
     100             :         return ret == 0 ? arrow::Status::OK()
     101           0 :                         : arrow::Status::IOError("Error while closing");
     102             :     }
     103             : 
     104           0 :     arrow::Result<int64_t> Tell() const override
     105             :     {
     106           0 :         return static_cast<int64_t>(VSIFTellL(m_fp));
     107             :     }
     108             : 
     109           0 :     bool closed() const override
     110             :     {
     111           0 :         return m_bAskedToClosed || m_fp == nullptr;
     112             :     }
     113             : 
     114        2750 :     arrow::Status Seek(int64_t position) override
     115             :     {
     116        2750 :         if (m_bAskedToClosed)
     117           0 :             return arrow::Status::IOError("File requested to close");
     118             : 
     119        2750 :         if (VSIFSeekL(m_fp, static_cast<vsi_l_offset>(position), SEEK_SET) == 0)
     120        2750 :             return arrow::Status::OK();
     121           0 :         return arrow::Status::IOError("Error while seeking");
     122             :     }
     123             : 
     124        2805 :     arrow::Result<int64_t> Read(int64_t nbytes, void *out) override
     125             :     {
     126        2805 :         if (m_bAskedToClosed)
     127           0 :             return arrow::Status::IOError("File requested to close");
     128             : 
     129             :         CPLAssert(static_cast<int64_t>(static_cast<size_t>(nbytes)) == nbytes);
     130        2805 :         return static_cast<int64_t>(
     131        2805 :             VSIFReadL(out, 1, static_cast<size_t>(nbytes), m_fp));
     132             :     }
     133             : 
     134        2771 :     arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override
     135             :     {
     136        2771 :         if (m_bAskedToClosed)
     137           0 :             return arrow::Status::IOError("File requested to close");
     138             : 
     139             :         // CPLDebug("ARROW", "Reading %d bytes", int(nbytes));
     140        5542 :         auto buffer = arrow::AllocateResizableBuffer(nbytes);
     141        2771 :         if (!buffer.ok())
     142             :         {
     143           0 :             return buffer;
     144             :         }
     145        2771 :         uint8_t *buffer_data = (*buffer)->mutable_data();
     146        5542 :         auto nread = Read(nbytes, buffer_data);
     147        2771 :         CPL_IGNORE_RET_VAL(
     148        5542 :             (*buffer)->Resize(*nread));  // shrink --> cannot fail
     149        2771 :         return buffer;
     150             :     }
     151             : 
     152             : #ifdef OGR_ARROW_USE_PREAD
     153             :     using arrow::io::RandomAccessFile::ReadAt;
     154             : 
     155             :     arrow::Result<std::shared_ptr<arrow::Buffer>>
     156             :     ReadAt(int64_t position, int64_t nbytes) override
     157             :     {
     158             :         if (m_bAskedToClosed)
     159             :             return arrow::Status::IOError("File requested to close");
     160             : 
     161             :         if (m_bUsePRead)
     162             :         {
     163             :             auto buffer = arrow::AllocateResizableBuffer(nbytes);
     164             :             if (!buffer.ok())
     165             :             {
     166             :                 return buffer;
     167             :             }
     168             :             if (m_bDebugReadAt)
     169             :             {
     170             :                 CPLDebug(
     171             :                     "ARROW",
     172             :                     "Start ReadAt() called on %s (this=%p) from "
     173             :                     "thread=" CPL_FRMT_GIB ": pos=%" PRId64 ", nbytes=%" PRId64,
     174             :                     m_osFilename.c_str(), this, CPLGetPID(), position, nbytes);
     175             :             }
     176             :             uint8_t *buffer_data = (*buffer)->mutable_data();
     177             :             auto nread = m_fp->PRead(buffer_data, static_cast<size_t>(nbytes),
     178             :                                      static_cast<vsi_l_offset>(position));
     179             :             CPL_IGNORE_RET_VAL(
     180             :                 (*buffer)->Resize(nread));  // shrink --> cannot fail
     181             :             if (m_bDebugReadAt)
     182             :             {
     183             :                 CPLDebug(
     184             :                     "ARROW",
     185             :                     "End ReadAt() called on %s (this=%p) from "
     186             :                     "thread=" CPL_FRMT_GIB ": pos=%" PRId64 ", nbytes=%" PRId64,
     187             :                     m_osFilename.c_str(), this, CPLGetPID(), position, nbytes);
     188             :             }
     189             :             return buffer;
     190             :         }
     191             :         return arrow::io::RandomAccessFile::ReadAt(position, nbytes);
     192             :     }
     193             : #endif
     194             : 
     195        1425 :     arrow::Result<int64_t> GetSize() override
     196             :     {
     197        1425 :         if (m_bAskedToClosed)
     198           0 :             return arrow::Status::IOError("File requested to close");
     199             : 
     200        1425 :         if (m_nSize < 0)
     201             :         {
     202        1425 :             const auto nPos = VSIFTellL(m_fp);
     203        1425 :             VSIFSeekL(m_fp, 0, SEEK_END);
     204        1425 :             m_nSize = static_cast<int64_t>(VSIFTellL(m_fp));
     205        1425 :             VSIFSeekL(m_fp, nPos, SEEK_SET);
     206             :         }
     207        1425 :         return m_nSize;
     208             :     }
     209             : };
     210             : 
     211             : #endif  // OGR_ARROW_RANDOM_ACCESS_FILE_H

Generated by: LCOV version 1.14