/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */

/*
 *  (C) 2001 by Argonne National Laboratory.
 *      See COPYRIGHT in top-level directory.
 */

#undef FUNCNAME
#define FUNCNAME MPIDU_Sock_create_set
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDU_Sock_create_set(struct MPIDU_Sock_set ** sock_setp)
{
    struct MPIDU_Sock_set * sock_set = NULL;
    int mpi_errno = MPI_SUCCESS;
    MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCK_CREATE_SET);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCK_CREATE_SET);
    
    MPIDU_SOCKI_VERIFY_INIT(mpi_errno, fn_exit);

    /*
     * Allocate and initialized a new sock set structure
     */
    sock_set = MPIU_Malloc(sizeof(struct MPIDU_Sock_set));
    /* --BEGIN ERROR HANDLING-- */
    if (sock_set == NULL)
    { 
	mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_NOMEM,
					 "**sock|setalloc", 0);
	goto fn_fail;
    }
    /* --END ERROR HANDLING-- */
    
    sock_set->id = MPIDU_Socki_set_next_id++;
    sock_set->poll_array_sz = 0;
    sock_set->poll_array_elems = 0;
    sock_set->starting_elem = 0;
    sock_set->pollfds = NULL;
    sock_set->pollinfos = NULL;
    sock_set->eventq_head = NULL;
    sock_set->eventq_tail = NULL;
    /* FIXME: Move the thread-specific operations into thread-specific
       routines (to allow for alternative thread sync models and
       for runtime control of thread level) */
#   ifdef MPICH_IS_THREADED
    {
	sock_set->pollfds_active = NULL;
	sock_set->pollfds_updated = FALSE;
	sock_set->wakeup_posted = FALSE;
	sock_set->intr_fds[0] = -1;
	sock_set->intr_fds[1] = -1;
	sock_set->intr_sock = NULL;
    }
#   endif

#   ifdef MPICH_IS_THREADED
    MPIU_THREAD_CHECK_BEGIN;
    {
	struct MPIDU_Sock * sock = NULL;
	struct pollfd * pollfd;
	struct pollinfo * pollinfo;
	long flags;
	int rc;
	
	/*
	 * Acquire a pipe (the interrupter) to wake up a blocking poll should 
	 * it become necessary.
	 *
	 * Make the read descriptor nonblocking.  The write descriptor is left
	 * as a blocking descriptor.  The write has to
	 * succeed or the system will lock up.  Should the blocking descriptor
	 * prove to be a problem, then (1) copy the above
	 * code, applying it to the write descriptor, and (2) update 
	 * MPIDU_Socki_wakeup() so that it loops while write returns 0,
	 * performing a thread yield between iterations.
	 */
	rc = pipe(sock_set->intr_fds);
	/* --BEGIN ERROR HANDLING-- */
	if (rc != 0)
	{
	    mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL,
					     "**sock|poll|pipe", "**sock|poll|pipe %d %s", errno, MPIU_Strerror(errno));
	    goto fn_fail;
	}
	/* --END ERROR HANDLING-- */

	flags = fcntl(sock_set->intr_fds[0], F_GETFL, 0);
	/* --BEGIN ERROR HANDLING-- */
	if (flags == -1)
	{
	    mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL,
					     "**sock|poll|pipenonblock", "**sock|poll|pipenonblock %d %s",
					     errno, MPIU_Strerror(errno));
	    goto fn_fail;
	}
	/* --END ERROR HANDLING-- */
    
	rc = fcntl(sock_set->intr_fds[0], F_SETFL, flags | O_NONBLOCK);
	/* --BEGIN ERROR HANDLING-- */
	if (rc == -1)
	{
	    mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_FAIL,
					     "**sock|poll|pipenonblock", "**sock|poll|pipenonblock %d %s",
					     errno, MPIU_Strerror(errno));
	    goto fn_fail;
	}
	/* --END ERROR HANDLING-- */

	/*
	 * Allocate and initialize a sock structure for the interrupter pipe
	 */
	mpi_errno = MPIDU_Socki_sock_alloc(sock_set, &sock);
	/* --BEGIN ERROR HANDLING-- */
	if (mpi_errno != MPI_SUCCESS)
	{
	    mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPIDU_SOCK_ERR_NOMEM,
					     "**sock|sockalloc", NULL);
	    goto fn_fail;
	}
	/* --END ERROR HANDLING-- */
    
	sock_set->intr_sock = sock;

	pollfd = MPIDU_Socki_sock_get_pollfd(sock);
	pollinfo = MPIDU_Socki_sock_get_pollinfo(sock);
    
	pollfd->fd = sock_set->intr_fds[0];
	pollinfo->fd = sock_set->intr_fds[0];
	pollinfo->user_ptr = NULL;
	pollinfo->type = MPIDU_SOCKI_TYPE_INTERRUPTER;
	pollinfo->state = MPIDU_SOCKI_STATE_CONNECTED_RO;
	pollinfo->os_errno = 0;

	MPIDU_SOCKI_POLLFD_OP_SET(pollfd, pollinfo, POLLIN);
    }
    MPIU_THREAD_CHECK_END;
