diff --git a/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java b/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java index 60320a1d1273..c94664e2f16a 100755 --- a/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java +++ b/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java @@ -1048,6 +1048,15 @@ public String toString() { .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) .setScope(Scope.MASTER) .build(); + public static final PropertyKey K8S_ENV_DEPLOYMENT = + booleanBuilder(Name.K8S_ENV_DEPLOYMENT) + .setDefaultValue(false) + .setDescription("If Alluxio is deployed in K8s environment.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) + .setScope(Scope.ALL) + .setIsHidden(true) + .build(); + /** * UFS related properties. */ @@ -7325,6 +7334,8 @@ public static final class Name { public static final String ZOOKEEPER_AUTH_ENABLED = "alluxio.zookeeper.auth.enabled"; public static final String ZOOKEEPER_LEADER_CONNECTION_ERROR_POLICY = "alluxio.zookeeper.leader.connection.error.policy"; + public static final String K8S_ENV_DEPLOYMENT = + "alluxio.k8s.env.deployment"; // // UFS related properties // diff --git a/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java b/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java index e5cae8eeaea8..4db32bb163c9 100644 --- a/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java +++ b/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java @@ -13,21 +13,34 @@ import alluxio.conf.AlluxioConfiguration; import alluxio.conf.PropertyKey; +import alluxio.exception.status.AlreadyExistsException; import alluxio.util.CommonUtils; import alluxio.wire.WorkerIdentity; import alluxio.wire.WorkerInfo; import com.google.common.annotations.VisibleForTesting; import com.google.gson.JsonParseException; +import io.etcd.jetcd.ByteSequence; import io.etcd.jetcd.KeyValue; +import io.etcd.jetcd.Txn; +import io.etcd.jetcd.kv.TxnResponse; +import io.etcd.jetcd.op.Cmp; +import io.etcd.jetcd.op.CmpTarget; +import io.etcd.jetcd.op.Op; +import io.etcd.jetcd.options.GetOption; +import io.etcd.jetcd.options.PutOption; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -83,31 +96,57 @@ public void join(WorkerInfo workerInfo) throws IOException { LOG.info("Try joining on etcd for worker:{} ", workerInfo); WorkerServiceEntity entity = new WorkerServiceEntity(workerInfo.getIdentity(), workerInfo.getAddress()); - // 1) register to the ring, check if there's existing entry String pathOnRing = new StringBuffer() .append(getRingPathPrefix()) .append(entity.getServiceEntityName()).toString(); - byte[] existingEntityBytes = mAlluxioEtcdClient.getForPath(pathOnRing); byte[] serializedEntity = entity.serialize(); - // If there's existing entry, check if it's me. - if (existingEntityBytes != null) { - // It's not me, or not the same me. - if (!Arrays.equals(existingEntityBytes, serializedEntity)) { - // In k8s this might be bcos worker pod restarting with the same worker identity - // but certain fields such as hostname has been changed. Register to ring path anyway. - WorkerServiceEntity existingEntity = new WorkerServiceEntity(); - existingEntity.deserialize(existingEntityBytes); - LOG.warn("Same worker entity found bearing same workerid:{}," - + "existing WorkerServiceEntity to be overwritten:{}," - + "maybe benign if pod restart in k8s env or same worker" - + " scheduled to restart on another machine in baremetal env.", - workerInfo.getIdentity().toString(), existingEntity); - mAlluxioEtcdClient.createForPath(pathOnRing, Optional.of(serializedEntity)); + // 1) register to the ring. + // CompareAndSet if no existing registered entry, if exist such key, two cases: + // a) it's k8s env, still register, overwriting the existing entry + // b) it's not k8s env, compare the registered entity content, if it's me + // then no op, if not, we don't allow overwriting the existing entity. + try { + boolean isK8s = mConf.isSet(PropertyKey.K8S_ENV_DEPLOYMENT) + && mConf.getBoolean(PropertyKey.K8S_ENV_DEPLOYMENT); + Txn txn = mAlluxioEtcdClient.getEtcdClient().getKVClient().txn(); + ByteSequence keyToPut = ByteSequence.from(pathOnRing, StandardCharsets.UTF_8); + ByteSequence valToPut = ByteSequence.from(serializedEntity); + CompletableFuture txnResponseFut = txn + // version of the key indicates number of modification, 0 means + // this key does not exist + .If(new Cmp(keyToPut, Cmp.Op.EQUAL, CmpTarget.version(0L))) + .Then(Op.put(keyToPut, valToPut, PutOption.newBuilder().build())) + .Else(isK8s ? Op.put(keyToPut, valToPut, PutOption.newBuilder().build()) : + Op.get(keyToPut, GetOption.DEFAULT)) + .commit(); + TxnResponse txnResponse = txnResponseFut.get(); + if (!isK8s && !txnResponse.isSucceeded()) { + // service kv already exists, for non-k8s env, check if it's me. + // bail if it's someone else. + List kvs = new ArrayList<>(); + txnResponse.getGetResponses().stream().map( + r -> kvs.addAll(r.getKvs())).collect(Collectors.toList()); + Optional latestKV = kvs.stream() + .max((kv1, kv2) -> (int) (kv1.getModRevision() - kv2.getModRevision())); + if (latestKV.isPresent() + && !Arrays.equals(latestKV.get().getValue().getBytes(), serializedEntity)) { + Optional existingEntity = parseWorkerServiceEntity(latestKV.get()); + if (!existingEntity.isPresent()) { + throw new IOException(String.format( + "Existing WorkerServiceEntity for path:%s corrupted", + pathOnRing)); + } + throw new AlreadyExistsException( + String.format("Some other member with same id registered on the ring, bail." + + "Conflicting worker addr:%s, worker identity:%s." + + "Different workers can't assume same worker identity in non-k8s env," + + "clean local worker identity settings to continue.", + existingEntity.get().getWorkerNetAddress().toString(), + existingEntity.get().getIdentity())); + } } - // It's me, go ahead to start heartbeating. - } else { - // If haven't created myself onto the ring before, create now. - mAlluxioEtcdClient.createForPath(pathOnRing, Optional.of(serializedEntity)); + } catch (InterruptedException | ExecutionException e) { + throw new IOException(e); } // 2) start heartbeat mAlluxioEtcdClient.mServiceDiscovery.registerAndStartSync(entity); diff --git a/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java b/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java index 5777179f9527..9f2ad2c1bfcb 100644 --- a/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java +++ b/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java @@ -301,6 +301,7 @@ private void register() throws IOException { * instance might assume same worker id in k8s pod restart situation. There might * be gaps in updating etcd states in the interim of transition. */ + LOG.error("Exception in join membership:", e); if (!retry.attempt()) { throw e; } diff --git a/dora/tests/testcontainers/src/test/java/alluxio/membership/MembershipManagerTest.java b/dora/tests/testcontainers/src/test/java/alluxio/membership/MembershipManagerTest.java index 8dcf347fd97b..f6455ea90e04 100644 --- a/dora/tests/testcontainers/src/test/java/alluxio/membership/MembershipManagerTest.java +++ b/dora/tests/testcontainers/src/test/java/alluxio/membership/MembershipManagerTest.java @@ -13,8 +13,10 @@ import alluxio.conf.Configuration; import alluxio.conf.PropertyKey; +import alluxio.exception.status.AlreadyExistsException; import alluxio.util.CommonUtils; import alluxio.util.WaitForOptions; +import alluxio.wire.WorkerIdentity; import alluxio.wire.WorkerIdentityTestUtils; import alluxio.wire.WorkerInfo; import alluxio.wire.WorkerNetAddress; @@ -45,6 +47,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -349,4 +352,52 @@ public void testStaticMembership() throws Exception { .collect(Collectors.toList()); Assert.assertEquals(wkrHosts, allMemberHosts); } + + @Test + public void testSameWorkerIdentityConflict() throws Exception { + final MembershipManager membershipManager = getHealthyEtcdMemberMgr(); + // in non-k8s env, no two workers can assume same identity, unless + Configuration.set(PropertyKey.WORKER_MEMBERSHIP_MANAGER_TYPE, MembershipType.ETCD); + Configuration.set(PropertyKey.ETCD_ENDPOINTS, getClientEndpoints()); + Assert.assertTrue(membershipManager instanceof EtcdMembershipManager); + WorkerIdentity workerIdentity1 = WorkerIdentityTestUtils.randomUuidBasedId(); + WorkerInfo wkr1 = new WorkerInfo() + .setIdentity(workerIdentity1) + .setAddress(new WorkerNetAddress() + .setHost("worker1").setContainerHost("containerhostname1") + .setRpcPort(1000).setDataPort(1001).setWebPort(1011) + .setDomainSocketPath("/var/lib/domain.sock")); + WorkerInfo wkr2 = new WorkerInfo() + .setIdentity(workerIdentity1) + .setAddress(new WorkerNetAddress() + .setHost("worker2").setContainerHost("containerhostname2") + .setRpcPort(2000).setDataPort(2001).setWebPort(2011) + .setDomainSocketPath("/var/lib/domain.sock")); + membershipManager.join(wkr1); + // bring wrk1 down and join wrk2 with a same worker identity. + membershipManager.stopHeartBeat(wkr1); + CommonUtils.waitFor("wkr1 is not alive.", () -> { + try { + return membershipManager.getFailedMembers().getWorkerById(workerIdentity1).isPresent(); + } catch (IOException e) { + // IGNORE + return false; + } + }, WaitForOptions.defaults().setTimeoutMs(5000)); + try { + membershipManager.join(wkr2); + } catch (IOException ex) { + Assert.assertTrue(ex instanceof AlreadyExistsException); + } + + // only in k8s env, it should allow same worker identity assumption. + Configuration.set(PropertyKey.K8S_ENV_DEPLOYMENT, true); + final MembershipManager membershipManager1 = getHealthyEtcdMemberMgr(); + membershipManager1.join(wkr2); + // check if joined with correct info onto etcd + Optional curWorkerInfo = membershipManager1.getLiveMembers() + .getWorkerById(workerIdentity1); + Assert.assertTrue(curWorkerInfo.isPresent()); + Assert.assertEquals(wkr2.getAddress(), curWorkerInfo.get().getAddress()); + } }