XRootD
Loading...
Searching...
No Matches
XrdSysIOEventsPollKQ.icc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d S y s I O E v e n t s P o l l K Q . i c c */
4/* */
5/* (c) 2014 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstdlib>
32#include <sys/types.h>
33#include <sys/stat.h>
34#include <sys/event.h>
35#include <sys/time.h>
36
38#include "XrdSys/XrdSysE2T.hh"
39#ifndef Atomic
40#define Atomic(x) x
41#endif
42
43
44/******************************************************************************/
45/* C l a s s P o l l E */
46/******************************************************************************/
47
48namespace XrdSys
49{
50namespace IOEvents
51{
52class PollKQ : public Poller
53{
54public:
55
56static int AllocMem(void **memP, int slots);
57
58 PollKQ(struct kevent *ptab, int numfd, int pfd, int pFD[2])
59 : Poller(pFD[0], pFD[1]), pollTab(ptab), cbNext(0),
60 pollDfd(pfd), pollMax(numfd), pollNum(1), numPoll(0)
61 {EV_SET(&armPipe, reqFD, EVFILT_READ,
62 EV_ADD|EV_CLEAR|EV_ENABLE, 0, 0, 0);
63 }
65
66protected:
67
68 void Begin(XrdSysSemaphore *syncp, int &rc, const char **eMsg);
69
70 void Exclude(Channel *cP, bool &isLocked, bool dover=1);
71
72 bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked);
73
74 bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked);
75
76 void Shutdown();
77
78private:
79 int AllocPT(int slots);
80 void Dispatch(Channel *cP, int i);
81 bool Process(int next);
82
83struct kevent *pollTab;
84struct kevent armPipe;
85 int cbNext;
86 int pollDfd;
87 int pollMax;
88 Atomic(int) pollNum;
89 int numPoll;
90static void *deadChP;
91
92static const int rEnabled = 1;
93static const int rFilterX = 2;
94static const int wEnabled = 4;
95static const int wFilterX = 8;
96};
97 void *PollKQ::deadChP = 0;
98};
99};
100
101/******************************************************************************/
102/* C l a s s P o l l e r */
103/******************************************************************************/
104/******************************************************************************/
105/* Static: n e w P o l l e r */
106/******************************************************************************/
107
109XrdSys::IOEvents::Poller::newPoller(int pipeFD[2],
110 int &eNum,
111 const char **eTxt)
112
113{
114 static const int allocFD = 1024;
115 struct kevent *pp, chlist;
116 int pfd;
117
118// Open the kqueue
119//
120 if ((pfd = kqueue()) < 0)
121 {eNum = errno;
122 if (eTxt) *eTxt = "creating kqueue";
123 return 0;
124 }
125
126// Add the request side of the pipe fd to the poll set (always fd[0])
127//
128 EV_SET(&chlist,pipeFD[0],EVFILT_READ,EV_ADD|EV_ONESHOT|EV_ENABLE,0,0,0);
129 if (kevent(pfd, &chlist, 1, 0, 0, 0) < 0)
130 { eNum = errno;
131 *eTxt = "adding communication pipe";
132 return 0;
133 }
134
135// Allocate the event table
136//
137 if ((eNum = XrdSys::IOEvents::PollKQ::AllocMem((void **)&pp, allocFD)))
138 {eNum = ENOMEM;
139 if (eTxt) *eTxt = "creating kqueue table";
140 close(pfd);
141 return 0;
142 }
143
144// Create new poll object
145//
146 return (Poller *)new PollKQ(pp, allocFD, pfd, pipeFD);
147}
148
149/******************************************************************************/
150/* C l a s s P o l l E */
151/******************************************************************************/
152/******************************************************************************/
153/* A l l o c M e m */
154/******************************************************************************/
155
156int XrdSys::IOEvents::PollKQ::AllocMem(void **memP, int slots)
157{
158 int rc, bytes, alignment, pagsz = getpagesize();
159
160// Calculate the size of the poll table and allocate it
161//
162 bytes = slots * sizeof(struct kevent);
163 alignment = (bytes < pagsz ? 1024 : pagsz);
164 if (!(rc = posix_memalign(memP, alignment, bytes))) memset(*memP, 0, bytes);
165 return rc;
166}
167
168/******************************************************************************/
169/* Private: A l l o c P T */
170/******************************************************************************/
171
172int XrdSys::IOEvents::PollKQ::AllocPT(int slots)
173{
174 struct kevent *pp;
175
176// Calclulate new slots
177//
178 if (pollMax >= slots) slots = pollMax + 256;
179 else slots = pollMax + (slots/256*256) + (slots%256 ? 256 : 0);
180
181// Allocate a new table and if successful, replace the old one
182//
183 if (!AllocMem((void **)&pp, slots))
184 {free(pollTab);
185 pollTab = pp;
186 pollMax = slots;
187 }
188
189// All done
190//
191 return 0;
192}
193
194/******************************************************************************/
195/* Protected: B e g i n */
196/******************************************************************************/
197
199 int &retcode,
200 const char **eTxt)
201{
202 struct timespec *tmP, tmOut;
203 Channel *cP;
204 long long tmVal;
205 int numpolled, pollN;
206
207// Indicate to the starting thread that all went well
208//
209 retcode = 0;
210 *eTxt = 0;
211 syncsem->Post();
212 tmOut.tv_nsec = 0;
213
214// Now start dispatching channels that are ready. We use the wakePend flag to
215// keep the chatter down when we actually wakeup.
216//
217 do {if ((tmVal = TmoGet()) < 0) tmP = 0;
218 else {tmOut.tv_sec = tmVal / 1000; tmP = &tmOut;}
219 do {numpolled = kevent(pollDfd, 0, 0, pollTab, pollMax, tmP);}
220 while (numpolled < 0 && errno == EINTR);
221 wakePend = true; numPoll = numpolled;
222 if (numpolled == 0) CbkTMO();
223 else if (numpolled < 0)
224 {int rc = errno;
225 //--------------------------------------------------------------
226 // If we are in a child process and the poll file descriptor
227 // has been closed, there is an immense chance the fork will be
228 // followed by an exec, in which case we don't want to abort
229 //--------------------------------------------------------------
230 if( rc == EBADF && parentPID != getpid() ) return;
231 std::cerr <<"KQ: " <<XrdSysE2T(rc) <<" polling for events" <<std::endl;
232 abort();
233 }
234 else for (int i = 0; i < numpolled; i++)
235 {if ((cP = (Channel *)pollTab[i].udata)) Dispatch(cP, i);
236 else if (!Process(i+1)) return;
237 }
238
239 pollN = AtomicGet(pollNum);
240 if (pollMax < pollN) AllocPT(pollN);
241
242 } while(1);
243}
244
245/******************************************************************************/
246/* Private: D i s p a t c h */
247/******************************************************************************/
248
249void XrdSys::IOEvents::PollKQ::Dispatch(XrdSys::IOEvents::Channel *cP, int i)
250{
251 static const uint16_t pollER = EV_EOF | EV_ERROR;
252 const char *eTxt;
253 int eNum, events;
254 bool isLocked = false;
255
256// Make sure this not a dispatch to a dead channel (rare but true)
257//
258 if (cP == (XrdSys::IOEvents::Channel *)&deadChP) return;
259
260// Translate the event to something reasonable
261//
262 if (!(pollTab[i].flags & pollER))
263 {if (pollTab[i].filter == EVFILT_READ) events = CallBack::ReadyToRead;
264 else events = CallBack::ReadyToWrite;
265 eNum = 0; eTxt = 0;
266 } else {
267 if (pollTab[i].fflags) eNum = pollTab[i].fflags;
268 else eNum = ECONNRESET;
269 eTxt = "polling"; events = 0;
270 }
271
272// Execute the callback
273//
274 cbNext = i+1;
275 if (!CbkXeq(cP, events, eNum, eTxt)) Exclude(cP, isLocked, 0);
276 cbNext = 0;
277}
278
279/******************************************************************************/
280/* Protected: E x c l u d e */
281/******************************************************************************/
282
284 bool &isLocked, bool dover)
285{
286 struct kevent chlist[2];
287 int i = 0, theFD = cP->GetFD(), kqStatus = GetPollEnt(cP);
288
289// Setup the removal elements.
290// may have been closed prior to this call (though this shouldn't happen).
291//
292 if (kqStatus & rFilterX)
293 {EV_SET(&chlist[i], theFD, EVFILT_READ, EV_DELETE, 0, 0, cP);}
294 if (kqStatus & wFilterX)
295 {EV_SET(&chlist[i], theFD, EVFILT_WRITE, EV_DELETE, 0, 0, cP);}
296
297// Remove this channel from the poll set. We ignore errors as the descriptor
298// may have been closed prior to this call (though this shouldn't happen).
299//
300 if (i) kevent(pollDfd, chlist, i, 0, 0, 0);
301 SetPollEnt(cP, 0);
302 AtomicDec(pollNum);
303
304// If we need to verify this action, sync with the poller thread (note that the
305// poller thread will not ask for this action unless it wants to deadlock). We
306// may actually deadlock anyway if the channel lock is held. We are allowed to
307// release it if the caller locked it. This will prevent a deadlock. Otherwise,
308// if we are in a callback and this channel is not the one that initiated the
309// exclude then we must make sure that we cancel any pending callback to the
310// excluded channel as it may have been deleted and we won't know that here.
311//
312 if (dover)
313 {PipeData cmdbuff;
314 if (isLocked)
315 {isLocked = false;
316 UnLockChannel(cP);
317 }
318 cmdbuff.req = PipeData::RmFD;
319 cmdbuff.fd = theFD;
320 SendCmd(cmdbuff);
321 } else {
322 if (cbNext)
323 for (int i = cbNext; i < numPoll; i++)
324 {if (cP == (Channel *)pollTab[i].udata)
325 pollTab[i].udata = &deadChP;
326 }
327 }
328}
329
330/******************************************************************************/
331/* Protected: I n c l u d e */
332/******************************************************************************/
333
335 int &eNum,
336 const char **eTxt,
337 bool &isLocked)
338{
339
340// We simply call modify as this will add events to the kqueue as needed
341//
342 if (!Modify(cP, eNum, eTxt, isLocked))
343 {if (eTxt) *eTxt = "adding channel";
344 return false;
345 }
346
347// All went well. Bump the number in the set. The poller thread will
348// reallocate the poll table if need be.
349//
350 AtomicInc(pollNum);
351 return true;
352}
353
354/******************************************************************************/
355/* Protected: M o d i f y */
356/******************************************************************************/
357
359 int &eNum,
360 const char **eTxt,
361 bool &isLocked)
362{
363 (void)isLocked;
364 struct kevent chlist[2];
365 int i = 0;
366 int events = cP->GetEvents(), theFD = cP->GetFD();
367 int kqStatus = GetPollEnt(cP);
368
369// Establish new read event mask
370//
371 if (events & Channel:: readEvents)
372 {if (!(kqStatus & rEnabled))
373 {EV_SET(&chlist[i], theFD, EVFILT_READ, EV_ADD|EV_ENABLE, 0, 0, cP);
374 kqStatus |= rEnabled | rFilterX;
375 i++;
376 }
377 } else {
378 if (kqStatus & rEnabled)
379 {EV_SET(&chlist[i], theFD, EVFILT_READ, EV_DISABLE, 0, 0, cP);
380 kqStatus &= ~rEnabled;
381 i++;
382 }
383 }
384
385// Establish new write event mask
386//
387 if (events & Channel::writeEvents)
388 {if (!(kqStatus & wEnabled))
389 {EV_SET(&chlist[i], theFD, EVFILT_WRITE, EV_ADD|EV_ENABLE, 0, 0, cP);
390 kqStatus |= wEnabled | wFilterX;
391 i++;
392 }
393 } else {
394 if (kqStatus & wEnabled)
395 {EV_SET(&chlist[i], theFD, EVFILT_WRITE, EV_DISABLE, 0, 0, cP);
396 kqStatus &= ~wEnabled;
397 i++;
398 }
399 }
400
401// Modify this fd if anything changed
402//
403 if (i)
404 {if (kevent(pollDfd, chlist, i, 0, 0, 0) < 0)
405 {eNum = errno;
406 if (eTxt) *eTxt = "modifying poll events";
407 return false;
408 }
409 SetPollEnt(cP, kqStatus);
410 }
411
412// All done
413//
414 return true;
415}
416
417/******************************************************************************/
418/* Private: P r o c e s s */
419/******************************************************************************/
420
421bool XrdSys::IOEvents::PollKQ::Process(int next)
422{
423
424// Get the pipe request and check out actions of interest.
425//
426 if (GetRequest())
427 { if (reqBuff.req == PipeData::RmFD)
428 {Channel *cP;
429 for (int i = next; i < numPoll; i++)
430 {if ((cP = (Channel *)pollTab[i].udata)
431 && cP != (XrdSys::IOEvents::Channel *)&deadChP
432 && reqBuff.fd == (int)pollTab[i].ident)
433 pollTab[i].udata = &deadChP;
434 }
435 reqBuff.theSem->Post();
436 }
437 else if (reqBuff.req == PipeData::Stop){reqBuff.theSem->Post();
438 return false;
439 }
440 }
441
442// Renable the pipe as kqueue essentially disables it once we do a read-out
443//
444 kevent(pollDfd, &armPipe, 1, 0, 0, 0);
445
446// All done
447//
448 return true;
449}
450
451/******************************************************************************/
452/* Protected: S h u t d o w n */
453/******************************************************************************/
454
456{
457 static XrdSysMutex shutMutex;
458
459// To avoid race conditions, we serialize this code
460//
461 shutMutex.Lock();
462
463// Release the poll table
464//
465 if (pollTab) {free(pollTab); pollTab = 0;}
466
467// Close the kqueue file descriptor
468//
469 if (pollDfd >= 0) {close(pollDfd); pollDfd = -1;}
470
471// All done
472//
473 shutMutex.UnLock();
474}
#define close(a)
Definition XrdPosix.hh:43
#define eMsg(x)
#define Atomic(type)
#define AtomicInc(x)
#define AtomicDec(x)
#define AtomicGet(x)
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:99
@ ReadyToWrite
Writing won't block.
@ ReadyToRead
New data has arrived.
@ writeEvents
Write and Write Timeouts.
@ readEvents
Read and Read Timeouts.
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
PollKQ(struct kevent *ptab, int numfd, int pfd, int pFD[2])
void Exclude(Channel *cP, bool &isLocked, bool dover=1)
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eMsg)
static int AllocMem(void **memP, int slots)
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)