From e413982c271579d2d76e2961a5ee1b701fba9e54 Mon Sep 17 00:00:00 2001 From: Shi Jin Date: Mon, 24 Jun 2024 21:44:44 +0000 Subject: [PATCH] prov/efa: Queue txes when handshake is enforced but not made. Introduce efa_rdm_txe_enforce_handshake() function to handle the handshake triggering and the txe queueing. Signed-off-by: Shi Jin --- prov/efa/src/rdm/efa_rdm_atomic.c | 9 +- prov/efa/src/rdm/efa_rdm_ep.h | 2 + prov/efa/src/rdm/efa_rdm_ep_utils.c | 32 ++++++ prov/efa/src/rdm/efa_rdm_msg.c | 6 +- prov/efa/src/rdm/efa_rdm_rma.c | 28 ++--- prov/efa/test/efa_unit_test_cq.c | 8 +- prov/efa/test/efa_unit_test_ep.c | 168 +++++++++++++++++++++++++++- prov/efa/test/efa_unit_tests.c | 5 +- prov/efa/test/efa_unit_tests.h | 18 ++- 9 files changed, 230 insertions(+), 46 deletions(-) diff --git a/prov/efa/src/rdm/efa_rdm_atomic.c b/prov/efa/src/rdm/efa_rdm_atomic.c index 6386ad372e4..eb997a77906 100644 --- a/prov/efa/src/rdm/efa_rdm_atomic.c +++ b/prov/efa/src/rdm/efa_rdm_atomic.c @@ -98,7 +98,6 @@ efa_rdm_atomic_alloc_txe(struct efa_rdm_ep *efa_rdm_ep, ssize_t efa_rdm_atomic_post_atomic(struct efa_rdm_ep *efa_rdm_ep, struct efa_rdm_ope *txe) { bool delivery_complete_requested; - int ret; static int req_pkt_type_list[] = { [ofi_op_atomic] = EFA_RDM_WRITE_RTA_PKT, [ofi_op_atomic_fetch] = EFA_RDM_FETCH_RTA_PKT, @@ -119,12 +118,10 @@ ssize_t efa_rdm_atomic_post_atomic(struct efa_rdm_ep *efa_rdm_ep, struct efa_rdm * the information whether the peer * support it or not. */ - if (!(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) { - ret = efa_rdm_ep_trigger_handshake(efa_rdm_ep, txe->peer); - return ret ? ret : -FI_EAGAIN; - } + if (!(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) + return efa_rdm_ep_enforce_handshake_for_txe(efa_rdm_ep, txe); - if (!efa_rdm_peer_support_delivery_complete(txe->peer)) + if (!(txe->peer->is_self) && !efa_rdm_peer_support_delivery_complete(txe->peer)) return -FI_EOPNOTSUPP; } diff --git a/prov/efa/src/rdm/efa_rdm_ep.h b/prov/efa/src/rdm/efa_rdm_ep.h index 8e2caf59436..67604745f9f 100644 --- a/prov/efa/src/rdm/efa_rdm_ep.h +++ b/prov/efa/src/rdm/efa_rdm_ep.h @@ -391,6 +391,8 @@ ssize_t efa_rdm_ep_trigger_handshake(struct efa_rdm_ep *ep, struct efa_rdm_peer ssize_t efa_rdm_ep_post_handshake(struct efa_rdm_ep *ep, struct efa_rdm_peer *peer); +int efa_rdm_ep_enforce_handshake_for_txe(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe); + void efa_rdm_ep_post_handshake_or_queue(struct efa_rdm_ep *ep, struct efa_rdm_peer *peer); diff --git a/prov/efa/src/rdm/efa_rdm_ep_utils.c b/prov/efa/src/rdm/efa_rdm_ep_utils.c index 3499e2efa9a..d32e0d0b6e5 100644 --- a/prov/efa/src/rdm/efa_rdm_ep_utils.c +++ b/prov/efa/src/rdm/efa_rdm_ep_utils.c @@ -967,3 +967,35 @@ size_t efa_rdm_ep_get_memory_alignment(struct efa_rdm_ep *ep, enum fi_hmem_iface return memory_alignment; } +/** + * @brief Enforce a handshake to made for given txe. + * It will trigger a handshake with peer and choose to + * return EAGAIN or queue the txe. + * @param ep efa_rdm_ep + * @param txe tx entry + * @return int 0 on success, negative integer on failure. + */ +int efa_rdm_ep_enforce_handshake_for_txe(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe) +{ + int ret; + + assert(txe->type == EFA_RDM_TXE); + assert(!(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)); + + ret = efa_rdm_ep_trigger_handshake(ep, txe->peer); + if (ret) + return ret; + /** + * we cannot queue requests (and return 0) for inject, + * which expects the buffer can be reused when the call + * returns success. + */ + if (txe->fi_flags & FI_INJECT) + return -FI_EAGAIN; + + if (!(txe->internal_flags & EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE)) { + txe->internal_flags |= EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE; + dlist_insert_tail(&txe->queued_entry, &efa_rdm_ep_domain(ep)->ope_queued_list); + } + return FI_SUCCESS; +} diff --git a/prov/efa/src/rdm/efa_rdm_msg.c b/prov/efa/src/rdm/efa_rdm_msg.c index 05c6a65ea8c..fe07851c29c 100644 --- a/prov/efa/src/rdm/efa_rdm_msg.c +++ b/prov/efa/src/rdm/efa_rdm_msg.c @@ -138,10 +138,8 @@ ssize_t efa_rdm_msg_post_rtm(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe) * * Check handshake packet from peer to verify support status. */ - if (!(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) { - err = efa_rdm_ep_trigger_handshake(ep, txe->peer); - return err ? err : -FI_EAGAIN; - } + if (!(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) + return efa_rdm_ep_enforce_handshake_for_txe(ep, txe); if (!efa_rdm_pkt_type_is_supported_by_peer(rtm_type, txe->peer)) return -FI_EOPNOTSUPP; diff --git a/prov/efa/src/rdm/efa_rdm_rma.c b/prov/efa/src/rdm/efa_rdm_rma.c index 67fdb0af58b..5ebecc3f97c 100644 --- a/prov/efa/src/rdm/efa_rdm_rma.c +++ b/prov/efa/src/rdm/efa_rdm_rma.c @@ -124,10 +124,8 @@ ssize_t efa_rdm_rma_post_read(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe) * For local read (read from self ep), such handshake is not needed because we only * need to check the local ep's capabilities. */ - if (!(txe->peer->is_self) && !(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) { - ret = efa_rdm_ep_trigger_handshake(ep, txe->peer); - return ret ? ret : -FI_EAGAIN; - } + if (!(txe->peer->is_self) && !(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) + return efa_rdm_ep_enforce_handshake_for_txe(ep, txe); if (efa_rdm_interop_rdma_read(ep, txe->peer)) { /* RDMA read interoperability check also checks domain.use_device_rdma, @@ -361,10 +359,8 @@ ssize_t efa_rdm_rma_post_write(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe) * For local write (writing it self), this handshake is not required because we only need to * check one-side capability */ - if (!(txe->peer->is_self) && !(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) { - err = efa_rdm_ep_trigger_handshake(ep, txe->peer); - return err ? err : -FI_EAGAIN; - } + if (!(txe->peer->is_self) && !(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) + return efa_rdm_ep_enforce_handshake_for_txe(ep, txe); if (efa_rdm_rma_should_write_using_rdma(ep, txe, txe->peer)) { efa_rdm_ope_prepare_to_post_write(txe); @@ -380,21 +376,11 @@ ssize_t efa_rdm_rma_post_write(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe) * The sender cannot send with FI_DELIVERY_COMPLETE * if the peer is not able to handle it. * - * If the sender does not know whether the peer - * can handle it, it needs to trigger - * a handshake packet from the peer. - * - * The handshake packet contains - * the information whether the peer - * support it or not. + * handshake is already made now since we enforce + * handshake for write earlier. */ - err = efa_rdm_ep_trigger_handshake(ep, txe->peer); - if (OFI_UNLIKELY(err)) - return err; - if (!(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) - return -FI_EAGAIN; - else if (!efa_rdm_peer_support_delivery_complete(txe->peer)) + if (!(txe->peer->is_self) && !efa_rdm_peer_support_delivery_complete(txe->peer)) return -FI_EOPNOTSUPP; max_eager_rtw_data_size = efa_rdm_txe_max_req_data_capacity(ep, txe, EFA_RDM_DC_EAGER_RTW_PKT); diff --git a/prov/efa/test/efa_unit_test_cq.c b/prov/efa/test/efa_unit_test_cq.c index 5af734e6d48..83e501e1c27 100644 --- a/prov/efa/test/efa_unit_test_cq.c +++ b/prov/efa/test/efa_unit_test_cq.c @@ -415,16 +415,10 @@ void test_rdm_cq_create_error_handling(struct efa_resource **state) static int test_efa_rdm_cq_get_ibv_cq_poll_list_length(struct fid_cq *cq_fid) { - int i = 0; - struct dlist_entry *item; struct efa_rdm_cq *cq; cq = container_of(cq_fid, struct efa_rdm_cq, util_cq.cq_fid.fid); - dlist_foreach(&cq->ibv_cq_poll_list, item) { - i++; - } - - return i; + return efa_unit_test_get_dlist_length(&cq->ibv_cq_poll_list); } /** diff --git a/prov/efa/test/efa_unit_test_ep.c b/prov/efa/test/efa_unit_test_ep.c index 3a5050f9879..e696ae16393 100644 --- a/prov/efa/test/efa_unit_test_ep.c +++ b/prov/efa/test/efa_unit_test_ep.c @@ -303,14 +303,13 @@ void test_efa_rdm_ep_pkt_pool_page_alignment(struct efa_resource **state) } - /** * @brief when delivery complete atomic was used and handshake packet has not been received - * verify there is no txe leak + * verify the txe is queued * * @param[in] state struct efa_resource that is managed by the framework */ -void test_efa_rdm_ep_dc_atomic_error_handling(struct efa_resource **state) +void test_efa_rdm_ep_dc_atomic_queue_before_handshake(struct efa_resource **state) { struct efa_rdm_ep *efa_rdm_ep; struct efa_rdm_peer *peer; @@ -322,6 +321,7 @@ void test_efa_rdm_ep_dc_atomic_error_handling(struct efa_resource **state) size_t raw_addr_len = sizeof(struct efa_ep_addr); fi_addr_t peer_addr; int buf[1] = {0}, err, numaddr; + struct efa_rdm_ope *txe; efa_unit_test_resource_construct(resource, FI_EP_RDM); @@ -363,11 +363,167 @@ void test_efa_rdm_ep_dc_atomic_error_handling(struct efa_resource **state) assert_true(dlist_empty(&efa_rdm_ep->txe_list)); err = fi_atomicmsg(resource->ep, &msg, FI_DELIVERY_COMPLETE); /* DC has been reuquested, but ep do not know whether peer supports it, therefore - * -FI_EAGAIN should be returned + * the ope has been queued to domain->ope_queued_list */ - assert_int_equal(err, -FI_EAGAIN); - /* make sure there is no leaking of txe */ + assert_int_equal(err, 0); + assert_int_equal(efa_unit_test_get_dlist_length(&efa_rdm_ep->txe_list), 1); + assert_int_equal(efa_unit_test_get_dlist_length(&(efa_rdm_ep_domain(efa_rdm_ep)->ope_queued_list)), 1); + txe = container_of(efa_rdm_ep_domain(efa_rdm_ep)->ope_queued_list.next, struct efa_rdm_ope, queued_entry); + assert_true((txe->op == ofi_op_atomic)); + assert_true(txe->internal_flags & EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE); +} + +/** + * @brief when delivery complete send was used and handshake packet has not been received + * verify the txe is queued + * + * @param[in] state struct efa_resource that is managed by the framework + */ +void test_efa_rdm_ep_dc_send_queue_before_handshake(struct efa_resource **state) +{ + struct efa_rdm_ep *efa_rdm_ep; + struct efa_rdm_peer *peer; + struct fi_msg msg = {0}; + struct iovec iov; + struct efa_resource *resource = *state; + struct efa_ep_addr raw_addr = {0}; + size_t raw_addr_len = sizeof(struct efa_ep_addr); + fi_addr_t peer_addr; + int err, numaddr; + struct efa_rdm_ope *txe; + + efa_unit_test_resource_construct(resource, FI_EP_RDM); + + /* create a fake peer */ + err = fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len); + assert_int_equal(err, 0); + raw_addr.qpn = 1; + raw_addr.qkey = 0x1234; + numaddr = fi_av_insert(resource->av, &raw_addr, 1, &peer_addr, 0, NULL); + assert_int_equal(numaddr, 1); + + msg.addr = peer_addr; + msg.iov_count = 1; + iov.iov_base = NULL; + iov.iov_len = 0; + msg.msg_iov = &iov; + msg.desc = NULL; + + efa_rdm_ep = container_of(resource->ep, struct efa_rdm_ep, base_ep.util_ep.ep_fid); + /* close shm_ep to force efa_rdm_ep to use efa device to send */ + if (efa_rdm_ep->shm_ep) { + err = fi_close(&efa_rdm_ep->shm_ep->fid); + assert_int_equal(err, 0); + efa_rdm_ep->shm_ep = NULL; + } + /* set peer->flag to EFA_RDM_PEER_REQ_SENT will make efa_rdm_atomic() think + * a REQ packet has been sent to the peer (so no need to send again) + * handshake has not been received, so we do not know whether the peer support DC + */ + peer = efa_rdm_ep_get_peer(efa_rdm_ep, peer_addr); + peer->flags = EFA_RDM_PEER_REQ_SENT; + peer->is_local = false; + assert_true(dlist_empty(&efa_rdm_ep->txe_list)); + err = fi_sendmsg(resource->ep, &msg, FI_DELIVERY_COMPLETE); + /* DC has been reuquested, but ep do not know whether peer supports it, therefore + * the ope has been queued to domain->ope_queued_list + */ + assert_int_equal(err, 0); + assert_int_equal(efa_unit_test_get_dlist_length(&efa_rdm_ep->txe_list), 1); + assert_int_equal(efa_unit_test_get_dlist_length(&(efa_rdm_ep_domain(efa_rdm_ep)->ope_queued_list)), 1); + txe = container_of(efa_rdm_ep_domain(efa_rdm_ep)->ope_queued_list.next, struct efa_rdm_ope, queued_entry); + assert_true((txe->op == ofi_op_msg)); + assert_true(txe->internal_flags & EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE); +} + + +/** + * @brief verify tx entry is queued for rma (read or write) request before handshake is made. + * + * @param[in] state struct efa_resource that is managed by the framework + * @param[in] op op code + */ +void test_efa_rdm_ep_rma_queue_before_handshake(struct efa_resource **state, int op) +{ + struct efa_resource *resource = *state; + struct efa_rdm_ep *efa_rdm_ep; + struct efa_ep_addr raw_addr = {0}; + size_t raw_addr_len = sizeof(struct efa_ep_addr); + fi_addr_t peer_addr; + int num_addr; + const int buf_len = 8; + char buf[8] = {0}; + int err; + uint64_t rma_addr, rma_key; + struct efa_rdm_ope *txe; + struct efa_rdm_peer *peer; + + resource->hints = efa_unit_test_alloc_hints(FI_EP_RDM); + resource->hints->caps |= FI_MSG | FI_TAGGED | FI_RMA; + resource->hints->domain_attr->mr_mode = FI_MR_BASIC; + efa_unit_test_resource_construct_with_hints(resource, FI_EP_RDM, resource->hints, true, true); + + /* ensure we don't have RMA capability. */ + efa_rdm_ep = container_of(resource->ep, struct efa_rdm_ep, base_ep.util_ep.ep_fid); + + /* create a fake peer */ + err = fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len); + assert_int_equal(err, 0); + raw_addr.qpn = 1; + raw_addr.qkey = 0x1234; + num_addr = fi_av_insert(resource->av, &raw_addr, 1, &peer_addr, 0, NULL); + assert_int_equal(num_addr, 1); + + /* create a fake rma_key and address. fi_read should return before + * they are needed. */ + rma_key = 0x1234; + rma_addr = (uint64_t) &buf; + + /* set peer->flag to EFA_RDM_PEER_REQ_SENT will make efa_rdm_atomic() think + * a REQ packet has been sent to the peer (so no need to send again) + * handshake has not been received, so we do not know whether the peer support DC + */ + peer = efa_rdm_ep_get_peer(efa_rdm_ep, peer_addr); + peer->flags = EFA_RDM_PEER_REQ_SENT; + peer->is_local = false; + + assert_true(dlist_empty(&efa_rdm_ep->txe_list)); + + if (op == ofi_op_read_req) { + err = fi_read(resource->ep, buf, buf_len, + NULL, /* desc, not required */ + peer_addr, + rma_addr, + rma_key, + NULL); /* context */ + } else if (op == ofi_op_write) { + err = fi_write(resource->ep, buf, buf_len, + NULL, /* desc, not required */ + peer_addr, + rma_addr, + rma_key, + NULL); /* context */ + } else { + fprintf(stderr, "Unknown op code %d\n", op); + fail(); + } + assert_int_equal(err, 0); + assert_int_equal(efa_unit_test_get_dlist_length(&efa_rdm_ep->txe_list), 1); + assert_int_equal(efa_unit_test_get_dlist_length(&(efa_rdm_ep_domain(efa_rdm_ep)->ope_queued_list)), 1); + txe = container_of(efa_rdm_ep_domain(efa_rdm_ep)->ope_queued_list.next, struct efa_rdm_ope, queued_entry); + assert_true((txe->op == op)); + assert_true(txe->internal_flags & EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE); +} + +void test_efa_rdm_ep_write_queue_before_handshake(struct efa_resource **state) +{ + test_efa_rdm_ep_rma_queue_before_handshake(state, ofi_op_write); +} + +void test_efa_rdm_ep_read_queue_before_handshake(struct efa_resource **state) +{ + test_efa_rdm_ep_rma_queue_before_handshake(state, ofi_op_read_req); } /** diff --git a/prov/efa/test/efa_unit_tests.c b/prov/efa/test/efa_unit_tests.c index b7533f7d95a..7ac72bd20a9 100644 --- a/prov/efa/test/efa_unit_tests.c +++ b/prov/efa/test/efa_unit_tests.c @@ -91,7 +91,10 @@ int main(void) cmocka_unit_test_setup_teardown(test_efa_rdm_ep_enable_qp_in_order_aligned_128_bytes_bad, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_rdm_ep_pkt_pool_flags, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_rdm_ep_pkt_pool_page_alignment, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), - cmocka_unit_test_setup_teardown(test_efa_rdm_ep_dc_atomic_error_handling, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_rdm_ep_dc_atomic_queue_before_handshake, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_rdm_ep_dc_send_queue_before_handshake, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_rdm_ep_read_queue_before_handshake, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_rdm_ep_write_queue_before_handshake, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_rdm_ep_send_with_shm_no_copy, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_rdm_ep_rma_without_caps, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_rdm_ep_atomic_without_caps, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), diff --git a/prov/efa/test/efa_unit_tests.h b/prov/efa/test/efa_unit_tests.h index b9117aa72d6..20cf32bd6ea 100644 --- a/prov/efa/test/efa_unit_tests.h +++ b/prov/efa/test/efa_unit_tests.h @@ -102,7 +102,10 @@ void test_efa_rdm_ep_getopt_undersized_optlen(); void test_efa_rdm_ep_getopt_oversized_optlen(); void test_efa_rdm_ep_pkt_pool_flags(); void test_efa_rdm_ep_pkt_pool_page_alignment(); -void test_efa_rdm_ep_dc_atomic_error_handling(); +void test_efa_rdm_ep_dc_atomic_queue_before_handshake(); +void test_efa_rdm_ep_dc_send_queue_before_handshake(); +void test_efa_rdm_ep_write_queue_before_handshake(); +void test_efa_rdm_ep_read_queue_before_handshake(); void test_efa_rdm_ep_send_with_shm_no_copy(); void test_efa_rdm_ep_rma_without_caps(); void test_efa_rdm_ep_atomic_without_caps(); @@ -175,4 +178,17 @@ void test_efa_rdm_cq_ibv_cq_poll_list_same_tx_rx_cq_single_ep(); void test_efa_rdm_cq_ibv_cq_poll_list_separate_tx_rx_cq_single_ep(); void test_efa_rdm_cntr_ibv_cq_poll_list_same_tx_rx_cq_single_ep(); void test_efa_rdm_cntr_ibv_cq_poll_list_separate_tx_rx_cq_single_ep(); + +static inline +int efa_unit_test_get_dlist_length(struct dlist_entry *head) +{ + int i = 0; + struct dlist_entry *item; + + dlist_foreach(head, item) { + i++; + } + + return i; +} #endif