#   endif

    *sock_setp = sock_set;

  fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_CREATE_SET);
    return mpi_errno;

    /* --BEGIN ERROR HANDLING-- */
  fn_fail:
    if (sock_set != NULL)
    {
#       ifdef MPICH_IS_THREADED
	MPIU_THREAD_CHECK_BEGIN;
	{
	    if (sock_set->intr_fds[0] != -1)
	    {
		close(sock_set->intr_fds[0]);
	    }
	    
	    if (sock_set->intr_fds[1] != -1)
	    {
		close(sock_set->intr_fds[1]);
	    }
	}
	MPIU_THREAD_CHECK_END;
#	endif
	
	MPIU_Free(sock_set);
    }

    goto fn_exit;
    /* --END ERROR HANDLING-- */
}

#undef FUNCNAME
#define FUNCNAME MPIDU_Sock_close_open_sockets
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDU_Sock_close_open_sockets(struct MPIDU_Sock_set * sock_set, void** user_ptr ){

    int i;
    int mpi_errno = MPI_SUCCESS;
    struct pollinfo * pollinfos = NULL;
    pollinfos = sock_set->pollinfos;
    MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCK_CLOSE_OPEN_SOCKETS);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCK_CLOSE_OPEN_SOCKETS);

    MPIDU_SOCKI_VERIFY_INIT(mpi_errno, fn_exit);
    /* wakeup waiting socket if mullti-threades */
    *user_ptr = NULL;
    for (i = 0; i < sock_set->poll_array_elems; i++) {
        if(pollinfos[i].sock != NULL &&  pollinfos[i].type != MPIDU_SOCKI_TYPE_INTERRUPTER){
             close(pollinfos[i].fd);
             MPIDU_Socki_sock_free(pollinfos[i].sock);
            *user_ptr = pollinfos[i].user_ptr;
             break;
        }
    }
#ifdef USE_SOCK_VERIFY
  fn_exit:
#endif
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_CLOSE_OPEN_SOCKETS);
    return mpi_errno;
}


#undef FUNCNAME
#define FUNCNAME MPIDU_Sock_destroy_set
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDU_Sock_destroy_set(struct MPIDU_Sock_set * sock_set)
{
    int elem;
    struct MPIDU_Sock_event event;
    int mpi_errno = MPI_SUCCESS;
    MPIDI_STATE_DECL(MPID_STATE_MPIDU_SOCK_DESTROY_SET);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDU_SOCK_DESTROY_SET);

    MPIDU_SOCKI_VERIFY_INIT(mpi_errno, fn_exit);

    /*
     * FIXME: check for open socks and return an error if any are found
     */
    
    /*
     * FIXME: verify no other thread is blocked in poll().  wake it up and 
     * get it to exit.
     */
    
    /*
     * Close pipe for interrupting a blocking poll()
     */
#   ifdef MPICH_IS_THREADED
    MPIU_THREAD_CHECK_BEGIN;
    {
	close(sock_set->intr_fds[1]);
	close(sock_set->intr_fds[0]);
	MPIDU_Socki_sock_free(sock_set->intr_sock);

	sock_set->pollfds_updated = FALSE;
	sock_set->pollfds_active = NULL;
	sock_set->wakeup_posted = FALSE;
	sock_set->intr_fds[0] = -1;
	sock_set->intr_fds[1] = -1;
	sock_set->intr_sock = NULL;
    }
    MPIU_THREAD_CHECK_END;
#   endif

    /*
     * Clear the event queue to eliminate memory leaks
     */
    while (MPIDU_Socki_event_dequeue(sock_set, &elem, &event) == MPI_SUCCESS);

    /*
     * Free structures used by the sock set
     */
    MPIU_Free(sock_set->pollinfos);
    MPIU_Free(sock_set->pollfds);

    /*
     * Reset the sock set fields
     */
    sock_set->id = ~0;
    sock_set->poll_array_sz = 0;
    sock_set->poll_array_elems = 0;
    sock_set->starting_elem = 0;
    sock_set->pollfds = NULL;
    sock_set->pollinfos = NULL;
    sock_set->eventq_head = NULL;
    sock_set->eventq_tail = NULL;

    /*
     * Free the structure
     */
    MPIU_Free(sock_set);
    
#ifdef USE_SOCK_VERIFY
  fn_exit:
#endif
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDU_SOCK_DESTROY_SET);
    return mpi_errno;
}