Skip to content

Commit

Permalink
Fix buffer posting and FI_EAGAIN
Browse files Browse the repository at this point in the history
Elaborate.

Signed-off-by: John H. Hartman <[email protected]>
  • Loading branch information
jhh67 committed Feb 7, 2025
1 parent 6fb1537 commit c4fbc8f
Showing 1 changed file with 100 additions and 41 deletions.
141 changes: 100 additions & 41 deletions runtime/src/comm/ofi/comm-ofi.c
Original file line number Diff line number Diff line change
Expand Up @@ -3165,6 +3165,26 @@ void init_ofiForAms(void) {
// set up two of these and swap back and forth between them, to hedge
// against receiving "buffer filled and released" events out of order
// with respect to the messages stored within them.

// There are two receive buffers and we alternate between them. If there
// were only one buffer then there might be a window during which there is
// no available buffer space because we are processing the last message in
// the buffer while new messages are still being received. Instead, we
// double-buffer. When the current buffer has been consumed up to a
// threshold (defined by FI_OPT_MIN_MULTI_RECV above), libfabric will tell
// us the buffer by setting the FI_MULTI_RECV flag in a completion event.
// This causes us to post the other receive buffer, but note that since the
// current buffer is not yet full we can still receive messages as long as
// FI_OPT_MIN_MULTI_RECV is large enough to cover the window.
//
// One issue is knowing when there are no lingering dependencies on a buffer
// so we can repost it, since doing so will cause it to be filled with new
// messages. There are two types of active messages in the buffer; some are
// handled synchrously by the active message handler itself, and some are
// handled asynchronously by calling chpl_task_startMovedTask to create a
// new task to execute the active message. chpl_task_startMovedTask copies
// its arguments, so in either case there are no lingering dependencies on
// the message buffer.
//
CHPL_CALLOC_SZ(amLZs[0], 1, amLZSize);
CHPL_CALLOC_SZ(amLZs[1], 1, amLZSize);
Expand All @@ -3190,16 +3210,20 @@ void init_ofiForAms(void) {
ofi_rxBuffer = ofi_msg_reqs[0].msg_iov->iov_base;
ofi_rxEnd = (void *) ((char *) ofi_rxBuffer +
ofi_msg_reqs[0].msg_iov->iov_len);

for (int i = 0; i < 2; i++) {
memset(ofi_msg_reqs[i].msg_iov->iov_base, '\0',
ofi_msg_reqs[i].msg_iov->iov_len);
OFI_CHK(fi_recvmsg(ofi_rxEp, &ofi_msg_reqs[i], FI_MULTI_RECV));
DBG_PRINTF(DBG_AM_BUF,
"pre-post fi_recvmsg(AMLZs %p, len %#zx)",
ofi_msg_reqs[i].msg_iov->iov_base,
ofi_msg_reqs[i].msg_iov->iov_len);
}

//
// Post the first receive buffer. The other will be posted when this
// one has been consumed.
//
OFI_CHK(fi_recvmsg(ofi_rxEp, &ofi_msg_reqs[0], FI_MULTI_RECV));
DBG_PRINTF(DBG_AM_BUF,
"post fi_recvmsg(AMLZs %p, len %#zx)",
ofi_msg_reqs[0].msg_iov->iov_base,
ofi_msg_reqs[0].msg_iov->iov_len);
init_amHandling();
}

Expand Down Expand Up @@ -5117,6 +5141,28 @@ void processRxAmReqCntr(void) {
ofi_rxCount += todo;
}

//
// Post the other of the two message buffers.
//
static
chpl_bool postOtherBuffer(void) {
chpl_bool posted = true;
int i = 1 - ofi_msg_i;
int rc;
OFI_CHK_2(fi_recvmsg(ofi_rxEp, &ofi_msg_reqs[i], FI_MULTI_RECV), rc,
-FI_EAGAIN);
if (rc == -FI_EAGAIN) {
posted = false;
} else {
DBG_PRINTF(DBG_AM_BUF,
"(re)post fi_recvmsg(AMLZs %p, len %#zx)",
ofi_msg_reqs[i].msg_iov->iov_base,
ofi_msg_reqs[i].msg_iov->iov_len);
ofi_msg_i = i;
}
return posted;
}

static
void processRxAmReqCQ(void) {
//
Expand All @@ -5125,44 +5171,57 @@ void processRxAmReqCQ(void) {
struct fi_cq_data_entry cqes[5];
const size_t maxEvents = sizeof(cqes) / sizeof(cqes[0]);
ssize_t ret;
CHK_TRUE((ret = fi_cq_read(ofi_rxCQ, cqes, maxEvents)) > 0
|| ret == -FI_EAGAIN
|| ret == -FI_EAVAIL);
if (ret == -FI_EAVAIL) {
reportCQError(ofi_rxCQ);
}

const size_t numEvents = (ret == -FI_EAGAIN) ? 0 : ret;

for (int i = 0; i < numEvents; i++) {
if ((cqes[i].flags & FI_RECV) != 0) {
//
// This event is for an inbound AM request. Handle it.
//
amRequest_t* req = (amRequest_t*) cqes[i].buf;
DBG_PRINTF(DBG_AM_BUF,
"CQ rx AM req @ buffer offset %zd, sz %zd, seqId %s",
(char*) req - (char*) ofi_iov_reqs[ofi_msg_i].iov_base,
cqes[i].len, am_seqIdStr(req));
DBG_PRINTF(DBG_AM | DBG_AM_RECV,
"rx AM req: %s",
am_reqStr(chpl_nodeID, req, cqes[i].len));
(void) handleAmReq(req);
chpl_bool post = false;
do {
CHK_TRUE((ret = fi_cq_read(ofi_rxCQ, cqes, maxEvents)) > 0
|| ret == -FI_EAGAIN
|| ret == -FI_EAVAIL);
if (ret == -FI_EAVAIL) {
reportCQError(ofi_rxCQ);
}
if ((cqes[i].flags & FI_MULTI_RECV) != 0) {
//
// Multi-receive buffer filled; post the other one.
//
ofi_msg_i = 1 - ofi_msg_i;
OFI_CHK(fi_recvmsg(ofi_rxEp, &ofi_msg_reqs[ofi_msg_i], FI_MULTI_RECV));
DBG_PRINTF(DBG_AM_BUF,
"re-post fi_recvmsg(AMLZs %p, len %#zx)",
ofi_msg_reqs[ofi_msg_i].msg_iov->iov_base,
ofi_msg_reqs[ofi_msg_i].msg_iov->iov_len);

//
// Post the other buffer if there is a post pending.
if (post) {
if (postOtherBuffer() == true) {
post = false;
}
}

CHK_TRUE((cqes[i].flags & ~(FI_MSG | FI_RECV | FI_MULTI_RECV)) == 0);
}
const size_t numEvents = (ret == -FI_EAGAIN) ? 0 : ret;

for (int i = 0; i < numEvents; i++) {
if ((cqes[i].flags & FI_RECV) != 0) {
//
// This event is for an inbound AM request. Handle it.
//
amRequest_t* req = (amRequest_t*) cqes[i].buf;
DBG_PRINTF(DBG_AM_BUF,
"CQ rx AM req @ buffer offset %zd, sz %zd, seqId %s",
(char*) req - (char*) ofi_iov_reqs[ofi_msg_i].iov_base,
cqes[i].len, am_seqIdStr(req));
DBG_PRINTF(DBG_AM | DBG_AM_RECV,
"rx AM req: %s",
am_reqStr(chpl_nodeID, req, cqes[i].len));
(void) handleAmReq(req);
}
if ((cqes[i].flags & FI_MULTI_RECV) != 0) {
//
// Multi-receive buffer filled; post the other one.
//

if (postOtherBuffer() == false) {
//
// Buffer was not posted due to FI_EAGAIN. Go around the outer loop
// again which will call fi_cq_read to progress the endpoint and
// then try reposting the buffer.
//
post = true;
}
}
CHK_TRUE((cqes[i].flags & ~(FI_MSG | FI_RECV | FI_MULTI_RECV)) == 0);
}
} while(post);
}

static
Expand Down

0 comments on commit c4fbc8f

Please sign in to comment.