Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix to disable worker identity reuse for registration unless in k8s #18454

Merged
merged 8 commits into from
Dec 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions dora/core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,15 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.MASTER)
.build();
public static final PropertyKey K8S_ENV_DEPLOYMENT =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have the feeling that whether we allow worker to reg with the same identity is relevant to whether it's k8s, but k8s is not 100% the decider. In other words, I feel we can have

PropertyKey ON_K8S

and 

PropertyKey WORKER_SAME_IDENTITY_ACTION = ON_K8S ? OVERWRITE : REJECT

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually identity should never be allowed to overwrite, from code side shouldnt even open this can of worm, it should be a deployment effort. k8s property can also be extended for other usage as suggested by @ssz1997

booleanBuilder(Name.K8S_ENV_DEPLOYMENT)
.setDefaultValue(false)
.setDescription("If Alluxio is deployed in K8s environment.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.ALL)
.setIsHidden(true)
.build();

/**
* UFS related properties.
*/
Expand Down Expand Up @@ -7325,6 +7334,8 @@ public static final class Name {
public static final String ZOOKEEPER_AUTH_ENABLED = "alluxio.zookeeper.auth.enabled";
public static final String ZOOKEEPER_LEADER_CONNECTION_ERROR_POLICY =
"alluxio.zookeeper.leader.connection.error.policy";
public static final String K8S_ENV_DEPLOYMENT =
"alluxio.k8s.env.deployment";
//
// UFS related properties
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,34 @@

import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.status.AlreadyExistsException;
import alluxio.util.CommonUtils;
import alluxio.wire.WorkerIdentity;
import alluxio.wire.WorkerInfo;

import com.google.common.annotations.VisibleForTesting;
import com.google.gson.JsonParseException;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Txn;
import io.etcd.jetcd.kv.TxnResponse;
import io.etcd.jetcd.op.Cmp;
import io.etcd.jetcd.op.CmpTarget;
import io.etcd.jetcd.op.Op;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -83,31 +96,57 @@ public void join(WorkerInfo workerInfo) throws IOException {
LOG.info("Try joining on etcd for worker:{} ", workerInfo);
WorkerServiceEntity entity =
new WorkerServiceEntity(workerInfo.getIdentity(), workerInfo.getAddress());
// 1) register to the ring, check if there's existing entry
String pathOnRing = new StringBuffer()
.append(getRingPathPrefix())
.append(entity.getServiceEntityName()).toString();
byte[] existingEntityBytes = mAlluxioEtcdClient.getForPath(pathOnRing);
byte[] serializedEntity = entity.serialize();
// If there's existing entry, check if it's me.
if (existingEntityBytes != null) {
// It's not me, or not the same me.
if (!Arrays.equals(existingEntityBytes, serializedEntity)) {
// In k8s this might be bcos worker pod restarting with the same worker identity
// but certain fields such as hostname has been changed. Register to ring path anyway.
WorkerServiceEntity existingEntity = new WorkerServiceEntity();
existingEntity.deserialize(existingEntityBytes);
LOG.warn("Same worker entity found bearing same workerid:{},"
+ "existing WorkerServiceEntity to be overwritten:{},"
+ "maybe benign if pod restart in k8s env or same worker"
+ " scheduled to restart on another machine in baremetal env.",
workerInfo.getIdentity().toString(), existingEntity);
mAlluxioEtcdClient.createForPath(pathOnRing, Optional.of(serializedEntity));
// 1) register to the ring.
// CompareAndSet if no existing registered entry, if exist such key, two cases:
// a) it's k8s env, still register, overwriting the existing entry
// b) it's not k8s env, compare the registered entity content, if it's me
// then no op, if not, we don't allow overwriting the existing entity.
try {
boolean isK8s = mConf.isSet(PropertyKey.K8S_ENV_DEPLOYMENT)
&& mConf.getBoolean(PropertyKey.K8S_ENV_DEPLOYMENT);
Txn txn = mAlluxioEtcdClient.getEtcdClient().getKVClient().txn();
ByteSequence keyToPut = ByteSequence.from(pathOnRing, StandardCharsets.UTF_8);
ByteSequence valToPut = ByteSequence.from(serializedEntity);
CompletableFuture<TxnResponse> txnResponseFut = txn
// version of the key indicates number of modification, 0 means
// this key does not exist
.If(new Cmp(keyToPut, Cmp.Op.EQUAL, CmpTarget.version(0L)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would appreciate a comment for etcd outsiders:

Suggested change
.If(new Cmp(keyToPut, Cmp.Op.EQUAL, CmpTarget.version(0L)))
// version of a key starts from 1; so version being 0 means
// the key does not exist (never existed or was deleted).
.If(new Cmp(keyToPut, Cmp.Op.EQUAL, CmpTarget.version(0L)))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the difference between version == 0 and createRevision == 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'revision' refers to a global number put onto each op for all keys(create/delete), createRevision = revision of the last creation on the key, version = the version of this particular key, if deleted, the version is reset to 0
create /k1 -> /k1 version=1 global revision = 1
create /k2 -> /k2 ... global revision=2
delete /k2 -> /k2 ... global revision=3
modify /k1 -> /k1 version=2 global revision = 4 => /k1 version = 2 createRevision=1
delete /k1 -> /k1 version=0 global revision = 5
create /k1 -> /k1 version=1 global revision = 6 => /k1 version = 1 createRevision=6

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

.Then(Op.put(keyToPut, valToPut, PutOption.newBuilder().build()))
.Else(isK8s ? Op.put(keyToPut, valToPut, PutOption.newBuilder().build()) :
Op.get(keyToPut, GetOption.DEFAULT))
.commit();
TxnResponse txnResponse = txnResponseFut.get();
if (!isK8s && !txnResponse.isSucceeded()) {
// service kv already exists, for non-k8s env, check if it's me.
// bail if it's someone else.
List<KeyValue> kvs = new ArrayList<>();
txnResponse.getGetResponses().stream().map(
r -> kvs.addAll(r.getKvs())).collect(Collectors.toList());
Optional<KeyValue> latestKV = kvs.stream()
.max((kv1, kv2) -> (int) (kv1.getModRevision() - kv2.getModRevision()));
if (latestKV.isPresent()
&& !Arrays.equals(latestKV.get().getValue().getBytes(), serializedEntity)) {
Optional<WorkerServiceEntity> existingEntity = parseWorkerServiceEntity(latestKV.get());
if (!existingEntity.isPresent()) {
throw new IOException(String.format(
"Existing WorkerServiceEntity for path:%s corrupted",
pathOnRing));
}
throw new AlreadyExistsException(
String.format("Some other member with same id registered on the ring, bail."
+ "Conflicting worker addr:%s, worker identity:%s."
+ "Different workers can't assume same worker identity in non-k8s env,"
+ "clean local worker identity settings to continue.",
existingEntity.get().getWorkerNetAddress().toString(),
existingEntity.get().getIdentity()));
}
}
// It's me, go ahead to start heartbeating.
} else {
// If haven't created myself onto the ring before, create now.
mAlluxioEtcdClient.createForPath(pathOnRing, Optional.of(serializedEntity));
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
}
// 2) start heartbeat
mAlluxioEtcdClient.mServiceDiscovery.registerAndStartSync(entity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ private void register() throws IOException {
* instance might assume same worker id in k8s pod restart situation. There might
* be gaps in updating etcd states in the interim of transition.
*/
LOG.error("Exception in join membership:", e);
if (!retry.attempt()) {
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@

import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.status.AlreadyExistsException;
import alluxio.util.CommonUtils;
import alluxio.util.WaitForOptions;
import alluxio.wire.WorkerIdentity;
import alluxio.wire.WorkerIdentityTestUtils;
import alluxio.wire.WorkerInfo;
import alluxio.wire.WorkerNetAddress;
Expand Down Expand Up @@ -45,6 +47,7 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -349,4 +352,52 @@ public void testStaticMembership() throws Exception {
.collect(Collectors.toList());
Assert.assertEquals(wkrHosts, allMemberHosts);
}

@Test
public void testSameWorkerIdentityConflict() throws Exception {
final MembershipManager membershipManager = getHealthyEtcdMemberMgr();
// in non-k8s env, no two workers can assume same identity, unless
Configuration.set(PropertyKey.WORKER_MEMBERSHIP_MANAGER_TYPE, MembershipType.ETCD);
Configuration.set(PropertyKey.ETCD_ENDPOINTS, getClientEndpoints());
Assert.assertTrue(membershipManager instanceof EtcdMembershipManager);
WorkerIdentity workerIdentity1 = WorkerIdentityTestUtils.randomUuidBasedId();
WorkerInfo wkr1 = new WorkerInfo()
.setIdentity(workerIdentity1)
.setAddress(new WorkerNetAddress()
.setHost("worker1").setContainerHost("containerhostname1")
.setRpcPort(1000).setDataPort(1001).setWebPort(1011)
.setDomainSocketPath("/var/lib/domain.sock"));
WorkerInfo wkr2 = new WorkerInfo()
.setIdentity(workerIdentity1)
.setAddress(new WorkerNetAddress()
.setHost("worker2").setContainerHost("containerhostname2")
.setRpcPort(2000).setDataPort(2001).setWebPort(2011)
.setDomainSocketPath("/var/lib/domain.sock"));
membershipManager.join(wkr1);
// bring wrk1 down and join wrk2 with a same worker identity.
membershipManager.stopHeartBeat(wkr1);
CommonUtils.waitFor("wkr1 is not alive.", () -> {
try {
return membershipManager.getFailedMembers().getWorkerById(workerIdentity1).isPresent();
} catch (IOException e) {
// IGNORE
return false;
}
}, WaitForOptions.defaults().setTimeoutMs(5000));
try {
membershipManager.join(wkr2);
} catch (IOException ex) {
Assert.assertTrue(ex instanceof AlreadyExistsException);
}

// only in k8s env, it should allow same worker identity assumption.
Configuration.set(PropertyKey.K8S_ENV_DEPLOYMENT, true);
final MembershipManager membershipManager1 = getHealthyEtcdMemberMgr();
membershipManager1.join(wkr2);
// check if joined with correct info onto etcd
Optional<WorkerInfo> curWorkerInfo = membershipManager1.getLiveMembers()
.getWorkerById(workerIdentity1);
Assert.assertTrue(curWorkerInfo.isPresent());
Assert.assertEquals(wkr2.getAddress(), curWorkerInfo.get().getAddress());
}
}
Loading