/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */ /* * (C) 2004 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */ /* * The routines in this file provide an event-driven I/O handler * * Each active fd has a handler associated with it. */ /* FIXME: Occasionally, data from stdout has been lost when the job is exiting. I don't know whether data is being lost because the writer is discarding it or the reader (mpiexec) is failing to finish reading from all of the sockets before exiting. */ #include "mpichconf.h" #include #include #ifdef HAVE_UNISTD_H #include #endif #ifdef HAVE_SYS_TYPES_H #include #endif #ifdef HAVE_SYS_SELECT_H #include #endif #ifdef HAVE_TIME_H #include #endif #ifdef HAVE_SYS_TIME_H #include #endif #ifdef HAVE_ERRNO_H #include #endif #include "pmutil.h" #include "ioloop.h" /* * To simplify mapping fds back to their handlers, we store the handles * in an array such that the ith element of the array corresponds to the * fd with value i (e.g., for fd == 10, the [10] element of the array * has the information on the handler). This isn't terrifically scalable, * but it makes this code fairly simple and this code isn't * performance sensitive. "maxFD" is the maximum fd value seen; this * allows us to allocate a large array but usually only look at a small * part of it. */ #define MAXFD 4096 static IOHandle handlesByFD[MAXFD+1]; static int maxFD = -1; /*@ MPIE_IORegister - Register a handler for an FD Input Parameters: Notes: Keeps track of the largest fd seen (in 'maxFD'). @*/ int MPIE_IORegister( int fd, int rdwr, int (*handler)(int,int,void*), void *extra_data ) { int i; if (fd > MAXFD) { /* Error; fd is too large */ return 1; } /* Remember the largest set FD, and clear any FDs between this fd and the last maximum */ if (fd > maxFD) { for (i=maxFD+1; i MAXFD) { /* Error; fd is too large */ return 1; } if (fd > maxFD) { /* Error; fd is unknown */ return 1; } /* Recompute the new maxfd */ newMaxFd = -1; for (i=0; i<=maxFD; i++) { if (handlesByFD[i].fd >= 0 && i > newMaxFd) { newMaxFd = i; } } maxFD = newMaxFd; handlesByFD[fd].fd = -1; handlesByFD[fd].rdwr = 0; handlesByFD[fd].handler = 0; handlesByFD[fd].extra_data = 0; return 0; } /*@ MPIE_IOLoop - Handle all registered I/O Input Parameters: . timeoutSeconds - Seconds until this routine should return with a timeout error. If negative, no timeout. If 0, return immediatedly after a nonblocking check for I/O. Return Value: Returns zero on success. Returns 'IOLOOP_TIMEOUT' if the timeout is reached and 'IOLOOP_ERROR' on other errors. @*/ int MPIE_IOLoop( int timeoutSeconds ) { int i, maxfd, fd, nfds, rc=0, rc2; fd_set readfds, writefds; int (*handler)(int,int,void*); struct timeval tv; /* Loop on the fds, with the timeout */ TimeoutInit( timeoutSeconds ); while (1) { tv.tv_sec = TimeoutGetRemaining(); tv.tv_usec = 0; /* Determine the active FDs */ FD_ZERO( &readfds ); FD_ZERO( &writefds ); /* maxfd is the maximum active fd */ maxfd = -1; for (i=0; i<=maxFD; i++) { if (handlesByFD[i].handler) { fd = handlesByFD[i].fd; if (handlesByFD[i].rdwr & IO_READ) { FD_SET( fd, &readfds ); maxfd = i; } if (handlesByFD[i].rdwr & IO_WRITE) { FD_SET( fd, &writefds ); maxfd = i; } } } if (maxfd < 0) break; /* DBG_PRINTF(("Calling select with readfds = %x writefds = %x\n", */ /* *(int *)&readfds, *(int*)&writefds)); */ MPIE_SYSCALL(nfds,select,( maxfd + 1, &readfds, &writefds, 0, &tv )); if (nfds < 0 && (errno == EINTR || errno == 0)) { /* Continuing through EINTR */ /* We allow errno == 0 as a synonym for EINTR. We've seen this on Solaris; in addition, we set errno to 0 after a failed waitpid in the process routines, and if the OS isn't careful, the value of errno may get ECHILD instead of EINTR when the signal handler returns (we suspect Linux of this problem), which is why we have the signal handler in process.c reset errno to 0 (we may need to allow ECHILD here (!)) */ /* FIXME: an EINTR may also mean that a process has exited (SIGCHILD). If all processes have exited, we may want to exit */ DBG_PRINTF(("errno = EINTR in select\n")); continue; } if (nfds < 0) { /* Serious error */ MPL_internal_sys_error_printf( "select", errno, 0 ); break; } if (nfds == 0) { /* Timeout from select */ DBG_PRINTF(("Timeout in select\n")); return IOLOOP_TIMEOUT; } /* nfds > 0 */ DBG_PRINTF(("Found some fds to process (n = %d)\n",nfds)); for (fd=0; fd<=maxfd; fd++) { if (FD_ISSET( fd, &writefds )) { handler = handlesByFD[fd].handler; if (handler) { rc = (*handler)( fd, IO_WRITE, handlesByFD[fd].extra_data ); } if (rc == 1) { /* EOF? */ MPIE_SYSCALL(rc2,close,(fd)); handlesByFD[fd].rdwr = 0; FD_CLR(fd,&writefds); } } if (FD_ISSET( fd, &readfds )) { handler = handlesByFD[fd].handler; if (handler) { rc = (*handler)( fd, IO_READ, handlesByFD[fd].extra_data ); } if (rc == 1) { /* EOF? */ MPIE_SYSCALL(rc2,close,(fd)); handlesByFD[fd].rdwr = 0; FD_CLR(fd,&readfds); } } } } DBG_PRINTF(("Returning from IOLOOP handler\n")); return 0; } static int end_time = -1; /* Time of timeout in seconds */ void TimeoutInit( int seconds ) { if (seconds > 0) { #ifdef HAVE_TIME time_t t; t = time( NULL ); end_time = seconds + (int)t; #elif defined(HAVE_GETTIMEOFDAY) struct timeval tp; gettimeofday( &tp, NULL ); end_time = seconds + tp.tv_sec; #else # error 'No timer available' #endif } else { end_time = -1; } } /* Return remaining time in seconds */ int TimeoutGetRemaining( void ) { int time_left; if (end_time < 0) { /* Return a large, positive number */ return 1000000; } else { #ifdef HAVE_TIME time_t t; t = time( NULL ); time_left = end_time - (int)t; #elif defined(HAVE_GETTIMEOFDAY) struct timeval tp; gettimeofday( &tp, NULL ); time_left = end_time - tp.tv_sec; #else # error 'No timer available' #endif } if (time_left < 0) time_left = 0; return time_left; }