Line data Source code
1 : /******************************************************************************
2 : *
3 : * Project: Arrow generic code
4 : * Purpose: Arrow generic code
5 : * Author: Even Rouault, <even.rouault at spatialys.com>
6 : *
7 : ******************************************************************************
8 : * Copyright (c) 2022, Planet Labs
9 : *
10 : * SPDX-License-Identifier: MIT
11 : ****************************************************************************/
12 :
13 : #ifndef OGR_ARROW_RANDOM_ACCESS_FILE_H
14 : #define OGR_ARROW_RANDOM_ACCESS_FILE_H
15 :
16 : #include "cpl_vsi.h"
17 : #include "cpl_vsi_virtual.h"
18 :
19 : #include "arrow/buffer.h"
20 : #include "arrow/io/file.h"
21 : #include "arrow/io/interfaces.h"
22 :
23 : #include <atomic>
24 : #include <cinttypes>
25 :
26 : #if defined(__clang__)
27 : #pragma clang diagnostic push
28 : #pragma clang diagnostic ignored "-Wweak-vtables"
29 : #endif
30 :
31 : /************************************************************************/
32 : /* OGRArrowRandomAccessFile */
33 : /************************************************************************/
34 :
35 : class OGRArrowRandomAccessFile final : public arrow::io::RandomAccessFile
36 : {
37 : int64_t m_nSize = -1;
38 : const std::string m_osFilename;
39 : VSILFILE *m_fp;
40 : const bool m_bOwnFP;
41 : std::atomic<bool> m_bAskedToClosed = false;
42 :
43 : #ifdef OGR_ARROW_USE_PREAD
44 : const bool m_bDebugReadAt;
45 : const bool m_bUsePRead;
46 : #endif
47 :
48 : OGRArrowRandomAccessFile(const OGRArrowRandomAccessFile &) = delete;
49 : OGRArrowRandomAccessFile &
50 : operator=(const OGRArrowRandomAccessFile &) = delete;
51 :
52 : public:
53 5 : OGRArrowRandomAccessFile(const std::string &osFilename, VSILFILE *fp,
54 : bool bOwnFP)
55 5 : : m_osFilename(osFilename), m_fp(fp), m_bOwnFP(bOwnFP)
56 : #ifdef OGR_ARROW_USE_PREAD
57 : ,
58 : m_bDebugReadAt(!VSIIsLocal(m_osFilename.c_str())),
59 : // Due to the lack of caching for current /vsicurl PRead(), do not
60 : // use the PRead() implementation on those files
61 : m_bUsePRead(m_fp->HasPRead() &&
62 : CPLTestBool(CPLGetConfigOption(
63 : "OGR_ARROW_USE_PREAD",
64 : VSIIsLocal(m_osFilename.c_str()) ? "YES" : "NO")))
65 : #endif
66 : {
67 5 : }
68 :
69 1434 : OGRArrowRandomAccessFile(const std::string &osFilename,
70 : VSIVirtualHandleUniquePtr &&fp)
71 1434 : : m_osFilename(osFilename), m_fp(fp.release()), m_bOwnFP(true)
72 : #ifdef OGR_ARROW_USE_PREAD
73 : ,
74 : m_bDebugReadAt(!VSIIsLocal(m_osFilename.c_str())),
75 : // Due to the lack of caching for current /vsicurl PRead(), do not
76 : // use the PRead() implementation on those files
77 : m_bUsePRead(m_fp->HasPRead() &&
78 : CPLTestBool(CPLGetConfigOption(
79 : "OGR_ARROW_USE_PREAD",
80 : VSIIsLocal(m_osFilename.c_str()) ? "YES" : "NO")))
81 : #endif
82 : {
83 1434 : }
84 :
85 1 : void AskToClose()
86 : {
87 1 : m_bAskedToClosed = true;
88 1 : if (m_fp)
89 1 : m_fp->Interrupt();
90 1 : }
91 :
92 1439 : ~OGRArrowRandomAccessFile() override
93 1439 : {
94 1439 : if (m_fp && m_bOwnFP)
95 1434 : VSIFCloseL(m_fp);
96 1439 : }
97 :
98 0 : arrow::Status Close() override
99 : {
100 0 : if (!m_bOwnFP)
101 : return arrow::Status::IOError(
102 0 : "Cannot close a file that we don't own");
103 0 : int ret = VSIFCloseL(m_fp);
104 0 : m_fp = nullptr;
105 : return ret == 0 ? arrow::Status::OK()
106 0 : : arrow::Status::IOError("Error while closing");
107 : }
108 :
109 0 : arrow::Result<int64_t> Tell() const override
110 : {
111 0 : return static_cast<int64_t>(VSIFTellL(m_fp));
112 : }
113 :
114 0 : bool closed() const override
115 : {
116 0 : return m_bAskedToClosed || m_fp == nullptr;
117 : }
118 :
119 2760 : arrow::Status Seek(int64_t position) override
120 : {
121 2760 : if (m_bAskedToClosed)
122 0 : return arrow::Status::IOError("File requested to close");
123 :
124 2760 : if (VSIFSeekL(m_fp, static_cast<vsi_l_offset>(position), SEEK_SET) == 0)
125 2760 : return arrow::Status::OK();
126 0 : return arrow::Status::IOError("Error while seeking");
127 : }
128 :
129 2815 : arrow::Result<int64_t> Read(int64_t nbytes, void *out) override
130 : {
131 2815 : if (m_bAskedToClosed)
132 0 : return arrow::Status::IOError("File requested to close");
133 :
134 : CPLAssert(static_cast<int64_t>(static_cast<size_t>(nbytes)) == nbytes);
135 2815 : return static_cast<int64_t>(
136 2815 : VSIFReadL(out, 1, static_cast<size_t>(nbytes), m_fp));
137 : }
138 :
139 2781 : arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override
140 : {
141 2781 : if (m_bAskedToClosed)
142 0 : return arrow::Status::IOError("File requested to close");
143 :
144 : // CPLDebug("ARROW", "Reading %d bytes", int(nbytes));
145 5562 : auto buffer = arrow::AllocateResizableBuffer(nbytes);
146 2781 : if (!buffer.ok())
147 : {
148 0 : return buffer;
149 : }
150 2781 : uint8_t *buffer_data = (*buffer)->mutable_data();
151 5562 : auto nread = Read(nbytes, buffer_data);
152 2781 : CPL_IGNORE_RET_VAL(
153 5562 : (*buffer)->Resize(*nread)); // shrink --> cannot fail
154 2781 : return buffer;
155 : }
156 :
157 : #ifdef OGR_ARROW_USE_PREAD
158 : using arrow::io::RandomAccessFile::ReadAt;
159 :
160 : arrow::Result<std::shared_ptr<arrow::Buffer>>
161 : ReadAt(int64_t position, int64_t nbytes) override
162 : {
163 : if (m_bAskedToClosed)
164 : return arrow::Status::IOError("File requested to close");
165 :
166 : if (m_bUsePRead)
167 : {
168 : auto buffer = arrow::AllocateResizableBuffer(nbytes);
169 : if (!buffer.ok())
170 : {
171 : return buffer;
172 : }
173 : if (m_bDebugReadAt)
174 : {
175 : CPLDebug(
176 : "ARROW",
177 : "Start ReadAt() called on %s (this=%p) from "
178 : "thread=" CPL_FRMT_GIB ": pos=%" PRId64 ", nbytes=%" PRId64,
179 : m_osFilename.c_str(), this, CPLGetPID(), position, nbytes);
180 : }
181 : uint8_t *buffer_data = (*buffer)->mutable_data();
182 : auto nread = m_fp->PRead(buffer_data, static_cast<size_t>(nbytes),
183 : static_cast<vsi_l_offset>(position));
184 : CPL_IGNORE_RET_VAL(
185 : (*buffer)->Resize(nread)); // shrink --> cannot fail
186 : if (m_bDebugReadAt)
187 : {
188 : CPLDebug(
189 : "ARROW",
190 : "End ReadAt() called on %s (this=%p) from "
191 : "thread=" CPL_FRMT_GIB ": pos=%" PRId64 ", nbytes=%" PRId64,
192 : m_osFilename.c_str(), this, CPLGetPID(), position, nbytes);
193 : }
194 : return buffer;
195 : }
196 : return arrow::io::RandomAccessFile::ReadAt(position, nbytes);
197 : }
198 : #endif
199 :
200 1430 : arrow::Result<int64_t> GetSize() override
201 : {
202 1430 : if (m_bAskedToClosed)
203 0 : return arrow::Status::IOError("File requested to close");
204 :
205 1430 : if (m_nSize < 0)
206 : {
207 1430 : const auto nPos = VSIFTellL(m_fp);
208 1430 : VSIFSeekL(m_fp, 0, SEEK_END);
209 1430 : m_nSize = static_cast<int64_t>(VSIFTellL(m_fp));
210 1430 : VSIFSeekL(m_fp, nPos, SEEK_SET);
211 : }
212 1430 : return m_nSize;
213 : }
214 : };
215 :
216 : #if defined(__clang__)
217 : #pragma clang diagnostic pop
218 : #endif
219 :
220 : #endif // OGR_ARROW_RANDOM_ACCESS_FILE_H
|