From 6099ebdeda3054b2295ea4494fcb338330f6b9ed Mon Sep 17 00:00:00 2001 From: Lucy Ge Date: Fri, 1 Dec 2023 14:12:30 -0800 Subject: [PATCH 1/8] only allow multiple workers assume same worker identity in k8s env --- .../main/java/alluxio/conf/PropertyKey.java | 10 +++++ .../membership/EtcdMembershipManager.java | 30 ++++++++----- .../membership/MembershipManagerTest.java | 45 +++++++++++++++++++ 3 files changed, 75 insertions(+), 10 deletions(-) diff --git a/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java b/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java index 60320a1d1273..d3a13426a869 100755 --- a/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java +++ b/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java @@ -4087,6 +4087,14 @@ public String toString() { .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.WORKER) .build(); + public static final PropertyKey WORKER_IN_K8S_ENV = + booleanBuilder(Name.WORKER_IN_K8S_ENV) + .setDefaultValue(false) + .setDescription("Indicate if worker deployed in K8s environment.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) + .setScope(Scope.WORKER) + .setIsHidden(true) + .build(); public static final PropertyKey WORKER_KEYTAB_FILE = stringBuilder(Name.WORKER_KEYTAB_FILE) .setDescription("Kerberos keytab file for Alluxio worker.") .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) @@ -7989,6 +7997,8 @@ public static final class Name { public static final String WORKER_IDENTITY_UUID = "alluxio.worker.identity.uuid"; public static final String WORKER_IDENTITY_UUID_FILE_PATH = "alluxio.worker.identity.uuid.file.path"; + public static final String WORKER_IN_K8S_ENV = + "alluxio.worker.in.k8s.env"; public static final String WORKER_KEYTAB_FILE = "alluxio.worker.keytab.file"; public static final String WORKER_MASTER_CONNECT_RETRY_TIMEOUT = "alluxio.worker.master.connect.retry.timeout"; diff --git a/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java b/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java index e5cae8eeaea8..877bd43c5d92 100644 --- a/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java +++ b/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java @@ -13,6 +13,7 @@ import alluxio.conf.AlluxioConfiguration; import alluxio.conf.PropertyKey; +import alluxio.exception.status.AlreadyExistsException; import alluxio.util.CommonUtils; import alluxio.wire.WorkerIdentity; import alluxio.wire.WorkerInfo; @@ -93,16 +94,25 @@ public void join(WorkerInfo workerInfo) throws IOException { if (existingEntityBytes != null) { // It's not me, or not the same me. if (!Arrays.equals(existingEntityBytes, serializedEntity)) { - // In k8s this might be bcos worker pod restarting with the same worker identity - // but certain fields such as hostname has been changed. Register to ring path anyway. - WorkerServiceEntity existingEntity = new WorkerServiceEntity(); - existingEntity.deserialize(existingEntityBytes); - LOG.warn("Same worker entity found bearing same workerid:{}," - + "existing WorkerServiceEntity to be overwritten:{}," - + "maybe benign if pod restart in k8s env or same worker" - + " scheduled to restart on another machine in baremetal env.", - workerInfo.getIdentity().toString(), existingEntity); - mAlluxioEtcdClient.createForPath(pathOnRing, Optional.of(serializedEntity)); + if (mConf.isSet(PropertyKey.WORKER_IN_K8S_ENV) + && mConf.getBoolean(PropertyKey.WORKER_IN_K8S_ENV)) { + // In k8s this might be bcos worker pod restarting with the same worker identity + // but certain fields such as hostname has been changed. Register to ring path anyway. + WorkerServiceEntity existingEntity = new WorkerServiceEntity(); + existingEntity.deserialize(existingEntityBytes); + LOG.warn("Same worker entity found bearing same workerid:{}," + + "existing WorkerServiceEntity to be overwritten:{}," + + "maybe benign if pod restart in k8s env or same worker" + + " scheduled to restart on another machine in baremetal env.", + workerInfo.getIdentity().toString(), existingEntity); + mAlluxioEtcdClient.createForPath(pathOnRing, Optional.of(serializedEntity)); + } else { + // + throw new AlreadyExistsException( + "Some other member with same id registered on the ring, bail." + + "Different workers can't assume same worker identity in non-k8s env." + + "Clean local worker identity settings to continue."); + } } // It's me, go ahead to start heartbeating. } else { diff --git a/dora/tests/testcontainers/src/test/java/alluxio/membership/MembershipManagerTest.java b/dora/tests/testcontainers/src/test/java/alluxio/membership/MembershipManagerTest.java index 8dcf347fd97b..dec522ff82ea 100644 --- a/dora/tests/testcontainers/src/test/java/alluxio/membership/MembershipManagerTest.java +++ b/dora/tests/testcontainers/src/test/java/alluxio/membership/MembershipManagerTest.java @@ -13,8 +13,10 @@ import alluxio.conf.Configuration; import alluxio.conf.PropertyKey; +import alluxio.exception.status.AlreadyExistsException; import alluxio.util.CommonUtils; import alluxio.util.WaitForOptions; +import alluxio.wire.WorkerIdentity; import alluxio.wire.WorkerIdentityTestUtils; import alluxio.wire.WorkerInfo; import alluxio.wire.WorkerNetAddress; @@ -349,4 +351,47 @@ public void testStaticMembership() throws Exception { .collect(Collectors.toList()); Assert.assertEquals(wkrHosts, allMemberHosts); } + + @Test + public void testSameWorkerIdentityConflict() throws Exception { + final MembershipManager membershipManager = getHealthyEtcdMemberMgr(); + // in non-k8s env, no two workers can assume same identity, unless + Configuration.set(PropertyKey.WORKER_MEMBERSHIP_MANAGER_TYPE, MembershipType.ETCD); + Configuration.set(PropertyKey.ETCD_ENDPOINTS, getClientEndpoints()); + Assert.assertTrue(membershipManager instanceof EtcdMembershipManager); + WorkerIdentity workerIdentity1 = WorkerIdentityTestUtils.randomUuidBasedId(); + WorkerInfo wkr1 = new WorkerInfo() + .setIdentity(workerIdentity1) + .setAddress(new WorkerNetAddress() + .setHost("worker1").setContainerHost("containerhostname1") + .setRpcPort(1000).setDataPort(1001).setWebPort(1011) + .setDomainSocketPath("/var/lib/domain.sock")); + WorkerInfo wkr2 = new WorkerInfo() + .setIdentity(workerIdentity1) + .setAddress(new WorkerNetAddress() + .setHost("worker2").setContainerHost("containerhostname2") + .setRpcPort(2000).setDataPort(2001).setWebPort(2011) + .setDomainSocketPath("/var/lib/domain.sock")); + membershipManager.join(wkr1); + // bring wrk1 down and join wrk2 with a same worker identity. + membershipManager.stopHeartBeat(wkr1); + CommonUtils.waitFor("wkr1 is not alive.", () -> { + try { + return membershipManager.getFailedMembers().getWorkerById(workerIdentity1).isPresent(); + } catch (IOException e) { + // IGNORE + return false; + } + }, WaitForOptions.defaults().setTimeoutMs(5000)); + try { + membershipManager.join(wkr2); + } catch (IOException ex) { + Assert.assertTrue(ex instanceof AlreadyExistsException); + } + + // only in k8s env, it should allow same worker identity assumption. + Configuration.set(PropertyKey.WORKER_IN_K8S_ENV, true); + final MembershipManager membershipManager1 = getHealthyEtcdMemberMgr(); + membershipManager1.join(wkr2); + } } From ce53e6353ee65ba61b1abfb38dd6e3231613b7bc Mon Sep 17 00:00:00 2001 From: Lucy Ge Date: Tue, 5 Dec 2023 10:59:20 -0800 Subject: [PATCH 2/8] address review comments -1 --- .../main/java/alluxio/conf/PropertyKey.java | 21 ++++++++++--------- .../membership/EtcdMembershipManager.java | 4 ++-- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java b/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java index d3a13426a869..c94664e2f16a 100755 --- a/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java +++ b/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java @@ -1048,6 +1048,15 @@ public String toString() { .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) .setScope(Scope.MASTER) .build(); + public static final PropertyKey K8S_ENV_DEPLOYMENT = + booleanBuilder(Name.K8S_ENV_DEPLOYMENT) + .setDefaultValue(false) + .setDescription("If Alluxio is deployed in K8s environment.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) + .setScope(Scope.ALL) + .setIsHidden(true) + .build(); + /** * UFS related properties. */ @@ -4087,14 +4096,6 @@ public String toString() { .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.WORKER) .build(); - public static final PropertyKey WORKER_IN_K8S_ENV = - booleanBuilder(Name.WORKER_IN_K8S_ENV) - .setDefaultValue(false) - .setDescription("Indicate if worker deployed in K8s environment.") - .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) - .setScope(Scope.WORKER) - .setIsHidden(true) - .build(); public static final PropertyKey WORKER_KEYTAB_FILE = stringBuilder(Name.WORKER_KEYTAB_FILE) .setDescription("Kerberos keytab file for Alluxio worker.") .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) @@ -7333,6 +7334,8 @@ public static final class Name { public static final String ZOOKEEPER_AUTH_ENABLED = "alluxio.zookeeper.auth.enabled"; public static final String ZOOKEEPER_LEADER_CONNECTION_ERROR_POLICY = "alluxio.zookeeper.leader.connection.error.policy"; + public static final String K8S_ENV_DEPLOYMENT = + "alluxio.k8s.env.deployment"; // // UFS related properties // @@ -7997,8 +8000,6 @@ public static final class Name { public static final String WORKER_IDENTITY_UUID = "alluxio.worker.identity.uuid"; public static final String WORKER_IDENTITY_UUID_FILE_PATH = "alluxio.worker.identity.uuid.file.path"; - public static final String WORKER_IN_K8S_ENV = - "alluxio.worker.in.k8s.env"; public static final String WORKER_KEYTAB_FILE = "alluxio.worker.keytab.file"; public static final String WORKER_MASTER_CONNECT_RETRY_TIMEOUT = "alluxio.worker.master.connect.retry.timeout"; diff --git a/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java b/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java index 877bd43c5d92..415458706b0c 100644 --- a/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java +++ b/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java @@ -94,8 +94,8 @@ public void join(WorkerInfo workerInfo) throws IOException { if (existingEntityBytes != null) { // It's not me, or not the same me. if (!Arrays.equals(existingEntityBytes, serializedEntity)) { - if (mConf.isSet(PropertyKey.WORKER_IN_K8S_ENV) - && mConf.getBoolean(PropertyKey.WORKER_IN_K8S_ENV)) { + if (mConf.isSet(PropertyKey.K8S_ENV_DEPLOYMENT) + && mConf.getBoolean(PropertyKey.K8S_ENV_DEPLOYMENT)) { // In k8s this might be bcos worker pod restarting with the same worker identity // but certain fields such as hostname has been changed. Register to ring path anyway. WorkerServiceEntity existingEntity = new WorkerServiceEntity(); From b776240d52884525d63abb0e86ae2ef86d5a9cbc Mon Sep 17 00:00:00 2001 From: Lucy Ge Date: Tue, 5 Dec 2023 11:56:52 -0800 Subject: [PATCH 3/8] compile error --- .../src/test/java/alluxio/membership/MembershipManagerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dora/tests/testcontainers/src/test/java/alluxio/membership/MembershipManagerTest.java b/dora/tests/testcontainers/src/test/java/alluxio/membership/MembershipManagerTest.java index dec522ff82ea..2f8013c7b510 100644 --- a/dora/tests/testcontainers/src/test/java/alluxio/membership/MembershipManagerTest.java +++ b/dora/tests/testcontainers/src/test/java/alluxio/membership/MembershipManagerTest.java @@ -390,7 +390,7 @@ public void testSameWorkerIdentityConflict() throws Exception { } // only in k8s env, it should allow same worker identity assumption. - Configuration.set(PropertyKey.WORKER_IN_K8S_ENV, true); + Configuration.set(PropertyKey.K8S_ENV_DEPLOYMENT, true); final MembershipManager membershipManager1 = getHealthyEtcdMemberMgr(); membershipManager1.join(wkr2); } From 6c3e75f4b22de54066f68ddb7d9ac3225f152b7b Mon Sep 17 00:00:00 2001 From: Lucy Ge Date: Tue, 5 Dec 2023 15:25:15 -0800 Subject: [PATCH 4/8] address review comments - 2 --- .../membership/EtcdMembershipManager.java | 81 ++++++++++++------- .../membership/MembershipManagerTest.java | 6 ++ 2 files changed, 60 insertions(+), 27 deletions(-) diff --git a/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java b/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java index 415458706b0c..bff032a1a699 100644 --- a/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java +++ b/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java @@ -20,15 +20,27 @@ import com.google.common.annotations.VisibleForTesting; import com.google.gson.JsonParseException; +import io.etcd.jetcd.ByteSequence; import io.etcd.jetcd.KeyValue; +import io.etcd.jetcd.Txn; +import io.etcd.jetcd.kv.TxnResponse; +import io.etcd.jetcd.op.Cmp; +import io.etcd.jetcd.op.CmpTarget; +import io.etcd.jetcd.op.Op; +import io.etcd.jetcd.options.GetOption; +import io.etcd.jetcd.options.PutOption; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -84,40 +96,55 @@ public void join(WorkerInfo workerInfo) throws IOException { LOG.info("Try joining on etcd for worker:{} ", workerInfo); WorkerServiceEntity entity = new WorkerServiceEntity(workerInfo.getIdentity(), workerInfo.getAddress()); - // 1) register to the ring, check if there's existing entry String pathOnRing = new StringBuffer() .append(getRingPathPrefix()) .append(entity.getServiceEntityName()).toString(); - byte[] existingEntityBytes = mAlluxioEtcdClient.getForPath(pathOnRing); byte[] serializedEntity = entity.serialize(); - // If there's existing entry, check if it's me. - if (existingEntityBytes != null) { - // It's not me, or not the same me. - if (!Arrays.equals(existingEntityBytes, serializedEntity)) { - if (mConf.isSet(PropertyKey.K8S_ENV_DEPLOYMENT) - && mConf.getBoolean(PropertyKey.K8S_ENV_DEPLOYMENT)) { - // In k8s this might be bcos worker pod restarting with the same worker identity - // but certain fields such as hostname has been changed. Register to ring path anyway. - WorkerServiceEntity existingEntity = new WorkerServiceEntity(); - existingEntity.deserialize(existingEntityBytes); - LOG.warn("Same worker entity found bearing same workerid:{}," - + "existing WorkerServiceEntity to be overwritten:{}," - + "maybe benign if pod restart in k8s env or same worker" - + " scheduled to restart on another machine in baremetal env.", - workerInfo.getIdentity().toString(), existingEntity); - mAlluxioEtcdClient.createForPath(pathOnRing, Optional.of(serializedEntity)); - } else { - // + // 1) register to the ring. + // CompareAndSet if no existing registered entry, if exist such key, two cases: + // a) it's k8s env, still register no matter what + // b) it's not k8s env, compare the registered entity content, if it's me + // then no op, if not, we don't allow overwriting the existing entity. + try { + boolean isK8s = mConf.isSet(PropertyKey.K8S_ENV_DEPLOYMENT) + && mConf.getBoolean(PropertyKey.K8S_ENV_DEPLOYMENT); + Txn txn = mAlluxioEtcdClient.getEtcdClient().getKVClient().txn(); + ByteSequence keyToPut = ByteSequence.from(pathOnRing, StandardCharsets.UTF_8); + ByteSequence valToPut = ByteSequence.from(serializedEntity); + CompletableFuture txnResponseFut = txn + .If(new Cmp(keyToPut, Cmp.Op.EQUAL, CmpTarget.createRevision(0L))) + .Then(Op.put(keyToPut, valToPut, PutOption.newBuilder().build())) + .Else(isK8s ? Op.put(keyToPut, valToPut, PutOption.newBuilder().build()) : + Op.get(keyToPut, GetOption.DEFAULT)) + .commit(); + TxnResponse txnResponse = txnResponseFut.get(); + if (!isK8s && !txnResponse.isSucceeded()) { + // service kv already exists, for non-k8s env, check if it's me. + // bail if it's someone else. + List kvs = new ArrayList<>(); + txnResponse.getGetResponses().stream().map( + r -> kvs.addAll(r.getKvs())).collect(Collectors.toList()); + Optional latestKV = kvs.stream() + .max((kv1, kv2) -> (int) (kv1.getModRevision() - kv2.getModRevision())); + if (latestKV.isPresent() + && !Arrays.equals(latestKV.get().getValue().getBytes(), serializedEntity)) { + Optional existingEntity = parseWorkerServiceEntity(latestKV.get()); + if (!existingEntity.isPresent()) { + throw new IOException(String.format( + "Existing WorkerServiceEntity for path:%s corrupted", + pathOnRing)); + } throw new AlreadyExistsException( - "Some other member with same id registered on the ring, bail." - + "Different workers can't assume same worker identity in non-k8s env." - + "Clean local worker identity settings to continue."); + String.format("Some other member with same id registered on the ring, bail." + + "Conflicting worker addr:%s, worker identity:%s." + + "Different workers can't assume same worker identity in non-k8s env," + + "clean local worker identity settings to continue.", + existingEntity.get().getWorkerNetAddress().toString(), + existingEntity.get().getIdentity())); } } - // It's me, go ahead to start heartbeating. - } else { - // If haven't created myself onto the ring before, create now. - mAlluxioEtcdClient.createForPath(pathOnRing, Optional.of(serializedEntity)); + } catch (InterruptedException | ExecutionException e) { + throw new IOException(e); } // 2) start heartbeat mAlluxioEtcdClient.mServiceDiscovery.registerAndStartSync(entity); diff --git a/dora/tests/testcontainers/src/test/java/alluxio/membership/MembershipManagerTest.java b/dora/tests/testcontainers/src/test/java/alluxio/membership/MembershipManagerTest.java index 2f8013c7b510..f6455ea90e04 100644 --- a/dora/tests/testcontainers/src/test/java/alluxio/membership/MembershipManagerTest.java +++ b/dora/tests/testcontainers/src/test/java/alluxio/membership/MembershipManagerTest.java @@ -47,6 +47,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -393,5 +394,10 @@ public void testSameWorkerIdentityConflict() throws Exception { Configuration.set(PropertyKey.K8S_ENV_DEPLOYMENT, true); final MembershipManager membershipManager1 = getHealthyEtcdMemberMgr(); membershipManager1.join(wkr2); + // check if joined with correct info onto etcd + Optional curWorkerInfo = membershipManager1.getLiveMembers() + .getWorkerById(workerIdentity1); + Assert.assertTrue(curWorkerInfo.isPresent()); + Assert.assertEquals(wkr2.getAddress(), curWorkerInfo.get().getAddress()); } } From a76c7d8420396cce7932fa4d90ce82e8689ad0b2 Mon Sep 17 00:00:00 2001 From: Lucy Ge Date: Tue, 5 Dec 2023 15:32:36 -0800 Subject: [PATCH 5/8] correct CmpTarget --- .../src/main/java/alluxio/membership/EtcdMembershipManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java b/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java index bff032a1a699..4b3fe57bc46c 100644 --- a/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java +++ b/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java @@ -112,7 +112,7 @@ public void join(WorkerInfo workerInfo) throws IOException { ByteSequence keyToPut = ByteSequence.from(pathOnRing, StandardCharsets.UTF_8); ByteSequence valToPut = ByteSequence.from(serializedEntity); CompletableFuture txnResponseFut = txn - .If(new Cmp(keyToPut, Cmp.Op.EQUAL, CmpTarget.createRevision(0L))) + .If(new Cmp(keyToPut, Cmp.Op.EQUAL, CmpTarget.version(0L))) .Then(Op.put(keyToPut, valToPut, PutOption.newBuilder().build())) .Else(isK8s ? Op.put(keyToPut, valToPut, PutOption.newBuilder().build()) : Op.get(keyToPut, GetOption.DEFAULT)) From c8544a09f3eee7293d0c2394181d9b43d84b4f9d Mon Sep 17 00:00:00 2001 From: Lucy Ge Date: Tue, 5 Dec 2023 16:23:13 -0800 Subject: [PATCH 6/8] log exception out in register phase --- .../src/main/java/alluxio/worker/dora/PagedDoraWorker.java | 1 + 1 file changed, 1 insertion(+) diff --git a/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java b/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java index 5777179f9527..9f2ad2c1bfcb 100644 --- a/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java +++ b/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java @@ -301,6 +301,7 @@ private void register() throws IOException { * instance might assume same worker id in k8s pod restart situation. There might * be gaps in updating etcd states in the interim of transition. */ + LOG.error("Exception in join membership:", e); if (!retry.attempt()) { throw e; } From eabee1b0e880298126c609799675c17385b61ef6 Mon Sep 17 00:00:00 2001 From: lucyge2022 <111789461+lucyge2022@users.noreply.github.com> Date: Thu, 7 Dec 2023 12:04:08 -0800 Subject: [PATCH 7/8] Update dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java Co-authored-by: Bowen Ding <6999708+dbw9580@users.noreply.github.com> --- .../src/main/java/alluxio/membership/EtcdMembershipManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java b/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java index 4b3fe57bc46c..2ae0b50f4321 100644 --- a/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java +++ b/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java @@ -102,7 +102,7 @@ public void join(WorkerInfo workerInfo) throws IOException { byte[] serializedEntity = entity.serialize(); // 1) register to the ring. // CompareAndSet if no existing registered entry, if exist such key, two cases: - // a) it's k8s env, still register no matter what + // a) it's k8s env, still register, overwriting the existing entry // b) it's not k8s env, compare the registered entity content, if it's me // then no op, if not, we don't allow overwriting the existing entity. try { From 33fd32955c464dce8c926162c6c6b7729b951b8d Mon Sep 17 00:00:00 2001 From: Lucy Ge Date: Thu, 7 Dec 2023 13:15:03 -0800 Subject: [PATCH 8/8] review comments --- .../src/main/java/alluxio/membership/EtcdMembershipManager.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java b/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java index 2ae0b50f4321..4db32bb163c9 100644 --- a/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java +++ b/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java @@ -112,6 +112,8 @@ public void join(WorkerInfo workerInfo) throws IOException { ByteSequence keyToPut = ByteSequence.from(pathOnRing, StandardCharsets.UTF_8); ByteSequence valToPut = ByteSequence.from(serializedEntity); CompletableFuture txnResponseFut = txn + // version of the key indicates number of modification, 0 means + // this key does not exist .If(new Cmp(keyToPut, Cmp.Op.EQUAL, CmpTarget.version(0L))) .Then(Op.put(keyToPut, valToPut, PutOption.newBuilder().build())) .Else(isK8s ? Op.put(keyToPut, valToPut, PutOption.newBuilder().build()) :