- what are the requirements for the communication agent? - driven by the needs of RMA, Pt2Pt, and Collective - Agent is responsible for transportation and "matching" In the send/recv case, matching is as simple as matching the standard MPI envelope information. In the RMA case, "matching" is much more complicated. RMA operation messages/RHCs need to be matched to exposure epochs. Before an operation message can be processed, locks, etc. may need to be acquired. - what we're doing here is allowing the origin to create a string of CARs that the target will service, serialize these CARS, pass them to the target, and place them in the appropriate queue for service - but not quite, because there is some translation/processing that has to be done here. - they cause matching and/or creation of CARs on the other side that correspond to completion of operations desired by the origin - in a put case, can you just serialize a recv of the data? no, you can't do this. there has to be something else going on. so CARs can be created as a result of the reception of a CAM. damn. - one way to look at this is that CAR processing could create additional CARs to be processed...this would cleanly handle the put case - origin serializes the put, and the target creates the corresponding recv after processing the put CAR? - much of the checking of parameters can be performed on the origin, as we have all the window displacement information locally - a series of RMA operations could be described as a chain of operation CARs where each operation has a data CARs hung off of it - we would like to be able to submit a batch of CARs to a single destination. - rma operations will be handled differently by the different methods, as some of them can handle these directly. however, almost none of them will have native support for accumulate operations -- so do these need to go up to the CA? Isend/Irecv scenerio - generates a send CAR on the sending side and a receive CAR on the receiving CAR send CAR - segment, counter, envelope recv CAR - segment, counter, envelope - CA_enqueue(vc, send_car) - add send_car to the end of send queue associated with the VC - if queue was empty (before the add) then start processing the new CAR - CA_enqueue(vc, recv_car) - add recv_car to the end of recv queue associated with the VC - ... "if we buffer, you suffer!" --rob - if a receive request is posted after the matching message is in the process of being received into a temporary buffer, we would like the message reception to be paused, the already buffered data to be copied into the user buffer, and the remainder of the receive to be deposited directly into the user buffer Persistent requests - We need to allow the method the opportunity to register memory reused in a persistant sent. We suggest accomplishing this by calling CA_register_mem() during the MPI__init() and CA_unregister_mem() during the MPI_Request_free(). CA_register_mem(method, buffer, count, datatype, ®_mem_data_p) CA_unregister_mem(method, buffer, count, datatype, reg_mem_data_p) A reg_mem_data_p (void *) is returned from CA_register_mem() which points at method specific data. This reg_mem_data_p should be attached to each "registered" send/recv CAR. - For persistent receives with "any source", we will call register memory for each method. Structures - there will be a separate structure of some sort for each VC. there will be a mapping function that allows one to map comm/rank to a VC. - for each VC there is a posted receive queue and posted send queue - there is also a global "unexpected receive" queue in which incoming sends which do not match are placed. - there is also a wildcard queue, possibly one per method. we will devise some mechanism for avoiding locking this queue each time we receive an incoming send. some alternatives are: one wc queue per method, an is-empty flag for the wildcard queue (which must be capable of being atomically read or written without a lock). WE WILL HAVE ONE WILDCARD QUEUE ONLY. - this will let us do per-communicator "no wildcard" attributes if we like as well :). - a receive call from above matches the unexpected queue and then inserts itself into the wildcard of vc-specific queue - an incoming message from a remote system matches the wildcard queue or the vc-specific receive queue. to maintain MPI ordering semantics, we must keep a counter. this counter's value is stored in each CAR. it is incremented only on enqueue into the wildcard queue (new value used by wildcard CAR), and ties go to the wildcard receive. - we will have to handle rollover in some reasonable way...how do we want to do that? i propose some very high overhead mad search through things in order to teach these evil wildcard users a lesson! end result: we have rewarded the case where receives are posted first with only per-vc locks. in the case of wildcard receives we have a counter increment and a second queue to check. in the case of unmatched incoming messages we have a single queue which might be a source of some contention. this is a more memory-intensive approach than the original prototype, but it should allow for higher performance and parallelism across VCs that share a common method at least in the well-behaved case of pre-posted, known-source receives (which is the approach we should reward :)). - we decided to use a single unexpected queue since having multiple unexpected queues requires timestamp to order the messages. the timestamp counter is a point of global contention, just like the global unexpect queue. - this same argument can be applied to the posted queue case, but allowing for multiple queues here creates a much nicer search situation for the case where people prepost their receives. is this worth the cost of the counter manipulation? for the non-threaded, non-shmem case we think this will be a win. for the case of threads or shmem, we are unsure if the lock overhead of the queues and counter increment will result in a loss. SINCE WE CAN'T COME TO A REAL CONSENSUS, WE WILL HIDE THIS BEHIND AN API. then we can play with the implementation ad nauseum, just as we have with this conversation :). ASIDE: rusty and rajeev broke queues into a queue per method, with each method implementing its own queue code. they then had a separate wildcard queue, and wildcards were ALSO posted in each method queue. interface: - enqueue(queue, car) - dequeue(queue, car) - findorallocate(vc?, envelope, &car) - used to match a posted receive to an incoming message - check for unexpeced incoming message - if match, return match to that message - otherwise post receive in the vc queue (if not wildcard) or in the wildcard queue (for the wildcard case) - also used to match an incoming message to a posted receive - check for matches in wildcard queue and vc queue (need to lock both queues if wildcard queue is non-empty) - if both, use counter to arbitrate which to match, match - if neither, allocate spot in unexpected incoming message - could split this into incomingfoa and expectingfoa? something like that. - we maybe want to use requests for matching envelopes, and separate from this the CARs for describing the pieces that make up the payloads - maybe special CARs that are the envelopes to match? does that make any sense in the gather case? this makes a certain amount of sense in that we want to be queueing things as units, with the envelope info and payload separate. this gets ugly when the dependency information is added :) mpi_send(tag, context, dest, buf, ...) { calls mpid_send() { allocates a counter allocates a car (c), fills in envelope, payload, pointer to counter set counter to 1 bump refcount on the communicator get the vc from the comm/rank get the right queue (q) from the vc enqueue(q, c) <- brian and i think this might be register_op()? { we want this to MAYBE start doing work if the queue was empty this calls a method-specific call function } ...we need to make progress and/or wait on completion... testing for completion isn't necessarily the same operation as making progress happen in the adi3 we have testflags and waitflags at the moment which do both of these do we want to kick all the pipes or just this particular pipe (vc) in this case? or just the pipes for our method? dunno. our ability to efficiently kick all the "active" pipes will depend on how easily we can get the status of the various methods from this level. when the method completes the car, it decrements the counter? or is that a CA thing? either way the car is dequeued too. we think this is a method thing? finally if the refcount on the car is 0, it is freed, otherwise it sits around until the guys dependent on it are done??? we think that the car can maybe go away. but we'll wait on this issue until we get to the collective case. we may or may not even need a refcount in the car...dunno yet we think that the car maybe needs a pointer to the queue that it is in so that we can know most easily where to dequeue from? we decided to do this as: while (counter > 0) kick_all_pipes(); then we decrement the refcount on the communicator. done. } } ---------- Rusty was right! MPID_Send() { int counter = 1; /* error checking */ vc = get_vc(comm, dest); vc->alloc_car(vc, &car, SEND_CAR); /* construct send car */ car->type = SEND; car->context = context; car->rank = dest; car->tag = tag; car->buf = buf; car->count = count; car->datatype = datatype; car->first = first; car->last = last; car->pcount = &counter; vc->advance_state(car, GO_FORWARD_AND_BE_HAPPY) while(counter > 0) { make_progress(); } } /*** NOT DONE YET ***/ MPID_Recv() { int counter = 1; /* error checking */ if (any source recv) { alloc(&recv_any_p); lock(recv_any_p->mutex); found = false; for(method = 0; !found && method < NUM_METHODS; method++) { method_table[method]->recv_find_or_alloc_car(&car[method], context, ANY_SRC, tag, recv_any_p, RECV_ANY_SOURCE_CAR, &found); car[method]->pcount = &counter; car[method]->buf = buf; car[method]->count = count; car[method]->datatype = datatype; car[method]->first = first; car[method]->last = last; method_table[method]->advance_state(car, ) if (found == true) { method_table[method]->set_segment(car[method], buf, count, datatype, first, last); for(method2 = 0; method2 < method; method2++) { method_table[method2]->cancel(car[method]); } } } unlock(recv_any_p->mutex); if (found == true) { free(recv_any_p); } } else { vc = get_vc(comm, src); vc->recv_find_or_alloc_car(&car, context, src, tag, NULL, &counter, RECV_CAR, &found); if (found) { vc->set_segment(car, buf, count, datatype, first, last); } } while(counter > 0) { make_progress(); } } so we see that this can work, but it's a little ugly. we think that we might eventually want to have a set of functions for creating specific cars for each method -- this would be a fast path for the send/recv cases. --------- collective case: MPID_Xfer_scatter_init()? buffer dependencies could be on either side -- we need some sort of mechanism for arbitrating which method should allocate buffers, based on which is more advantageous. in the case where the sender supplies the buffer, we are creating a type of flow control which allows us to most efficiently use the local buffers. start is going to put the cars with envelopes into the "active" state? something like that... (see Rob's picture) buffer management is handled by the method, since each method has its own peculiarities with respect to limitations. we need to keep up with who allocates temporary buffers, and we need to further ensure that they are only freed when they are no longer needed by any cars (could be more than one) -------- The xfer request structure will contain both a counter for determining when operation have completed as well as tracking data structures for building the CARs associated with the xfer block. THESE ARE FOR HOMOGENEOUS USE ONLY. In the hetergeneous case, the message headers (envelopes) would need to include data format information for each fragment of the message. (basically each car) MPID_xfer_scatter_init(src, tag, comm, &req) { alloc(req); /* allocates a xfer request */ req->src = src; req->tag = tag; req->comm = comm; req->car_count = 0; req->recv_vc = get_vc(comm, src); req->recv_car_list = recv_envelope; } MPID_xferi_scatter_recv_mop_op(req, buf, count, dtype, first, last, mop_func) { req->recv_vc->alloc(car); car->type = RECV_DATA | UNPACK; car->segment = segment; if (mop_func != MOP_NOOP) { recv_car->type |= MOP; recv_car->op_func = mop_func; } list_add(req->recv_car_list, car); req->recv_car_list.size += get_segment_size(segment); } /* * * the type field has three independent sets of flags: * SEND_DATA - sending * RECV_DATA - receiving * * SUPPLY_PACKED_BUF - the method associated with this CAR will supply a * buffer for use by both this and some other CAR * PACKED_BUF_DEPENDENCY - this CAR will be supplied a buffer from some other * CAR (thus it has a buffer dependency) * * PACK - this CAR packs the data from the segment into the packed buffer * UNPACK - this CAR unpacks the data from the pack buffer into the segment * buffer */ MPID_xferi_scatter_recv_mop_forward_op(req, buf, count, dtype, first, last, mop_func, dest) { send_vc = get_vc(req->comm, dest); send_vc->alloc(send_car); req->recv_vc->alloc(recv_car); /* * Buffer handling is determined using some yet-to-be-determined function. * Nothing above this interface should be looking at the methods, so this * decision can't be made until we get in here? * * This function can take the count/dtype as a parameter to use to help * decide if it wants to tailor buffer allocation to eager vs. rendezvous. */ buffer_handing = func(..., mop_orientation); switch(buffer_handling) { case SENDER_SUPPLIES_BUF: send_car->type = SEND_DATA | SUPPLY_PACKED_BUF | PROGRESS_DEPENDENCY; send_car->buf_dep = recv_car; send_car->pack_buf = NULL; send_car->pack_size = get_segment_size(buf, count, dtype, first, last); recv_car->type = RECV_DATA | UNPACK | PACKED_BUF_DEPENDENCY | SUPPLY_PROGRESS; recv_car->buf_dep = send_car; recv_car->op_func = mop_func; recv_car->pack_buf = NULL; recv_car->segment = segment(buf, count, dtype, first, last); recv_car->progress_dep = send_car; case RECEIVER_SUPPLIES_BUF: send_car->type = SEND_DATA | PACKED_BUF_DEPENDENCY | PROGRESS_DEPENDENCY; send_car->pack_buf = NULL; send_car->pack_size = get_segment_size(buf, count, dtype, first, last); recv_car->type = RECV_DATA | UNPACK | SUPPLY_PACKED_BUF | SUPPLY_PROGRESS; recv_car->buf_dep = send_car; recv_car->pack_buf = NULL; recv_car->segment = segment(buf, count, dtype, first, last); recv_car->progress_dep = send_car; case USE_SEGMENT_BUF: send_car->type = SEND_DATA | PACK | PROGRESS_DEPENDENCY; send_car->pack_buf = NULL; send_car->pack_size = get_segment_size(buf, count, dtype, first, last); recv_car->type = RECV_DATA | UNPACK | SUPPLY_PROGRESS; recv_car->pack_buf = NULL; recv_car->segment = segment(buf, count, dtype, first, last); recv_car->progress_dep = send_car; case USE_TMP_BUF: /* case where neither method is capable of providing a buffer */ alloc(tmp_buf); send_car->type = SEND_DATA | PROGRESS_DEPENDENCY; send_car->pack_buf = tmp_buf; send_car->pack_size = get_segment_size(buf, count, dtype, first, last); recv_car->type = RECV_DATA | UNPACK | SUPPLY_PROGRESS; recv_car->pack_buf = tmp_buf; recv_car->segment = segment(buf, count, dtype, first, last); recv_car->progress_dep = send_car; } if (mop_func != MOP_NOOP) { if (mop_orientation == SEND) { send_car->type |= MOP; send_car->op_func = mop_func; } else { recv_car->type |= MOP; recv_car->op_func = mop_func; } } if (req->send_car_list{dest} == NULL) { req->send_car_list{dest} = envelope_car{dest}; req->send_car_list{dest}.size = 0; } /* basically we're adding to the size of the * envelope, which we know are at the head of these lists */ list_add(req->send_car_list{dest}, send_car); req->send_car_list{dest}.size += send_car->pack_size; list_add(req->recv_car_list, recv_car); req->recv_car_list.size += recv_car->pack_size; } MPID_xfer_scatter_forward_op(req, size, dest) { /* this one doesn't unpack. david points out that there is no USE_SEGMENT_BUF case here. that might be worth optimizing out... */ send_vc = get_vc(req->comm, dest); send_vc->alloc(send_car); req->recv_vc->alloc(recv_car); /* * Buffer handling is determined using some yet-to-be-determined function. * Nothing above this interface should be looking at the methods, so this * decision can't be made until we get in here? * * This function can take the count/dtype as a parameter to use to help * decide if it wants to tailor buffer allocation to eager vs. rendezvous. */ buffer_handing = func(...); switch(buffer_handling) { case SENDER_SUPPLIES_BUF: send_car->type = SEND_DATA | SUPPLY_PACKED_BUF | PROGRESS_DEPENDENCY; send_car->buf_dep = recv_car; send_car->pack_buf = NULL; send_car->pack_size = size; recv_car->type = RECV_DATA | PACKED_BUF_DEPENDENCY | SUPPLY_PROGRESS; recv_car->pack_buf = NULL; recv_car->pack_size = size; recv_car->progress_dep = send_car; case RECEIVER_SUPPLIES_BUF: send_car->type = SEND_DATA | PACKED_BUF_DEPENDENCY | PROGRESS_DEPENDENCY; send_car->pack_buf = NULL; send_car->pack_size = size; recv_car->type = RECV_DATA | SUPPLY_PACKED_BUF | SUPPLY_PROGRESS; recv_car->buf_dep = send_car; recv_car->pack_buf = NULL; recv_car->pack_size = size; recv_car->progress_dep = send_car; case USE_TMP_BUF: /* case where neither method is capable of providing a buffer */ alloc(tmp_buf); send_car->type = SEND_DATA | PROGRESS_DEPENDENCY; send_car->pack_buf = tmp_buf; send_car->pack_size = size; recv_car->type = RECV_DATA | SUPPLY_PROGRESS; recv_car->pack_buf = tmp_buf; recv_car->pack_size = size; recv_car->progress_dep = send_car; } if (req->send_car_list{dest} == NULL) { req->send_car_list{dest} = envelope_car{dest}; req->send_car_list{dest}.size = 0; } /* basically we're adding to the size of the * envelope, which we know are at the head of these lists */ list_add(req->send_car_list{dest}, send_car); req->send_car_list{dest}.size += send_car->pack_size; list_add(req->recv_car_list, recv_car); req->recv_car_list.size += recv_car->pack_size; } MPID_xfer_scatter_send_op(req, buf, count, dtype, first, last, dest) { send_vc = get_vc(req->comm, dest); send_vc->alloc(car); car->type = SEND_DATA | PACK; car->segment = segment; if (req->send_car_list{dest} == NULL) req->send_car_list{dest} = envelope_car{dest}; /* allocate an envelope and put it at the head of the send_car_list for this destination */ list_add(req->send_car_list{dest}, car); req->send_car_list{dest}.size += get_segment_size(segment); } MPID_xfer_scatter_start(req) { /* add the heads of the lists of cars into the ready lists/queues */ foreach rank in {send} { vc = get_vc(req->comm, rank); vc.send(vc, req->send_car_list{rank}); } req->recv_vc.recv(vc, req->recv_car_list); } MPID_xfer_gather_init(dest, tag, comm, &req) { alloc(req); /* allocates a xfer request */ req->dest = dest; req->tag = tag; req->comm = comm; req->car_count = 0; req->send_vc = get_vc(comm, dest); req->send_car_list = send_envelope; } MPID_xferi_gather_recv_mop_op(req, buf, count, dtype, first, last, mop_func, src) { req->recv_vc->alloc(car); car->type = RECV_DATA | UNPACK; car->segment = segment; if (req->recv_car_list{src} == NULL) { req->recv_car_list{src} = envelope_car{src}; } if (mop_func != MOP_NOOP) { recv_car->type |= MOP; recv_car->op_func = mop_func; } list_add(req->recv_car_list{src}, car); req->recv_car_list{src}.size += get_segment_size(segment); } /* * * the type field has three independent sets of flags: * SEND_DATA - sending * RECV_DATA - receiving * * SUPPLY_PACKED_BUF - the method associated with this CAR will supply a * buffer for use by both this and some other CAR * PACKED_BUF_DEPENDENCY - this CAR will be supplied a buffer from some other * CAR (thus it has a buffer dependency) * * PACK - this CAR packs the data from the segment into the packed buffer * UNPACK - this CAR unpacks the data from the pack buffer into the segment * buffer */ MPID_xferi_gather_recv_mop_forward_op(req, buf, count, dtype, first, last, mop, src) { recv_vc = get_vc(req->comm, src); recv_vc->alloc(recv_car); req->send_vc->alloc(send_car); /* * Buffer handling is determined using some yet-to-be-determined function. * Nothing above this interface should be looking at the methods, so this * decision can't be made until we get in here? * * This function can take the count/dtype as a parameter to use to help * decide if it wants to tailor buffer allocation to eager vs. rendezvous. */ buffer_handing = func(..., mop_orientation); switch(buffer_handling) { case SENDER_SUPPLIES_BUF: send_car->type = SEND_DATA | SUPPLY_PACKED_BUF | PROGRESS_DEPENDENCY; send_car->buf_dep = recv_car; send_car->pack_buf = NULL; send_car->pack_size = get_segment_size(buf, count, dtype, first, last); recv_car->type = RECV_DATA | UNPACK | PACKED_BUF_DEPENDENCY | SUPPLY_PROGRESS; recv_car->pack_buf = NULL; recv_car->segment = segment(buf, count, dtype, first, last); recv_car->progress_dep = send_car; case RECEIVER_SUPPLIES_BUF: send_car->type = SEND_DATA | PACKED_BUF_DEPENDENCY | PROGRESS_DEPENDENCY; send_car->pack_buf = NULL; send_car->pack_size = get_segment_size(buf, count, dtype, first, last); recv_car->type = RECV_DATA | UNPACK | SUPPLY_PACKED_BUF | SUPPLY_PROGRESS; recv_car->buf_dep = send_car; recv_car->pack_buf = NULL; recv_car->segment = segment(buf, count, dtype, first, last); recv_car->progress_dep = send_car; case USE_SEGMENT_BUF: send_car->type = SEND_DATA | PACK | PROGRESS_DEPENDENCY; send_car->pack_buf = NULL; send_car->pack_size = get_segment_size(buf, count, dtype, first, last); recv_car->type = RECV_DATA | UNPACK | SUPPLY_PROGRESS; recv_car->pack_buf = NULL; recv_car->segment = segment(buf, count, dtype, first, last); recv_car->progress_dep = send_car; case USE_TMP_BUF: /* case where neither method is capable of providing a buffer */ alloc(tmp_buf); send_car->type = SEND_DATA | PROGRESS_DEPENDENCY; send_car->pack_buf = tmp_buf; send_car->pack_size = get_segment_size(buf, count, dtype, first, last); recv_car->type = RECV_DATA | UNPACK | SUPPLY_PROGRESS; recv_car->pack_buf = tmp_buf; recv_car->segment = segment(buf, count, dtype, first, last); recv_car->progress_dep = send_car; } if (mop_func != MOP_NOOP) { if (mop_orientation == SEND) { send_car->type |= MOP; send_car->op_func = mop_func; } else { recv_car->type |= MOP; recv_car->op_func = mop_func; } } if (req->recv_car_list{src} == NULL) { req->recv_car_list{src} = envelope_car{src}; req->recv_car_list{src}.size = 0; } /* basically we're adding to the size of the * envelope, which we know are at the head of these lists */ list_add(req->recv_car_list{src}, recv_car); req->recv_car_list{src}.size += recv_car->pack_size; list_add(req->send_car_list, send_car); req->send_car_list.size += send_car->pack_size; } MPID_xfer_gather_forward_op(req, size, dest) { /* this one doesn't unpack. david points out that there is no USE_SEGMENT_BUF case here. that might be worth optimizing out... */ recv_vc = get_vc(req->comm, src); recv_vc->alloc(recv_car); req->send_vc->alloc(send_car); /* * Buffer handling is determined using some yet-to-be-determined function. * Nothing above this interface should be looking at the methods, so this * decision can't be made until we get in here? * * This function can take the count/dtype as a parameter to use to help * decide if it wants to tailor buffer allocation to eager vs. rendezvous. */ buffer_handing = func(...); switch(buffer_handling) { case SENDER_SUPPLIES_BUF: send_car->type = SEND_DATA | SUPPLY_PACKED_BUF | PROGRESS_DEPENDENCY; send_car->buf_dep = recv_car; send_car->pack_buf = NULL; send_car->pack_size = size; recv_car->type = RECV_DATA | PACKED_BUF_DEPENDENCY | SUPPLY_PROGRESS; recv_car->buf_dep = send_car; recv_car->pack_buf = NULL; recv_car->pack_size = size; recv_car->progress_dep = send_car; case RECEIVER_SUPPLIES_BUF: send_car->type = SEND_DATA | PACKED_BUF_DEPENDENCY | PROGRESS_DEPENDENCY; send_car->pack_buf = NULL; send_car->pack_size = size; recv_car->type = RECV_DATA | SUPPLY_PACKED_BUF | SUPPLY_PROGRESS; recv_car->buf_dep = send_car; recv_car->pack_buf = NULL; recv_car->pack_size = size; recv_car->progress_dep = send_car; case USE_TMP_BUF: /* case where neither method is capable of providing a buffer */ alloc(tmp_buf); send_car->type = SEND_DATA | PROGRESS_DEPENDENCY; send_car->pack_buf = tmp_buf; send_car->pack_size = size; recv_car->type = RECV_DATA | SUPPLY_PROGRESS; recv_car->pack_buf = tmp_buf; recv_car->pack_size = size; recv_car->progress_dep = send_car; } if (req->recv_car_list{src} == NULL) { req->recv_car_list{src} = envelope_car{src}; req->recv_car_list{src}.size = 0; } /* basically we're adding to the size of the * envelope, which we know are at the head of these lists */ list_add(req->recv_car_list{src}, recv_car); req->recv_car_list{src}.size += recv_car->pack_size; list_add(req->send_car_list, send_car); req->send_car_list.size += send_car->pack_size; } MPID_xfer_gather_send_op(req, buf, count, dtype, first, last) { req->send_vc->alloc(send_car); send_car->type = SEND_DATA | PACK; send_car->segment = segment; list_add(req->send_car_list, send_car); req->send_car_list.size += get_segment_size(segment); } MPID_xfer_gather_start(req) { /* add the heads of the lists of cars into the ready lists/queues */ foreach rank in {recv} { /* need to get the vc for rank; extract it from the head car, which is * the envelope - this could be a special field in the envelope, or * we could have the vc in all cars (that seems silly) * * alternatively we could look up the vc using the info in the request * plus the rank from the envelope... */ vc = get_vc(req->comm, rank); vc.recv(vc, req->recv_car_list{rank}); } req->send_vc.send(vc, req->send_car_list); } /***************************************************************/ now we need to make progress on things, taking the dependencies into acct. /***************************************************************/ the open question at the moment is where to put the car lists. one option is to associate things with the vc. this necessitates having a list of "active" vcs, I think. we would update any necessary global method state during the xfer start functions. we note that there is a disparity between the send side and the receive side in that there is always the possibility that messages will arrive, so we need to be polling all the time for receives, at some interval. This interval might be very infrequent when we are not expecting any messages, but more frequent when we know that there are outstanding operations on the given method. in some cases it costs the same to poll all vcs for a given method as it does to poll a set of or a single vc for the method. in other cases it is more expensive to poll all vcs on the method. another issue is knowing the relative speeds of the methods WRT how often to poll. the polling functions could give feedback as well indicating that progress was made, or # of things processed, or whatever. this might be useful as well... this could be a priority thing of some sort. we call a method-specific function to add the car lists to the specific VC(s). this allows for the method to update any internal structures as well which it can use to keep up with active connections/sockets/VIs/whatever. /* this is will be a pointer in the vc to a method-specific function */ We're going to allow methods to modify the VC method functions whenever they feel like it in order to express the current state via the function pointers. Boy, that's wacky isn't it? :) This could also be used to put a VC in an error state. tcp_send(vc, car_list) { if connect isn't initiated initiate connect queue car list else if connect in progress queue car list else if send queue is not empty queue car list make progress on this send queue else make progress on specified car list enqueue remainder of car list fi } tcp_recv(vc, car_list) { } tcp_make_progress() { } In the case where a recv is posted to the method but the matching message has already started to arrive (a corresponding unexpected CAR already exists), we need a way to switch the CAR the progress engine is using to complete the receive. To accomplish this, we will lock the unexpected CAR, point the unexpected CAR at the new recv CAR, copy any data already received into the buffer associated with the new car, and then release the lock. When the progress engine receives more data, it will notice that the unexpected CAR contains a pointer to a real recv CAR, update its cached CAR pointer, and release the unexpected CAR (continuing to process the incoming data using the state in the recv CAR). Dependencies: - completion dependencies are simply handled by use of separate cars in these ordered lists. these are for multiple cars which make up the same envelope. - we've lost our priority dependencies. oops! at present we use an associative array (thank you perl), but that data structure does not maintain the order of operations. to simplify this situation, we constrain prioritization to the message level rather than prioritizing each and every operation/CAR. this allows us to continue using an associative array type data structure as long as that array maintains the order in which we inserted new keys into the array. - this approach loses the priority information between methods. is that ok? no. so we need to have something that keeps this information across methods as well. - david seems to think that this priority stuff isn't going to work. - we might get back more than one buffer to use for receiving one car's worth of data, so we can't just have a single pointer to a buffer in the car, if we use a pointer in the car at all. alternatively we could keep the pointers to buffers in some structure associated with the vc (as method-specific data). so we use iovec-like things to keep up with the multiple buffers. describing buffers: the sender can then hand over groups of buffers. this includes a list of pointers and sizes and a total length. grabbing or providing buffers (which?): options: - single function call used by receiver to ask for a buffer, and if the sender isn't ready, then we get a temp. buffer. - single function call used by sender used to provide buffers immediately when available. - handshake: recv notifies sender he is ready, sender provides buffer space when it is available. he does not reserve buffer space before the receiver notifies him. the code matching handshakes on the sending side may supply buffers for more than one send CAR which has been marked by the receiver as ready, but it MUST do so in the order given by the completion dependencies sender provides buffers when it is at the head of the queue OR if all CARs ahead of the sending CAR have satisfied all their buffer dependencies (and the recv is ready). the sender has the option of providing only part of the required buffer space. in this case, there must be some minimum amount agreed upon between the two methods (to ensure that the receiver can make progress with the amount of buffer space provided). CARs are considered "ahead" if they are part of the same envelope/message (CAR list) and are ahead of the send CAR in the ordered list (vertical, completion dependence) OR if they are on the same VC with the sender but in a different envelope and were enqueued (hi david!) previously. We can optionally consider whether or not these different envelopes are in the same context when making dependency assessments (gets back to the reordering possibility). - this "all previous CARs are satisfied" stuff refers to the space for the envelopes, not the data (unless it was sent eagerly). we can do this due to the semantics of MPI contexts - a receive only notifies when the incoming envelope has been matched. likewise, the sender (if using rendezvous) must match envelopes with the remote receiver before he will hand buffers back to the receiver. - there are opportunities for reordering the send queue based on the MPI semantics between isends/sends/collectives. if the contexts aren't the same, you can reorder (maintaining intra-context ordering). recall that collectives are in a different context from p2p - senders in the rendezvous protocol with progress dependencies may eagerly send their RTS. likewise the CTS may also be sent eagerly when the matching receive is seen. - we need to keep a small number of buffers in reserve on some methods (e.g. VIA) in order to guarantee that we can make progress. the example is a system that would otherwise use up all his buffers for RTS/CTS packets. we don't think that this is quite sufficient. so in the via case we need to have at least two buffers lying around (at a very low level) for our window protocol, one for receiving window messages and one for sending them, but we don't use that up at the level we're looking at. the send buffer can be small, but the receive buffer may have to be bigger (?). actually we can just have two buffers around at the messaging level, which are enough to make slow progress. then we piggyback our window info inside the 32 bits in the header of each send. additionally the envelope/header can't be bigger than the minimum via packet size (16K) we can implement a window scheme per VI/VC. we can keep a pool of buffers around that VI/VCs can use (and free if they aren't needed, except for the two mentioned above). * we could also use DMA writes to do the data transfer and get rid of this problem altogether. this gives you a packet per message used out of the receive queue. IMPLEMENT THIS ONE. - DMA writes for both the data transfer and the control messages is david's favorite screaming fast idea. by utilizing this two-dma approach to ensure that the entire header is there, we can get around implementation details. - we came up with a wacky scheme for writing window values directly into the remote memory. to do this you want to assign a sequence number to each packet. then the format of the data you write is: The counter values are identical for a given write and are used as sentinel values (is this the right nomenclature?)/cookies to allow the local side to know that the data in the region is valid. if the local process reads the region and doesn't get the same counter value on each end, it tries again. This is cool because it's a memory operation, it's likely to work the second time if it fails, and it doesn't consume any buffers off the VI queues. We might even be able to get all this into a 32-bit value and make this simpler. - we want to utilize our pool of buffers to primarily service forwarding operations -- we want to wait for the send operation to reach the head of its VC queue before we really start giving lots of buffers to that operation. ----- MPID_Irecv(buf, count, datatype, ...) { /* foa() - looks up VC (among other things. if rank is ANY_SOURCE, get * back NULL */ recv_posted_foa(comm, rank, tag, &car, &vc, &found); car->buf = buf; car->count = count; car->datatype = datatype; unlock(car->mutex); if (found) { /* some indication of the sent message has already arrived */ /* process_recv() can make progress implicitly if they like */ vc->process_recv(vc, car); /* after this call, make_progress() must be the only call * necessary to finish this message. make_progress may need to be * called more than once. this function may _optionally_ also make * progress. */ } } /* this isn't necessarily a function... */ method_specific_send_envelope_arrived_on_the_wire() { /* y'all, the envelope has been yanked */ comm = get_comm_from_context(context_id); /* we won't know if we really want the vc in here until we start * implementiong this...but this isn't method specific, so we need to * make up our minds sooner or later. * * this is pretty much the least number of parameters from which we can * reasonably extract all the information we might need (vc, context) */ recv_incoming_foa(comm, rank, tag, &car, &found); /* recv_incoming_foa() returns an existing CAR if a matching recv was * found in the "posted queue"; otherwise it allocates an new CAR and * returns a pointer to it. found is true if an existing CAR was matched * in the posted queue. */ if (data with header) { car->data = p_data; } unlock(car->mutex); if (found) { /* we could do a lot of work in this function if we like */ method_specific_process_recv(vc, car); } } - we will allocate cars by determining the maximum needed size at runtime and allocating all cars to that size. preserves locality while simultaneously separating the generic "car" from any method-specific info, allowing 3rd party methods to be added without modifying the generic car MPI_Wait() { int backoff_cnt = 0; while(req->counter > 0) { if (backoff_cnt++ > SPIN_THRESHOLD) { if (backoff_cnt < SLEEP_THRESHOLD) { /* may want to do this only if you have limited processor resources */ mpid_yield(); } else { mpid_lock(req->mutex); req->state = BLOCKING_WAIT; while (req->counter > 0) mpid_cond_wait(req->mutex, req->cond); mpid_unlock(req->mutex); } } } } tcp_make_progress() { /* assuming a set of global FD set which contain the current "list" of active FDs, we simply make a copy of those fd_set local_read_set = global_read_set; fd_set local_write_set = global_write_set; nset = select(nfds, local_read_set, local_write_set, NULL, timeout); foreach fd (set in local_read_set) { get_header(); if (new incoming message) { comm = get_comm_from_context(context_id); recv_incoming_foa(comm, rank, tag, &car, &found); if (found) { if (car->type & PACKED_BUF_DEPENDENCY != 0) { } if (car->type & UNPACK == UNPACK) { MPID_Segment_init(car->segment, car->buf, car->count, car->datatype); } } } } } Inter-method interface progress reporting: recv CAR has new data available and want to allow the method associated with the send CAR to make progress on the send CAR. This implies that we need a function provided by the send CAR's method which the recv CAR's method can call directly. car_r.car_s.vc.recv_ready(car_s) provide_bufs(car_s, , length) report_progress(car, nbytes) provide_bufs_and_progress(car, bufs, len, nbytes) { } - we want to have all the "code" in the arcs/edges - it might make sense to separate the vm, method buffer, recv, and send into separate machines - do we want it to be ok for the buffering system to return 0? - concern: if we pound out a purely event-based system, can we then come back and build an efficient implementation? or are we screwing ourselves? - get a state machine, so we can talk about the api... - we want this first cut to be event-driven - we also need some fast-path capabilities for some key transitions/actions. - we see then that we have a very simple state machine really. but we think that we should have some flow diagrams that describe the processing that happens on the edges of these simple state machines. - ugh. then we start to see that we would like to differentiate between receiving control and data? i dunno. - looks like we have a couple of events/api functions for the vcs now: buffer events and wire events. - not purely event driven, but bubbles are suspendible state and events are what wake things up. buffer allocation: - receiver vs. sender buffer allocation relies on the sender to grab buffers in all cases? this is what we're saying in our state machine at the moment, but it's NOT what we were saying previously. we need to make a decision on this one. - maybe we're not going to ever eagerly receive(?) method.alloc() - parameters: maybe src_vc, dest_vc? maybe provider_vc, consumer_vc? (for notification, accounting) min_sz, max_sz? user_arg? callback_fn? - this function should return immediately, possibly showing failure. - there is a separate issue of matching BETWEEN methods. we haven't quite figured out how to do this...we were thinking that the source and dest could be used to pick more optimal buffers in the case where the method knows about pairings - this gets us to a sticky point -- how much do we try to optimize as much as possible vs. how much do we try to make it easy to implement new methods ----- Open Issues: - When should the ready signal be triggered from the receive car to the send car? - When should buffers be allocated when forwarding eager messages so as to avoid buffer copies? - Need to define the communication agent