Skip to content

Commit

Permalink
only allow multiple workers assume same worker identity in k8s env
Browse files Browse the repository at this point in the history
  • Loading branch information
lucyge2022 committed Dec 1, 2023
1 parent 15b05ba commit 87702fd
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 10 deletions.
10 changes: 10 additions & 0 deletions dora/core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -4087,6 +4087,14 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.WORKER)
.build();
public static final PropertyKey WORKER_IN_K8S_ENV =
booleanBuilder(Name.WORKER_IN_K8S_ENV)
.setDefaultValue(false)
.setDescription("Indicate if worker deployed in K8s environment.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.WORKER)
.setIsHidden(true)
.build();
public static final PropertyKey WORKER_KEYTAB_FILE = stringBuilder(Name.WORKER_KEYTAB_FILE)
.setDescription("Kerberos keytab file for Alluxio worker.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
Expand Down Expand Up @@ -7983,6 +7991,8 @@ public static final class Name {
public static final String WORKER_IDENTITY_UUID = "alluxio.worker.identity.uuid";
public static final String WORKER_IDENTITY_UUID_FILE_PATH =
"alluxio.worker.identity.uuid.file.path";
public static final String WORKER_IN_K8S_ENV =
"alluxio.worker.in.k8s.env";
public static final String WORKER_KEYTAB_FILE = "alluxio.worker.keytab.file";
public static final String WORKER_MASTER_CONNECT_RETRY_TIMEOUT =
"alluxio.worker.master.connect.retry.timeout";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.status.AlreadyExistsException;
import alluxio.util.CommonUtils;
import alluxio.wire.WorkerIdentity;
import alluxio.wire.WorkerInfo;
Expand Down Expand Up @@ -93,16 +94,25 @@ public void join(WorkerInfo workerInfo) throws IOException {
if (existingEntityBytes != null) {
// It's not me, or not the same me.
if (!Arrays.equals(existingEntityBytes, serializedEntity)) {
// In k8s this might be bcos worker pod restarting with the same worker identity
// but certain fields such as hostname has been changed. Register to ring path anyway.
WorkerServiceEntity existingEntity = new WorkerServiceEntity();
existingEntity.deserialize(existingEntityBytes);
LOG.warn("Same worker entity found bearing same workerid:{},"
+ "existing WorkerServiceEntity to be overwritten:{},"
+ "maybe benign if pod restart in k8s env or same worker"
+ " scheduled to restart on another machine in baremetal env.",
workerInfo.getIdentity().toString(), existingEntity);
mAlluxioEtcdClient.createForPath(pathOnRing, Optional.of(serializedEntity));
if (mConf.isSet(PropertyKey.WORKER_IN_K8S_ENV)
&& mConf.getBoolean(PropertyKey.WORKER_IN_K8S_ENV)) {
// In k8s this might be bcos worker pod restarting with the same worker identity
// but certain fields such as hostname has been changed. Register to ring path anyway.
WorkerServiceEntity existingEntity = new WorkerServiceEntity();
existingEntity.deserialize(existingEntityBytes);
LOG.warn("Same worker entity found bearing same workerid:{},"
+ "existing WorkerServiceEntity to be overwritten:{},"
+ "maybe benign if pod restart in k8s env or same worker"
+ " scheduled to restart on another machine in baremetal env.",
workerInfo.getIdentity().toString(), existingEntity);
mAlluxioEtcdClient.createForPath(pathOnRing, Optional.of(serializedEntity));
} else {
//
throw new AlreadyExistsException(
"Some other member with same id registered on the ring, bail."
+ "Different workers can't assume same worker identity in non-k8s env."
+ "Clean local worker identity settings to continue.");
}
}
// It's me, go ahead to start heartbeating.
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@

import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.status.AlreadyExistsException;
import alluxio.util.CommonUtils;
import alluxio.util.WaitForOptions;
import alluxio.wire.WorkerIdentity;
import alluxio.wire.WorkerIdentityTestUtils;
import alluxio.wire.WorkerInfo;
import alluxio.wire.WorkerNetAddress;
Expand Down Expand Up @@ -349,4 +351,47 @@ public void testStaticMembership() throws Exception {
.collect(Collectors.toList());
Assert.assertEquals(wkrHosts, allMemberHosts);
}

@Test
public void testSameWorkerIdentityConflict() throws Exception {
final MembershipManager membershipManager = getHealthyEtcdMemberMgr();
// in non-k8s env, no two workers can assume same identity, unless
Configuration.set(PropertyKey.WORKER_MEMBERSHIP_MANAGER_TYPE, MembershipType.ETCD);
Configuration.set(PropertyKey.ETCD_ENDPOINTS, getClientEndpoints());
Assert.assertTrue(membershipManager instanceof EtcdMembershipManager);
WorkerIdentity workerIdentity1 = WorkerIdentityTestUtils.randomUuidBasedId();
WorkerInfo wkr1 = new WorkerInfo()
.setIdentity(workerIdentity1)
.setAddress(new WorkerNetAddress()
.setHost("worker1").setContainerHost("containerhostname1")
.setRpcPort(1000).setDataPort(1001).setWebPort(1011)
.setDomainSocketPath("/var/lib/domain.sock"));
WorkerInfo wkr2 = new WorkerInfo()
.setIdentity(workerIdentity1)
.setAddress(new WorkerNetAddress()
.setHost("worker2").setContainerHost("containerhostname2")
.setRpcPort(2000).setDataPort(2001).setWebPort(2011)
.setDomainSocketPath("/var/lib/domain.sock"));
membershipManager.join(wkr1);
// bring wrk1 down and join wrk2 with a same worker identity.
membershipManager.stopHeartBeat(wkr1);
CommonUtils.waitFor("wkr1 is not alive.", () -> {
try {
return membershipManager.getFailedMembers().getWorkerById(workerIdentity1).isPresent();
} catch (IOException e) {
// IGNORE
return false;
}
}, WaitForOptions.defaults().setTimeoutMs(5000));
try {
membershipManager.join(wkr2);
} catch (IOException ex) {
Assert.assertTrue(ex instanceof AlreadyExistsException);
}

// only in k8s env, it should allow same worker identity assumption.
Configuration.set(PropertyKey.WORKER_IN_K8S_ENV, true);
final MembershipManager membershipManager1 = getHealthyEtcdMemberMgr();
membershipManager1.join(wkr2);
}
}

0 comments on commit 87702fd

Please sign in to comment.