Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Arrow generic code
4 : * Purpose: Arrow generic code
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #ifndef OGR_ARROW_RANDOM_ACCESS_FILE_H
14 : #define OGR_ARROW_RANDOM_ACCESS_FILE_H
15 :
16 : #include "cpl_vsi.h"
17 : #include "cpl_vsi_virtual.h"
18 :
19 : #include "arrow/buffer.h"
20 : #include "arrow/io/file.h"
21 : #include "arrow/io/interfaces.h"
22 :
23 : #include <atomic>
24 : #include <cinttypes>
25 :
26 : /************************************************************************/
27 : /* OGRArrowRandomAccessFile */
28 : /************************************************************************/
29 :
30 : class OGRArrowRandomAccessFile final : public arrow::io::RandomAccessFile
31 : {
32 : int64_t m_nSize = -1;
33 : const std::string m_osFilename;
34 : VSILFILE *m_fp;
35 : const bool m_bOwnFP;
36 : std::atomic<bool> m_bAskedToClosed = false;
37 :
38 : #ifdef OGR_ARROW_USE_PREAD
39 : const bool m_bDebugReadAt;
40 : const bool m_bUsePRead;
41 : #endif
42 :
43 : OGRArrowRandomAccessFile(const OGRArrowRandomAccessFile &) = delete;
44 : OGRArrowRandomAccessFile &
45 : operator=(const OGRArrowRandomAccessFile &) = delete;
46 :
47 : public:
48 5 : OGRArrowRandomAccessFile(const std::string &osFilename, VSILFILE *fp,
49 : bool bOwnFP)
50 5 : : m_osFilename(osFilename), m_fp(fp), m_bOwnFP(bOwnFP)
51 : #ifdef OGR_ARROW_USE_PREAD
52 : ,
53 : m_bDebugReadAt(!VSIIsLocal(m_osFilename.c_str())),
54 : // Due to the lack of caching for current /vsicurl PRead(), do not
55 : // use the PRead() implementation on those files
56 : m_bUsePRead(m_fp->HasPRead() &&
57 : CPLTestBool(CPLGetConfigOption(
58 : "OGR_ARROW_USE_PREAD",
59 : VSIIsLocal(m_osFilename.c_str()) ? "YES" : "NO")))
60 : #endif
61 : {
62 5 : }
63 :
64 1433 : OGRArrowRandomAccessFile(const std::string &osFilename,
65 : VSIVirtualHandleUniquePtr &&fp)
66 1433 : : m_osFilename(osFilename), m_fp(fp.release()), m_bOwnFP(true)
67 : #ifdef OGR_ARROW_USE_PREAD
68 : ,
69 : m_bDebugReadAt(!VSIIsLocal(m_osFilename.c_str())),
70 : // Due to the lack of caching for current /vsicurl PRead(), do not
71 : // use the PRead() implementation on those files
72 : m_bUsePRead(m_fp->HasPRead() &&
73 : CPLTestBool(CPLGetConfigOption(
74 : "OGR_ARROW_USE_PREAD",
75 : VSIIsLocal(m_osFilename.c_str()) ? "YES" : "NO")))
76 : #endif
77 : {
78 1433 : }
79 :
80 0 : void AskToClose()
81 : {
82 0 : m_bAskedToClosed = true;
83 0 : if (m_fp)
84 0 : m_fp->Interrupt();
85 0 : }
86 :
87 1438 : ~OGRArrowRandomAccessFile() override
88 1438 : {
89 1438 : if (m_fp && m_bOwnFP)
90 1433 : VSIFCloseL(m_fp);
91 1438 : }
92 :
93 0 : arrow::Status Close() override
94 : {
95 0 : if (!m_bOwnFP)
96 : return arrow::Status::IOError(
97 0 : "Cannot close a file that we don't own");
98 0 : int ret = VSIFCloseL(m_fp);
99 0 : m_fp = nullptr;
100 : return ret == 0 ? arrow::Status::OK()
101 0 : : arrow::Status::IOError("Error while closing");
102 : }
103 :
104 0 : arrow::Result<int64_t> Tell() const override
105 : {
106 0 : return static_cast<int64_t>(VSIFTellL(m_fp));
107 : }
108 :
109 0 : bool closed() const override
110 : {
111 0 : return m_bAskedToClosed || m_fp == nullptr;
112 : }
113 :
114 2758 : arrow::Status Seek(int64_t position) override
115 : {
116 2758 : if (m_bAskedToClosed)
117 0 : return arrow::Status::IOError("File requested to close");
118 :
119 2758 : if (VSIFSeekL(m_fp, static_cast<vsi_l_offset>(position), SEEK_SET) == 0)
120 2758 : return arrow::Status::OK();
121 0 : return arrow::Status::IOError("Error while seeking");
122 : }
123 :
124 2813 : arrow::Result<int64_t> Read(int64_t nbytes, void *out) override
125 : {
126 2813 : if (m_bAskedToClosed)
127 0 : return arrow::Status::IOError("File requested to close");
128 :
129 : CPLAssert(static_cast<int64_t>(static_cast<size_t>(nbytes)) == nbytes);
130 2813 : return static_cast<int64_t>(
131 2813 : VSIFReadL(out, 1, static_cast<size_t>(nbytes), m_fp));
132 : }
133 :
134 2779 : arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override
135 : {
136 2779 : if (m_bAskedToClosed)
137 0 : return arrow::Status::IOError("File requested to close");
138 :
139 : // CPLDebug("ARROW", "Reading %d bytes", int(nbytes));
140 5558 : auto buffer = arrow::AllocateResizableBuffer(nbytes);
141 2779 : if (!buffer.ok())
142 : {
143 0 : return buffer;
144 : }
145 2779 : uint8_t *buffer_data = (*buffer)->mutable_data();
146 5558 : auto nread = Read(nbytes, buffer_data);
147 2779 : CPL_IGNORE_RET_VAL(
148 5558 : (*buffer)->Resize(*nread)); // shrink --> cannot fail
149 2779 : return buffer;
150 : }
151 :
152 : #ifdef OGR_ARROW_USE_PREAD
153 : using arrow::io::RandomAccessFile::ReadAt;
154 :
155 : arrow::Result<std::shared_ptr<arrow::Buffer>>
156 : ReadAt(int64_t position, int64_t nbytes) override
157 : {
158 : if (m_bAskedToClosed)
159 : return arrow::Status::IOError("File requested to close");
160 :
161 : if (m_bUsePRead)
162 : {
163 : auto buffer = arrow::AllocateResizableBuffer(nbytes);
164 : if (!buffer.ok())
165 : {
166 : return buffer;
167 : }
168 : if (m_bDebugReadAt)
169 : {
170 : CPLDebug(
171 : "ARROW",
172 : "Start ReadAt() called on %s (this=%p) from "
173 : "thread=" CPL_FRMT_GIB ": pos=%" PRId64 ", nbytes=%" PRId64,
174 : m_osFilename.c_str(), this, CPLGetPID(), position, nbytes);
175 : }
176 : uint8_t *buffer_data = (*buffer)->mutable_data();
177 : auto nread = m_fp->PRead(buffer_data, static_cast<size_t>(nbytes),
178 : static_cast<vsi_l_offset>(position));
179 : CPL_IGNORE_RET_VAL(
180 : (*buffer)->Resize(nread)); // shrink --> cannot fail
181 : if (m_bDebugReadAt)
182 : {
183 : CPLDebug(
184 : "ARROW",
185 : "End ReadAt() called on %s (this=%p) from "
186 : "thread=" CPL_FRMT_GIB ": pos=%" PRId64 ", nbytes=%" PRId64,
187 : m_osFilename.c_str(), this, CPLGetPID(), position, nbytes);
188 : }
189 : return buffer;
190 : }
191 : return arrow::io::RandomAccessFile::ReadAt(position, nbytes);
192 : }
193 : #endif
194 :
195 1429 : arrow::Result<int64_t> GetSize() override
196 : {
197 1429 : if (m_bAskedToClosed)
198 0 : return arrow::Status::IOError("File requested to close");
199 :
200 1429 : if (m_nSize < 0)
201 : {
202 1429 : const auto nPos = VSIFTellL(m_fp);
203 1429 : VSIFSeekL(m_fp, 0, SEEK_END);
204 1429 : m_nSize = static_cast<int64_t>(VSIFTellL(m_fp));
205 1429 : VSIFSeekL(m_fp, nPos, SEEK_SET);
206 : }
207 1429 : return m_nSize;
208 : }
209 : };
210 :
211 : #endif // OGR_ARROW_RANDOM_ACCESS_FILE_H
|