XRootD
Loading...
Searching...
No Matches
XrdXrootdMonFile.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d X r o o t d M o n F i l e . c c */
4/* */
5/* (c) 2012 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstring>
32
33#include "Xrd/XrdScheduler.hh"
34
35#include "XrdSys/XrdSysError.hh"
37
40
41/******************************************************************************/
42/* G l o b a l s */
43/******************************************************************************/
44
45namespace XrdXrootdMonInfo
46{
49extern long long mySID;
50extern int32_t startTime;
51}
52
53/******************************************************************************/
54/* S t a t i c M e m b e r s */
55/******************************************************************************/
56
57XrdSysMutex XrdXrootdMonFile::bfMutex;
58XrdSysMutex XrdXrootdMonFile::fmMutex;
59XrdXrootdMonFMap XrdXrootdMonFile::fmMap[XrdXrootdMonFMap::mapNum];
60short XrdXrootdMonFile::fmUse[XrdXrootdMonFMap::mapNum] = {0};
61char *XrdXrootdMonFile::repBuff = 0;
62XrdXrootdMonHeader *XrdXrootdMonFile::repHdr = 0;
63XrdXrootdMonFileTOD *XrdXrootdMonFile::repTOD = 0;
64char *XrdXrootdMonFile::repNext = 0;
65char *XrdXrootdMonFile::repFirst = 0;
66char *XrdXrootdMonFile::repLast = 0;
67int XrdXrootdMonFile::totRecs = 0;
68int XrdXrootdMonFile::xfrRecs = 0;
69int XrdXrootdMonFile::repSize = 0;
70int XrdXrootdMonFile::repTime = 0;
71int XrdXrootdMonFile::fmHWM =-1;
72int XrdXrootdMonFile::crecSize = 0;
73int XrdXrootdMonFile::xfrCnt = 0;
74int XrdXrootdMonFile::fBsz = 65472;
75int XrdXrootdMonFile::xfrRem = 0;
76XrdXrootdMonFileXFR XrdXrootdMonFile::xfrRec;
77short XrdXrootdMonFile::crecNLen = 0;
78short XrdXrootdMonFile::trecNLen = 0;
79char XrdXrootdMonFile::fsLFN = 0;
80char XrdXrootdMonFile::fsLVL = 0;
81char XrdXrootdMonFile::fsOPS = 0;
82char XrdXrootdMonFile::fsSSQ = 0;
83char XrdXrootdMonFile::fsXFR = 0;
84char XrdXrootdMonFile::crecFlag = 0;
85
86/******************************************************************************/
87/* C l o s e */
88/******************************************************************************/
89
91{
93 char *cP;
94 int iEnt, iMap, iSlot;
95
96// If this object was registered for I/O reporting, deregister it.
97//
98 if (fsP->MonEnt != -1)
99 {iEnt = fsP->MonEnt & 0xffff;
100 iMap = iEnt >> XrdXrootdMonFMap::fmShft;
101 iSlot = iEnt & XrdXrootdMonFMap::fmMask;
102 fsP->MonEnt = -1;
103 fmMutex.Lock();
104 if (fmMap[iMap].Free(iSlot)) fmUse[iMap]--;
105 if (iMap == fmHWM) while(fmHWM >= 0 && !fmUse[fmHWM]) fmHWM--;
106 fmMutex.UnLock();
107 }
108
109// Insert a close record header (mostly precomputed)
110//
112 cRec.Hdr.recFlag = crecFlag;
113 if (isDisc) cRec.Hdr.recFlag |= XrdXrootdMonFileHdr::forced;
114 cRec.Hdr.recSize = crecNLen;
115 cRec.Hdr.fileID = fsP->FileID;
116
117// Insert the I/O bytes
118//
119 cRec.Xfr.read = htonll(fsP->xfr.read);
120 cRec.Xfr.readv = htonll(fsP->xfr.readv);
121 cRec.Xfr.write = htonll(fsP->xfr.write);
122
123// Insert ops if so wanted
124//
125 if (fsOPS)
126 {cRec.Ops.read = htonl (fsP->ops.read);
127 if (fsP->ops.read)
128 {cRec.Ops.rdMin = htonl (fsP->ops.rdMin);
129 cRec.Ops.rdMax = htonl (fsP->ops.rdMax);
130 } else {
131 cRec.Ops.rdMin = cRec.Ops.rdMax = 0;
132 }
133 cRec.Ops.readv = htonl (fsP->ops.readv);
134 cRec.Ops.rsegs = htonll(fsP->ops.rsegs);
135 if (fsP->ops.readv)
136 {cRec.Ops.rsMin = htons (fsP->ops.rsMin);
137 cRec.Ops.rsMax = htons (fsP->ops.rsMax);
138 cRec.Ops.rvMin = htonl (fsP->ops.rvMin);
139 cRec.Ops.rvMax = htonl (fsP->ops.rvMax);
140 } else {
141 cRec.Ops.rsMin = cRec.Ops.rsMax = 0;
142 cRec.Ops.rvMin = cRec.Ops.rvMax = 0;
143 }
144 cRec.Ops.write = htonl (fsP->ops.write);
145 if (fsP->ops.write)
146 {cRec.Ops.wrMin = htonl (fsP->ops.wrMin);
147 cRec.Ops.wrMax = htonl (fsP->ops.wrMax);
148 } else {
149 cRec.Ops.wrMin = cRec.Ops.wrMax = 0;
150 }
151 }
152
153// Record sum of squares if so needed
154//
155 if (fsSSQ)
156 {XrdXrootdMonDouble xval;
157 xval.dreal = fsP->ssq.read;
158 cRec.Ssq.read.dlong = htonll(xval.dlong);
159 xval.dreal = fsP->ssq.readv;
160 cRec.Ssq.readv.dlong = htonll(xval.dlong);
161 xval.dreal = fsP->ssq.rsegs;
162 cRec.Ssq.rsegs.dlong = htonll(xval.dlong);
163 xval.dreal = fsP->ssq.write;
164 cRec.Ssq.write.dlong = htonll(xval.dlong);
165 }
166
167// Get a pointer to the next slot (the buffer gets locked)
168//
169 cP = GetSlot(crecSize);
170 memcpy(cP, &cRec, crecSize);
171 bfMutex.UnLock();
172}
173
174/******************************************************************************/
175/* D e f a u l t s */
176/******************************************************************************/
177
178void XrdXrootdMonFile::Defaults(int intv, int opts, int xfrcnt, int fbsz)
179{
180
181// Set the reporting interval and I/O counter
182//
183 repTime = intv;
184 xfrCnt = xfrcnt;
185 xfrRem = xfrcnt;
186 fBsz = (fbsz <= 0 ? 65472 : fbsz);
187
188// Expand out the options
189//
190 fsXFR = (opts & XROOTD_MON_FSXFR) != 0;
191 fsLFN = (opts & XROOTD_MON_FSLFN) != 0;
192 fsOPS = (opts & (XROOTD_MON_FSOPS | XROOTD_MON_FSSSQ)) != 0;
193 fsSSQ = (opts & XROOTD_MON_FSSSQ) != 0;
194
195// Set monitoring level
196//
197 if (fsSSQ) fsLVL = XrdXrootdFileStats::monSsq;
198 else if (fsOPS) fsLVL = XrdXrootdFileStats::monOps;
199 else if (intv) fsLVL = XrdXrootdFileStats::monOn;
200 else fsLVL = XrdXrootdFileStats::monOff;
201}
202
203/******************************************************************************/
204/* D i s c */
205/******************************************************************************/
206
207void XrdXrootdMonFile::Disc(unsigned int usrID)
208{
209 static short drecSize = htons(sizeof(XrdXrootdMonFileDSC));
211
212// Get a pointer to the next slot (the buffer gets locked)
213//
214 dP = (XrdXrootdMonFileDSC *)GetSlot(sizeof(XrdXrootdMonFileDSC));
215
216// Fill out the record. It's pretty simple
217//
219 dP->Hdr.recFlag = 0;
220 dP->Hdr.recSize = drecSize;
221 dP->Hdr.userID = usrID;
222 bfMutex.UnLock();
223}
224
225/******************************************************************************/
226/* D o I t */
227/******************************************************************************/
228
230{
231
232// First check if we need to report all the I/O stats
233//
234 xfrRem--;
235 if (!xfrRem) DoXFR();
236
237// Check if we should flush the buffer
238//
239 bfMutex.Lock();
240 if (repNext) Flush();
241 bfMutex.UnLock();
242
243// Reschedule ourselves
244//
245 XrdXrootdMonInfo::Sched->Schedule((XrdJob *)this, time(0)+repTime);
246}
247
248/******************************************************************************/
249/* Private: D o X F R */
250/******************************************************************************/
251
252void XrdXrootdMonFile::DoXFR()
253{
255 int keep, i, n, hwm;
256
257// Reset interval counter
258//
259 xfrRem = xfrCnt;
260
261// Grab the high watermark once
262//
263 fmMutex.Lock();
264 hwm = fmHWM;
265 fmMutex.UnLock();
266
267// Report on all the files we have registered. This is a CPU burner as we
268// periodically drop the lock to allow open/close requests to come through.
269//
270 for (i = 0; i <= hwm; i++)
271 {fmMutex.Lock();
272 if (fmUse[i])
273 {n = 0;
275 while((fsP = fmMap[i].Next(n)))
276 {if (fsP->xfrXeq) DoXFR(fsP);
277 if (!keep--)
278 {fmMutex.UnLock();
280 fmMutex.Lock();
281 }
282 }
283 }
284 fmMutex.UnLock();
285 }
286}
287
288/******************************************************************************/
289
290void XrdXrootdMonFile::DoXFR(XrdXrootdFileStats *fsP)
291{
292 long long xfrRead, xfrReadv, xfrWrite;
293 char *cP;
294
295// Turn off the activity flag
296//
297 fsP->xfrXeq = 0;
298
299// Grab the I/O bytes to get a somewhat consistent image here
300//
301 xfrRead = fsP->xfr.read;
302 xfrReadv = fsP->xfr.readv;
303 xfrWrite = fsP->xfr.write;
304
305// Complete the record
306//
307 xfrRec.Hdr.fileID = fsP->FileID;
308 xfrRec.Xfr.read = htonll(xfrRead);
309 xfrRec.Xfr.readv = htonll(xfrReadv);
310 xfrRec.Xfr.write = htonll(xfrWrite);
311
312// Get a pointer to the next slot (the buffer gets locked)
313//
314 cP = GetSlot(sizeof(xfrRec));
315 memcpy(cP, &xfrRec, sizeof(xfrRec));
316 xfrRecs++;
317 bfMutex.UnLock();
318}
319
320/******************************************************************************/
321/* I n i t */
322/******************************************************************************/
323
325{
326 XrdXrootdMonFile *mfP;
327 int alignment, pagsz = getpagesize();
328
329// Allocate a socket buffer
330//
331 alignment = (fBsz < pagsz ? 1024 : pagsz);
332 if (posix_memalign((void **)&repBuff, alignment, fBsz))
333 {XrdXrootdMonInfo::eDest->Emsg("MonFile", "Unable to allocate monitor buffer.");
334 return false;
335 }
336
337// Set the header (always present)
338//
339 repHdr = (XrdXrootdMonHeader *)repBuff;
340 repHdr->code = XROOTD_MON_MAPFSTA;
341 repHdr->pseq = 0;
343
344// Set the time record (always present)
345//
346 repTOD = (XrdXrootdMonFileTOD *)(repBuff + sizeof(XrdXrootdMonHeader));
349 repTOD->Hdr.recSize = htons(sizeof(XrdXrootdMonFileTOD));
350 repTOD->sID = static_cast<kXR_int64>(XrdXrootdMonInfo::mySID);
351
352// Establish first real record in the buffer (always fixed)
353//
354 repFirst = repBuff+sizeof(XrdXrootdMonHeader)+sizeof(XrdXrootdMonFileTOD);
355
356// Calculate the end nut the next slot always starts with a null pointer
357//
358 repLast = repBuff+fBsz-1;
359 repNext = 0;
360
361// Calculate the close record size and the initial flags
362//
363 crecSize = sizeof(XrdXrootdMonFileHdr) + sizeof(XrdXrootdMonStatXFR);
364 if (fsSSQ || fsOPS)
365 {crecSize += sizeof(XrdXrootdMonStatOPS);
367 } else crecFlag = 0;
368 if (fsSSQ)
369 {crecSize += sizeof(XrdXrootdMonStatSSQ);
370 crecFlag |= XrdXrootdMonFileHdr::hasSSQ;
371 }
372 crecNLen = htons(static_cast<short>(crecSize));
373
374// Preformat the i/o record
375//
377 xfrRec.Hdr.recFlag = 0;
378 xfrRec.Hdr.recSize = htons(static_cast<short>(sizeof(xfrRec)));
379
380// Calculate the tod record size
381//
382 trecNLen = htons(static_cast<short>(sizeof(XrdXrootdMonFileTOD)));
383
384// Allocate an instance of ourselves so we can schedule ourselves
385//
386 mfP = new XrdXrootdMonFile();
387
388// Schedule an the flushes
389//
390 XrdXrootdMonInfo::Sched->Schedule((XrdJob *)mfP, time(0)+repTime);
391 return true;
392}
393
394/******************************************************************************/
395/* Private: F l u s h */
396/******************************************************************************/
397
398void XrdXrootdMonFile::Flush() // The bfMutex must be locked
399{
400 static int seq = 0;
401 int bfSize;
402
403// Update the sequence number
404//
405 repHdr->pseq = static_cast<char>(0x00ff & seq++);
406
407// Insert ending timestamp and record counts
408//
409 repTOD->Hdr.nRecs[0] = htons(static_cast<short>(xfrRecs));
410 repTOD->Hdr.nRecs[1] = htons(static_cast<short>(totRecs));
411 repTOD->tEnd = htonl(static_cast<int>(time(0)));
412
413// Calculate buffer size and stick into the header
414//
415 bfSize = (repNext - repBuff);
416 repHdr->plen = htons(static_cast<short>(bfSize));
417 repNext = 0;
418
419// Write this out
420//
421 XrdXrootdMonitor::Send(XROOTD_MON_FSTA, repBuff, bfSize, false);
422 repTOD->tBeg = repTOD->tEnd;
423 xfrRecs = totRecs = 0;
424}
425
426/******************************************************************************/
427/* Private: G e t S l o t */
428/******************************************************************************/
429
430char *XrdXrootdMonFile::GetSlot(int slotSZ)
431{
432 char *myRec;
433
434// Lock this code to prevent interference (we should use double buffering)
435// Note that the caller must do the unlock when finished with the slot.
436//
437 bfMutex.Lock();
438
439// Check if we need to flush the buffer (sets repNext to zero). Otherwise,
440// if this is the first record insert a timestamp.
441//
442 if (repNext)
443 {if ((repNext + slotSZ) > repLast)
444 {Flush();
445 repNext = repFirst;
446 }
447 } else {
448 repTOD->tBeg = htonl(static_cast<int>(time(0)));
449 repNext = repFirst;
450 }
451
452// Return the slot
453//
454 totRecs++;
455 myRec = repNext;
456 repNext += slotSZ;
457 return myRec;
458}
459
460/******************************************************************************/
461/* O p e n */
462/******************************************************************************/
463
465 unsigned int uDID, bool isRW)
466{
467 static const int minRecSz = sizeof(XrdXrootdMonFileOPN)
468 - sizeof(XrdXrootdMonFileLFN);
470 int i = 0, sNum = -1, rLen, pLen = 0;
471
472// Assign the path a dictionary id if not assigned via file monitoring
473//
474 if (fsP->FileID == 0) fsP->FileID = XrdXrootdMonitor::GetDictID();
475
476// Add this open to the map table if we are doing I/O stats.
477//
478 if (fsXFR)
479 {fmMutex.Lock();
480 for (i = 0; i < XrdXrootdMonFMap::mapNum; i++)
481 if (fmUse[i] < XrdXrootdMonFMap::fmSize)
482 {if ((sNum = fmMap[i].Insert(fsP)) >= 0)
483 {fmUse[i]++;
484 if (i > fmHWM) fmHWM = i;
485 break;
486 }
487 }
488 fmMutex.UnLock();
489 }
490
491// Generate the cookie (real or virtual) to find the entry in the map table.
492// Supply the monitoring options for effeciency.
493//
494 fsP->MonEnt = (sNum | (i << XrdXrootdMonFMap::fmShft)) & 0xffff;
495 fsP->monLvl = fsLVL;
496 fsP->xfrXeq = 0;
497
498// Compute the size of this record
499//
500 rLen = minRecSz;
501 if (fsLFN)
502 {pLen = strlen(Path);
503 rLen += sizeof(kXR_unt32) + pLen;
504 i = (rLen + 8) & ~0x00000003;
505 pLen = pLen + (i - rLen);
506 rLen = i;
507 }
508
509// Get a pointer to the next slot (the buffer gets locked)
510//
511 oP = (XrdXrootdMonFileOPN *)GetSlot(rLen);
512
513// Fill out the record
514//
516 oP->Hdr.recFlag = (isRW ? XrdXrootdMonFileHdr::hasRW : 0);
517 oP->Hdr.recSize = htons(static_cast<short>(rLen));
518 oP->Hdr.fileID = fsP->FileID;
519 oP->fsz = htonll(fsP->fSize);
520
521// Append user and path if so wanted (sizes have been verified)
522//
523 if (fsLFN)
525 oP->ufn.user = uDID;
526 strncpy(oP->ufn.lfn, Path, pLen);
527 }
528 bfMutex.UnLock();
529}
long long kXR_int64
Definition XPtypes.hh:98
unsigned int kXR_unt32
Definition XPtypes.hh:90
XrdOucString Path
struct myOpts opts
XrdXrootdMonDouble write
XrdXrootdMonFileHdr Hdr
XrdXrootdMonFileLFN ufn
XrdXrootdMonDouble read
XrdXrootdMonStatXFR Xfr
XrdXrootdMonDouble readv
XrdXrootdMonStatXFR Xfr
XrdXrootdMonFileHdr Hdr
XrdXrootdMonStatOPS Ops
const kXR_char XROOTD_MON_MAPFSTA
XrdXrootdMonFileHdr Hdr
XrdXrootdMonFileHdr Hdr
XrdXrootdMonDouble rsegs
XrdXrootdMonStatSSQ Ssq
XrdXrootdMonFileHdr Hdr
#define XROOTD_MON_FSSSQ
#define XROOTD_MON_FSLFN
#define XROOTD_MON_FSTA
#define XROOTD_MON_FSOPS
#define XROOTD_MON_FSXFR
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
struct XrdXrootdFileStats::@158 ssq
XrdXrootdMonStatXFR xfr
XrdXrootdMonStatOPS ops
static const int fmShft
static const int fmSize
static const int mapNum
static const int fmHold
static const int fmMask
static void Disc(unsigned int usrID)
static void Defaults(int intv, int opts, int iocnt, int fbsz)
static void Open(XrdXrootdFileStats *fsP, const char *Path, unsigned int uDID, bool isRW)
static void Close(XrdXrootdFileStats *fsP, bool isDisc=false)
static int Send(int mmode, void *buff, int size, bool setseq=true)
static kXR_unt32 GetDictID(bool hbo=false)
XrdScheduler * Sched
XrdSysError * eDest