XRootD
Loading...
Searching...
No Matches
XrdClLocalFileHandler.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH
3// Author: Paul-Niklas Kramp <p.n.kramp@gsi.de>
4// Michal Simon <michal.simon@cern.ch>
5//------------------------------------------------------------------------------
6// XRootD is free software: you can redistribute it and/or modify
7// it under the terms of the GNU Lesser General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10//
11// XRootD is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15//
16// You should have received a copy of the GNU Lesser General Public License
17// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
18//------------------------------------------------------------------------------
22#include "XrdCl/XrdClURL.hh"
26
27#include "XrdSys/XrdSysE2T.hh"
28#include "XrdSys/XrdSysXAttr.hh"
29#include "XrdSys/XrdSysFAttr.hh"
30#include "XrdSys/XrdSysFD.hh"
31
32#include <string>
33#include <memory>
34#include <stdexcept>
35
36#include <fcntl.h>
37#include <cstdio>
38#include <cstdlib>
39#include <unistd.h>
40#include <sys/stat.h>
41#include <arpa/inet.h>
42#include <aio.h>
43
44namespace
45{
46
47 class AioCtx
48 {
49 public:
50
51 enum Opcode
52 {
53 None,
54 Read,
55 Write,
56 Sync
57 };
58
59 AioCtx( const XrdCl::HostList &hostList, XrdCl::ResponseHandler *handler ) :
60 opcode( None ), hosts( new XrdCl::HostList( hostList ) ), handler( handler )
61 {
62 aiocb *ptr = new aiocb();
63 memset( ptr, 0, sizeof( aiocb ) );
64
66 int useSignals = XrdCl::DefaultAioSignal;
67 env->GetInt( "AioSignal", useSignals );
68
69 if( useSignals )
70 {
71 static SignalHandlerRegistrator registrator; // registers the signal handler
72
73 ptr->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
74 ptr->aio_sigevent.sigev_signo = SIGUSR1;
75 }
76 else
77 {
78 ptr->aio_sigevent.sigev_notify = SIGEV_THREAD;
79 ptr->aio_sigevent.sigev_notify_function = ThreadHandler;
80 }
81
82 ptr->aio_sigevent.sigev_value.sival_ptr = this;
83
84 cb.reset( ptr );
85 }
86
87
88 void SetWrite( int fd, size_t offset, size_t size, const void *buffer )
89 {
90 cb->aio_fildes = fd;
91 cb->aio_offset = offset;
92 cb->aio_buf = const_cast<void*>( buffer );
93 cb->aio_nbytes = size;
94 opcode = Opcode::Write;
95 }
96
97 void SetRead( int fd, size_t offset, size_t size, void *buffer )
98 {
99 cb->aio_fildes = fd;
100 cb->aio_offset = offset;
101 cb->aio_buf = buffer;
102 cb->aio_nbytes = size;
103 opcode = Opcode::Read;
104 }
105
106 void SetFsync( int fd )
107 {
108 cb->aio_fildes = fd;
109 opcode = Opcode::Sync;
110 }
111
112 static void ThreadHandler( sigval arg )
113 {
114 std::unique_ptr<AioCtx> me( reinterpret_cast<AioCtx*>( arg.sival_ptr ) );
115 Handler( std::move( me ) );
116 }
117
118 static void SignalHandler( int sig, siginfo_t *info, void *ucontext )
119 {
120 std::unique_ptr<AioCtx> me( reinterpret_cast<AioCtx*>( info->si_value.sival_ptr ) );
121 Handler( std::move( me ) );
122 }
123
124 operator aiocb*()
125 {
126 return cb.get();
127 }
128
129 private:
130
131 struct SignalHandlerRegistrator
132 {
133 SignalHandlerRegistrator()
134 {
135 struct sigaction newact, oldact;
136 newact.sa_sigaction = SignalHandler;
137 sigemptyset( &newact.sa_mask );
138 newact.sa_flags = SA_SIGINFO;
139 int rc = sigaction( SIGUSR1, &newact, &oldact );
140 if( rc < 0 )
141 throw std::runtime_error( XrdSysE2T( errno ) );
142 }
143 };
144
145 static void Handler( std::unique_ptr<AioCtx> me )
146 {
147 if( me->opcode == Opcode::None )
148 return;
149
150 using namespace XrdCl;
151
152 int rc = aio_return( me->cb.get() );
153 if( rc < 0 )
154 {
155 int errcode = aio_error( me->cb.get() );
156 Log *log = DefaultEnv::GetLog();
157 log->Error( FileMsg, GetErrMsg( me->opcode ), XrdSysE2T( errcode ) );
158 XRootDStatus *error = new XRootDStatus( stError, errLocalError, errcode ) ;
159 QueueTask( error, 0, me->hosts, me->handler );
160 }
161 else
162 {
163 AnyObject *resp = 0;
164
165 if( me->opcode == Opcode::Read )
166 {
167 ChunkInfo *chunk = new ChunkInfo( me->cb->aio_offset, rc,
168 const_cast<void*>( me->cb->aio_buf ) );
169 resp = new AnyObject();
170 resp->Set( chunk );
171 }
172
173 QueueTask( new XRootDStatus(), resp, me->hosts, me->handler );
174 }
175 }
176
177 static const char* GetErrMsg( Opcode opcode )
178 {
179 static const char readmsg[] = "Read: failed %s";
180 static const char writemsg[] = "Write: failed %s";
181 static const char syncmsg[] = "Sync: failed %s";
182
183 switch( opcode )
184 {
185 case Opcode::Read: return readmsg;
186
187 case Opcode::Write: return writemsg;
188
189 case Opcode::Sync: return syncmsg;
190
191 default: return 0;
192 }
193 }
194
195 static void QueueTask( XrdCl::XRootDStatus *status, XrdCl::AnyObject *resp,
196 XrdCl::HostList *hosts, XrdCl::ResponseHandler *handler )
197 {
198 using namespace XrdCl;
199
200 // if it is simply the sync handler we can release the semaphore
201 // and return there is no need to execute this in the thread-pool
202 SyncResponseHandler *syncHandler =
203 dynamic_cast<SyncResponseHandler*>( handler );
204 if( syncHandler || DefaultEnv::GetPostMaster() == nullptr )
205 {
206 syncHandler->HandleResponse( status, resp );
207 }
208 else
209 {
210 JobManager *jmngr = DefaultEnv::GetPostMaster()->GetJobManager();
211 LocalFileTask *task = new LocalFileTask( status, resp, hosts, handler );
212 jmngr->QueueJob( task );
213 }
214 }
215
216 std::unique_ptr<aiocb> cb;
217 Opcode opcode;
218 XrdCl::HostList *hosts;
219 XrdCl::ResponseHandler *handler;
220 };
221
222};
223
224namespace XrdCl
225{
226
227 //------------------------------------------------------------------------
228 // Constructor
229 //------------------------------------------------------------------------
231 fd( -1 )
232 {
233 }
234
235 //------------------------------------------------------------------------
236 // Destructor
237 //------------------------------------------------------------------------
239 {
240
241 }
242
243 //------------------------------------------------------------------------
244 // Open
245 //------------------------------------------------------------------------
246 XRootDStatus LocalFileHandler::Open( const std::string& url, uint16_t flags,
247 uint16_t mode, ResponseHandler* handler, uint16_t timeout )
248 {
249 AnyObject *resp = 0;
250 XRootDStatus st = OpenImpl( url, flags, mode, resp );
251 if( !st.IsOK() && st.code != errLocalError )
252 return st;
253
254 return QueueTask( new XRootDStatus( st ), resp, handler );
255 }
256
257 XRootDStatus LocalFileHandler::Open( const URL *url, const Message *req, AnyObject *&resp )
258 {
259 const ClientOpenRequest* request =
260 reinterpret_cast<const ClientOpenRequest*>( req->GetBuffer() );
261 uint16_t flags = ntohs( request->options );
262 uint16_t mode = ntohs( request->mode );
263 return OpenImpl( url->GetURL(), flags, mode, resp );
264 }
265
266 //------------------------------------------------------------------------
267 // Close
268 //------------------------------------------------------------------------
270 uint16_t timeout )
271 {
272 if( close( fd ) == -1 )
273 {
274 Log *log = DefaultEnv::GetLog();
275 log->Error( FileMsg, "Close: file fd: %i %s", fd, XrdSysE2T( errno ) );
276 XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
277 return QueueTask( error, 0, handler );
278 }
279
280 return QueueTask( new XRootDStatus(), 0, handler );
281 }
282
283 //------------------------------------------------------------------------
284 // Stat
285 //------------------------------------------------------------------------
287 uint16_t timeout )
288 {
289 Log *log = DefaultEnv::GetLog();
290
291 struct stat ssp;
292 if( fstat( fd, &ssp ) == -1 )
293 {
294 log->Error( FileMsg, "Stat: failed fd: %i %s", fd, XrdSysE2T( errno ) );
295 XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
296 return QueueTask( error, 0, handler );
297 }
298 std::ostringstream data;
299 data << ssp.st_dev << " " << ssp.st_size << " " << ssp.st_mode << " "
300 << ssp.st_mtime;
301 log->Debug( FileMsg, data.str().c_str() );
302
303 StatInfo *statInfo = new StatInfo();
304 if( !statInfo->ParseServerResponse( data.str().c_str() ) )
305 {
306 log->Error( FileMsg, "Stat: ParseServerResponse failed." );
307 delete statInfo;
309 0, handler );
310 }
311
312 AnyObject *resp = new AnyObject();
313 resp->Set( statInfo );
314 return QueueTask( new XRootDStatus(), resp, handler );
315 }
316
317 //------------------------------------------------------------------------
318 // Read
319 //------------------------------------------------------------------------
320 XRootDStatus LocalFileHandler::Read( uint64_t offset, uint32_t size,
321 void* buffer, ResponseHandler* handler, uint16_t timeout )
322 {
323#if defined(__APPLE__)
324 Log *log = DefaultEnv::GetLog();
325 int read = 0;
326 if( ( read = pread( fd, buffer, size, offset ) ) == -1 )
327 {
328 log->Error( FileMsg, "Read: failed %s", XrdSysE2T( errno ) );
329 XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
330 return QueueTask( error, 0, handler );
331 }
332 ChunkInfo *chunk = new ChunkInfo( offset, read, buffer );
333 AnyObject *resp = new AnyObject();
334 resp->Set( chunk );
335 return QueueTask( new XRootDStatus(), resp, handler );
336#else
337 AioCtx *ctx = new AioCtx( pHostList, handler );
338 ctx->SetRead( fd, offset, size, buffer );
339
340 int rc = aio_read( *ctx );
341
342 if( rc < 0 )
343 {
344 Log *log = DefaultEnv::GetLog();
345 log->Error( FileMsg, "Read: failed %s", XrdSysE2T( errno ) );
346 return XRootDStatus( stError, errLocalError, errno );
347 }
348
349 return XRootDStatus();
350#endif
351 }
352
353
354 //------------------------------------------------------------------------
355 // ReadV
356 //------------------------------------------------------------------------
358 struct iovec *iov,
359 int iovcnt,
360 ResponseHandler *handler,
361 uint16_t timeout )
362 {
363 Log *log = DefaultEnv::GetLog();
364#if defined(__APPLE__)
365 ssize_t ret = lseek( fd, offset, SEEK_SET );
366 if( ret >= 0 )
367 ret = readv( fd, iov, iovcnt );
368#else
369 ssize_t ret = preadv( fd, iov, iovcnt, offset );
370#endif
371 if( ret == -1 )
372 {
373 log->Error( FileMsg, "ReadV: failed %s", XrdSysE2T( errno ) );
374 XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
375 return QueueTask( error, 0, handler );
376 }
377 VectorReadInfo *info = new VectorReadInfo();
378 info->SetSize( ret );
379 uint64_t choff = offset;
380 uint32_t left = ret;
381 for( int i = 0; i < iovcnt; ++i )
382 {
383 uint32_t chlen = iov[i].iov_len;
384 if( chlen > left ) chlen = left;
385 info->GetChunks().emplace_back( choff, chlen, iov[i].iov_base);
386 left -= chlen;
387 choff += chlen;
388 }
389 AnyObject *resp = new AnyObject();
390 resp->Set( info );
391 return QueueTask( new XRootDStatus(), resp, handler );
392 }
393
394 //------------------------------------------------------------------------
395 // Write
396 //------------------------------------------------------------------------
397 XRootDStatus LocalFileHandler::Write( uint64_t offset, uint32_t size,
398 const void* buffer, ResponseHandler* handler, uint16_t timeout )
399 {
400#if defined(__APPLE__)
401 const char *buff = reinterpret_cast<const char*>( buffer );
402 size_t bytesWritten = 0;
403 while( bytesWritten < size )
404 {
405 ssize_t ret = pwrite( fd, buff, size, offset );
406 if( ret < 0 )
407 {
408 Log *log = DefaultEnv::GetLog();
409 log->Error( FileMsg, "Write: failed %s", XrdSysE2T( errno ) );
410 XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
411 return QueueTask( error, 0, handler );
412 }
413 offset += ret;
414 buff += ret;
415 bytesWritten += ret;
416 }
417 return QueueTask( new XRootDStatus(), 0, handler );
418#else
419 AioCtx *ctx = new AioCtx( pHostList, handler );
420 ctx->SetWrite( fd, offset, size, buffer );
421
422 int rc = aio_write( *ctx );
423
424 if( rc < 0 )
425 {
426 Log *log = DefaultEnv::GetLog();
427 log->Error( FileMsg, "Write: failed %s", XrdSysE2T( errno ) );
428 return XRootDStatus( stError, errLocalError, errno );
429 }
430
431 return XRootDStatus();
432#endif
433 }
434
435 //------------------------------------------------------------------------
436 // Sync
437 //------------------------------------------------------------------------
439 uint16_t timeout )
440 {
441#if defined(__APPLE__)
442 if( fsync( fd ) )
443 {
444 Log *log = DefaultEnv::GetLog();
445 log->Error( FileMsg, "Sync: failed %s", XrdSysE2T( errno ) );
447 XProtocol::mapError( errno ),
448 XrdSysE2T( errno ) );
449 return QueueTask( error, 0, handler );
450 }
451 return QueueTask( new XRootDStatus(), 0, handler );
452#else
453 AioCtx *ctx = new AioCtx( pHostList, handler );
454 ctx->SetFsync( fd );
455 int rc = aio_fsync( O_SYNC, *ctx );
456 if( rc < 0 )
457 {
458 Log *log = DefaultEnv::GetLog();
459 log->Error( FileMsg, "Sync: failed %s", XrdSysE2T( errno ) );
460 return XRootDStatus( stError, errLocalError, errno );
461 }
462#endif
463 return XRootDStatus();
464 }
465
466 //------------------------------------------------------------------------
467 // Truncate
468 //------------------------------------------------------------------------
470 ResponseHandler* handler, uint16_t timeout )
471 {
472 if( ftruncate( fd, size ) )
473 {
474 Log *log = DefaultEnv::GetLog();
475 log->Error( FileMsg, "Truncate: failed, file descriptor: %i, %s", fd,
476 XrdSysE2T( errno ) );
477 XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
478 return QueueTask( error, 0, handler );
479 }
480
481 return QueueTask( new XRootDStatus( stOK ), 0, handler );
482 }
483
484 //------------------------------------------------------------------------
485 // VectorRead
486 //------------------------------------------------------------------------
488 void* buffer, ResponseHandler* handler, uint16_t timeout )
489 {
490 std::unique_ptr<VectorReadInfo> info( new VectorReadInfo() );
491 size_t totalSize = 0;
492 bool useBuffer( buffer );
493
494 for( auto itr = chunks.begin(); itr != chunks.end(); ++itr )
495 {
496 auto &chunk = *itr;
497 if( !useBuffer )
498 buffer = chunk.buffer;
499 ssize_t bytesRead = pread( fd, buffer, chunk.length,
500 chunk.offset );
501 if( bytesRead < 0 )
502 {
503 Log *log = DefaultEnv::GetLog();
504 log->Error( FileMsg, "VectorRead: failed, file descriptor: %i, %s",
505 fd, XrdSysE2T( errno ) );
506 XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
507 return QueueTask( error, 0, handler );
508 }
509 totalSize += bytesRead;
510 info->GetChunks().push_back( ChunkInfo( chunk.offset, bytesRead, buffer ) );
511 if( useBuffer )
512 buffer = reinterpret_cast<char*>( buffer ) + bytesRead;
513 }
514
515 info->SetSize( totalSize );
516 AnyObject *resp = new AnyObject();
517 resp->Set( info.release() );
518 return QueueTask( new XRootDStatus(), resp, handler );
519 }
520
521 //------------------------------------------------------------------------
522 // VectorWrite
523 //------------------------------------------------------------------------
525 ResponseHandler *handler, uint16_t timeout )
526 {
527
528 for( auto itr = chunks.begin(); itr != chunks.end(); ++itr )
529 {
530 auto &chunk = *itr;
531 ssize_t bytesWritten = pwrite( fd, chunk.buffer, chunk.length,
532 chunk.offset );
533 if( bytesWritten < 0 )
534 {
535 Log *log = DefaultEnv::GetLog();
536 log->Error( FileMsg, "VectorWrite: failed, file descriptor: %i, %s",
537 fd, XrdSysE2T( errno ) );
538 XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
539 return QueueTask( error, 0, handler );
540 }
541 }
542
543 return QueueTask( new XRootDStatus(), 0, handler );
544 }
545
546 //------------------------------------------------------------------------
547 // WriteV
548 //------------------------------------------------------------------------
550 ChunkList *chunks,
551 ResponseHandler *handler,
552 uint16_t timeout )
553 {
554 size_t iovcnt = chunks->size();
555 iovec iovcp[iovcnt];
556 ssize_t size = 0;
557 for( size_t i = 0; i < iovcnt; ++i )
558 {
559 iovcp[i].iov_base = (*chunks)[i].buffer;
560 iovcp[i].iov_len = (*chunks)[i].length;
561 size += (*chunks)[i].length;
562 }
563 iovec *iovptr = iovcp;
564
565 ssize_t bytesWritten = 0;
566 while( bytesWritten < size )
567 {
568#ifdef __APPLE__
569 ssize_t ret = lseek( fd, offset, SEEK_SET );
570 if( ret >= 0 )
571 ret = writev( fd, iovptr, iovcnt );
572#else
573 ssize_t ret = pwritev( fd, iovptr, iovcnt, offset );
574#endif
575 if( ret < 0 )
576 {
577 Log *log = DefaultEnv::GetLog();
578 log->Error( FileMsg, "WriteV: failed %s", XrdSysE2T( errno ) );
579 XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
580 return QueueTask( error, 0, handler );
581 }
582
583 bytesWritten += ret;
584 while( ret )
585 {
586 if( size_t( ret ) > iovptr[0].iov_len )
587 {
588 ret -= iovptr[0].iov_len;
589 --iovcnt;
590 ++iovptr;
591 }
592 else
593 {
594 iovptr[0].iov_len -= ret;
595 iovptr[0].iov_base = reinterpret_cast<char*>( iovptr[0].iov_base ) + ret;
596 ret = 0;
597 }
598 }
599 }
600
601 return QueueTask( new XRootDStatus(), 0, handler );
602 }
603
604 //------------------------------------------------------------------------
605 // Fcntl
606 //------------------------------------------------------------------------
608 ResponseHandler *handler, uint16_t timeout )
609 {
611 }
612
613 //------------------------------------------------------------------------
614 // Visa
615 //------------------------------------------------------------------------
617 uint16_t timeout )
618 {
620 }
621
622 //------------------------------------------------------------------------
623 // Set extended attributes - async
624 //------------------------------------------------------------------------
625 XRootDStatus LocalFileHandler::SetXAttr( const std::vector<xattr_t> &attrs,
626 ResponseHandler *handler,
627 uint16_t timeout )
628 {
630 std::vector<XAttrStatus> response;
631
632 auto itr = attrs.begin();
633 for( ; itr != attrs.end(); ++itr )
634 {
635 std::string name = std::get<xattr_name>( *itr );
636 std::string value = std::get<xattr_value>( *itr );
637 int err = xattr->Set( name.c_str(), value.c_str(), value.size(), 0, fd );
638 XRootDStatus status = err < 0 ? XRootDStatus( stError, errLocalError, -err ) :
639 XRootDStatus();
640
641 response.push_back( XAttrStatus( name, status ) );
642 }
643
644 AnyObject *resp = new AnyObject();
645 resp->Set( new std::vector<XAttrStatus>( std::move( response ) ) );
646
647 return QueueTask( new XRootDStatus(), resp, handler );
648 }
649
650 //------------------------------------------------------------------------
651 // Get extended attributes - async
652 //------------------------------------------------------------------------
653 XRootDStatus LocalFileHandler::GetXAttr( const std::vector<std::string> &attrs,
654 ResponseHandler *handler,
655 uint16_t timeout )
656 {
658 std::vector<XAttr> response;
659
660 auto itr = attrs.begin();
661 for( ; itr != attrs.end(); ++itr )
662 {
663 std::string name = *itr;
664 std::unique_ptr<char[]> buffer;
665
666 int size = xattr->Get( name.c_str(), 0, 0, 0, fd );
667 if( size < 0 )
668 {
669 XRootDStatus status( stError, errLocalError, -size );
670 response.push_back( XAttr( *itr, "", status ) );
671 continue;
672 }
673 buffer.reset( new char[size] );
674 int ret = xattr->Get( name.c_str(), buffer.get(), size, 0, fd );
675
676 XRootDStatus status;
677 std::string value;
678
679 if( ret >= 0 )
680 value.append( buffer.get(), ret );
681 else if( ret < 0 )
682 status = XRootDStatus( stError, errLocalError, -ret );
683
684 response.push_back( XAttr( *itr, value, status ) );
685 }
686
687 AnyObject *resp = new AnyObject();
688 resp->Set( new std::vector<XAttr>( std::move( response ) ) );
689
690 return QueueTask( new XRootDStatus(), resp, handler );
691 }
692
693 //------------------------------------------------------------------------
694 // Delete extended attributes - async
695 //------------------------------------------------------------------------
696 XRootDStatus LocalFileHandler::DelXAttr( const std::vector<std::string> &attrs,
697 ResponseHandler *handler,
698 uint16_t timeout )
699 {
701 std::vector<XAttrStatus> response;
702
703 auto itr = attrs.begin();
704 for( ; itr != attrs.end(); ++itr )
705 {
706 std::string name = *itr;
707 int err = xattr->Del( name.c_str(), 0, fd );
708 XRootDStatus status = err < 0 ? XRootDStatus( stError, errLocalError, -err ) :
709 XRootDStatus();
710
711 response.push_back( XAttrStatus( name, status ) );
712 }
713
714 AnyObject *resp = new AnyObject();
715 resp->Set( new std::vector<XAttrStatus>( std::move( response ) ) );
716
717 return QueueTask( new XRootDStatus(), resp, handler );
718 }
719
720 //------------------------------------------------------------------------
721 // List extended attributes - async
722 //------------------------------------------------------------------------
724 uint16_t timeout )
725 {
727 std::vector<XAttr> response;
728
729 XrdSysXAttr::AList *alist = 0;
730 int err = xattr->List( &alist, 0, fd, 1 );
731
732 if( err < 0 )
733 {
734 XRootDStatus *status = new XRootDStatus( stError, XProtocol::mapError( -err ) );
735 return QueueTask( status, 0, handler );
736 }
737
738 XrdSysXAttr::AList *ptr = alist;
739 while( ptr )
740 {
741 std::string name( ptr->Name, ptr->Nlen );
742 int vlen = ptr->Vlen;
743 ptr = ptr->Next;
744
745 std::unique_ptr<char[]> buffer( new char[vlen] );
746 int ret = xattr->Get( name.c_str(),
747 buffer.get(), vlen, 0, fd );
748
749 std::string value = ret >= 0 ? std::string( buffer.get(), ret ) :
750 std::string();
751 XRootDStatus status = ret >= 0 ? XRootDStatus() :
753 response.push_back( XAttr( name, value, status ) );
754 }
755 xattr->Free( alist );
756
757 AnyObject *resp = new AnyObject();
758 resp->Set( new std::vector<XAttr>( std::move( response ) ) );
759
760 return QueueTask( new XRootDStatus(), resp, handler );
761 }
762
763 //------------------------------------------------------------------------
764 // QueueTask - queues error/success tasks for all operations.
765 // Must always return stOK.
766 // Is always creating the same HostList containing only localhost.
767 //------------------------------------------------------------------------
769 ResponseHandler *handler )
770 {
771 // if it is simply the sync handler we can release the semaphore
772 // and return there is no need to execute this in the thread-pool
773 SyncResponseHandler *syncHandler =
774 dynamic_cast<SyncResponseHandler*>( handler );
775 if( syncHandler || DefaultEnv::GetPostMaster() == nullptr )
776 {
777 syncHandler->HandleResponse( st, resp );
778 return XRootDStatus();
779 }
780
781 HostList *hosts = pHostList.empty() ? 0 : new HostList( pHostList );
782 LocalFileTask *task = new LocalFileTask( st, resp, hosts, handler );
784 return XRootDStatus();
785 }
786
787 //------------------------------------------------------------------------
788 // MkdirPath - creates the folders specified in file_path
789 // called if kXR_mkdir flag is set
790 //------------------------------------------------------------------------
791 XRootDStatus LocalFileHandler::MkdirPath( const std::string &path )
792 {
793 // first find the most up-front component that exists
794 size_t pos = path.rfind( '/' );
795 while( pos != std::string::npos && pos != 0 )
796 {
797 std::string tmp = path.substr( 0, pos );
798 struct stat st;
799 int rc = lstat( tmp.c_str(), &st );
800 if( rc == 0 ) break;
801 if( errno != ENOENT )
802 return XRootDStatus( stError, errLocalError, errno );
803 pos = path.rfind( '/', pos - 1 );
804 }
805
806 pos = path.find( '/', pos + 1 );
807 while( pos != std::string::npos && pos != 0 )
808 {
809 std::string tmp = path.substr( 0, pos );
810 if( mkdir( tmp.c_str(), 0755 ) )
811 {
812 if( errno != EEXIST )
813 return XRootDStatus( stError, errLocalError, errno );
814 }
815 pos = path.find( '/', pos + 1 );
816 }
817 return XRootDStatus();
818 }
819
820 XRootDStatus LocalFileHandler::OpenImpl( const std::string &url, uint16_t flags,
821 uint16_t mode, AnyObject *&resp)
822 {
823 Log *log = DefaultEnv::GetLog();
824
825 // safe the file URL for the HostList for later
826 pUrl = url;
827
828 URL fileUrl( url );
829 if( !fileUrl.IsValid() )
831
832 if( fileUrl.GetHostName() != "localhost" )
834
835 std::string path = fileUrl.GetPath();
836
837 //---------------------------------------------------------------------
838 // Prepare Flags
839 //---------------------------------------------------------------------
840 uint16_t openflags = 0;
841 if( flags & kXR_new )
842 openflags |= O_CREAT | O_EXCL;
843 if( flags & kXR_open_wrto )
844 openflags |= O_WRONLY;
845 else if( flags & kXR_open_updt )
846 openflags |= O_RDWR;
847 else
848 openflags |= O_RDONLY;
849 if( flags & kXR_delete )
850 openflags |= O_CREAT | O_TRUNC;
851
852 if( flags & kXR_mkdir )
853 {
854 XRootDStatus st = MkdirPath( path );
855 if( !st.IsOK() )
856 {
857 log->Error( FileMsg, "Open MkdirPath failed %s: %s", path.c_str(),
858 XrdSysE2T( st.errNo ) );
859 return st;
860 }
861
862 }
863 //---------------------------------------------------------------------
864 // Open File
865 //---------------------------------------------------------------------
866 if( mode == Access::Mode::None)
867 mode = 0644;
868 fd = XrdSysFD_Open( path.c_str(), openflags, mode );
869 if( fd == -1 )
870 {
871 log->Error( FileMsg, "Open: open failed: %s: %s", path.c_str(),
872 XrdSysE2T( errno ) );
873
875 XProtocol::mapError( errno ) );
876 }
877 //---------------------------------------------------------------------
878 // Stat File and cache statInfo in openInfo
879 //---------------------------------------------------------------------
880 struct stat ssp;
881 if( fstat( fd, &ssp ) == -1 )
882 {
883 log->Error( FileMsg, "Open: stat failed." );
885 XProtocol::mapError( errno ) );
886 }
887
888 std::ostringstream data;
889 data << ssp.st_dev << " " << ssp.st_size << " " << ssp.st_mode << " "
890 << ssp.st_mtime;
891
892 StatInfo *statInfo = new StatInfo();
893 if( !statInfo->ParseServerResponse( data.str().c_str() ) )
894 {
895 log->Error( FileMsg, "Open: ParseServerResponse failed." );
896 delete statInfo;
898 }
899
900 // add the URL to hosts list
901 pHostList.push_back( HostInfo( pUrl, false ) );
902
903 //All went well
904 uint32_t ufd = fd;
905 OpenInfo *openInfo = new OpenInfo( (uint8_t*)&ufd, 1, statInfo );
906 resp = new AnyObject();
907 resp->Set( openInfo );
908 return XRootDStatus();
909 }
910
911 //------------------------------------------------------------------------
912 // Parses kXR_fattr request and calls respective XAttr operation
913 //------------------------------------------------------------------------
914 XRootDStatus LocalFileHandler::XAttrImpl( kXR_char code,
915 kXR_char numattr,
916 size_t bodylen,
917 char *body,
918 ResponseHandler *handler )
919 {
920 // shift body by 1 to omit the empty path
921 if( bodylen > 0 )
922 {
923 ++body;
924 --bodylen;
925 }
926
927 switch( code )
928 {
929 case kXR_fattrGet:
930 case kXR_fattrDel:
931 {
932 std::vector<std::string> attrs;
933 // parse namevec
934 for( kXR_char i = 0; i < numattr; ++i )
935 {
936 if( bodylen < sizeof( kXR_unt16 ) ) return XRootDStatus( stError, errDataError );
937 // shift by RC size
938 body += sizeof( kXR_unt16 );
939 bodylen -= sizeof( kXR_unt16 );
940 // get the size of attribute name
941 size_t len = strlen( body );
942 if( len > bodylen ) return XRootDStatus( stError, errDataError );
943 attrs.push_back( std::string( body, len ) );
944 body += len + 1; // +1 for the null terminating the string
945 bodylen -= len + 1; // +1 for the null terminating the string
946 }
947
948 if( code == kXR_fattrGet )
949 return GetXAttr( attrs, handler );
950
951 return DelXAttr( attrs, handler );
952 }
953
954 case kXR_fattrSet:
955 {
956 std::vector<xattr_t> attrs;
957 // parse namevec
958 for( kXR_char i = 0; i < numattr; ++i )
959 {
960 if( bodylen < sizeof( kXR_unt16 ) ) return XRootDStatus( stError, errDataError );
961 // shift by RC size
962 body += sizeof( kXR_unt16 );
963 bodylen -= sizeof( kXR_unt16 );
964 // get the size of attribute name
965 char *name = 0;
966 body = ClientFattrRequest::NVecRead( body, name );
967 attrs.push_back( std::make_tuple( std::string( name ), std::string() ) );
968 bodylen -= strlen( name ) + 1; // +1 for the null terminating the string
969 free( name );
970 }
971 // parse valuevec
972 for( kXR_char i = 0; i < numattr; ++i )
973 {
974 // get value length
975 if( bodylen < sizeof( kXR_int32 ) ) return XRootDStatus( stError, errDataError );
976 kXR_int32 len = 0;
977 body = ClientFattrRequest::VVecRead( body, len );
978 bodylen -= sizeof( kXR_int32 );
979 // get value
980 if( size_t( len ) > bodylen ) return XRootDStatus( stError, errDataError );
981 char *value = 0;
982 body = ClientFattrRequest::VVecRead( body, len, value );
983 bodylen -= len;
984 std::get<xattr_value>( attrs[i] ) = value;
985 free( value );
986 }
987
988 return SetXAttr( attrs, handler );
989 }
990
991 case kXR_fattrList:
992 {
993 return ListXAttr( handler );
994 }
995
996 default:
998 }
999
1000 return XRootDStatus();
1001 }
1002
1004 Message *msg,
1005 ResponseHandler *handler,
1006 MessageSendParams &sendParams )
1007 {
1008 ClientRequest *req = reinterpret_cast<ClientRequest*>( msg->GetBuffer() );
1009
1010 switch( req->header.requestid )
1011 {
1012 case kXR_open:
1013 {
1014 XRootDStatus st = Open( url.GetURL(), req->open.options,
1015 req->open.mode, handler, sendParams.timeout );
1016 delete msg; // in case of other operations msg is owned by the handler
1017 return st;
1018 }
1019
1020 case kXR_close:
1021 {
1022 return Close( handler, sendParams.timeout );
1023 }
1024
1025 case kXR_stat:
1026 {
1027 return Stat( handler, sendParams.timeout );
1028 }
1029
1030 case kXR_read:
1031 {
1032 if( msg->GetVirtReqID() == kXR_virtReadv )
1033 {
1034 auto &chunkList = *sendParams.chunkList;
1035 struct iovec iov[chunkList.size()];
1036 for( size_t i = 0; i < chunkList.size() ; ++i )
1037 {
1038 iov[i].iov_base = chunkList[i].buffer;
1039 iov[i].iov_len = chunkList[i].length;
1040 }
1041 return ReadV( chunkList.front().offset, iov, chunkList.size(),
1042 handler, sendParams.timeout );
1043 }
1044
1045 return Read( req->read.offset, req->read.rlen,
1046 sendParams.chunkList->front().buffer,
1047 handler, sendParams.timeout );
1048 }
1049
1050 case kXR_write:
1051 {
1052 ChunkList *chunks = sendParams.chunkList;
1053 if( chunks->size() == 1 )
1054 {
1055 // it's an ordinary write
1056 return Write( req->write.offset, req->write.dlen,
1057 chunks->front().buffer, handler,
1058 sendParams.timeout );
1059 }
1060 // it's WriteV call
1061 return WriteV( req->write.offset, sendParams.chunkList,
1062 handler, sendParams.timeout );
1063 }
1064
1065 case kXR_sync:
1066 {
1067 return Sync( handler, sendParams.timeout );
1068 }
1069
1070 case kXR_truncate:
1071 {
1072 return Truncate( req->truncate.offset, handler, sendParams.timeout );
1073 }
1074
1075 case kXR_writev:
1076 {
1077 return VectorWrite( *sendParams.chunkList, handler,
1078 sendParams.timeout );
1079 }
1080
1081 case kXR_readv:
1082 {
1083 return VectorRead( *sendParams.chunkList, 0,
1084 handler, sendParams.timeout );
1085 }
1086
1087 case kXR_fattr:
1088 {
1089 return XAttrImpl( req->fattr.subcode, req->fattr.numattr, req->fattr.dlen,
1090 msg->GetBuffer( sizeof(ClientRequestHdr ) ), handler );
1091 }
1092
1093 default:
1094 {
1096 }
1097 }
1098 }
1099}
@ kXR_FSError
Definition XProtocol.hh:995
struct ClientTruncateRequest truncate
Definition XProtocol.hh:875
@ kXR_fattrDel
Definition XProtocol.hh:270
@ kXR_fattrSet
Definition XProtocol.hh:273
@ kXR_fattrList
Definition XProtocol.hh:272
@ kXR_fattrGet
Definition XProtocol.hh:271
struct ClientFattrRequest fattr
Definition XProtocol.hh:854
@ kXR_virtReadv
Definition XProtocol.hh:150
kXR_unt16 options
Definition XProtocol.hh:481
@ kXR_open_wrto
Definition XProtocol.hh:469
@ kXR_delete
Definition XProtocol.hh:453
@ kXR_open_updt
Definition XProtocol.hh:457
@ kXR_new
Definition XProtocol.hh:455
struct ClientOpenRequest open
Definition XProtocol.hh:860
struct ClientRequestHdr header
Definition XProtocol.hh:846
kXR_unt16 requestid
Definition XProtocol.hh:157
@ kXR_read
Definition XProtocol.hh:125
@ kXR_open
Definition XProtocol.hh:122
@ kXR_writev
Definition XProtocol.hh:143
@ kXR_readv
Definition XProtocol.hh:137
@ kXR_mkdir
Definition XProtocol.hh:120
@ kXR_sync
Definition XProtocol.hh:128
@ kXR_fattr
Definition XProtocol.hh:132
@ kXR_write
Definition XProtocol.hh:131
@ kXR_truncate
Definition XProtocol.hh:140
@ kXR_stat
Definition XProtocol.hh:129
@ kXR_close
Definition XProtocol.hh:115
struct ClientReadRequest read
Definition XProtocol.hh:867
struct ClientWriteRequest write
Definition XProtocol.hh:876
int kXR_int32
Definition XPtypes.hh:89
unsigned short kXR_unt16
Definition XPtypes.hh:67
unsigned char kXR_char
Definition XPtypes.hh:65
struct stat Stat
Definition XrdCks.cc:49
int lstat(const char *path, struct stat *buf)
#define close(a)
Definition XrdPosix.hh:43
#define lseek(a, b, c)
Definition XrdPosix.hh:47
#define fsync(a)
Definition XrdPosix.hh:59
#define fstat(a, b)
Definition XrdPosix.hh:57
#define mkdir(a, b)
Definition XrdPosix.hh:69
#define writev(a, b, c)
Definition XrdPosix.hh:112
#define readv(a, b, c)
Definition XrdPosix.hh:79
#define stat(a, b)
Definition XrdPosix.hh:96
#define ftruncate(a, b)
Definition XrdPosix.hh:65
#define read(a, b, c)
Definition XrdPosix.hh:77
#define pwrite(a, b, c, d)
Definition XrdPosix.hh:102
#define pread(a, b, c, d)
Definition XrdPosix.hh:75
struct sigevent aio_sigevent
Definition XrdSfsAio.hh:51
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:99
static int mapError(int rc)
void Set(Type object, bool own=true)
Binary blob representation.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
A synchronized queue.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
XRootDStatus Truncate(uint64_t size, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus VectorRead(const ChunkList &chunks, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Stat(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Sync(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus ReadV(uint64_t offset, struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus QueueTask(XRootDStatus *st, AnyObject *obj, ResponseHandler *handler)
XRootDStatus SetXAttr(const std::vector< xattr_t > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus MkdirPath(const std::string &path)
XRootDStatus Read(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus WriteV(uint64_t offset, ChunkList *chunks, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus DelXAttr(const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus ListXAttr(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Fcntl(const Buffer &arg, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Close(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Visa(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Write(uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus VectorWrite(const ChunkList &chunks, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus ExecRequest(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams)
Translate an XRootD request into LocalFileHandler call.
XRootDStatus Open(const std::string &url, uint16_t flags, uint16_t mode, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus GetXAttr(const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
The message representation used throughout the system.
uint16_t GetVirtReqID() const
Get virtual request ID for the message.
Open operation (.
Information returned by file open operation.
JobManager * GetJobManager()
Get the job manager object user by the post master.
Handle an async response.
Object stat info.
bool ParseServerResponse(const char *data)
Parse server response and fill up the object.
Synchronize the response.
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Handle the response.
URL representation.
Definition XrdClURL.hh:31
std::string GetURL() const
Get the URL.
Definition XrdClURL.hh:86
ChunkList & GetChunks()
Get chunks.
void SetSize(uint32_t size)
Set size.
static XrdSysXAttr * Xat
char Name[1]
Start of the name (size of struct is dynamic)
int Vlen
The length of the attribute value;.
virtual int List(AList **aPL, const char *Path, int fd=-1, int getSz=0)=0
virtual int Get(const char *Aname, void *Aval, int Avsz, const char *Path, int fd=-1)=0
virtual int Set(const char *Aname, const void *Aval, int Avsz, const char *Path, int fd=-1, int isNew=0)=0
int Nlen
The length of the attribute name that follows.
virtual void Free(AList *aPL)=0
virtual int Del(const char *Aname, const char *Path, int fd=-1)=0
AList * Next
-> next element.
ReadImpl< false > Read(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating ReadImpl objects.
const uint16_t stError
An error occurred that could potentially be retried.
SyncImpl< false > Sync(Ctx< File > file, uint16_t timeout=0)
Factory for creating SyncImpl objects.
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
const uint16_t stOK
Everything went OK.
const uint64_t FileMsg
const uint16_t errOSError
const uint16_t errInvalidArgs
WriteImpl< false > Write(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< const void * > buffer, uint16_t timeout=0)
Factory for creating WriteImpl objects.
std::vector< ChunkInfo > ChunkList
List of chunks.
const uint16_t errNotSupported
const uint16_t errLocalError
const int DefaultAioSignal
static char * NVecRead(char *buffer, kXR_unt16 &rc)
Definition XProtocol.cc:205
static char * VVecRead(char *buffer, kXR_int32 &len)
Definition XProtocol.cc:224
Describe a data chunk for vector read.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
uint32_t errNo
Errno, if any.
Extended attribute operation status.
Extended attributes with status.