-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix to disable worker identity reuse for registration unless in k8s #18454
Changes from all commits
6099ebd
ce53e63
b776240
6c3e75f
a76c7d8
c8544a0
eabee1b
33fd329
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -13,21 +13,34 @@ | |||||||||
|
||||||||||
import alluxio.conf.AlluxioConfiguration; | ||||||||||
import alluxio.conf.PropertyKey; | ||||||||||
import alluxio.exception.status.AlreadyExistsException; | ||||||||||
import alluxio.util.CommonUtils; | ||||||||||
import alluxio.wire.WorkerIdentity; | ||||||||||
import alluxio.wire.WorkerInfo; | ||||||||||
|
||||||||||
import com.google.common.annotations.VisibleForTesting; | ||||||||||
import com.google.gson.JsonParseException; | ||||||||||
import io.etcd.jetcd.ByteSequence; | ||||||||||
import io.etcd.jetcd.KeyValue; | ||||||||||
import io.etcd.jetcd.Txn; | ||||||||||
import io.etcd.jetcd.kv.TxnResponse; | ||||||||||
import io.etcd.jetcd.op.Cmp; | ||||||||||
import io.etcd.jetcd.op.CmpTarget; | ||||||||||
import io.etcd.jetcd.op.Op; | ||||||||||
import io.etcd.jetcd.options.GetOption; | ||||||||||
import io.etcd.jetcd.options.PutOption; | ||||||||||
import org.slf4j.Logger; | ||||||||||
import org.slf4j.LoggerFactory; | ||||||||||
|
||||||||||
import java.io.IOException; | ||||||||||
import java.nio.charset.StandardCharsets; | ||||||||||
import java.util.ArrayList; | ||||||||||
import java.util.Arrays; | ||||||||||
import java.util.List; | ||||||||||
import java.util.Optional; | ||||||||||
import java.util.Set; | ||||||||||
import java.util.concurrent.CompletableFuture; | ||||||||||
import java.util.concurrent.ExecutionException; | ||||||||||
import java.util.function.Supplier; | ||||||||||
import java.util.stream.Collectors; | ||||||||||
import java.util.stream.Stream; | ||||||||||
|
@@ -83,31 +96,57 @@ public void join(WorkerInfo workerInfo) throws IOException { | |||||||||
LOG.info("Try joining on etcd for worker:{} ", workerInfo); | ||||||||||
WorkerServiceEntity entity = | ||||||||||
new WorkerServiceEntity(workerInfo.getIdentity(), workerInfo.getAddress()); | ||||||||||
// 1) register to the ring, check if there's existing entry | ||||||||||
String pathOnRing = new StringBuffer() | ||||||||||
.append(getRingPathPrefix()) | ||||||||||
.append(entity.getServiceEntityName()).toString(); | ||||||||||
byte[] existingEntityBytes = mAlluxioEtcdClient.getForPath(pathOnRing); | ||||||||||
byte[] serializedEntity = entity.serialize(); | ||||||||||
// If there's existing entry, check if it's me. | ||||||||||
if (existingEntityBytes != null) { | ||||||||||
// It's not me, or not the same me. | ||||||||||
if (!Arrays.equals(existingEntityBytes, serializedEntity)) { | ||||||||||
// In k8s this might be bcos worker pod restarting with the same worker identity | ||||||||||
// but certain fields such as hostname has been changed. Register to ring path anyway. | ||||||||||
WorkerServiceEntity existingEntity = new WorkerServiceEntity(); | ||||||||||
existingEntity.deserialize(existingEntityBytes); | ||||||||||
LOG.warn("Same worker entity found bearing same workerid:{}," | ||||||||||
+ "existing WorkerServiceEntity to be overwritten:{}," | ||||||||||
+ "maybe benign if pod restart in k8s env or same worker" | ||||||||||
+ " scheduled to restart on another machine in baremetal env.", | ||||||||||
workerInfo.getIdentity().toString(), existingEntity); | ||||||||||
mAlluxioEtcdClient.createForPath(pathOnRing, Optional.of(serializedEntity)); | ||||||||||
// 1) register to the ring. | ||||||||||
// CompareAndSet if no existing registered entry, if exist such key, two cases: | ||||||||||
// a) it's k8s env, still register, overwriting the existing entry | ||||||||||
// b) it's not k8s env, compare the registered entity content, if it's me | ||||||||||
// then no op, if not, we don't allow overwriting the existing entity. | ||||||||||
try { | ||||||||||
boolean isK8s = mConf.isSet(PropertyKey.K8S_ENV_DEPLOYMENT) | ||||||||||
&& mConf.getBoolean(PropertyKey.K8S_ENV_DEPLOYMENT); | ||||||||||
Txn txn = mAlluxioEtcdClient.getEtcdClient().getKVClient().txn(); | ||||||||||
ByteSequence keyToPut = ByteSequence.from(pathOnRing, StandardCharsets.UTF_8); | ||||||||||
ByteSequence valToPut = ByteSequence.from(serializedEntity); | ||||||||||
CompletableFuture<TxnResponse> txnResponseFut = txn | ||||||||||
// version of the key indicates number of modification, 0 means | ||||||||||
// this key does not exist | ||||||||||
.If(new Cmp(keyToPut, Cmp.Op.EQUAL, CmpTarget.version(0L))) | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would appreciate a comment for etcd outsiders:
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what's the difference between There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 'revision' refers to a global number put onto each op for all keys(create/delete), createRevision = revision of the last creation on the key, version = the version of this particular key, if deleted, the version is reset to 0 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||||||||||
.Then(Op.put(keyToPut, valToPut, PutOption.newBuilder().build())) | ||||||||||
.Else(isK8s ? Op.put(keyToPut, valToPut, PutOption.newBuilder().build()) : | ||||||||||
Op.get(keyToPut, GetOption.DEFAULT)) | ||||||||||
.commit(); | ||||||||||
TxnResponse txnResponse = txnResponseFut.get(); | ||||||||||
if (!isK8s && !txnResponse.isSucceeded()) { | ||||||||||
// service kv already exists, for non-k8s env, check if it's me. | ||||||||||
// bail if it's someone else. | ||||||||||
List<KeyValue> kvs = new ArrayList<>(); | ||||||||||
txnResponse.getGetResponses().stream().map( | ||||||||||
r -> kvs.addAll(r.getKvs())).collect(Collectors.toList()); | ||||||||||
Optional<KeyValue> latestKV = kvs.stream() | ||||||||||
.max((kv1, kv2) -> (int) (kv1.getModRevision() - kv2.getModRevision())); | ||||||||||
if (latestKV.isPresent() | ||||||||||
&& !Arrays.equals(latestKV.get().getValue().getBytes(), serializedEntity)) { | ||||||||||
Optional<WorkerServiceEntity> existingEntity = parseWorkerServiceEntity(latestKV.get()); | ||||||||||
if (!existingEntity.isPresent()) { | ||||||||||
throw new IOException(String.format( | ||||||||||
"Existing WorkerServiceEntity for path:%s corrupted", | ||||||||||
pathOnRing)); | ||||||||||
} | ||||||||||
throw new AlreadyExistsException( | ||||||||||
String.format("Some other member with same id registered on the ring, bail." | ||||||||||
+ "Conflicting worker addr:%s, worker identity:%s." | ||||||||||
+ "Different workers can't assume same worker identity in non-k8s env," | ||||||||||
+ "clean local worker identity settings to continue.", | ||||||||||
existingEntity.get().getWorkerNetAddress().toString(), | ||||||||||
existingEntity.get().getIdentity())); | ||||||||||
} | ||||||||||
} | ||||||||||
// It's me, go ahead to start heartbeating. | ||||||||||
} else { | ||||||||||
// If haven't created myself onto the ring before, create now. | ||||||||||
mAlluxioEtcdClient.createForPath(pathOnRing, Optional.of(serializedEntity)); | ||||||||||
} catch (InterruptedException | ExecutionException e) { | ||||||||||
throw new IOException(e); | ||||||||||
} | ||||||||||
// 2) start heartbeat | ||||||||||
mAlluxioEtcdClient.mServiceDiscovery.registerAndStartSync(entity); | ||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i have the feeling that whether we allow worker to reg with the same identity is relevant to whether it's k8s, but k8s is not 100% the decider. In other words, I feel we can have
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually identity should never be allowed to overwrite, from code side shouldnt even open this can of worm, it should be a deployment effort. k8s property can also be extended for other usage as suggested by @ssz1997