XRootD
Loading...
Searching...
No Matches
XrdPfc.cc
Go to the documentation of this file.
1//----------------------------------------------------------------------------------
2// Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3// Author: Alja Mrak-Tadel, Matevz Tadel, Brian Bockelman
4//----------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//----------------------------------------------------------------------------------
18
19#include <fcntl.h>
20#include <sstream>
21#include <algorithm>
22#include <sys/statvfs.h>
23
25#include "XrdCl/XrdClURL.hh"
26
27#include "XrdOuc/XrdOucEnv.hh"
28#include "XrdOuc/XrdOucUtils.hh"
30
32#include "XrdSys/XrdSysTimer.hh"
33#include "XrdSys/XrdSysTrace.hh"
34
36
37#include "XrdOss/XrdOss.hh"
38
39#include "XrdPfc.hh"
40#include "XrdPfcTrace.hh"
41#include "XrdPfcFSctl.hh"
42#include "XrdPfcInfo.hh"
43#include "XrdPfcIOFile.hh"
44#include "XrdPfcIOFileBlock.hh"
45
46using namespace XrdPfc;
47
48Cache * Cache::m_instance = 0;
49
51
52
54{
56 return 0;
57}
58
59void *PurgeThread(void*)
60{
62 return 0;
63}
64
66{
68 return 0;
69}
70
71void *PrefetchThread(void*)
72{
74 return 0;
75}
76
77//==============================================================================
78
79extern "C"
80{
82 const char *config_filename,
83 const char *parameters,
84 XrdOucEnv *env)
85{
86 XrdSysError err(logger, "");
87 err.Say("++++++ Proxy file cache initialization started.");
88
89 if ( ! env ||
90 ! (XrdPfc::Cache::schedP = (XrdScheduler*) env->GetPtr("XrdScheduler*")))
91 {
94 }
95
96 Cache &instance = Cache::CreateInstance(logger, env);
97
98 if (! instance.Config(config_filename, parameters))
99 {
100 err.Say("Config Proxy file cache initialization failed.");
101 return 0;
102 }
103 err.Say("------ Proxy file cache initialization completed.");
104
105 {
106 pthread_t tid;
107
108 for (int wti = 0; wti < instance.RefConfiguration().m_wqueue_threads; ++wti)
109 {
110 XrdSysThread::Run(&tid, ProcessWriteTaskThread, 0, 0, "XrdPfc WriteTasks ");
111 }
112
113 if (instance.RefConfiguration().m_prefetch_max_blocks > 0)
114 {
115 XrdSysThread::Run(&tid, PrefetchThread, 0, 0, "XrdPfc Prefetch ");
116 }
117
118 XrdSysThread::Run(&tid, ResourceMonitorHeartBeatThread, 0, 0, "XrdPfc ResourceMonitorHeartBeat");
119
120 XrdSysThread::Run(&tid, PurgeThread, 0, 0, "XrdPfc Purge");
121 }
122
123 XrdPfcFSctl* pfcFSctl = new XrdPfcFSctl(instance, logger);
124 env->PutPtr("XrdFSCtl_PC*", pfcFSctl);
125
126 return &instance;
127}
128}
129
130//==============================================================================
131
132void Configuration::calculate_fractional_usages(long long du, long long fu,
133 double &frac_du, double &frac_fu)
134{
135 // Calculate fractional disk / file usage and clamp them to [0, 1].
136
137 // Fractional total usage above LWM:
138 // - can be > 1 if usage is above HWM;
139 // - can be < 0 if triggered via age-based-purging.
140 frac_du = (double) (du - m_diskUsageLWM) / (m_diskUsageHWM - m_diskUsageLWM);
141
142 // Fractional file usage above baseline.
143 // - can be > 1 if file usage is above max;
144 // - can be < 0 if file usage is below baseline.
145 frac_fu = (double) (fu - m_fileUsageBaseline) / (m_fileUsageMax - m_fileUsageBaseline);
146
147 frac_du = std::min( std::max( frac_du, 0.0), 1.0 );
148 frac_fu = std::min( std::max( frac_fu, 0.0), 1.0 );
149}
150
151//==============================================================================
152
154{
155 assert (m_instance == 0);
156 m_instance = new Cache(logger, env);
157 return *m_instance;
158}
159
160 Cache& Cache::GetInstance() { return *m_instance; }
161const Cache& Cache::TheOne() { return *m_instance; }
162const Configuration& Cache::Conf() { return m_instance->RefConfiguration(); }
163
165{
166 if (! m_decisionpoints.empty())
167 {
168 XrdCl::URL url(io->Path());
169 std::string filename = url.GetPath();
170 std::vector<Decision*>::const_iterator it;
171 for (it = m_decisionpoints.begin(); it != m_decisionpoints.end(); ++it)
172 {
173 XrdPfc::Decision *d = *it;
174 if (! d) continue;
175 if (! d->Decide(filename, *m_oss))
176 {
177 return false;
178 }
179 }
180 }
181
182 return true;
183}
184
186 XrdOucCache("pfc"),
187 m_env(env),
188 m_log(logger, "XrdPfc_"),
189 m_trace(new XrdSysTrace("XrdPfc", logger)),
190 m_traceID("Cache"),
191 m_oss(0),
192 m_gstream(0),
193 m_prefetch_condVar(0),
194 m_prefetch_enabled(false),
195 m_RAM_used(0),
196 m_RAM_write_queue(0),
197 m_RAM_std_size(0),
198 m_isClient(false),
199 m_in_purge(false),
200 m_active_cond(0),
201 m_stats_n_purge_cond(0),
202 m_fs_state(0),
203 m_last_scan_duration(0),
204 m_last_purge_duration(0),
205 m_spt_state(SPTS_Idle)
206{
207 // Default log level is Warning.
208 m_trace->What = 2;
209}
210
212{
213 const char* tpfx = "Attach() ";
214
215 if (Cache::GetInstance().Decide(io))
216 {
217 TRACE(Info, tpfx << obfuscateAuth(io->Path()));
218
219 IO *cio;
220
221 if (Cache::GetInstance().RefConfiguration().m_hdfsmode)
222 {
223 cio = new IOFileBlock(io, *this);
224 }
225 else
226 {
227 IOFile *iof = new IOFile(io, *this);
228
229 if ( ! iof->HasFile())
230 {
231 delete iof;
232 // TODO - redirect instead. But this is kind of an awkward place for it.
233 // errno is set during IOFile construction.
234 TRACE(Error, tpfx << "Failed opening local file, falling back to remote access " << io->Path());
235 return io;
236 }
237
238 cio = iof;
239 }
240
241 TRACE_PC(Debug, const char* loc = io->Location(), tpfx << io->Path() << " location: " <<
242 ((loc && loc[0] != 0) ? loc : "<deferred open>"));
243
244 return cio;
245 }
246 else
247 {
248 TRACE(Info, tpfx << "decision decline " << io->Path());
249 }
250 return io;
251}
252
253void Cache::AddWriteTask(Block* b, bool fromRead)
254{
255 TRACE(Dump, "AddWriteTask() offset=" << b->m_offset << ". file " << b->get_file()->GetLocalPath());
256
257 {
258 XrdSysMutexHelper lock(&m_RAM_mutex);
259 m_RAM_write_queue += b->get_size();
260 }
261
262 m_writeQ.condVar.Lock();
263 if (fromRead)
264 m_writeQ.queue.push_back(b);
265 else
266 m_writeQ.queue.push_front(b);
267 m_writeQ.size++;
268 m_writeQ.condVar.Signal();
269 m_writeQ.condVar.UnLock();
270}
271
273{
274 std::list<Block*> removed_blocks;
275 long long sum_size = 0;
276
277 m_writeQ.condVar.Lock();
278 std::list<Block*>::iterator i = m_writeQ.queue.begin();
279 while (i != m_writeQ.queue.end())
280 {
281 if ((*i)->m_file == file)
282 {
283 TRACE(Dump, "Remove entries for " << (void*)(*i) << " path " << file->lPath());
284 std::list<Block*>::iterator j = i++;
285 removed_blocks.push_back(*j);
286 sum_size += (*j)->get_size();
287 m_writeQ.queue.erase(j);
288 --m_writeQ.size;
289 }
290 else
291 {
292 ++i;
293 }
294 }
295 m_writeQ.condVar.UnLock();
296
297 {
298 XrdSysMutexHelper lock(&m_RAM_mutex);
299 m_RAM_write_queue -= sum_size;
300 }
301
302 file->BlocksRemovedFromWriteQ(removed_blocks);
303}
304
306{
307 std::vector<Block*> blks_to_write(m_configuration.m_wqueue_blocks);
308
309 while (true)
310 {
311 m_writeQ.condVar.Lock();
312 while (m_writeQ.size == 0)
313 {
314 m_writeQ.condVar.Wait();
315 }
316
317 // MT -- optimize to pop several blocks if they are available (or swap the list).
318 // This makes sense especially for smallish block sizes.
319
320 int n_pushed = std::min(m_writeQ.size, m_configuration.m_wqueue_blocks);
321 long long sum_size = 0;
322
323 for (int bi = 0; bi < n_pushed; ++bi)
324 {
325 Block* block = m_writeQ.queue.front();
326 m_writeQ.queue.pop_front();
327 m_writeQ.writes_between_purges += block->get_size();
328 sum_size += block->get_size();
329
330 blks_to_write[bi] = block;
331
332 TRACE(Dump, "ProcessWriteTasks for block " << (void*)(block) << " path " << block->m_file->lPath());
333 }
334 m_writeQ.size -= n_pushed;
335
336 m_writeQ.condVar.UnLock();
337
338 {
339 XrdSysMutexHelper lock(&m_RAM_mutex);
340 m_RAM_write_queue -= sum_size;
341 }
342
343 for (int bi = 0; bi < n_pushed; ++bi)
344 {
345 Block* block = blks_to_write[bi];
346
347 block->m_file->WriteBlockToDisk(block);
348 }
349 }
350}
351
352//==============================================================================
353
354char* Cache::RequestRAM(long long size)
355{
356 static const size_t s_block_align = sysconf(_SC_PAGESIZE);
357
358 bool std_size = (size == m_configuration.m_bufferSize);
359
360 m_RAM_mutex.Lock();
361
362 long long total = m_RAM_used + size;
363
364 if (total <= m_configuration.m_RamAbsAvailable)
365 {
366 m_RAM_used = total;
367 if (std_size && m_RAM_std_size > 0)
368 {
369 char *buf = m_RAM_std_blocks.back();
370 m_RAM_std_blocks.pop_back();
371 --m_RAM_std_size;
372
373 m_RAM_mutex.UnLock();
374
375 return buf;
376 }
377 else
378 {
379 m_RAM_mutex.UnLock();
380 char *buf;
381 if (posix_memalign((void**) &buf, s_block_align, (size_t) size))
382 {
383 // Report out of mem? Probably should report it at least the first time,
384 // then periodically.
385 return 0;
386 }
387 return buf;
388 }
389 }
390 m_RAM_mutex.UnLock();
391 return 0;
392}
393
394void Cache::ReleaseRAM(char* buf, long long size)
395{
396 bool std_size = (size == m_configuration.m_bufferSize);
397 {
398 XrdSysMutexHelper lock(&m_RAM_mutex);
399
400 m_RAM_used -= size;
401
402 if (std_size && m_RAM_std_size < m_configuration.m_RamKeepStdBlocks)
403 {
404 m_RAM_std_blocks.push_back(buf);
405 ++m_RAM_std_size;
406 return;
407 }
408 }
409 free(buf);
410}
411
412File* Cache::GetFile(const std::string& path, IO* io, long long off, long long filesize)
413{
414 // Called from virtual IO::Attach
415
416 TRACE(Debug, "GetFile " << path << ", io " << io);
417
418 ActiveMap_i it;
419
420 {
421 XrdSysCondVarHelper lock(&m_active_cond);
422
423 while (true)
424 {
425 it = m_active.find(path);
426
427 // File is not open or being opened. Mark it as being opened and
428 // proceed to opening it outside of while loop.
429 if (it == m_active.end())
430 {
431 it = m_active.insert(std::make_pair(path, (File*) 0)).first;
432 break;
433 }
434
435 if (it->second != 0)
436 {
437 it->second->AddIO(io);
438 inc_ref_cnt(it->second, false, true);
439
440 return it->second;
441 }
442 else
443 {
444 // Wait for some change in m_active, then recheck.
445 m_active_cond.Wait();
446 }
447 }
448 }
449
450 if (filesize == 0)
451 {
452 struct stat st;
453 int res = io->Fstat(st);
454 if (res < 0) {
455 errno = res;
456 TRACE(Error, "GetFile, could not get valid stat");
457 } else if (res > 0) {
458 errno = ENOTSUP;
459 TRACE(Error, "GetFile, stat returned positive value, this should NOT happen here");
460 } else {
461 filesize = st.st_size;
462 }
463 }
464
465 File *file = 0;
466
467 if (filesize >= 0)
468 {
469 file = File::FileOpen(path, off, filesize);
470 }
471
472 {
473 XrdSysCondVarHelper lock(&m_active_cond);
474
475 if (file)
476 {
477 inc_ref_cnt(file, false, true);
478 it->second = file;
479
480 file->AddIO(io);
481 }
482 else
483 {
484 m_active.erase(it);
485 }
486
487 m_active_cond.Broadcast();
488 }
489
490 return file;
491}
492
494{
495 // Called from virtual IO::DetachFinalize.
496
497 TRACE(Debug, "ReleaseFile " << f->GetLocalPath() << ", io " << io);
498
499 {
500 XrdSysCondVarHelper lock(&m_active_cond);
501
502 f->RemoveIO(io);
503 }
504 dec_ref_cnt(f, true);
505}
506
507
508namespace
509{
510
511class DiskSyncer : public XrdJob
512{
513private:
514 File *m_file;
515 bool m_high_debug;
516
517public:
518 DiskSyncer(File *f, bool high_debug, const char *desc = "") :
519 XrdJob(desc),
520 m_file(f),
521 m_high_debug(high_debug)
522 {}
523
524 void DoIt()
525 {
526 m_file->Sync();
527 Cache::GetInstance().FileSyncDone(m_file, m_high_debug);
528 delete this;
529 }
530};
531
532
533class CommandExecutor : public XrdJob
534{
535private:
536 std::string m_command_url;
537
538public:
539 CommandExecutor(const std::string& command, const char *desc = "") :
540 XrdJob(desc),
541 m_command_url(command)
542 {}
543
544 void DoIt()
545 {
546 Cache::GetInstance().ExecuteCommandUrl(m_command_url);
547 delete this;
548 }
549};
550
551}
552
553//==============================================================================
554
555void Cache::schedule_file_sync(File* f, bool ref_cnt_already_set, bool high_debug)
556{
557 DiskSyncer* ds = new DiskSyncer(f, high_debug);
558
559 if ( ! ref_cnt_already_set) inc_ref_cnt(f, true, high_debug);
560
561 schedP->Schedule(ds);
562}
563
564void Cache::FileSyncDone(File* f, bool high_debug)
565{
566 dec_ref_cnt(f, high_debug);
567}
568
569void Cache::inc_ref_cnt(File* f, bool lock, bool high_debug)
570{
571 // called from GetFile() or SheduleFileSync();
572
573 int tlvl = high_debug ? TRACE_Debug : TRACE_Dump;
574
575 if (lock) m_active_cond.Lock();
576 int rc = f->inc_ref_cnt();
577 if (lock) m_active_cond.UnLock();
578
579 TRACE_INT(tlvl, "inc_ref_cnt " << f->GetLocalPath() << ", cnt at exit = " << rc);
580}
581
582void Cache::dec_ref_cnt(File* f, bool high_debug)
583{
584 // Called from ReleaseFile() or DiskSync callback.
585
586 int tlvl = high_debug ? TRACE_Debug : TRACE_Dump;
587 int cnt;
588
589 {
590 XrdSysCondVarHelper lock(&m_active_cond);
591
592 cnt = f->get_ref_cnt();
593
595 {
596 // In this case file has been already removed from m_active map and
597 // does not need to be synced.
598
599 if (cnt == 1)
600 {
601 TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << " is in shutdown, ref_cnt = " << cnt
602 << " -- deleting File object without further ado");
603 delete f;
604 }
605 else
606 {
607 TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << " is in shutdown, ref_cnt = " << cnt
608 << " -- waiting");
609 }
610
611 return;
612 }
613 }
614
615 TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << ", cnt at entry = " << cnt);
616
617 if (cnt == 1)
618 {
619 if (f->FinalizeSyncBeforeExit())
620 {
621 // Note, here we "reuse" the existing reference count for the
622 // final sync.
623
624 TRACE(Debug, "dec_ref_cnt " << f->GetLocalPath() << ", scheduling final sync");
625 schedule_file_sync(f, true, true);
626 return;
627 }
628 }
629
630 {
631 XrdSysCondVarHelper lock(&m_active_cond);
632
633 cnt = f->dec_ref_cnt();
634 TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << ", cnt after sync_check and dec_ref_cnt = " << cnt);
635 if (cnt == 0)
636 {
637 ActiveMap_i it = m_active.find(f->GetLocalPath());
638 m_active.erase(it);
639
640 m_closed_files_stats.insert(std::make_pair(f->GetLocalPath(), f->DeltaStatsFromLastCall()));
641
642 if (m_gstream)
643 {
644 const Stats &st = f->RefStats();
645 const Info::AStat *as = f->GetLastAccessStats();
646
647 char buf[4096];
648 int len = snprintf(buf, 4096, "{\"event\":\"file_close\","
649 "\"lfn\":\"%s\",\"size\":%lld,\"blk_size\":%d,\"n_blks\":%d,\"n_blks_done\":%d,"
650 "\"access_cnt\":%lu,\"attach_t\":%lld,\"detach_t\":%lld,\"remotes\":%s,"
651 "\"b_hit\":%lld,\"b_miss\":%lld,\"b_bypass\":%lld,\"n_cks_errs\":%d}",
652 f->GetLocalPath().c_str(), f->GetFileSize(), f->GetBlockSize(),
654 (unsigned long) f->GetAccessCnt(), (long long) as->AttachTime, (long long) as->DetachTime,
655 f->GetRemoteLocations().c_str(),
657 );
658 bool suc = false;
659 if (len < 4096)
660 {
661 suc = m_gstream->Insert(buf, len + 1);
662 }
663 if ( ! suc)
664 {
665 TRACE(Error, "Failed g-stream insertion of file_close record, len=" << len);
666 }
667 }
668
669 delete f;
670 }
671 }
672}
673
674bool Cache::IsFileActiveOrPurgeProtected(const std::string& path)
675{
676 XrdSysCondVarHelper lock(&m_active_cond);
677
678 return m_active.find(path) != m_active.end() ||
679 m_purge_delay_set.find(path) != m_purge_delay_set.end();
680}
681
682
683//==============================================================================
684//=== PREFETCH
685//==============================================================================
686
688{
689 // Can be called with other locks held.
690
691 if ( ! m_prefetch_enabled)
692 {
693 return;
694 }
695
696 m_prefetch_condVar.Lock();
697 m_prefetchList.push_back(file);
698 m_prefetch_condVar.Signal();
699 m_prefetch_condVar.UnLock();
700}
701
702
704{
705 // Can be called with other locks held.
706
707 if ( ! m_prefetch_enabled)
708 {
709 return;
710 }
711
712 m_prefetch_condVar.Lock();
713 for (PrefetchList::iterator it = m_prefetchList.begin(); it != m_prefetchList.end(); ++it)
714 {
715 if (*it == file)
716 {
717 m_prefetchList.erase(it);
718 break;
719 }
720 }
721 m_prefetch_condVar.UnLock();
722}
723
724
726{
727 m_prefetch_condVar.Lock();
728 while (m_prefetchList.empty())
729 {
730 m_prefetch_condVar.Wait();
731 }
732
733 // std::sort(m_prefetchList.begin(), m_prefetchList.end(), myobject);
734
735 size_t l = m_prefetchList.size();
736 int idx = rand() % l;
737 File* f = m_prefetchList[idx];
738
739 m_prefetch_condVar.UnLock();
740 return f;
741}
742
743
745{
746 const long long limit_RAM = m_configuration.m_RamAbsAvailable * 7 / 10;
747
748 while (true)
749 {
750 m_RAM_mutex.Lock();
751 bool doPrefetch = (m_RAM_used < limit_RAM);
752 m_RAM_mutex.UnLock();
753
754 if (doPrefetch)
755 {
757 f->Prefetch();
758 }
759 else
760 {
762 }
763 }
764}
765
766
767//==============================================================================
768//=== Virtuals from XrdOucCache
769//==============================================================================
770
771//------------------------------------------------------------------------------
785
786int Cache::LocalFilePath(const char *curl, char *buff, int blen,
787 LFP_Reason why, bool forall)
788{
789 static const mode_t groupReadable = S_IRUSR | S_IWUSR | S_IRGRP;
790 static const mode_t worldReadable = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
791 static const char *lfpReason[] = { "ForAccess", "ForInfo", "ForPath" };
792
793 TRACE(Debug, "LocalFilePath '" << curl << "', why=" << lfpReason[why]);
794
795 if (buff && blen > 0) buff[0] = 0;
796
797 XrdCl::URL url(curl);
798 std::string f_name = url.GetPath();
799 std::string i_name = f_name + Info::s_infoExtension;
800
801 if (why == ForPath)
802 {
803 int ret = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
804 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> " << ret);
805 return ret;
806 }
807
808 {
809 XrdSysCondVarHelper lock(&m_active_cond);
810 m_purge_delay_set.insert(f_name);
811 }
812
813 struct stat sbuff, sbuff2;
814 if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
815 m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
816 {
817 if (S_ISDIR(sbuff.st_mode))
818 {
819 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> EISDIR");
820 return -EISDIR;
821 }
822 else
823 {
824 bool read_ok = false;
825 bool is_complete = false;
826
827 // Lock and check if the file is active. If NOT, keep the lock
828 // and add dummy access after successful reading of info file.
829 // If it IS active, just release the lock, this ongoing access will
830 // assure the file continues to exist.
831
832 // XXXX How can I just loop over the cinfo file when active?
833 // Can I not get is_complete from the existing file?
834 // Do I still want to inject access record?
835 // Oh, it writes only if not active .... still let's try to use existing File.
836
837 m_active_cond.Lock();
838
839 bool is_active = m_active.find(f_name) != m_active.end();
840
841 if (is_active) m_active_cond.UnLock();
842
843 XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
844 XrdOucEnv myEnv;
845 int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
846 if (res >= 0)
847 {
848 Info info(m_trace, 0);
849 if (info.Read(infoFile, i_name.c_str()))
850 {
851 read_ok = true;
852
853 is_complete = info.IsComplete();
854
855 // Add full-size access if reason is for access.
856 if ( ! is_active && is_complete && why == ForAccess)
857 {
858 info.WriteIOStatSingle(info.GetFileSize());
859 info.Write(infoFile, i_name.c_str());
860 }
861 }
862 infoFile->Close();
863 }
864 delete infoFile;
865
866 if ( ! is_active) m_active_cond.UnLock();
867
868 if (read_ok)
869 {
870 if ((is_complete || why == ForInfo) && buff != 0)
871 {
872 int res2 = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
873 if (res2 < 0)
874 return res2;
875
876 // Normally, files are owned by us but when direct cache access
877 // is wanted and possible, make sure the file is world readable.
878 if (why == ForAccess)
879 {mode_t mode = (forall ? worldReadable : groupReadable);
880 if (((sbuff.st_mode & worldReadable) != mode)
881 && (m_oss->Chmod(f_name.c_str(), mode) != XrdOssOK))
882 {is_complete = false;
883 *buff = 0;
884 }
885 }
886 }
887
888 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] <<
889 (is_complete ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
890
891 return is_complete ? 0 : -EREMOTE;
892 }
893 }
894 }
895
896 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> ENOENT");
897 return -ENOENT;
898}
899
900//______________________________________________________________________________
901// Check if the file is cached including m_onlyIfCachedMinSize and m_onlyIfCachedMinFrac
902// pfc configuration parameters. The logic of accessing the Info file is the same
903// as in Cache::LocalFilePath.
913//------------------------------------------------------------------------------
914int Cache::ConsiderCached(const char *curl)
915{
916 TRACE(Debug, "ConsiderFileCached '" << curl << "'" );
917
918 XrdCl::URL url(curl);
919 std::string f_name = url.GetPath();
920 std::string i_name = f_name + Info::s_infoExtension;
921
922 {
923 XrdSysCondVarHelper lock(&m_active_cond);
924 m_purge_delay_set.insert(f_name);
925 }
926
927 struct stat sbuff, sbuff2;
928 if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
929 m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
930 {
931 if (S_ISDIR(sbuff.st_mode))
932 {
933 TRACE(Info, "ConsiderCached '" << curl << ", why=ForInfo" << " -> EISDIR");
934 return -EISDIR;
935 }
936 else
937 {
938 bool read_ok = false;
939 bool is_cached = false;
940
941 // Lock and check if the file is active. If NOT, keep the lock
942 // and add dummy access after successful reading of info file.
943 // If it IS active, just release the lock, this ongoing access will
944 // assure the file continues to exist.
945
946 // XXXX How can I just loop over the cinfo file when active?
947 // Can I not get is_complete from the existing file?
948 // Do I still want to inject access record?
949 // Oh, it writes only if not active .... still let's try to use existing File.
950
951 m_active_cond.Lock();
952
953 bool is_active = m_active.find(f_name) != m_active.end();
954
955 if (is_active)
956 m_active_cond.UnLock();
957
958 XrdOssDF *infoFile = m_oss->newFile(m_configuration.m_username.c_str());
959 XrdOucEnv myEnv;
960 int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
961 if (res >= 0)
962 {
963 Info info(m_trace, 0);
964 if (info.Read(infoFile, i_name.c_str()))
965 {
966 read_ok = true;
967
968 if (info.IsComplete())
969 {
970 is_cached = true;
971 }
972 else if (info.GetFileSize() == 0)
973 {
974 is_cached = true;
975 }
976 else
977 {
978 long long fileSize = info.GetFileSize();
979 long long bytesRead = info.GetNDownloadedBytes();
980
981 if (fileSize < m_configuration.m_onlyIfCachedMinSize)
982 {
983 if ((float)bytesRead / fileSize > m_configuration.m_onlyIfCachedMinFrac)
984 is_cached = true;
985 }
986 else
987 {
988 if (bytesRead > m_configuration.m_onlyIfCachedMinSize &&
989 (float)bytesRead / fileSize > m_configuration.m_onlyIfCachedMinFrac)
990 is_cached = true;
991 }
992 }
993 }
994 infoFile->Close();
995 }
996 delete infoFile;
997
998 if (!is_active) m_active_cond.UnLock();
999
1000 if (read_ok)
1001 {
1002 TRACE(Info, "ConsiderCached '" << curl << "', why=ForInfo" << (is_cached ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
1003 return is_cached ? 0 : -EREMOTE;
1004 }
1005 }
1006 }
1007
1008 TRACE(Info, "ConsiderCached '" << curl << "', why=ForInfo" << " -> ENOENT");
1009 return -ENOENT;
1010}
1011
1012//______________________________________________________________________________
1020//------------------------------------------------------------------------------
1021
1022int Cache::Prepare(const char *curl, int oflags, mode_t mode)
1023{
1024 XrdCl::URL url(curl);
1025 std::string f_name = url.GetPath();
1026 std::string i_name = f_name + Info::s_infoExtension;
1027
1028 // Do not allow write access.
1029 if (oflags & (O_WRONLY | O_RDWR | O_APPEND | O_CREAT))
1030 {
1031 TRACE(Warning, "Prepare write access requested on file " << f_name << ". Denying access.");
1032 return -EROFS;
1033 }
1034
1035 // Intercept xrdpfc_command requests.
1036 if (m_configuration.m_allow_xrdpfc_command && strncmp("/xrdpfc_command/", f_name.c_str(), 16) == 0)
1037 {
1038 // Schedule a job to process command request.
1039 {
1040 CommandExecutor *ce = new CommandExecutor(f_name, "CommandExecutor");
1041
1042 schedP->Schedule(ce);
1043 }
1044
1045 return -EAGAIN;
1046 }
1047
1048 {
1049 XrdSysCondVarHelper lock(&m_active_cond);
1050 m_purge_delay_set.insert(f_name);
1051 }
1052
1053 struct stat sbuff;
1054 int res = m_oss->Stat(i_name.c_str(), &sbuff);
1055 if (res == 0)
1056 {
1057 TRACE(Dump, "Prepare defer open " << f_name);
1058 return 1;
1059 }
1060 else
1061 {
1062 return 0;
1063 }
1064}
1065
1066//______________________________________________________________________________
1067// virtual method of XrdOucCache.
1072//------------------------------------------------------------------------------
1073
1074int Cache::Stat(const char *curl, struct stat &sbuff)
1075{
1076 XrdCl::URL url(curl);
1077 std::string f_name = url.GetPath();
1078
1079 {
1080 XrdSysCondVarHelper lock(&m_active_cond);
1081 m_purge_delay_set.insert(f_name);
1082 }
1083
1084 if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK)
1085 {
1086 if (S_ISDIR(sbuff.st_mode))
1087 {
1088 return 0;
1089 }
1090 else
1091 {
1092 bool success = false;
1093 XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
1094 XrdOucEnv myEnv;
1095
1096 f_name += Info::s_infoExtension;
1097 int res = infoFile->Open(f_name.c_str(), O_RDONLY, 0600, myEnv);
1098 if (res >= 0)
1099 {
1100 Info info(m_trace, 0);
1101 if (info.Read(infoFile, f_name.c_str()))
1102 {
1103 sbuff.st_size = info.GetFileSize();
1104 success = true;
1105 }
1106 }
1107 infoFile->Close();
1108 delete infoFile;
1109 return success ? 0 : 1;
1110 }
1111 }
1112
1113 return 1;
1114}
1115
1116//______________________________________________________________________________
1117// virtual method of XrdOucCache.
1121//------------------------------------------------------------------------------
1122
1123int Cache::Unlink(const char *curl)
1124{
1125 XrdCl::URL url(curl);
1126 std::string f_name = url.GetPath();
1127
1128 // printf("Unlink url=%s\n\t fname=%s\n", curl, f_name.c_str());
1129
1130 return UnlinkFile(f_name, false);
1131}
1132
1133int Cache::UnlinkFile(const std::string& f_name, bool fail_if_open)
1134{
1135 ActiveMap_i it;
1136 File *file = 0;
1137 {
1138 XrdSysCondVarHelper lock(&m_active_cond);
1139
1140 it = m_active.find(f_name);
1141
1142 if (it != m_active.end())
1143 {
1144 if (fail_if_open)
1145 {
1146 TRACE(Info, "UnlinkCommon " << f_name << ", file currently open and force not requested - denying request");
1147 return -EBUSY;
1148 }
1149
1150 // Null File* in m_active map means an operation is ongoing, probably
1151 // Attach() with possible File::Open(). Ask for retry.
1152 if (it->second == 0)
1153 {
1154 TRACE(Info, "UnlinkCommon " << f_name << ", an operation on this file is ongoing - denying request");
1155 return -EAGAIN;
1156 }
1157
1158 file = it->second;
1160 it->second = 0;
1161 }
1162 else
1163 {
1164 it = m_active.insert(std::make_pair(f_name, (File*) 0)).first;
1165 }
1166 }
1167
1168 if (file)
1169 {
1171 }
1172
1173 std::string i_name = f_name + Info::s_infoExtension;
1174
1175 // Unlink file & cinfo
1176 int f_ret = m_oss->Unlink(f_name.c_str());
1177 int i_ret = m_oss->Unlink(i_name.c_str());
1178
1179 TRACE(Debug, "UnlinkCommon " << f_name << ", f_ret=" << f_ret << ", i_ret=" << i_ret);
1180
1181 {
1182 XrdSysCondVarHelper lock(&m_active_cond);
1183
1184 m_active.erase(it);
1185 }
1186
1187 return std::min(f_ret, i_ret);
1188}
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
#define TRACE_Debug
#define XrdOssOK
Definition XrdOss.hh:50
std::string obfuscateAuth(const std::string &input)
#define TRACE_Dump
#define TRACE_PC(act, pre_code, x)
#define TRACE_INT(act, x)
void * ProcessWriteTaskThread(void *)
Definition XrdPfc.cc:65
void * ResourceMonitorHeartBeatThread(void *)
Definition XrdPfc.cc:53
void * PrefetchThread(void *)
Definition XrdPfc.cc:71
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
Definition XrdPfc.cc:81
void * PurgeThread(void *)
Definition XrdPfc.cc:59
#define stat(a, b)
Definition XrdPosix.hh:96
bool Debug
#define TRACE(act, x)
Definition XrdTrace.hh:63
URL representation.
Definition XrdClURL.hh:31
const std::string & GetPath() const
Get the path.
Definition XrdClURL.hh:217
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:200
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Chmod(const char *path, mode_t mode, XrdOucEnv *envP=0)=0
virtual int Lfn2Pfn(const char *Path, char *buff, int blen)
Definition XrdOss.hh:873
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
virtual int Fstat(struct stat &sbuff)
virtual const char * Path()=0
virtual const char * Location(bool refresh=false)
void * GetPtr(const char *varname)
Definition XrdOucEnv.cc:263
void PutPtr(const char *varname, void *value)
Definition XrdOucEnv.cc:298
int get_size() const
long long m_offset
File * get_file() const
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition XrdPfc.hh:267
void FileSyncDone(File *, bool high_debug)
Definition XrdPfc.cc:564
File * GetFile(const std::string &, IO *, long long off=0, long long filesize=0)
Definition XrdPfc.cc:412
static const Configuration & Conf()
Definition XrdPfc.cc:162
bool Config(const char *config_filename, const char *parameters)
Parse configuration file.
virtual int LocalFilePath(const char *url, char *buff=0, int blen=0, LFP_Reason why=ForAccess, bool forall=false)
Definition XrdPfc.cc:786
virtual int Stat(const char *url, struct stat &sbuff)
Definition XrdPfc.cc:1074
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:315
void Purge()
Thread function invoked to scan and purge files from disk when needed.
void ReleaseRAM(char *buf, long long size)
Definition XrdPfc.cc:394
virtual int ConsiderCached(const char *url)
Definition XrdPfc.cc:914
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:160
void ResourceMonitorHeartBeat()
Thread function checking resource usage periodically.
void DeRegisterPrefetchFile(File *)
Definition XrdPfc.cc:703
void ExecuteCommandUrl(const std::string &command_url)
void RegisterPrefetchFile(File *)
Definition XrdPfc.cc:687
void Prefetch()
Definition XrdPfc.cc:744
void ReleaseFile(File *, IO *)
Definition XrdPfc.cc:493
void AddWriteTask(Block *b, bool from_read)
Add downloaded block in write queue.
Definition XrdPfc.cc:253
Cache(XrdSysLogger *logger, XrdOucEnv *env)
Constructor.
Definition XrdPfc.cc:185
bool Decide(XrdOucCacheIO *)
Makes decision if the original XrdOucCacheIO should be cached.
Definition XrdPfc.cc:164
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1133
static XrdScheduler * schedP
Definition XrdPfc.hh:404
bool IsFileActiveOrPurgeProtected(const std::string &)
Definition XrdPfc.cc:674
File * GetNextFileToPrefetch()
Definition XrdPfc.cc:725
void ProcessWriteTasks()
Separate task which writes blocks from ram to disk.
Definition XrdPfc.cc:305
virtual int Unlink(const char *url)
Definition XrdPfc.cc:1123
void RemoveWriteQEntriesFor(File *f)
Remove blocks from write queue which belong to given prefetch. This method is used at the time of Fil...
Definition XrdPfc.cc:272
virtual XrdOucCacheIO * Attach(XrdOucCacheIO *, int Options=0)
Definition XrdPfc.cc:211
static const Cache & TheOne()
Definition XrdPfc.cc:161
char * RequestRAM(long long size)
Definition XrdPfc.cc:354
virtual int Prepare(const char *url, int oflags, mode_t mode)
Definition XrdPfc.cc:1022
static Cache & CreateInstance(XrdSysLogger *logger, XrdOucEnv *env)
Singleton creation.
Definition XrdPfc.cc:153
Base class for selecting which files should be cached.
virtual bool Decide(const std::string &, XrdOss &) const =0
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
void WriteBlockToDisk(Block *b)
std::string & GetLocalPath()
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
Definition XrdPfcFile.cc:99
int GetNBlocks() const
std::string GetRemoteLocations() const
size_t GetAccessCnt() const
void AddIO(IO *io)
int GetBlockSize() const
int GetNDownloadedBlocks() const
const Info::AStat * GetLastAccessStats() const
long long GetFileSize()
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
void initiate_emergency_shutdown()
int inc_ref_cnt()
const Stats & RefStats() const
void Sync()
Sync file cache inf o and output data with disk.
int dec_ref_cnt()
int get_ref_cnt()
void RemoveIO(IO *io)
Stats DeltaStatsFromLastCall()
bool is_in_emergency_shutdown()
Downloads original file into multiple files, chunked into blocks. Only blocks that are asked for are ...
Downloads original file into a single file on local disk. Handles read requests as they come along.
bool HasFile() const
Check if File was opened successfully.
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition XrdPfcIO.hh:18
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:45
static const char * s_infoExtension
void WriteIOStatSingle(long long bytes_disk)
Write single open/close time for given bytes read from disk.
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
long long GetNDownloadedBytes() const
Get number of downloaded bytes.
bool IsComplete() const
Get complete status.
long long GetFileSize() const
Get file size.
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
Statistics of cache utilisation by a File object.
int m_NCksumErrors
number of checksum errors while getting data from remote
void Schedule(XrdJob *jp)
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void Wait(int milliseconds)
bool Insert(const char *data, int dlen)
Contains parameters configurable from the xrootd config file.
Definition XrdPfc.hh:56
long long m_RamAbsAvailable
available from configuration
Definition XrdPfc.hh:102
long long m_fileUsageMax
cache purge - files usage maximum
Definition XrdPfc.hh:90
long long m_fileUsageBaseline
cache purge - files usage baseline
Definition XrdPfc.hh:88
bool m_allow_xrdpfc_command
flag for enabling access to /xrdpfc-command/ functionality.
Definition XrdPfc.hh:79
long long m_diskUsageHWM
cache purge - disk usage high water mark
Definition XrdPfc.hh:87
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition XrdPfc.hh:106
void calculate_fractional_usages(long long du, long long fu, double &frac_du, double &frac_fu)
Definition XrdPfc.cc:132
long long m_diskUsageLWM
cache purge - disk usage low water mark
Definition XrdPfc.hh:86
int m_RamKeepStdBlocks
number of standard-sized blocks kept after release
Definition XrdPfc.hh:103
long long m_bufferSize
prefetch buffer size, default 1MB
Definition XrdPfc.hh:101
int m_wqueue_blocks
maximum number of blocks written per write-queue loop
Definition XrdPfc.hh:104
std::string m_username
username passed to oss plugin
Definition XrdPfc.hh:81
double m_onlyIfCachedMinFrac
minimum fraction of downloaded file, used by only-if-cached CGI option
Definition XrdPfc.hh:116
long long m_onlyIfCachedMinSize
minumum size of downloaded file, used by only-if-cached CGI option
Definition XrdPfc.hh:115
Access statistics.
Definition XrdPfcInfo.hh:61
long long BytesHit
read from cache
Definition XrdPfcInfo.hh:68
long long BytesBypassed
read from remote and dropped
Definition XrdPfcInfo.hh:70
time_t DetachTime
close time
Definition XrdPfcInfo.hh:63
long long BytesMissed
read from remote and cached
Definition XrdPfcInfo.hh:69
time_t AttachTime
open time
Definition XrdPfcInfo.hh:62