Skip to content

Commit

Permalink
Fix to disable worker identity reuse for registration unless in k8s
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

Put back the restriction of reuse worker identity for non-k8s env deployment.

### Why are the changes needed?

Worker identity gets reused by wrong deployment behaviors such as copy conf/ over for new worker setup, as opposed to k8s deployment is thru operator / automation, bare metal deployment has no way of prevention, thus putting back the restriction for non-k8s deployment.

### Does this PR introduce any user facing changes?

No.

			pr-link: #18454
			change-id: cid-295ea352895b16c2a5f0a23fa790c9f42a5e3881
  • Loading branch information
lucyge2022 authored Dec 9, 2023
1 parent 73d1746 commit 22af1b1
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 20 deletions.
11 changes: 11 additions & 0 deletions dora/core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,15 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.MASTER)
.build();
public static final PropertyKey K8S_ENV_DEPLOYMENT =
booleanBuilder(Name.K8S_ENV_DEPLOYMENT)
.setDefaultValue(false)
.setDescription("If Alluxio is deployed in K8s environment.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.ALL)
.setIsHidden(true)
.build();

/**
* UFS related properties.
*/
Expand Down Expand Up @@ -7325,6 +7334,8 @@ public static final class Name {
public static final String ZOOKEEPER_AUTH_ENABLED = "alluxio.zookeeper.auth.enabled";
public static final String ZOOKEEPER_LEADER_CONNECTION_ERROR_POLICY =
"alluxio.zookeeper.leader.connection.error.policy";
public static final String K8S_ENV_DEPLOYMENT =
"alluxio.k8s.env.deployment";
//
// UFS related properties
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,34 @@

import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.status.AlreadyExistsException;
import alluxio.util.CommonUtils;
import alluxio.wire.WorkerIdentity;
import alluxio.wire.WorkerInfo;

import com.google.common.annotations.VisibleForTesting;
import com.google.gson.JsonParseException;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Txn;
import io.etcd.jetcd.kv.TxnResponse;
import io.etcd.jetcd.op.Cmp;
import io.etcd.jetcd.op.CmpTarget;
import io.etcd.jetcd.op.Op;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -83,31 +96,57 @@ public void join(WorkerInfo workerInfo) throws IOException {
LOG.info("Try joining on etcd for worker:{} ", workerInfo);
WorkerServiceEntity entity =
new WorkerServiceEntity(workerInfo.getIdentity(), workerInfo.getAddress());
// 1) register to the ring, check if there's existing entry
String pathOnRing = new StringBuffer()
.append(getRingPathPrefix())
.append(entity.getServiceEntityName()).toString();
byte[] existingEntityBytes = mAlluxioEtcdClient.getForPath(pathOnRing);
byte[] serializedEntity = entity.serialize();
// If there's existing entry, check if it's me.
if (existingEntityBytes != null) {
// It's not me, or not the same me.
if (!Arrays.equals(existingEntityBytes, serializedEntity)) {
// In k8s this might be bcos worker pod restarting with the same worker identity
// but certain fields such as hostname has been changed. Register to ring path anyway.
WorkerServiceEntity existingEntity = new WorkerServiceEntity();
existingEntity.deserialize(existingEntityBytes);
LOG.warn("Same worker entity found bearing same workerid:{},"
+ "existing WorkerServiceEntity to be overwritten:{},"
+ "maybe benign if pod restart in k8s env or same worker"
+ " scheduled to restart on another machine in baremetal env.",
workerInfo.getIdentity().toString(), existingEntity);
mAlluxioEtcdClient.createForPath(pathOnRing, Optional.of(serializedEntity));
// 1) register to the ring.
// CompareAndSet if no existing registered entry, if exist such key, two cases:
// a) it's k8s env, still register, overwriting the existing entry
// b) it's not k8s env, compare the registered entity content, if it's me
// then no op, if not, we don't allow overwriting the existing entity.
try {
boolean isK8s = mConf.isSet(PropertyKey.K8S_ENV_DEPLOYMENT)
&& mConf.getBoolean(PropertyKey.K8S_ENV_DEPLOYMENT);
Txn txn = mAlluxioEtcdClient.getEtcdClient().getKVClient().txn();
ByteSequence keyToPut = ByteSequence.from(pathOnRing, StandardCharsets.UTF_8);
ByteSequence valToPut = ByteSequence.from(serializedEntity);
CompletableFuture<TxnResponse> txnResponseFut = txn
// version of the key indicates number of modification, 0 means
// this key does not exist
.If(new Cmp(keyToPut, Cmp.Op.EQUAL, CmpTarget.version(0L)))
.Then(Op.put(keyToPut, valToPut, PutOption.newBuilder().build()))
.Else(isK8s ? Op.put(keyToPut, valToPut, PutOption.newBuilder().build()) :
Op.get(keyToPut, GetOption.DEFAULT))
.commit();
TxnResponse txnResponse = txnResponseFut.get();
if (!isK8s && !txnResponse.isSucceeded()) {
// service kv already exists, for non-k8s env, check if it's me.
// bail if it's someone else.
List<KeyValue> kvs = new ArrayList<>();
txnResponse.getGetResponses().stream().map(
r -> kvs.addAll(r.getKvs())).collect(Collectors.toList());
Optional<KeyValue> latestKV = kvs.stream()
.max((kv1, kv2) -> (int) (kv1.getModRevision() - kv2.getModRevision()));
if (latestKV.isPresent()
&& !Arrays.equals(latestKV.get().getValue().getBytes(), serializedEntity)) {
Optional<WorkerServiceEntity> existingEntity = parseWorkerServiceEntity(latestKV.get());
if (!existingEntity.isPresent()) {
throw new IOException(String.format(
"Existing WorkerServiceEntity for path:%s corrupted",
pathOnRing));
}
throw new AlreadyExistsException(
String.format("Some other member with same id registered on the ring, bail."
+ "Conflicting worker addr:%s, worker identity:%s."
+ "Different workers can't assume same worker identity in non-k8s env,"
+ "clean local worker identity settings to continue.",
existingEntity.get().getWorkerNetAddress().toString(),
existingEntity.get().getIdentity()));
}
}
// It's me, go ahead to start heartbeating.
} else {
// If haven't created myself onto the ring before, create now.
mAlluxioEtcdClient.createForPath(pathOnRing, Optional.of(serializedEntity));
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
}
// 2) start heartbeat
mAlluxioEtcdClient.mServiceDiscovery.registerAndStartSync(entity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ private void register() throws IOException {
* instance might assume same worker id in k8s pod restart situation. There might
* be gaps in updating etcd states in the interim of transition.
*/
LOG.error("Exception in join membership:", e);
if (!retry.attempt()) {
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@

import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.status.AlreadyExistsException;
import alluxio.util.CommonUtils;
import alluxio.util.WaitForOptions;
import alluxio.wire.WorkerIdentity;
import alluxio.wire.WorkerIdentityTestUtils;
import alluxio.wire.WorkerInfo;
import alluxio.wire.WorkerNetAddress;
Expand Down Expand Up @@ -45,6 +47,7 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -349,4 +352,52 @@ public void testStaticMembership() throws Exception {
.collect(Collectors.toList());
Assert.assertEquals(wkrHosts, allMemberHosts);
}

@Test
public void testSameWorkerIdentityConflict() throws Exception {
final MembershipManager membershipManager = getHealthyEtcdMemberMgr();
// in non-k8s env, no two workers can assume same identity, unless
Configuration.set(PropertyKey.WORKER_MEMBERSHIP_MANAGER_TYPE, MembershipType.ETCD);
Configuration.set(PropertyKey.ETCD_ENDPOINTS, getClientEndpoints());
Assert.assertTrue(membershipManager instanceof EtcdMembershipManager);
WorkerIdentity workerIdentity1 = WorkerIdentityTestUtils.randomUuidBasedId();
WorkerInfo wkr1 = new WorkerInfo()
.setIdentity(workerIdentity1)
.setAddress(new WorkerNetAddress()
.setHost("worker1").setContainerHost("containerhostname1")
.setRpcPort(1000).setDataPort(1001).setWebPort(1011)
.setDomainSocketPath("/var/lib/domain.sock"));
WorkerInfo wkr2 = new WorkerInfo()
.setIdentity(workerIdentity1)
.setAddress(new WorkerNetAddress()
.setHost("worker2").setContainerHost("containerhostname2")
.setRpcPort(2000).setDataPort(2001).setWebPort(2011)
.setDomainSocketPath("/var/lib/domain.sock"));
membershipManager.join(wkr1);
// bring wrk1 down and join wrk2 with a same worker identity.
membershipManager.stopHeartBeat(wkr1);
CommonUtils.waitFor("wkr1 is not alive.", () -> {
try {
return membershipManager.getFailedMembers().getWorkerById(workerIdentity1).isPresent();
} catch (IOException e) {
// IGNORE
return false;
}
}, WaitForOptions.defaults().setTimeoutMs(5000));
try {
membershipManager.join(wkr2);
} catch (IOException ex) {
Assert.assertTrue(ex instanceof AlreadyExistsException);
}

// only in k8s env, it should allow same worker identity assumption.
Configuration.set(PropertyKey.K8S_ENV_DEPLOYMENT, true);
final MembershipManager membershipManager1 = getHealthyEtcdMemberMgr();
membershipManager1.join(wkr2);
// check if joined with correct info onto etcd
Optional<WorkerInfo> curWorkerInfo = membershipManager1.getLiveMembers()
.getWorkerById(workerIdentity1);
Assert.assertTrue(curWorkerInfo.isPresent());
Assert.assertEquals(wkr2.getAddress(), curWorkerInfo.get().getAddress());
}
}

0 comments on commit 22af1b1

Please sign in to comment.