XRootD
Loading...
Searching...
No Matches
XrdSysIOEvents.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d S y s I O E v e n t s . c c */
4/* */
5/* (c) 2012 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstdio>
32#include <cstdlib>
33
34#include "XrdSys/XrdSysE2T.hh"
35#include "XrdSys/XrdSysFD.hh"
40
41/******************************************************************************/
42/* L o c a l D a t a */
43/******************************************************************************/
44
45namespace
46{
47// Status code to name array corresponding to:
48// enum Status {isClear = 0, isCBMode, isDead};
49//
50 const char *statName[] = {"isClear", "isCBMode", "isDead"};
51}
52
53/******************************************************************************/
54/* L o c a l D e f i n e s */
55/******************************************************************************/
56
57#define STATUS statName[(int)chStat]
58
59#define STATUSOF(x) statName[(int)(x->chStat)]
60
61#define SINGLETON(dlvar, theitem)\
62 theitem ->dlvar .next == theitem
63
64#define INSERT(dlvar, curitem, newitem) \
65 newitem ->dlvar .next = curitem; \
66 newitem ->dlvar .prev = curitem ->dlvar .prev; \
67 curitem ->dlvar .prev-> dlvar .next = newitem; \
68 curitem ->dlvar .prev = newitem
69
70#define REMOVE(dlbase, dlvar, curitem) \
71 if (dlbase == curitem) dlbase = (SINGLETON(dlvar,curitem) \
72 ? 0 : curitem ->dlvar .next);\
73 curitem ->dlvar .prev-> dlvar .next = curitem ->dlvar .next;\
74 curitem ->dlvar .next-> dlvar .prev = curitem ->dlvar .prev;\
75 curitem ->dlvar .next = curitem;\
76 curitem ->dlvar .prev = curitem
77
78#define REVENTS(x) x & Channel:: readEvents
79
80#define WEVENTS(x) x & Channel::writeEvents
81
82#define ISPOLLER XrdSysThread::Same(XrdSysThread::ID(),pollTid)
83
84#define BOOLNAME(x) (x ? "true" : "false")
85
86#define DO_TRACE(x,fd,y) \
87 {PollerInit::traceMTX.Lock(); \
88 std::cerr <<"IOE fd "<<fd<<' '<<#x <<": "<<y<<'\n'<< std::flush; \
89 PollerInit::traceMTX.UnLock();}
90
91#define TRACING PollerInit::doTrace
92
93#define IF_TRACE(x,fd,y) if (TRACING) DO_TRACE(x,fd,y)
94
95#define TRACE_LOK " channel now " <<(isLocked ? "locked" : "unlocked")
96
97#define TRACE_MOD(x,fd,y) \
98 IF_TRACE(x,fd,"Modify(" <<y <<") == " \
99 <<BOOLNAME(retval) <<TRACE_LOK)
100
101#define TRACE_NOD(x,fd,y) \
102 IF_TRACE(x,fd,"Modify(" <<y <<") skipped; no events changed")
103
104/******************************************************************************/
105/* G l o b a l D a t a */
106/******************************************************************************/
107
109 = (sizeof(time_t) == 8 ? 0x7fffffffffffffffLL : 0x7fffffff);
110
112
113/******************************************************************************/
114/* L o c a l C l a s s e s */
115/******************************************************************************/
116/******************************************************************************/
117/* T h r e a d S t a r t u p I n t e r f a c e */
118/******************************************************************************/
119
120namespace XrdSys
121{
122namespace IOEvents
123{
126 const char *retMsg;
129
131 {pollSync = new XrdSysSemaphore(0, "poll sync");}
133 };
134
136{
137public:
138
139static void *Start(void *parg);
140};
141
142void *BootStrap::Start(void *parg)
143{
144 struct pollArg *pollArg = (struct pollArg *)parg;
145 Poller *thePoller = pollArg->pollP;
147 thePoller->pollTid = XrdSysThread::ID();
148
149 thePoller->Begin(theSem, pollArg->retCode, &(pollArg->retMsg));
150 delete theSem;
151
152 return (void *)0;
153}
154
155/******************************************************************************/
156/* P o l l e r E r r 1 */
157/******************************************************************************/
158
159// This class is set in the channel when an error occurs but cannot be reflected
160// immediately because either there is no callback function or all events are
161// disabled. We need to do this because error events can be physically presented
162// by the kernel even when logical events are disabled. Note that the error
163// number and text will have been set and remain set as the channel was actually
164// disabled preventing any new operation on the channel.
165//
166class PollerErr1 : public Poller
167{
168public:
169
170 PollerErr1() : Poller(-1, -1) {}
172
173protected:
174void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt)
175 {(void)syncp; (void)rc; (void)eTxt;}
176
177void Exclude(Channel *cP, bool &isLocked, bool dover=1)
178 {(void)cP; (void)isLocked; (void)dover;}
179
180bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
181 {(void)isLocked;
182 if (!(eNum = GetFault(cP))) eNum = EPROTO;
183 if (eTxt) *eTxt = "initializing channel";
184 return false;
185 }
186
187bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
188 {(void)isLocked;
189 if (!(eNum = GetFault(cP))) eNum = EPROTO;
190 if (eTxt) *eTxt = "modifying channel";
191 return false;
192 }
193
194void Shutdown() {}
195};
196
197/******************************************************************************/
198/* P o l l e r I n i t */
199/******************************************************************************/
200
201// This class is used as the initial poller on a channel. It is responsible
202// for adding the file descriptor to the poll set upon the initial enable. This
203// avoids enabling a channel prior to it receiving a call back function.
204//
205class PollerInit : public Poller
206{
207public:
208
209 PollerInit() : Poller(-1, -1) {}
211
213static bool doTrace;
214
215protected:
216
217void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt) {}
218
219void Exclude(Channel *cP, bool &isLocked, bool dover=1) {}
220
221bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
222 {eNum = EPROTO;
223 if (eTxt) *eTxt = "initializing channel";
224 return false;
225 }
226
227bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
228 {bool rc = Init(cP, eNum, eTxt, isLocked);
229 IF_TRACE(Modify,cP->GetFD(), "Init() returned " <<BOOLNAME(rc));
230 return rc;
231 }
232
233void Shutdown() {}
234};
235
236bool PollerInit::doTrace = (getenv("XrdSysIOE_TRACE") != 0);
238
239/******************************************************************************/
240/* P o l l e r W a i t */
241/******************************************************************************/
242
243// This class is set in the channel when we need to serialize aces to the
244// channel. Channel methods (as some others) check for this to see if they need
245// to defer the current operation. We need to do his because some poller
246// implementations must release the channel lock to avoid a deadlock.
247//
248class PollerWait : public Poller
249{
250public:
251
252 PollerWait() : Poller(-1, -1) {}
254
255protected:
256void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt) {}
257
258void Exclude(Channel *cP, bool &isLocked, bool dover=1) {}
259
260bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
261 {eNum = EIDRM;
262 if (eTxt) *eTxt = "initializing channel";
263 return false;
264 }
265
266bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
267 {return Init(cP, eNum, eTxt, isLocked);}
268
269void Shutdown() {}
270};
271
275};
276};
277
278/******************************************************************************/
279/* C l a s s C h a n n e l M e t h o d s */
280/******************************************************************************/
281/******************************************************************************/
282/* C o n s t r u c t o r */
283/******************************************************************************/
284
286 CallBack *cbP, void *cbArg)
287 : chPollXQ(pollP), chCB(cbP), chCBA(cbArg)
288{
289 attList.next = attList.prev = this;
290 tmoList.next = tmoList.prev = this;
291 inTOQ = 0;
292 pollEnt = 0;
293 chStat = isClear;
294 Reset(&pollInit, fd);
295
296 pollP->Attach(this);
297}
298
299/******************************************************************************/
300/* D e l e t e */
301/******************************************************************************/
302
304{
305 Poller *myPoller;
306 bool isLocked = true;
307
308// Do some tracing
309//
310 IF_TRACE(Delete,chFD,"status="<<STATUS);
311
312// Lock ourselves during the delete process. If the channel is disassociated
313// or the real poller is set to the error poller then this channel is clean
314// and can be deleted (i.e. the channel ran through Detach()).
315//
316 chMutex.Lock();
317 if (!chPollXQ || chPollXQ == &pollErr1)
318 {chMutex.UnLock();
319 delete this;
320 return;
321 }
322
323// Disable and remove ourselves from all queues
324//
325 myPoller = chPollXQ;
326 chPollXQ->Detach(this,isLocked,false);
327 if (!isLocked) chMutex.Lock();
328
329// If we are in callback mode then we will need to delay the destruction until
330// after the callback completes unless this is the poller thread. In that case,
331// we need to tell the poller that we have been destroyed in a shelf-stable way.
332//
333 if (chStat)
335 {myPoller->chDead = true;
336 chMutex.UnLock();
337 } else {
338 XrdSysSemaphore cbDone(0);
339 IF_TRACE(Delete,chFD,"waiting for callback");
340 chStat = isDead;
341 chCBA = (void *)&cbDone;
342 chMutex.UnLock();
343 cbDone.Wait();
344 }
345 }
346// It is now safe to release the storage
347//
348 IF_TRACE(Delete,chFD,"chan="<< std::hex<<(void *)this<< std::dec);
349 delete this;
350}
351
352/******************************************************************************/
353/* D i s a b l e */
354/******************************************************************************/
355
356bool XrdSys::IOEvents::Channel::Disable(int events, const char **eText)
357{
358 int eNum = 0, newev, curev;
359 bool retval = true, isLocked = true;
360
361// Lock this channel
362//
363 chMutex.Lock();
364
365// Get correct current events; depending on the state of the channel
366//
367 if (chPoller == &pollWait) curev = static_cast<int>(reMod);
368 else curev = static_cast<int>(chEvents);
369
370// Trace this entry
371//
372 IF_TRACE(Disable,chFD,"->Disable(" <<events <<") chev=" <<curev);
373
374// Calculate new event mask
375//
376 events &= allEvents;
377 newev = curev & ~events;
378
379// If something has changed, then modify the event mask in the poller. The
380// poller may or may not unlock this channel during the process.
381//
382 if (newev != curev)
383 {chEvents = newev;
384 retval = chPoller->Modify(this, eNum, eText, isLocked);
385 TRACE_MOD(Disable,chFD,newev);
386 } else {
387 TRACE_NOD(Disable,chFD,newev);
388 }
389 if (isLocked) chMutex.UnLock();
390
391// All done
392//
393 if (!retval) errno = eNum;
394 return retval;
395}
396
397/******************************************************************************/
398/* E n a b l e */
399/******************************************************************************/
400
401bool XrdSys::IOEvents::Channel::Enable(int events, int timeout,
402 const char **eText)
403{
404 int eNum = 0, newev, curev, tmoSet = 0;
405 bool retval, setTO, isLocked = true;
406
407// Lock ourselves against any changes (this is a recursive mutex)
408//
409 chMutex.Lock();
410
411// Get correct current events; depending on the state of the channel
412//
413 if (chPoller == &pollWait) curev = static_cast<int>(reMod);
414 else curev = static_cast<int>(chEvents);
415
416// Trace this entry
417//
418 IF_TRACE(Enable,chFD,"->Enable("<<events<<','<<timeout<<") chev="<<curev);
419
420// Establish events that should be enabled
421//
422 events &= allEvents;
423 newev = (curev ^ events) & events;
424 chEvents = curev | events;
425
426// Handle timeout changes now
427//
428 if (REVENTS(events))
429 { if (timeout > 0) chRTO = timeout;
430 else if (timeout < 0) chRTO = 0;
431 if (rdDL != Poller::maxTime || chRTO) tmoSet |= CallBack::ReadyToRead;
432 }
433
434 if (WEVENTS(events))
435 { if (timeout > 0) chWTO = timeout;
436 else if (timeout < 0) chWTO = 0;
437 if (wrDL != Poller::maxTime || chWTO) tmoSet |= CallBack::ReadyToWrite;
438 }
439
440// Check if we have to reset the timeout. We need to hold the channel lock here.
441//
442 if (tmoSet && chPoller != &pollErr1)
443 setTO = chPollXQ->TmoAdd(this, tmoSet);
444 else setTO = false;
445
446// Check if any modifcations needed here. If so, invoke the modifier. Note that
447// the modify will unlock the channel if the operation causes a wait. So,
448// we cannot depend on the channel being locked upon return. The reason we do
449// not unlock here is because we must ensure the channel doesn't change while
450// we call modify. We let modify determine what to do.
451//
452 if (newev)
453 {retval = chPoller->Modify(this, eNum, eText, isLocked);
454 TRACE_MOD(Enable,chFD,(curev | events));
455 } else {
456 retval = true;
457 TRACE_NOD(Enable,chFD,(curev | events));
458 }
459
460// We need to notify the poller thread if the added deadline is the first in the
461// queue and the poller is waiting. We also optimize for the case where the
462// poller thread is always woken up to perform an action in which case it
463// doesn't need a separate wakeup. We only do this if the enable succeeed. Note
464// that we cannot hold the channel mutex for this call because it may wait.
465//
466 if (isLocked) chMutex.UnLock();
467 bool isWakePend = CPP_ATOMIC_LOAD(chPollXQ->wakePend, std::memory_order_consume);
468 if (retval && !isWakePend && setTO && isLocked) chPollXQ->WakeUp();
469
470// All done
471//
472 if (!retval) errno = eNum;
473 return retval;
474}
475
476/******************************************************************************/
477/* G e t C a l l B a c k */
478/******************************************************************************/
479
481{
482 chMutex.Lock();
483 *cbP = chCB;
484 *cbArg = chCBA;
485 chMutex.UnLock();
486}
487
488/******************************************************************************/
489/* Private: R e s e t */
490/******************************************************************************/
491
492void XrdSys::IOEvents::Channel::Reset(XrdSys::IOEvents::Poller *thePoller,
493 int fd, int eNum)
494{
495 chPoller = thePoller;
496 chFD = fd;
497 chFault = eNum;
498 chRTO = 0;
499 chWTO = 0;
500 chEvents = 0;
501 dlType = 0;
502 inPSet = 0;
503 reMod = 0;
504 rdDL = Poller::maxTime;
505 wrDL = Poller::maxTime;
506 deadLine = Poller::maxTime;
507}
508
509/******************************************************************************/
510/* S e t C a l l B a c k */
511/******************************************************************************/
512
514{
515
516// We only need to have the channel lock to set the callback. If the object
517// is in the process of being destroyed, we do nothing.
518//
519 chMutex.Lock();
520 if (chStat != isDead)
521 {chCB = cbP;
522 chCBA = cbArg;
523 }
524 chMutex.UnLock();
525}
526
527/******************************************************************************/
528/* S e t F D */
529/******************************************************************************/
530
532{
533 bool isLocked = true;
534
535// Obtain the channel lock. If the object is in callback mode we have some
536// extra work to do. If normal callback then indicate the channel transitioned
537// to prevent it being automatically re-enabled. If it's being destroyed, then
538// do nothing. Otherwise, this is a stupid double setFD call.
539//
540 chMutex.Lock();
541 if (chStat == isDead)
542 {chMutex.UnLock();
543 return;
544 }
545
546// This is a tricky deal here because we need to protect ourselves from other
547// threads as well as the poller trying to do a callback. We first, set the
548// poller target. This means the channel is no longer ready and callbacks will
549// be skipped. We then remove the current file descriptor. This may unlock the
550// channel but at this point that's ok.
551//
552 if (inPSet)
553 {chPoller = &pollWait;
554 chPollXQ->Detach(this, isLocked, true);
555 if (!isLocked) chMutex.Lock();
556 }
557
558// Indicate channel needs to be re-enabled then unlock the channel
559//
560 Reset(&pollInit, fd);
561 chMutex.UnLock();
562}
563
564/******************************************************************************/
565/* C l a s s P o l l e r */
566/******************************************************************************/
567/******************************************************************************/
568/* C o n s t r u c t o r */
569/******************************************************************************/
570
572{
573
574// Now initialize local class members
575//
576 attBase = 0;
577 tmoBase = 0;
578 cmdFD = cFD;
579 reqFD = rFD;
580 wakePend = false;
581 pipeBuff = 0;
582 pipeBlen = 0;
583 pipePoll.fd = rFD;
584 pipePoll.events = POLLIN | POLLRDNORM;
585 tmoMask = 255;
586}
587
588/******************************************************************************/
589/* A t t a c h */
590/******************************************************************************/
591
592void XrdSys::IOEvents::Poller::Attach(XrdSys::IOEvents::Channel *cP)
593{
594 Channel *pcP;
595
596// We allow only one attach at a time to simplify the processing
597//
598 adMutex.Lock();
599
600// Chain this channel into the list of attached channels
601//
602 if ((pcP = attBase)) {INSERT(attList, pcP, cP);}
603 else attBase = cP;
604
605// All done
606//
607 adMutex.UnLock();
608}
609
610/******************************************************************************/
611/* C b k T M O */
612/******************************************************************************/
613
615{
616 Channel *cP;
617
618// Process each element in the timeout queue, calling the callback function
619// if the timeout has passed. As this method can be called with a lock on the
620// channel mutex, we need to drop it prior to calling the callback.
621//
622 toMutex.Lock();
623 while((cP = tmoBase) && cP->deadLine <= time(0))
624 {int dlType = cP->dlType;
625 toMutex.UnLock();
626 CbkXeq(cP, dlType, 0, 0);
627 toMutex.Lock();
628 }
629 toMutex.UnLock();
630}
631
632/******************************************************************************/
633/* C b k X e q */
634/******************************************************************************/
635
637 int eNum, const char *eTxt)
638{
639 XrdSysMutexHelper cbkMHelp(cP->chMutex);
640 char oldEvents;
641 bool cbok, retval, isRead, isWrite, isLocked = true;
642
643// Perform any required tracing
644//
645 if (TRACING)
646 {const char *cbtype = (cP->chPoller == cP->chPollXQ ? "norm" :
647 (cP->chPoller == &pollInit ? "init" :
648 (cP->chPoller == &pollWait ? "wait" : "err")));
649 DO_TRACE(CbkXeq,cP->chFD,"callback events=" <<events
650 <<" chev=" <<static_cast<int>(cP->chEvents)
651 <<" toq=" <<(cP->inTOQ != 0) <<" erc=" <<eNum
652 <<" callback " <<(cP->chCB ? "present" : "missing")
653 <<" poller=" <<cbtype);
654 }
655
656// Remove this from the timeout queue if there and reset the deadlines based
657// on the event we are reflecting. This separates read and write deadlines
658//
659 if (cP->inTOQ)
660 {TmoDel(cP);
661 cP->dlType |= (events & CallBack::ValidEvents) << 4;
662 isRead = events & (CallBack::ReadyToRead | CallBack:: ReadTimeOut);
663 if (isRead) cP->rdDL = maxTime;
665 if (isWrite) cP->wrDL = maxTime;
666 } else {
667 cP->dlType &= CallBack::ValidEvents;
668 isRead = isWrite = false;
669 }
670
671// Verify that there is a callback here and the channel is ready. If not,
672// disable this channel for the events being refelcted unless the event is a
673// fatal error. In this case we need to abandon the channel since error events
674// may continue to be generated as we can't always disable them.
675//
676 if (!(cP->chCB) || cP->chPoller != cP->chPollXQ)
677 {if (eNum)
678 {cP->chPoller = &pollErr1; cP->chFault = eNum;
679 cP->inPSet = 0;
680 return false;
681 }
682 oldEvents = cP->chEvents;
683 cP->chEvents = 0;
684 retval = cP->chPoller->Modify(cP, eNum, 0, isLocked);
685 TRACE_MOD(CbkXeq,cP->chFD,0);
686 if (!isLocked) cP->chMutex.Lock();
687 cP->chEvents = oldEvents;
688 return true;
689 }
690
691// Resolve the problem where we get an error event but the channel wants them
692// presented as a read or write event. If neither is possible then defer the
693// error until the channel is enabled again.
694//
695 if (eNum)
696 {if (cP->chEvents & Channel::errorEvents)
697 {cP->chPoller = &pollErr1; cP->chFault = eNum;
698 cP->chStat = Channel::isCBMode;
699 chDead = false;
700 cbkMHelp.UnLock();
701 cP->chCB->Fatal(cP,cP->chCBA, eNum, eTxt);
702 if (chDead) return true;
703 cbkMHelp.Lock(&(cP->chMutex));
704 cP->inPSet = 0;
705 return false;
706 }
707 if (REVENTS(cP->chEvents)) events = CallBack::ReadyToRead;
708 else if (WEVENTS(cP->chEvents)) events = CallBack::ReadyToWrite;
709 else {cP->chPoller = &pollErr1; cP->chFault = eNum; cP->inPSet = 0;
710 return false;
711 }
712 }
713
714// Indicate that we are in callback mode then drop the channel lock and effect
715// the callback. This allows the callback to freely manage locks.
716//
717 cP->chStat = Channel::isCBMode;
718 chDead = false;
719 cbkMHelp.UnLock();
720 IF_TRACE(CbkXeq,cP->chFD,"invoking callback; events=" <<events);
721 cbok = cP->chCB->Event(cP,cP->chCBA, events);
722 IF_TRACE(CbkXeq,cP->chFD,"callback returned " <<BOOLNAME(cbok));
723
724// If channel destroyed by the callback, bail really fast. Otherwise, regain
725// the channel lock.
726//
727 if (chDead) return true;
728 cbkMHelp.Lock(&(cP->chMutex));
729
730// If the channel is being destroyed; then another thread must have done so.
731// Tell it the callback has finished and just return.
732//
733 if (cP->chStat != Channel::isCBMode)
734 {if (cP->chStat == Channel::isDead)
735 ((XrdSysSemaphore *)cP->chCBA)->Post();
736 return true;
737 }
738 cP->chStat = Channel::isClear;
739
740// Handle enable or disable here. If we keep the channel enabled then reset
741// the timeout if it hasn't been handled via a call from the callback.
742//
743 if (!cbok) Detach(cP,isLocked,false);
744 else if ((isRead || isWrite) && !(cP->inTOQ) && (cP->chRTO || cP->chWTO))
745 TmoAdd(cP, 0);
746
747// All done. While the mutex should not have been unlocked, we relock it if
748// it has to keep the mutex helper from croaking.
749//
750 if (!isLocked) cP->chMutex.Lock();
751 return true;
752}
753
754/******************************************************************************/
755/* Static: C r e a t e */
756/******************************************************************************/
757
759 const char **eTxt,
760 int crOpts)
761{
762 int fildes[2];
763 struct pollArg pArg;
764 pthread_t tid;
765
766// Create a pipe used to break the poll wait loop
767//
768 if (XrdSysFD_Pipe(fildes))
769 {eNum = errno;
770 if (eTxt) *eTxt = "creating poll pipe";
771 return 0;
772 }
773
774// Create an actual implementation of a poller
775//
776 if (!(pArg.pollP = newPoller(fildes, eNum, eTxt)))
777 {close(fildes[0]);
778 close(fildes[1]);
779 return 0;
780 }
781
782// Now start a thread to handle this poller object
783//
785 (void *)&pArg, XRDSYSTHREAD_BIND, "Poller")))
786 {if (eTxt) *eTxt = "creating poller thread"; return 0;}
787
788// Now wait for the thread to finish initializing before we allow use
789// Note that the bootstrap takes ownership of the semaphore and will delete it
790// once the thread positing the semaphore actually ends. This is to avoid
791// semaphore bugs present in certain (e.g. Linux) kernels.
792//
793 pArg.pollSync->Wait();
794
795// Check if all went well
796//
797 if (pArg.retCode)
798 {if (eTxt) *eTxt = (pArg.retMsg ? pArg.retMsg : "starting poller");
799 eNum = pArg.retCode;
800 delete pArg.pollP;
801 return 0;
802 }
803
804// Set creation options in the new poller
805//
806 if (crOpts & optTOM)
808
809// All done
810//
811 eNum = 0;
812 if (eTxt) *eTxt = "";
813 return pArg.pollP;
814}
815
816/******************************************************************************/
817/* D e t a c h */
818/******************************************************************************/
819
820void XrdSys::IOEvents::Poller::Detach(XrdSys::IOEvents::Channel *cP,
821 bool &isLocked, bool keep)
822{
823// The caller must hold the channel lock!
824//
825 bool detFD = (cP->inPSet != 0);
826
827// First remove the channel from the timeout queue
828//
829 if (cP->inTOQ)
830 {toMutex.Lock();
831 REMOVE(tmoBase, tmoList, cP);
832 toMutex.UnLock();
833 }
834
835// Allow only one detach at a time
836//
837 adMutex.Lock();
838
839// Preset channel to prohibit callback if we are not keeping this channel
840//
841 if (!keep)
842 {cP->Reset(&pollErr1, cP->chFD);
843 cP->chPollXQ = &pollErr1;
844 cP->chCB = 0;
845 cP->chCBA = 0;
846 if (cP->attList.next != cP) {REMOVE(attBase, attList, cP);}
847 else if (attBase == cP) attBase = 0;
848 }
849
850// Exclude this channel from the associated poll set, don't hold the ad lock
851//
852 adMutex.UnLock();
853 if (detFD)
854 {cP->inPSet = 0;
855 if (cmdFD >= 0) Exclude(cP, isLocked, !ISPOLLER);
856 }
857}
858
859/******************************************************************************/
860/* Protected: G e t R e q u e s t */
861/******************************************************************************/
862
863// Warning: This method runs unlocked. The caller must have exclusive use of
864// the reqBuff otherwise unpredictable results will occur.
865
867{
868 ssize_t rlen;
869 int rc;
870
871// See if we are to resume a read or start a fresh one
872//
873 if (!pipeBlen)
874 {pipeBuff = (char *)&reqBuff; pipeBlen = sizeof(reqBuff);}
875
876// Wait for the next request. Some OS's (like Linux) don't support non-blocking
877// pipes. So, we must front the read with a poll.
878//
879 do {rc = poll(&pipePoll, 1, 0);}
880 while(rc < 0 && (errno == EAGAIN || errno == EINTR));
881 if (rc < 1) return 0;
882
883// Now we can put up a read without a delay. Normally a full command will be
884// present. Under some heavy conditions, this may not be the case.
885//
886 do {rlen = read(reqFD, pipeBuff, pipeBlen);}
887 while(rlen < 0 && errno == EINTR);
888 if (rlen <= 0)
889 {std::cerr <<"Poll: "<<XrdSysE2T(errno)<<" reading from request pipe\n"<< std::flush;
890 return 0;
891 }
892
893// Check if all the data has arrived. If not all the data is present, defer
894// this request until more data arrives.
895//
896 if (!(pipeBlen -= rlen)) return 1;
897 pipeBuff += rlen;
898 return 0;
899}
900
901/******************************************************************************/
902/* Protected: I n i t */
903/******************************************************************************/
904
906 const char **eTxt, bool &isLocked)
907{
908// The channel must be locked upon entry!
909//
910 bool retval;
911
912
913// If we are already in progress then simply update the shadow events and
914// resuppress all current events.
915//
916 if (cP->chPoller == &pollWait)
917 {cP->reMod = cP->chEvents;
918 cP->chEvents = 0;
919 IF_TRACE(Init,cP->chFD,"defer events=" <<cP->reMod);
920 return true;
921 }
922
923// Trace this entry
924//
925 IF_TRACE(Init,cP->chFD,"begin events=" <<int(cP->chEvents));
926
927// If no events are enabled at this point, just return
928//
929 if (!(cP->chEvents)) return true;
930
931// Refuse to enable a channel without a callback function
932//
933 if (!(cP->chCB))
934 {eNum = EDESTADDRREQ;
935 if (eTxt) *eTxt = "enabling without a callback";
936 return false;
937 }
938
939// So, now we can include the channel in the poll set. We will include it
940// with no events enabled to prevent callbacks prior to completion here.
941//
942 cP->chPoller = &pollWait; cP->reMod = cP->chEvents; cP->chEvents = 0;
943 retval = cP->chPollXQ->Include(cP, eNum, eTxt, isLocked);
944 IF_TRACE(Init,cP->chFD,"Include() returned " <<BOOLNAME(retval) <<TRACE_LOK);
945 if (!isLocked) {cP->chMutex.Lock(); isLocked = true;}
946
947// Determine what future poller to use. If we can use the regular poller then
948// set the correct event mask for the channel. Note that we could have lost
949// control but the correct events will be reflected in the "reMod" member.
950//
951 if (!retval) {cP->chPoller = &pollErr1; cP->chFault = eNum;}
952 else {cP->chPoller = cP->chPollXQ;
953 cP->inPSet = 1;
954 if (cP->reMod)
955 {cP->chEvents = cP->reMod;
956 retval = cP->chPoller->Modify(cP, eNum, eTxt, isLocked);
957 TRACE_MOD(Init,cP->chFD,int(cP->reMod));
958 if (!isLocked) {cP->chMutex.Lock(); isLocked = true;}
959 } else {
960 TRACE_NOD(Init,cP->chFD,0);
961 }
962 }
963
964// All done
965//
966 cP->reMod = 0;
967 return retval;
968}
969
970/******************************************************************************/
971/* P o l l 2 E n u m */
972/******************************************************************************/
973
975{
976 if (events & POLLERR) return EPIPE;
977
978 if (events & POLLHUP) return ECONNRESET;
979
980 if (events & POLLNVAL) return EBADF;
981
982 return EOPNOTSUPP;
983}
984
985/******************************************************************************/
986/* S e n d C m d */
987/******************************************************************************/
988
990{
991 int wlen;
992
993// Pipe writes are atomic so we don't need locks. Some commands require
994// confirmation. We handle that here based on the command. Note that pipes
995// gaurantee that all of the data will be written or we will block.
996//
997 if (cmd.req >= PipeData::Post)
998 {XrdSysSemaphore mySem(0);
999 cmd.theSem = &mySem;
1000 do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));}
1001 while (wlen < 0 && errno == EINTR);
1002 if (wlen > 0) mySem.Wait();
1003 } else {
1004 do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));}
1005 while (wlen < 0 && errno == EINTR);
1006 }
1007
1008// All done
1009//
1010 return (wlen >= 0 ? 0 : errno);
1011}
1012
1013/******************************************************************************/
1014/* Protected: S e t P o l l E n t */
1015/******************************************************************************/
1016
1018{
1019 cP->pollEnt = pe;
1020}
1021
1022/******************************************************************************/
1023/* S t o p */
1024/******************************************************************************/
1025
1027{
1028 PipeData cmdbuff;
1029 CallBack *theCB;
1030 Channel *cP;
1031 void *cbArg;
1032 int doCB;
1033
1034// Initialize the pipdata structure
1035//
1036 memset(static_cast<void*>( &cmdbuff ), 0, sizeof(cmdbuff));
1037 cmdbuff.req = PipeData::Stop;
1038
1039// Lock all of this
1040//
1041 adMutex.Lock();
1042
1043// If we are already shutdown then we are done
1044//
1045 if (cmdFD == -1) {adMutex.UnLock(); return;}
1046
1047// First we must stop the poller thread in an orderly fashion.
1048//
1049 adMutex.UnLock();
1050 SendCmd(cmdbuff);
1051 adMutex.Lock();
1052
1053// Close the pipe communication mechanism
1054//
1055 close(cmdFD); cmdFD = -1;
1056 close(reqFD); reqFD = -1;
1057
1058// Run through cleaning up the channels. While there should not be any other
1059// operations happening on this poller, we take the conservative approach.
1060//
1061 while((cP = attBase))
1062 {REMOVE(attBase, attList, cP);
1063 adMutex.UnLock();
1064 cP->chMutex.Lock();
1065 doCB = cP->chCB != 0 && (cP->chEvents & Channel::stopEvent);
1066 if (cP->inTOQ) TmoDel(cP);
1067 cP->Reset(&pollErr1, cP->chFD, EIDRM);
1068 cP->chPollXQ = &pollErr1;
1069 if (doCB)
1070 {cP->chStat = Channel::isClear;
1071 theCB = cP->chCB; cbArg = cP->chCBA;
1072 cP->chMutex.UnLock();
1073 theCB->Stop(cP, cbArg);
1074 } else cP->chMutex.UnLock();
1075 adMutex.Lock();
1076 }
1077
1078// Now invoke the poller specific shutdown
1079//
1080 Shutdown();
1081 adMutex.UnLock();
1082}
1083
1084/******************************************************************************/
1085/* T m o A d d */
1086/******************************************************************************/
1087
1089{
1090 XrdSysMutexHelper mHelper(toMutex);
1091 time_t tNow;
1092 Channel *ncP;
1093 bool setRTO, setWTO;
1094
1095// Do some tracing
1096//
1097 IF_TRACE(TmoAdd,cP->chFD,"chan="<< std::hex<<(void*)cP<< std::dec
1098 <<" inTOQ="<<BOOLNAME(cP->inTOQ)<<" status="<<STATUSOF(cP));
1099
1100// Remove element from timeout queue if it is there
1101//
1102 if (cP->inTOQ)
1103 {REMOVE(tmoBase, tmoList, cP);
1104 cP->inTOQ = 0;
1105 }
1106
1107// Determine which timeouts need to be reset
1108//
1109 tmoSet|= cP->dlType >> 4;
1110 setRTO = (tmoSet&tmoMask) & (CallBack::ReadyToRead |CallBack:: ReadTimeOut);
1111 setWTO = (tmoSet&tmoMask) & (CallBack::ReadyToWrite|CallBack::WriteTimeOut);
1112
1113// Reset the required deadlines
1114//
1115 tNow = time(0);
1116 if (setRTO && REVENTS(cP->chEvents) && cP->chRTO)
1117 cP->rdDL = cP->chRTO + tNow;
1118 if (setWTO && WEVENTS(cP->chEvents) && cP->chWTO)
1119 cP->wrDL = cP->chWTO + tNow;
1120
1121// Calculate the closest enabled deadline
1122//
1123 if (cP->rdDL < cP->wrDL)
1124 {cP->deadLine = cP->rdDL; cP->dlType = CallBack:: ReadTimeOut;
1125 } else {
1126 cP->deadLine = cP->wrDL; cP->dlType = CallBack::WriteTimeOut;
1127 if (cP->rdDL == cP->wrDL) cP->dlType |= CallBack:: ReadTimeOut;
1128 }
1129 IF_TRACE(TmoAdd, cP->chFD, "t=" <<tNow <<" rdDL=" <<setRTO <<' ' <<cP->rdDL
1130 <<" wrDL=" <<setWTO <<' ' <<cP->wrDL);
1131
1132// If no timeout really applies, we are done
1133//
1134 if (cP->deadLine == maxTime) return false;
1135
1136// Add the channel to the timeout queue in correct deadline position.
1137//
1138 if ((ncP = tmoBase))
1139 {do {if (cP->deadLine < ncP->deadLine) break;
1140 ncP = ncP->tmoList.next;
1141 } while(ncP != tmoBase);
1142 INSERT(tmoList, ncP, cP);
1143 if (cP->deadLine < tmoBase->deadLine) tmoBase = cP;
1144 } else tmoBase = cP;
1145 cP->inTOQ = 1;
1146
1147// Indicate to the caller whether or not a wakeup is required
1148//
1149 return (tmoBase == cP);
1150}
1151
1152/******************************************************************************/
1153/* T m o D e l */
1154/******************************************************************************/
1155
1157{
1158
1159// Do some tracing
1160//
1161 IF_TRACE(TmoDel,cP->chFD,"chan="<< std::hex<<(void*)cP<< std::dec
1162 <<" inTOQ="<<BOOLNAME(cP->inTOQ)<<" status="<<STATUSOF(cP));
1163
1164// Get the timeout queue lock and remove the channel from the queue
1165//
1166 toMutex.Lock();
1167 REMOVE(tmoBase, tmoList, cP);
1168 cP->inTOQ = 0;
1169 toMutex.UnLock();
1170}
1171
1172/******************************************************************************/
1173/* T m o G e t */
1174/******************************************************************************/
1175
1177{
1178 int wtval;
1179
1180// Lock the timeout queue
1181//
1182 toMutex.Lock();
1183
1184// Calculate wait time. If the deadline passed, invoke the timeout callback.
1185// we will need to drop the timeout lock as we don't have the channel lock.
1186//
1187 do {if (!tmoBase) {wtval = -1; break;}
1188 wtval = (tmoBase->deadLine - time(0)) * 1000;
1189 if (wtval > 0) break;
1190 toMutex.UnLock();
1191 CbkTMO();
1192 toMutex.Lock();
1193 } while(1);
1194
1195// Return the value
1196//
1197 CPP_ATOMIC_STORE(wakePend, false, std::memory_order_release);
1198 toMutex.UnLock();
1199 return wtval;
1200}
1201
1202/******************************************************************************/
1203/* W a k e U p */
1204/******************************************************************************/
1205
1206void XrdSys::IOEvents::Poller::WakeUp()
1207{
1208 static PipeData cmdbuff(PipeData::NoOp);
1209
1210// Send it off to wakeup the poller thread, but only if here is no wakeup in
1211// progress.
1212//
1213// We use a mutex here because we want to produce a synchronization point - all
1214// threads that might be interested timeouts and wakeups are going to incur a
1215// cache bounce for the page where wakePend resides; they will see a consistent
1216// view of the wakePend flag. For those threads, this is equivalent to
1217// an atomic with memory_order std::memory_order_seq_cst (the strongest ordering).
1218// However, the threads that are not interested in timeouts will not get a flush
1219// for their copy of the wakePend page. They will still have the weaker memory
1220// ordering of consume/release (which is guaranteed anyway on all current architectures
1221// except for DEC Alpha).
1222 toMutex.Lock();
1223 bool isWakePend = CPP_ATOMIC_LOAD(wakePend, std::memory_order_consume);
1224 if (isWakePend) {toMutex.UnLock();}
1225 else {CPP_ATOMIC_STORE(wakePend, true, std::memory_order_release);
1226 toMutex.UnLock();
1227 SendCmd(cmdbuff);
1228 }
1229}
1230
1231/******************************************************************************/
1232/* I m p l e m e n t a t i o n S p e c i f i c s */
1233/******************************************************************************/
1234
1235#if defined( __solaris__ )
1237#elif defined( __linux__ )
1239#elif defined(__APPLE__)
1241#else
1243#endif
#define close(a)
Definition XrdPosix.hh:43
#define write(a, b, c)
Definition XrdPosix.hh:110
#define read(a, b, c)
Definition XrdPosix.hh:77
#define REMOVE(dlbase, dlvar, curitem)
#define INSERT(dlvar, curitem, newitem)
#define CPP_ATOMIC_LOAD(x, order)
#define CPP_ATOMIC_STORE(x, val, order)
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:99
#define IF_TRACE(x, fd, y)
#define TRACE_LOK
#define STATUS
#define TRACE_NOD(x, fd, y)
#define STATUSOF(x)
#define DO_TRACE(x, fd, y)
#define REVENTS(x)
#define BOOLNAME(x)
#define TRACE_MOD(x, fd, y)
#define ISPOLLER
#define WEVENTS(x)
#define XRDSYSTHREAD_BIND
#define TRACING(x)
Definition XrdTrace.hh:70
void Lock(XrdSysMutex *Mutex)
static int Same(pthread_t t1, pthread_t t2)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static pthread_t ID(void)
static void * Start(void *parg)
virtual void Fatal(Channel *chP, void *cbArg, int eNum, const char *eTxt)
virtual bool Event(Channel *chP, void *cbArg, int evFlags)=0
virtual void Stop(Channel *chP, void *cbArg)
@ ReadyToWrite
Writing won't block.
@ ReadyToRead
New data has arrived.
@ ValidEvents
Mask to test for valid events.
void SetCallBack(CallBack *cbP, void *cbArg=0)
void GetCallBack(CallBack **cbP, void **cbArg)
@ errorEvents
Error event non-r/w specific.
@ stopEvent
Poller stop event.
bool Enable(int events, int timeout=0, const char **eText=0)
Channel(Poller *pollP, int fd, CallBack *cbP=0, void *cbArg=0)
bool Disable(int events, const char **eText=0)
void Exclude(Channel *cP, bool &isLocked, bool dover=1)
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt)
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt)
void Exclude(Channel *cP, bool &isLocked, bool dover=1)
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
void Exclude(Channel *cP, bool &isLocked, bool dover=1)
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt)
virtual bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)=0
virtual bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)=0
static Poller * Create(int &eNum, const char **eTxt=0, int crOpts=0)
virtual void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt)=0
bool CbkXeq(Channel *cP, int events, int eNum, const char *eTxt)
int SendCmd(PipeData &cmd)
int Poll2Enum(short events)
bool TmoAdd(Channel *cP, int tmoSet)
void SetPollEnt(Channel *cP, int ptEnt)
bool Init(Channel *cP, int &eNum, const char **eTxt, bool &isLockd)