-
Notifications
You must be signed in to change notification settings - Fork 5
/
na_ucx.patch
110 lines (97 loc) · 4.71 KB
/
na_ucx.patch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
diff --git a/src/na/na_ucx.c b/src/na/na_ucx.c
index 84eb8b0..e4b6676 100644
--- a/src/na/na_ucx.c
+++ b/src/na/na_ucx.c
@@ -614,7 +614,7 @@ na_ucx_addr_map_update(struct na_ucx_class *na_ucx_class,
*/
static na_return_t
na_ucx_addr_map_remove(
- struct na_ucx_map *na_ucx_map, ucs_sock_addr_t *addr_key);
+ struct na_ucx_map *na_ucx_map, struct na_ucx_addr *remove_addr);
/**
* Hash connection ID.
@@ -1688,8 +1688,12 @@ na_ucp_listener_conn_cb(ucp_conn_request_h conn_request, void *arg)
.addr = (const struct sockaddr *) &conn_request_attrs.client_address,
.addrlen = sizeof(conn_request_attrs.client_address)};
na_ucx_addr = na_ucx_addr_map_lookup(&na_ucx_class->addr_map, &addr_key);
- NA_CHECK_SUBSYS_ERROR_NORET(addr, na_ucx_addr != NULL, error,
- "An entry is already present for this address");
+
+ if (na_ucx_addr != NULL) {
+ NA_LOG_SUBSYS_WARNING(addr,
+ "An entry is already present for this address");
+ na_ucx_addr_map_remove(&na_ucx_class->addr_map, na_ucx_addr);
+ }
/* Insert new entry and create new address */
na_ret = na_ucx_addr_map_insert(na_ucx_class, &na_ucx_class->addr_map,
@@ -1937,10 +1941,14 @@ na_ucp_ep_error_cb(
static void
na_ucp_ep_close(ucp_ep_h ep)
{
- ucs_status_ptr_t status_ptr = ucp_ep_close_nb(ep, UCP_EP_CLOSE_MODE_FORCE);
+ const ucp_request_param_t close_params = {
+ .op_attr_mask = UCP_OP_ATTR_FIELD_FLAGS,
+ .flags = UCP_EP_CLOSE_FLAG_FORCE};
+ ucs_status_ptr_t status_ptr = ucp_ep_close_nbx(ep, &close_params);
+
NA_CHECK_SUBSYS_ERROR_DONE(addr,
status_ptr != NULL && UCS_PTR_IS_ERR(status_ptr),
- "ucp_ep_close_nb() failed (%s)",
+ "ucp_ep_close_nbx() failed (%s)",
ucs_status_string(UCS_PTR_STATUS(status_ptr)));
}
@@ -2722,7 +2730,7 @@ unlock:
/*---------------------------------------------------------------------------*/
static na_return_t
-na_ucx_addr_map_remove(struct na_ucx_map *na_ucx_map, ucs_sock_addr_t *addr_key)
+na_ucx_addr_map_remove(struct na_ucx_map *na_ucx_map, struct na_ucx_addr *remove_addr)
{
struct na_ucx_addr *na_ucx_addr = NULL;
na_return_t ret = NA_SUCCESS;
@@ -2731,13 +2739,14 @@ na_ucx_addr_map_remove(struct na_ucx_map *na_ucx_map, ucs_sock_addr_t *addr_key)
hg_thread_rwlock_wrlock(&na_ucx_map->lock);
na_ucx_addr = hg_hash_table_lookup(
- na_ucx_map->key_map, (hg_hash_table_key_t) addr_key);
- if (na_ucx_addr == HG_HASH_TABLE_NULL)
+ na_ucx_map->key_map, (hg_hash_table_key_t) &remove_addr->addr_key);
+
+ if (na_ucx_addr == HG_HASH_TABLE_NULL || na_ucx_addr->ucp_ep != remove_addr->ucp_ep)
goto unlock;
/* Remove addr key from primary map */
rc = hg_hash_table_remove(
- na_ucx_map->key_map, (hg_hash_table_key_t) addr_key);
+ na_ucx_map->key_map, (hg_hash_table_key_t) &na_ucx_addr->addr_key);
NA_CHECK_SUBSYS_ERROR(addr, rc != 1, unlock, ret, NA_NOENTRY,
"hg_hash_table_remove() failed");
@@ -2841,7 +2850,7 @@ na_ucx_addr_release(struct na_ucx_addr *na_ucx_addr)
NA_UCX_PRINT_ADDR_KEY_INFO("Removing address", &na_ucx_addr->addr_key);
na_ucx_addr_map_remove(
- &na_ucx_addr->na_ucx_class->addr_map, &na_ucx_addr->addr_key);
+ &na_ucx_addr->na_ucx_class->addr_map, na_ucx_addr);
}
if (na_ucx_addr->ucp_ep != NULL) {
@@ -3023,6 +3032,18 @@ na_ucx_rma(struct na_ucx_class NA_UNUSED *na_ucx_class, na_context_t *context,
/* There is no need to have a fully resolved address to start an RMA.
* This is only necessary for two-sided communication. */
+ /* The above assumption is now in question, so the following will resolve
+ * the address if required. */
+
+ /* Check addr to ensure the EP for that addr is still valid */
+ if (!(hg_atomic_get32(&na_ucx_addr->status) & NA_UCX_ADDR_RESOLVED)) {
+ ret = na_ucx_addr_map_update(
+ na_ucx_class, &na_ucx_class->addr_map, na_ucx_addr);
+ NA_CHECK_SUBSYS_NA_ERROR(
+ addr, error, ret, "Could not update NA UCX address");
+ }
+ NA_CHECK_SUBSYS_ERROR(msg, na_ucx_addr->ucp_ep == NULL, error, ret,
+ NA_ADDRNOTAVAIL, "UCP endpoint is NULL for that address");
/* TODO UCX requires the remote key to be bound to the origin, do we need a
* new API? */
@@ -3061,6 +3082,9 @@ na_ucx_rma_key_resolve(ucp_ep_h ep, struct na_ucx_mem_handle *na_ucx_mem_handle,
hg_thread_mutex_lock(&na_ucx_mem_handle->rkey_unpack_lock);
+ NA_CHECK_SUBSYS_ERROR(
+ mem, ep == NULL, error, ret, NA_INVALID_ARG, "Invalid endpoint (%p)", ep);
+
switch (hg_atomic_get32(&na_ucx_mem_handle->type)) {
case NA_UCX_MEM_HANDLE_REMOTE_PACKED: {
ucs_status_t status = ucp_ep_rkey_unpack(ep,