Skip to content

Commit

Permalink
Replace the etcdctl proc call with etcd client. (#1970)
Browse files Browse the repository at this point in the history
Fixes #1945

Signed-off-by: Ye Cao <[email protected]>
  • Loading branch information
dashanji authored Aug 15, 2024
1 parent e1e1b26 commit 55c9058
Show file tree
Hide file tree
Showing 18 changed files with 320 additions and 410 deletions.
2 changes: 0 additions & 2 deletions docker/Dockerfile.vineyardd
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ RUN export arch="$PLATFORM" && \
curl -LO https://github.com/etcd-io/etcd/releases/download/v3.5.9/etcd-v3.5.9-linux-$arch.tar.gz && \
tar zxf etcd-v3.5.9-linux-$arch.tar.gz && \
mv /tmp/etcd-v3.5.9-linux-$arch/etcd /usr/bin/etcd && \
mv /tmp/etcd-v3.5.9-linux-$arch/etcdctl /usr/bin/etcdctl && \
curl -LO https://dl.k8s.io/release/v1.24.0/bin/linux/$arch/kubectl && \
chmod +x kubectl && \
mv /tmp/kubectl /usr/bin/kubectl
Expand Down Expand Up @@ -86,7 +85,6 @@ SHELL ["/bin/bash", "-c"]
COPY --from=builder /usr/bin/bash-linux /bin/bash
COPY --from=builder /usr/bin/dumb-init /usr/bin/dumb-init
COPY --from=builder /usr/bin/etcd /usr/bin/etcd
COPY --from=builder /usr/bin/etcdctl /usr/bin/etcdctl
COPY --from=builder /usr/bin/kubectl /usr/bin/kubectl
COPY --from=builder /work/v6d/build/bin/vineyardd /usr/local/bin/vineyardd
RUN ln -s /busybox/env /usr/bin/env
Expand Down
8 changes: 6 additions & 2 deletions k8s/test/e2e/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ load-vineyardd-image:
@docker push localhost:5001/vineyardd:latest
.PHONY: load-vineyardd-image

load-vineyard-python-dev-image:
@docker tag ghcr.io/v6d-io/v6d/vineyard-python-dev:latest localhost:5001/vineyard-python-dev:latest
@docker push localhost:5001/vineyard-python-dev:latest

load-vineyard-operator-image:
@docker tag vineyardcloudnative/vineyard-operator:latest localhost:5001/vineyard-operator:latest
@docker push localhost:5001/vineyard-operator:latest
Expand Down Expand Up @@ -248,13 +252,13 @@ e2e-tests-failover: prepare-e2e-test install-vineyard-cluster

############# etcd failover testing #############################################

e2e-tests-three-etcd-nodes-failover: prepare-e2e-test build-local-cluster load-vineyardd-image
e2e-tests-three-etcd-nodes-failover: prepare-e2e-test build-local-cluster load-vineyardd-image load-vineyard-python-dev-image
@echo "Running three etcd nodes failover e2e test..."
@cd ${ROOT_DIR} && ${GOBIN}/e2e run --config=${E2E_DIR}/etcd-failover/three-etcd-nodes-failover-e2e.yaml
@echo "three etcd nodes failover e2e test passed."
@make delete-local-cluster

e2e-tests-five-etcd-nodes-failover: prepare-e2e-test build-local-cluster load-vineyardd-image
e2e-tests-five-etcd-nodes-failover: prepare-e2e-test build-local-cluster load-vineyardd-image load-vineyard-python-dev-image
@echo "Running five etcd nodes failover e2e test..."
@cd ${ROOT_DIR} && ${GOBIN}/e2e run --config=${E2E_DIR}/etcd-failover/five-etcd-nodes-failover-e2e.yaml
@echo "five etcd nodes failover e2e test passed."
Expand Down
4 changes: 2 additions & 2 deletions k8s/test/e2e/etcd-failover/consumer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ spec:
restartPolicy: Never
containers:
- name: consumer
image: python:3.10
image: localhost:5001/vineyard-python-dev:latest
imagePullPolicy: IfNotPresent
command:
- bash
- -c
- |
pip install vineyard numpy pandas --index-url https://pypi.tuna.tsinghua.edu.cn/simple;
cat << EOF >> consumer.py
import vineyard
client = vineyard.connect(host="vineyardd-svc.default.svc.cluster.local",port=9600)
Expand Down
3 changes: 2 additions & 1 deletion k8s/test/e2e/etcd-failover/five-etcd-nodes-failover-e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ setup:
done
kubectl delete pod "vineyardd-$num1" -n default --force
kubectl delete pod "vineyardd-$num2" -n default --force
kubectl rollout status statefulset/vineyardd
# wait for the instance quit messages to be propagated
sleep 240
sleep 360
kubectl rollout status statefulset/vineyardd
done
- name: install consumer
Expand Down
6 changes: 3 additions & 3 deletions k8s/test/e2e/etcd-failover/producer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ spec:
restartPolicy: Never
containers:
- name: producer
image: python:3.10
image: localhost:5001/vineyard-python-dev:latest
imagePullPolicy: IfNotPresent
command:
- bash
- -c
- |
pip install vineyard numpy pandas --index-url https://pypi.tuna.tsinghua.edu.cn/simple;
cat << EOF >> producer.py
import vineyard
import numpy as np
Expand All @@ -42,4 +42,4 @@ spec:
client.put(data, persist=True, name="test_data");
client.close()
EOF
python producer.py;
python producer.py;
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ setup:
kubectl delete pod vineyardd-$(shuf -i 0-2 -n 1) -n default --force
kubectl rollout status statefulset/vineyardd
# wait for the instance quit messages to be propagated
sleep 60
sleep 120
kubectl rollout status statefulset/vineyardd
done
- name: install consumer
Expand Down
6 changes: 3 additions & 3 deletions src/server/services/etcd_meta_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,8 @@ Status EtcdMetaService::preStart(const bool create_new_instance) {
return etcd_launcher_->LaunchEtcdServer(etcd_, meta_sync_lock_);
}

Status EtcdMetaService::RemoveMember(const std::string member_id) {
auto status = etcd_launcher_->RemoveMember(member_id);
Status EtcdMetaService::RemoveMember(const uint64_t& member_id) {
auto status = etcd_launcher_->RemoveMember(etcd_, member_id);
if (!status.ok()) {
LOG(ERROR) << "Failed to remove member " << member_id
<< " from etcd: " << status.ToString();
Expand All @@ -449,7 +449,7 @@ Status EtcdMetaService::UpdateEndpoint() {
if (etcd_launcher_ == nullptr) {
return Status::Invalid("etcd launcher is not initialized");
}
return etcd_launcher_->UpdateEndpoint();
return etcd_launcher_->UpdateEndpoint(etcd_);
}

} // namespace vineyard
Expand Down
4 changes: 2 additions & 2 deletions src/server/services/etcd_meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ class EtcdMetaService : public IMetaService {

void TryReleaseLock(std::string key, callback_t<bool>) override;

Status RemoveMember(std::string member_id);
Status RemoveMember(const uint64_t& member_id);

std::string GetMemberID() { return etcd_launcher_->GetMemberID(); }
const uint64_t GetMemberID() { return etcd_launcher_->GetMemberID(); }

Status UpdateEndpoint();

Expand Down
16 changes: 9 additions & 7 deletions src/server/services/meta_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -555,11 +555,12 @@ void IMetaService::registerToEtcd() {
self->meta_["my_nodename"] = nodename;

self->instances_list_.emplace(rank);
auto etcd_member_id = self->GetEtcdMemberID();
uint64_t etcd_member_id = self->GetEtcdMemberID();
std::string key = "/instances/" + self->server_ptr_->instance_name();
ops.emplace_back(op_t::Put(key + "/hostid", self_host_id));
if (etcd_member_id != "") {
ops.emplace_back(op_t::Put(key + "/member_id", etcd_member_id));
if (etcd_member_id != 0) {
ops.emplace_back(
op_t::Put(key + "/member_id", std::to_string(etcd_member_id)));
}
ops.emplace_back(op_t::Put(key + "/hostname", hostname));
ops.emplace_back(op_t::Put(key + "/nodename", nodename));
Expand Down Expand Up @@ -1218,7 +1219,8 @@ void IMetaService::instanceUpdate(const op_t& op, const bool from_remote) {
}
// reset the etcd client
VINEYARD_CHECK_OK(this->probe());
instance_to_member_id_[instance_id] = member_id;
uint64_t member_id_ = std::stoull(member_id);
instance_to_member_id_[instance_id] = member_id_;
} else if (op.op != op_t::op_type_t::kDel) {
if (from_remote) {
LOG(ERROR) << "Unknown op type: " << op.ToString();
Expand Down Expand Up @@ -1265,20 +1267,20 @@ Status IMetaService::daemonWatchHandler(
return callback_after_update(Status::OK(), rev);
}

Status IMetaService::RemoveEtcdMember(const std::string& member_id) {
Status IMetaService::RemoveEtcdMember(const uint64_t& member_id) {
return callIfEtcdMetaService(
[&member_id](std::shared_ptr<EtcdMetaService> etcd_meta_service) {
return etcd_meta_service->RemoveMember(member_id);
},
Status::OK());
}

std::string IMetaService::GetEtcdMemberID() {
const uint64_t IMetaService::GetEtcdMemberID() {
return callIfEtcdMetaService(
[](std::shared_ptr<EtcdMetaService> etcd_meta_service) {
return etcd_meta_service->GetMemberID();
},
std::string());
(uint64_t) 0);
}

Status IMetaService::UpdateEtcdEndpoint() {
Expand Down
6 changes: 3 additions & 3 deletions src/server/services/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,9 @@ class IMetaService : public std::enable_shared_from_this<IMetaService> {

virtual void TryReleaseLock(std::string key, callback_t<bool> callback) = 0;

Status RemoveEtcdMember(const std::string& member_id);
Status RemoveEtcdMember(const uint64_t& member_id);

std::string GetEtcdMemberID();
const uint64_t GetEtcdMemberID();

Status UpdateEtcdEndpoint();

Expand Down Expand Up @@ -262,7 +262,7 @@ class IMetaService : public std::enable_shared_from_this<IMetaService> {

std::unique_ptr<asio::steady_timer> heartbeat_timer_;
std::set<InstanceID> instances_list_;
std::map<InstanceID, std::string> instance_to_member_id_;
std::map<InstanceID, uint64_t> instance_to_member_id_;
int64_t target_latest_time_ = 0;
size_t timeout_count_ = 0;

Expand Down
71 changes: 23 additions & 48 deletions src/server/util/etcd_launcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,25 +114,6 @@ Status checkEtcdCmd(const std::string& etcd_cmd) {
return Status::OK();
}

Status checkEtcdctlCommand(const std::string& etcdctl_cmd) {
if (etcdctl_cmd.empty()) {
std::string error_message =
"Failed to find etcdctl binary, please specify its path using the "
"`--etcdctl_cmd` argument and try again.";
LOG(WARNING) << error_message;
return Status::EtcdError("Failed to find etcdctl binary");
}
if (!ghc::filesystem::exists(ghc::filesystem::path(etcdctl_cmd))) {
std::string error_message =
"The etcd binary '" + etcdctl_cmd +
"' does not exist, please specify the correct path using "
"the `--etcdctl_cmd` argument and try again.";
LOG(WARNING) << error_message;
return Status::EtcdError("The etcdctl binary does not exist");
}
return Status::OK();
}

EtcdLauncher::EtcdLauncher(const json& etcd_spec,
const uint32_t& rpc_socket_port,
const bool create_new_instance)
Expand Down Expand Up @@ -166,15 +147,6 @@ Status EtcdLauncher::LaunchEtcdServer(
return Status::OK();
}

// resolve etcdctl binary
std::string etcdctl_cmd = etcd_spec_.value("etcdctl_cmd", "");
if (etcdctl_cmd.empty()) {
etcdctl_cmd = lookupCommand(etcd_spec_, "etcdctl");
}
RETURN_ON_ERROR(checkEtcdctlCommand(etcdctl_cmd));
etcdctl_ = std::make_shared<Etcdctl>(etcdctl_cmd);
LOG(INFO) << "Found etcdctl at: " << etcdctl_cmd;

bool skip_launch_etcd = etcd_spec_.value("skip_launch_etcd", true);
bool etcd_cluster_existing = false;
// create_new_instance_ is a flag to indicate whether we should launch an etcd
Expand Down Expand Up @@ -267,25 +239,25 @@ Status EtcdLauncher::LaunchEtcdServer(
if (etcd_cluster_existing) {
std::string cluster_name;

std::vector<json> all_members = etcdctl_->listMembers(etcd_endpoint);
std::vector<json> members = etcdctl_->listHealthyMembers(all_members);
std::vector<json> all_members = listMembers(etcd_client);
std::vector<json> members = listHealthyMembers(all_members);
if (members.size() == 0) {
return Status::EtcdError("No healthy members found via etcdctl");
}

existing_members = etcdctl_->listMembersName(members);
existing_members = listMembersName(members);
new_member_name = generateMemberName(existing_members);
peer_urls = etcdctl_->listPeerURLs(members);
peer_urls = listPeerURLs(members);
if (peer_urls.size() == 0) {
return Status::EtcdError("No peer urls found via etcdctl");
}
std::vector<std::string> client_urls = etcdctl_->listClientURLs(members);
std::vector<std::string> client_urls = listClientURLs(members);
if (peer_urls.size() == 0) {
return Status::EtcdError("No client urls found via etcdctl");
}

endpoint = boost::algorithm::join(client_urls, ",");
if (!etcdctl_->addMember(new_member_name, peer_endpoint, endpoint).ok()) {
if (!addMember(etcd_client, peer_endpoint).ok()) {
return Status::EtcdError("Failed to add new member to the etcd cluster");
}

Expand Down Expand Up @@ -378,8 +350,7 @@ Status EtcdLauncher::LaunchEtcdServer(
retries < max_probe_retries) {
etcd_client.reset(new etcd::Client(etcd_endpoints_));
if (probeEtcdServer(etcd_client, sync_lock)) {
etcd_member_id_ =
etcdctl_->findMemberID(peer_endpoint, etcd_endpoints_);
etcd_member_id_ = findMemberID(etcd_client, peer_endpoint);
// reset the etcd watcher
break;
}
Expand All @@ -388,25 +359,28 @@ Status EtcdLauncher::LaunchEtcdServer(
}
if (!etcd_proc_) {
return handleEtcdFailure(
peer_endpoint,
etcd_client, peer_endpoint,
"Failed to wait until etcd ready: operation has been interrupted");
} else if (err) {
return handleEtcdFailure(
peer_endpoint, "Failed to wait until etcd ready: " + err.message());
etcd_client, peer_endpoint,
"Failed to wait until etcd ready: " + err.message());
} else if (retries >= max_probe_retries) {
return handleEtcdFailure(
peer_endpoint, "Etcd has been launched but failed to connect to it");
etcd_client, peer_endpoint,
"Etcd has been launched but failed to connect to it");
} else {
return Status::OK();
}
}
}

Status EtcdLauncher::handleEtcdFailure(const std::string& peer_urls,
const std::string& errMessage) {
auto member_id = etcdctl_->findMemberID(peer_urls, etcd_endpoints_);
RETURN_ON_ERROR(etcdctl_->removeMember(etcd_member_id_, etcd_endpoints_));
etcd_member_id_.clear();
Status EtcdLauncher::handleEtcdFailure(
std::unique_ptr<etcd::Client>& etcd_client, const std::string& peer_urls,
const std::string& errMessage) {
auto member_id = findMemberID(etcd_client, peer_urls);
RETURN_ON_ERROR(removeMember(etcd_client, member_id));
etcd_member_id_ = 0;
return Status::IOError(errMessage);
}

Expand Down Expand Up @@ -488,10 +462,11 @@ bool EtcdLauncher::probeEtcdServer(std::unique_ptr<etcd::Client>& etcd_client,
return etcd_client && response.is_ok();
}

Status EtcdLauncher::UpdateEndpoint() {
auto all_members = etcdctl_->listMembers(etcd_endpoints_);
auto members = etcdctl_->listHealthyMembers(all_members);
auto client_urls = etcdctl_->listClientURLs(members);
Status EtcdLauncher::UpdateEndpoint(
std::unique_ptr<etcd::Client>& etcd_client) {
auto all_members = listMembers(etcd_client);
auto members = listHealthyMembers(all_members);
auto client_urls = listClientURLs(members);
etcd_endpoints_ = boost::algorithm::join(client_urls, ",");
return Status::OK();
}
Expand Down
18 changes: 9 additions & 9 deletions src/server/util/etcd_launcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ limitations under the License.
#include "etcd/Client.hpp"

#include "common/util/status.h"
#include "server/util/etcdctl.h"
#include "server/util/etcd_member.h"

namespace vineyard {

Expand All @@ -48,21 +48,23 @@ class EtcdLauncher {
std::string const& key);

private:
Status handleEtcdFailure(const std::string& member_name,
Status handleEtcdFailure(std::unique_ptr<etcd::Client>& etcd_client,
const std::string& member_name,
const std::string& errMessage);

Status parseEndpoint();

std::string generateMemberName(
const std::vector<std::string>& existing_members_name);

std::string GetMemberID() { return etcd_member_id_; }
const uint64_t GetMemberID() { return etcd_member_id_; }

Status RemoveMember(const std::string member_id) {
return etcdctl_->removeMember(member_id, etcd_endpoints_);
Status RemoveMember(std::unique_ptr<etcd::Client>& etcd_client,
const uint64_t& member_id) {
return removeMember(etcd_client, member_id);
}

Status UpdateEndpoint();
Status UpdateEndpoint(std::unique_ptr<etcd::Client>& etcd_client);

Status initHostInfo();

Expand All @@ -75,11 +77,9 @@ class EtcdLauncher {
std::set<std::string> local_hostnames_;
std::set<std::string> local_ip_addresses_;

std::string etcd_member_id_;
uint64_t etcd_member_id_;
std::string etcd_endpoints_;

std::shared_ptr<Etcdctl> etcdctl_;

std::unique_ptr<boost::process::child> etcd_proc_;

friend class EtcdMetaService;
Expand Down
Loading

0 comments on commit 55c9058

Please sign in to comment.