Line data Source code
1 : /**********************************************************************
2 : *
3 : * Project: CPL - Common Portability Library
4 : * Purpose: Implement VSI large file api for HDFS
5 : * Author: James McClain, <jmcclain@azavea.com>
6 : *
7 : **********************************************************************
8 : * Copyright (c) 2010-2015, Even Rouault <even dot rouault at spatialys.com>
9 : * Copyright (c) 2018, Azavea
10 : *
11 : * SPDX-License-Identifier: MIT
12 : ****************************************************************************/
13 :
14 : //! @cond Doxygen_Suppress
15 :
16 : #include <string>
17 :
18 : #include <fcntl.h>
19 : #include <sys/types.h>
20 : #include <sys/stat.h>
21 :
22 : #if !defined(_MSC_VER)
23 : #include <unistd.h>
24 : #endif
25 :
26 : #include <cstring>
27 : #include <climits>
28 :
29 : #include "cpl_port.h"
30 : #include "cpl_vsi.h"
31 :
32 : #include "cpl_conv.h"
33 : #include "cpl_error.h"
34 : #include "cpl_vsi_virtual.h"
35 :
36 : #ifdef HDFS_ENABLED
37 :
38 : #include "hdfs.h"
39 :
40 : /************************************************************************/
41 : /* ==================================================================== */
42 : /* VSIHdfsHandle */
43 : /* ==================================================================== */
44 : /************************************************************************/
45 :
46 : #define SILENCE(expr) \
47 : { \
48 : int hOldStderr = dup(2); \
49 : int hNewStderr = open("/dev/null", O_WRONLY); \
50 : \
51 : if ((hOldStderr != -1) && (hNewStderr != -1) && \
52 : (dup2(hNewStderr, 2) != -1)) \
53 : { \
54 : close(hNewStderr); \
55 : expr; \
56 : dup2(hOldStderr, 2); \
57 : close(hOldStderr); \
58 : } \
59 : else \
60 : { \
61 : if (hOldStderr != -1) \
62 : close(hOldStderr); \
63 : if (hNewStderr != -1) \
64 : close(hNewStderr); \
65 : expr; \
66 : } \
67 : }
68 :
69 : class VSIHdfsHandle final : public VSIVirtualHandle
70 : {
71 : private:
72 : CPL_DISALLOW_COPY_ASSIGN(VSIHdfsHandle)
73 :
74 : hdfsFile poFile = nullptr;
75 : hdfsFS poFilesystem = nullptr;
76 : std::string oFilename;
77 : bool bEOF = false;
78 :
79 : public:
80 : static constexpr const char *VSIHDFS = "/vsihdfs/";
81 :
82 : VSIHdfsHandle(hdfsFile poFile, hdfsFS poFilesystem, const char *pszFilename,
83 : bool bReadOnly);
84 : ~VSIHdfsHandle() override;
85 :
86 : int Seek(vsi_l_offset nOffset, int nWhence) override;
87 : vsi_l_offset Tell() override;
88 : size_t Read(void *pBuffer, size_t nSize, size_t nMemb) override;
89 : size_t Write(const void *pBuffer, size_t nSize, size_t nMemb) override;
90 : vsi_l_offset Length();
91 : void ClearErr() override;
92 : int Eof() override;
93 : int Error() override;
94 : int Flush() override;
95 : int Close() override;
96 : };
97 :
98 : VSIHdfsHandle::VSIHdfsHandle(hdfsFile _poFile, hdfsFS _poFilesystem,
99 : const char *pszFilename, bool /*_bReadOnly*/)
100 : : poFile(_poFile), poFilesystem(_poFilesystem), oFilename(pszFilename)
101 : {
102 : }
103 :
104 : VSIHdfsHandle::~VSIHdfsHandle()
105 : {
106 : Close();
107 : }
108 :
109 : int VSIHdfsHandle::Seek(vsi_l_offset nOffset, int nWhence)
110 : {
111 : bEOF = false;
112 : switch (nWhence)
113 : {
114 : case SEEK_SET:
115 : return hdfsSeek(poFilesystem, poFile, nOffset);
116 : case SEEK_CUR:
117 : return hdfsSeek(poFilesystem, poFile, nOffset + Tell());
118 : case SEEK_END:
119 : return hdfsSeek(poFilesystem, poFile,
120 : static_cast<tOffset>(Length()) - nOffset);
121 : default:
122 : return -1;
123 : }
124 : }
125 :
126 : vsi_l_offset VSIHdfsHandle::Tell()
127 : {
128 : return hdfsTell(poFilesystem, poFile);
129 : }
130 :
131 : size_t VSIHdfsHandle::Read(void *pBuffer, size_t nSize, size_t nMemb)
132 : {
133 : if (nSize == 0 || nMemb == 0)
134 : return 0;
135 :
136 : size_t bytes_wanted = nSize * nMemb;
137 : size_t bytes_read = 0;
138 :
139 : while (bytes_read < bytes_wanted)
140 : {
141 : tSize bytes = 0;
142 : size_t bytes_to_request = bytes_wanted - bytes_read;
143 :
144 : // The `Read` function can take 64-bit arguments for its
145 : // read-request size, whereas `hdfsRead` may only take a 32-bit
146 : // argument. If the former requests an amount larger than can
147 : // be encoded in a signed 32-bit number, break the request into
148 : // 2GB batches.
149 : bytes = hdfsRead(
150 : poFilesystem, poFile, static_cast<char *>(pBuffer) + bytes_read,
151 : bytes_to_request > INT_MAX ? INT_MAX : bytes_to_request);
152 :
153 : if (bytes > 0)
154 : {
155 : if (static_cast<size_t>(bytes) < bytes_to_request)
156 : bEOF = true;
157 : bytes_read += bytes;
158 : }
159 : if (bytes == 0)
160 : {
161 : bEOF = true;
162 : return bytes_read / nSize;
163 : }
164 : else if (bytes < 0)
165 : {
166 : bEOF = false;
167 : return 0;
168 : }
169 : }
170 :
171 : return bytes_read / nSize;
172 : }
173 :
174 : size_t VSIHdfsHandle::Write(const void *, size_t, size_t)
175 : {
176 : CPLError(CE_Failure, CPLE_AppDefined, "HDFS driver is read-only");
177 : return -1;
178 : }
179 :
180 : vsi_l_offset VSIHdfsHandle::Length()
181 : {
182 : hdfsFileInfo *poInfo = hdfsGetPathInfo(poFilesystem, oFilename.c_str());
183 : if (poInfo != nullptr)
184 : {
185 : tOffset nSize = poInfo->mSize;
186 : hdfsFreeFileInfo(poInfo, 1);
187 : return static_cast<vsi_l_offset>(nSize);
188 : }
189 : return -1;
190 : }
191 :
192 : int VSIHdfsHandle::Eof()
193 : {
194 : return bEOF;
195 : }
196 :
197 : int VSIHdfsHandle::Error()
198 : {
199 : return 0;
200 : }
201 :
202 : void VSIHdfsHandle::ClearErr()
203 : {
204 : }
205 :
206 : int VSIHdfsHandle::Flush()
207 : {
208 : return hdfsFlush(poFilesystem, poFile);
209 : }
210 :
211 : int VSIHdfsHandle::Close()
212 : {
213 : int retval = 0;
214 :
215 : if (poFilesystem != nullptr && poFile != nullptr)
216 : retval = hdfsCloseFile(poFilesystem, poFile);
217 : poFile = nullptr;
218 : poFilesystem = nullptr;
219 :
220 : return retval;
221 : }
222 :
223 : class VSIHdfsFilesystemHandler final : public VSIFilesystemHandler
224 : {
225 : private:
226 : CPL_DISALLOW_COPY_ASSIGN(VSIHdfsFilesystemHandler)
227 :
228 : hdfsFS poFilesystem = nullptr;
229 : CPLMutex *hMutex = nullptr;
230 :
231 : public:
232 : VSIHdfsFilesystemHandler();
233 : ~VSIHdfsFilesystemHandler() override;
234 :
235 : void EnsureFilesystem();
236 : VSIVirtualHandle *Open(const char *pszFilename, const char *pszAccess,
237 : bool bSetError,
238 : CSLConstList /* papszOptions */) override;
239 : int Stat(const char *pszFilename, VSIStatBufL *pStatBuf,
240 : int nFlags) override;
241 : int Unlink(const char *pszFilename) override;
242 : int Mkdir(const char *pszDirname, long nMode) override;
243 : int Rmdir(const char *pszDirname) override;
244 : char **ReadDirEx(const char *pszDirname, int nMaxFiles) override;
245 : int Rename(const char *oldpath, const char *newpath) override;
246 : };
247 :
248 : VSIHdfsFilesystemHandler::VSIHdfsFilesystemHandler()
249 : {
250 : }
251 :
252 : VSIHdfsFilesystemHandler::~VSIHdfsFilesystemHandler()
253 : {
254 : if (hMutex != nullptr)
255 : {
256 : CPLDestroyMutex(hMutex);
257 : hMutex = nullptr;
258 : }
259 :
260 : if (poFilesystem != nullptr)
261 : hdfsDisconnect(poFilesystem);
262 : poFilesystem = nullptr;
263 : }
264 :
265 : void VSIHdfsFilesystemHandler::EnsureFilesystem()
266 : {
267 : CPLMutexHolder oHolder(&hMutex);
268 : if (poFilesystem == nullptr)
269 : poFilesystem = hdfsConnect("default", 0);
270 : }
271 :
272 : VSIVirtualHandle *
273 : VSIHdfsFilesystemHandler::Open(const char *pszFilename, const char *pszAccess,
274 : bool, CSLConstList /* papszOptions */)
275 : {
276 : EnsureFilesystem();
277 :
278 : if (strchr(pszAccess, 'w') != nullptr || strchr(pszAccess, 'a') != nullptr)
279 : {
280 : CPLError(CE_Failure, CPLE_AppDefined, "HDFS driver is read-only");
281 : return nullptr;
282 : }
283 :
284 : if (strncmp(pszFilename, VSIHdfsHandle::VSIHDFS,
285 : strlen(VSIHdfsHandle::VSIHDFS)) != 0)
286 : {
287 : return nullptr;
288 : }
289 : else
290 : {
291 : const char *pszPath = pszFilename + strlen(VSIHdfsHandle::VSIHDFS);
292 :
293 : // Open HDFS file, sending Java stack traces to /dev/null.
294 : hdfsFile poFile = nullptr;
295 : SILENCE(poFile =
296 : hdfsOpenFile(poFilesystem, pszPath, O_RDONLY, 0, 0, 0));
297 :
298 : if (poFile != nullptr)
299 : {
300 : VSIHdfsHandle *poHandle =
301 : new VSIHdfsHandle(poFile, poFilesystem, pszPath, true);
302 : return poHandle;
303 : }
304 : }
305 : return nullptr;
306 : }
307 :
308 : int VSIHdfsFilesystemHandler::Stat(const char *pszFilename,
309 : VSIStatBufL *pStatBuf, int)
310 : {
311 : memset(pStatBuf, 0, sizeof(VSIStatBufL));
312 :
313 : if (strncmp(pszFilename, VSIHdfsHandle::VSIHDFS,
314 : strlen(VSIHdfsHandle::VSIHDFS)) != 0)
315 : {
316 : return -1;
317 : }
318 :
319 : EnsureFilesystem();
320 :
321 : // CPLDebug("VSIHDFS", "Stat(%s)", pszFilename);
322 :
323 : hdfsFileInfo *poInfo = hdfsGetPathInfo(
324 : poFilesystem, pszFilename + strlen(VSIHdfsHandle::VSIHDFS));
325 :
326 : if (poInfo != nullptr)
327 : {
328 : pStatBuf->st_dev =
329 : static_cast<dev_t>(0); /* ID of device containing file */
330 : pStatBuf->st_ino = static_cast<ino_t>(0); /* inode number */
331 : switch (poInfo->mKind)
332 : { /* protection */
333 : case tObjectKind::kObjectKindFile:
334 : pStatBuf->st_mode = S_IFREG;
335 : break;
336 : case tObjectKind::kObjectKindDirectory:
337 : pStatBuf->st_mode = S_IFDIR;
338 : break;
339 : default:
340 : CPLError(CE_Failure, CPLE_AppDefined,
341 : "Unrecognized object kind");
342 : }
343 : pStatBuf->st_nlink = static_cast<nlink_t>(0); /* number of hard links */
344 : pStatBuf->st_uid = getuid(); /* user ID of owner */
345 : pStatBuf->st_gid = getgid(); /* group ID of owner */
346 : pStatBuf->st_rdev =
347 : static_cast<dev_t>(0); /* device ID (if special file) */
348 : pStatBuf->st_size =
349 : static_cast<off_t>(poInfo->mSize); /* total size, in bytes */
350 : pStatBuf->st_blksize = static_cast<blksize_t>(
351 : poInfo->mBlockSize); /* blocksize for filesystem I/O */
352 : pStatBuf->st_blocks =
353 : static_cast<blkcnt_t>((poInfo->mBlockSize >> 9) +
354 : 1); /* number of 512B blocks allocated */
355 : pStatBuf->st_atime =
356 : static_cast<time_t>(poInfo->mLastAccess); /* time of last access */
357 : pStatBuf->st_mtime = static_cast<time_t>(
358 : poInfo->mLastMod); /* time of last modification */
359 : pStatBuf->st_ctime = static_cast<time_t>(
360 : poInfo->mLastMod); /* time of last status change */
361 : hdfsFreeFileInfo(poInfo, 1);
362 : return 0;
363 : }
364 :
365 : return -1;
366 : }
367 :
368 : int VSIHdfsFilesystemHandler::Unlink(const char *)
369 : {
370 : CPLError(CE_Failure, CPLE_AppDefined, "HDFS driver is read-only");
371 : return -1;
372 : }
373 :
374 : int VSIHdfsFilesystemHandler::Mkdir(const char *, long)
375 : {
376 : CPLError(CE_Failure, CPLE_AppDefined, "HDFS driver is read-only");
377 : return -1;
378 : }
379 :
380 : int VSIHdfsFilesystemHandler::Rmdir(const char *)
381 : {
382 : CPLError(CE_Failure, CPLE_AppDefined, "HDFS driver is read-only");
383 : return -1;
384 : }
385 :
386 : char **VSIHdfsFilesystemHandler::ReadDirEx(const char *pszDirname,
387 : int /* nMaxFiles */)
388 : {
389 : if (strncmp(pszDirname, VSIHdfsHandle::VSIHDFS,
390 : strlen(VSIHdfsHandle::VSIHDFS)) != 0)
391 : {
392 : return nullptr;
393 : }
394 :
395 : EnsureFilesystem();
396 :
397 : std::string osDirName(pszDirname);
398 : if (osDirName.back() != '/')
399 : osDirName += '/';
400 :
401 : VSIStatBufL sStat;
402 : if (Stat(osDirName.c_str(), &sStat, 0) != 0 || sStat.st_mode != S_IFDIR)
403 : return nullptr;
404 :
405 : int nEntries = 0;
406 : std::string osDirNameWithoutPrefix(
407 : osDirName.substr(strlen(VSIHdfsHandle::VSIHDFS)));
408 :
409 : // file:///home/user/... is accepted, but if this is used, files returned
410 : // by hdfsListDirectory() use file:/home/user/...
411 : if (osDirNameWithoutPrefix.compare(0, strlen("file:///"), "file:///") == 0)
412 : {
413 : osDirNameWithoutPrefix =
414 : "file:/" + osDirNameWithoutPrefix.substr(strlen("file:///"));
415 : }
416 :
417 : hdfsFileInfo *paoInfo = hdfsListDirectory(
418 : poFilesystem, osDirNameWithoutPrefix.c_str(), &nEntries);
419 :
420 : if (paoInfo != nullptr)
421 : {
422 : CPLStringList aosNames;
423 : for (int i = 0; i < nEntries; ++i)
424 : {
425 : // CPLDebug("VSIHDFS", "[%d]: %s", i, paoInfo[i].mName);
426 : if (STARTS_WITH(paoInfo[i].mName, osDirNameWithoutPrefix.c_str()))
427 : {
428 : aosNames.AddString(paoInfo[i].mName +
429 : osDirNameWithoutPrefix.size());
430 : }
431 : else
432 : {
433 : CPLDebug("VSIHDFS",
434 : "hdfsListDirectory() returned %s, but this is not "
435 : "starting with %s",
436 : paoInfo[i].mName, osDirNameWithoutPrefix.c_str());
437 : }
438 : }
439 : hdfsFreeFileInfo(paoInfo, nEntries);
440 : return aosNames.StealList();
441 : }
442 : return nullptr;
443 : }
444 :
445 : int VSIHdfsFilesystemHandler::Rename(const char *, const char *)
446 : {
447 : CPLError(CE_Failure, CPLE_AppDefined, "HDFS driver is read-only");
448 : return -1;
449 : }
450 :
451 : #endif
452 :
453 : //! @endcond
454 :
455 : #ifdef HDFS_ENABLED
456 :
457 : /************************************************************************/
458 : /* VSIInstallHdfsHandler() */
459 : /************************************************************************/
460 :
461 : /**
462 : * \brief Install /vsihdfs/ file system handler (requires JVM and HDFS support)
463 : *
464 : * @since GDAL 2.4.0
465 : */
466 : void VSIInstallHdfsHandler()
467 : {
468 : VSIFileManager::InstallHandler(VSIHdfsHandle::VSIHDFS,
469 : new VSIHdfsFilesystemHandler);
470 : }
471 :
472 : #else
473 :
474 : /************************************************************************/
475 : /* VSIInstallHdfsHandler() */
476 : /************************************************************************/
477 :
478 : /**
479 : * \brief Install /vsihdfs/ file system handler (non-functional stub)
480 : *
481 : * @since GDAL 2.4.0
482 : */
483 1304 : void VSIInstallHdfsHandler(void)
484 : {
485 : // Not supported.
486 1304 : }
487 :
488 : #endif
|