From e189754e84556cbd9d0c9b3b695967d07c665e01 Mon Sep 17 00:00:00 2001 From: fmoral2 Date: Mon, 18 Mar 2024 14:10:46 -0300 Subject: [PATCH] Node replacement test (#70) add Node replacement test --------- Signed-off-by: Francisco --- .github/workflows/run-distros.yaml | 2 +- .golangci.yaml | 2 +- Makefile | 4 +- docs/tests/aws.md | 41 -- .../upgradecluster/upgrademanual_test.go | 6 +- .../upgradenodereplacement_test.go | 91 +++ entrypoint/upgradecluster/upgradesuc_test.go | 6 +- pkg/aws/aws.go | 34 +- .../{upgradecluster.go => upgrademanually.go} | 48 +- pkg/testcase/upgradenodereplacement.go | 558 ++++++++++++++++++ pkg/testcase/upgradesuc.go | 56 ++ scripts/test_runner.sh | 2 + shared/aux.go | 57 +- shared/cluster.go | 114 ++-- shared/kubeconfig.go | 74 +++ 15 files changed, 944 insertions(+), 151 deletions(-) delete mode 100644 docs/tests/aws.md create mode 100644 entrypoint/upgradecluster/upgradenodereplacement_test.go rename pkg/testcase/{upgradecluster.go => upgrademanually.go} (68%) create mode 100644 pkg/testcase/upgradenodereplacement.go create mode 100644 pkg/testcase/upgradesuc.go create mode 100644 shared/kubeconfig.go diff --git a/.github/workflows/run-distros.yaml b/.github/workflows/run-distros.yaml index 075c640c..476cacc8 100644 --- a/.github/workflows/run-distros.yaml +++ b/.github/workflows/run-distros.yaml @@ -55,4 +55,4 @@ jobs: with: version: v1.55.2 args: --timeout 10m - continue-on-error: false + continue-on-error: false \ No newline at end of file diff --git a/.golangci.yaml b/.golangci.yaml index 1e66ad72..248afab4 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -24,7 +24,7 @@ linters-settings: ignore-generated-header: true rules: - name: line-length-limit - arguments: [110] + arguments: [125] - name: bare-return - name: blank-imports - name: confusing-results diff --git a/Makefile b/Makefile index 09ae0101..8f16e879 100644 --- a/Makefile +++ b/Makefile @@ -72,11 +72,13 @@ test-validate: test-upgrade-suc: @go test -timeout=45m -v -tags=upgradesuc -count=1 ./entrypoint/upgradecluster/... -sucUpgradeVersion ${SUC_UPGRADE_VERSION} - .PHONY: test-upgrade-manual test-upgrade-manual: @go test -timeout=45m -v -tags=upgrademanual -count=1 ./entrypoint/upgradecluster/... -installVersionOrCommit ${INSTALL_VERSION_OR_COMMIT} -channel ${CHANNEL} +.PHONY: test-upgrade-node-replacement +test-upgrade-node-replacement: + @go test -timeout=60m -v -tags=upgradereplacement -count=1 ./entrypoint/upgradecluster/... -installVersionOrCommit ${INSTALL_VERSION_OR_COMMIT} .PHONY: test-create-mixedos test-create-mixedos: diff --git a/docs/tests/aws.md b/docs/tests/aws.md deleted file mode 100644 index d62579f7..00000000 --- a/docs/tests/aws.md +++ /dev/null @@ -1,41 +0,0 @@ -#### How to test aws sdk wrapper: - -#### Example: - -```go -package main - -import ( -"fmt" - -"github.com/rancher/distros-test-framework/pkg/aws" -) - -type Whatever interface { -CreateInstances(names ...string) (ids, ips []string, err error) -DeleteInstance(ip string) error -WaitForInstanceRunning(instanceId string) error -} - -func testCase(w Whatever) error { -ids, ips, err := w.CreateInstances("fmoral-test-instance-12") -if err != nil { -return err -} -fmt.Println(ids, ips) - -return nil -} - -func main() { -dependencies, err := aws.AddAwsNode() -if err != nil { -fmt.Println(err) -} - -a := testCase(dependencies) -// err = e.DeleteInstance("1.111.11.1") - -fmt.Println(a) -} - ``` \ No newline at end of file diff --git a/entrypoint/upgradecluster/upgrademanual_test.go b/entrypoint/upgradecluster/upgrademanual_test.go index f60f62f1..9922dd36 100644 --- a/entrypoint/upgradecluster/upgrademanual_test.go +++ b/entrypoint/upgradecluster/upgrademanual_test.go @@ -8,7 +8,6 @@ import ( "github.com/rancher/distros-test-framework/pkg/assert" "github.com/rancher/distros-test-framework/pkg/customflag" "github.com/rancher/distros-test-framework/pkg/testcase" - "github.com/rancher/distros-test-framework/shared" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -77,9 +76,8 @@ var _ = Describe("Test:", func() { } It("Upgrade Manual", func() { - fmt.Println("Current cluster state before upgrade:") - shared.PrintClusterState() - _ = testcase.TestUpgradeClusterManually(customflag.ServiceFlag.InstallMode.String()) + err := testcase.TestUpgradeClusterManually(customflag.ServiceFlag.InstallMode.String()) + Expect(err).NotTo(HaveOccurred()) }) It("Checks Node Status after upgrade and validate version", func() { diff --git a/entrypoint/upgradecluster/upgradenodereplacement_test.go b/entrypoint/upgradecluster/upgradenodereplacement_test.go new file mode 100644 index 00000000..d07036ad --- /dev/null +++ b/entrypoint/upgradecluster/upgradenodereplacement_test.go @@ -0,0 +1,91 @@ +//go:build upgradereplacement + +package upgradecluster + +import ( + "fmt" + + "github.com/rancher/distros-test-framework/pkg/assert" + "github.com/rancher/distros-test-framework/pkg/customflag" + "github.com/rancher/distros-test-framework/pkg/testcase" + + . "github.com/onsi/ginkgo/v2" +) + +var _ = Describe("Test:", func() { + + It("Start Up with no issues", func() { + testcase.TestBuildCluster(GinkgoT()) + }) + + It("Validate Node", func() { + testcase.TestNodeStatus( + assert.NodeAssertReadyStatus(), + nil) + }) + + It("Validate Pod", func() { + testcase.TestPodStatus( + assert.PodAssertRestart(), + assert.PodAssertReady(), + assert.PodAssertStatus()) + }) + + It("Verifies ClusterIP Service pre-upgrade", func() { + testcase.TestServiceClusterIp(true, false) + }) + + if cfg.Product == "k3s" { + It("Verifies LoadBalancer Service pre-upgrade", func() { + testcase.TestServiceLoadBalancer(true, false) + }) + } + + It("Verifies Ingress pre-upgrade", func() { + testcase.TestIngress(true, false) + }) + + It("Upgrade by Node replacement", func() { + testcase.TestUpgradeReplaceNode(customflag.ServiceFlag.InstallMode.String()) + }) + + It("Checks Node Status after upgrade and validate version", func() { + testcase.TestNodeStatus( + assert.NodeAssertReadyStatus(), + assert.NodeAssertVersionTypeUpgrade(customflag.ServiceFlag)) + }) + + It("Checks Pod Status after upgrade", func() { + testcase.TestPodStatus( + assert.PodAssertRestart(), + assert.PodAssertReady(), + assert.PodAssertStatus(), + ) + }) + + It("Verifies ClusterIP Service after upgrade", func() { + testcase.TestServiceClusterIp(false, true) + }) + + It("Verifies NodePort Service after upgrade applying and deleting workload", func() { + testcase.TestServiceNodePort(true, true) + }) + + It("Verifies Ingress after upgrade", func() { + testcase.TestIngress(false, true) + }) + + if cfg.Product == "k3s" { + It("Verifies LoadBalancer Service after upgrade", func() { + testcase.TestServiceLoadBalancer(false, true) + }) + } +}) + +var _ = AfterEach(func() { + if CurrentSpecReport().Failed() { + fmt.Printf("\nFAILED! %s\n", CurrentSpecReport().FullText()) + } else { + fmt.Printf("\nPASSED! %s\n", CurrentSpecReport().FullText()) + } +}) diff --git a/entrypoint/upgradecluster/upgradesuc_test.go b/entrypoint/upgradecluster/upgradesuc_test.go index ac040704..f60cf831 100644 --- a/entrypoint/upgradecluster/upgradesuc_test.go +++ b/entrypoint/upgradecluster/upgradesuc_test.go @@ -8,7 +8,6 @@ import ( "github.com/rancher/distros-test-framework/pkg/assert" "github.com/rancher/distros-test-framework/pkg/customflag" "github.com/rancher/distros-test-framework/pkg/testcase" - "github.com/rancher/distros-test-framework/shared" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -77,9 +76,8 @@ var _ = Describe("SUC Upgrade Tests:", func() { } It("\nUpgrade via SUC", func() { - fmt.Println("Current cluster state before upgrade:") - shared.PrintClusterState() - _ = testcase.TestUpgradeClusterSUC(customflag.ServiceFlag.SUCUpgradeVersion.String()) + err := testcase.TestUpgradeClusterSUC(customflag.ServiceFlag.SUCUpgradeVersion.String()) + Expect(err).NotTo(HaveOccurred()) }) It("Checks Node status post-upgrade", func() { diff --git a/pkg/aws/aws.go b/pkg/aws/aws.go index 6c484216..11284921 100644 --- a/pkg/aws/aws.go +++ b/pkg/aws/aws.go @@ -24,9 +24,10 @@ type Client struct { type response struct { nodeId string externalIp string + privateIp string } -func AddAwsNode() (*Client, error) { +func AddNode() (*Client, error) { c := factory.ClusterConfig(GinkgoT()) sess, err := session.NewSession(&aws.Config{ @@ -41,9 +42,9 @@ func AddAwsNode() (*Client, error) { }, nil } -func (c Client) CreateInstances(names ...string) (ids, ips []string, err error) { +func (c Client) CreateInstances(names ...string) (externalIPs, privateIPs, ids []string, err error) { if len(names) == 0 { - return nil, nil, shared.ReturnLogError("must sent a name: %s\n", names) + return nil, nil, nil, shared.ReturnLogError("must sent name for the instance") } errChan := make(chan error, len(names)) @@ -67,13 +68,13 @@ func (c Client) CreateInstances(names ...string) (ids, ips []string, err error) return } - externalIp, err := c.fetchIP(nodeID) + externalIp, privateIp, err := c.fetchIP(nodeID) if err != nil { errChan <- shared.ReturnLogError("error fetching ip: %w\n", err) return } - resChan <- response{nodeId: nodeID, externalIp: externalIp} + resChan <- response{nodeId: nodeID, externalIp: externalIp, privateIp: privateIp} }(n) } go func() { @@ -84,17 +85,18 @@ func (c Client) CreateInstances(names ...string) (ids, ips []string, err error) for e := range errChan { if e != nil { - return nil, nil, shared.ReturnLogError("error from errChan: %w\n", e) + return nil, nil, nil, shared.ReturnLogError("error from errChan: %w\n", e) } } - var nodeIps, nodeIds []string + var externalIps, privateIps, nodeIds []string for i := range resChan { nodeIds = append(nodeIds, i.nodeId) - nodeIps = append(nodeIps, i.externalIp) + externalIps = append(externalIps, i.externalIp) + privateIps = append(privateIps, i.privateIp) } - return nodeIps, nodeIds, nil + return externalIps, privateIps, nodeIds, nil } func (c Client) DeleteInstance(ip string) error { @@ -133,7 +135,7 @@ func (c Client) DeleteInstance(ip string) error { if len(node.Tags) > 0 { instanceName = *node.Tags[0].Value } - shared.LogLevel("info", fmt.Sprintf("\nTerminated instance: %s (ID: %s)", + shared.LogLevel("info", fmt.Sprintf("Terminated instance: %s (ID: %s)", instanceName, *node.InstanceId)) } } @@ -229,10 +231,10 @@ func (c Client) create(name string) (*ec2.Reservation, error) { return c.ec2.RunInstances(input) } -func (c Client) fetchIP(nodeID string) (string, error) { +func (c Client) fetchIP(nodeID string) (publicIP string, privateIP string, err error) { waitErr := c.WaitForInstanceRunning(nodeID) if waitErr != nil { - return "", shared.ReturnLogError("error waiting for instance to be in running state: %w\n", waitErr) + return "", "", shared.ReturnLogError("error waiting for instance to be running: %w\n", waitErr) } id := &ec2.DescribeInstancesInput{ @@ -240,18 +242,18 @@ func (c Client) fetchIP(nodeID string) (string, error) { } result, err := c.ec2.DescribeInstances(id) if err != nil { - return "", shared.ReturnLogError("error describing instances: %w\n", err) + return "", "", shared.ReturnLogError("error describing instances: %w\n", err) } for _, r := range result.Reservations { for _, i := range r.Instances { - if i.PublicIpAddress != nil { - return *i.PublicIpAddress, nil + if i.PublicIpAddress != nil && i.PrivateIpAddress != nil { + return *i.PublicIpAddress, *i.PrivateIpAddress, nil } } } - return "", shared.ReturnLogError("no public ip found for instance: %s\n", nodeID) + return "", "", shared.ReturnLogError("no ip found for instance: %s\n", nodeID) } func extractID(reservation *ec2.Reservation) (string, error) { diff --git a/pkg/testcase/upgradecluster.go b/pkg/testcase/upgrademanually.go similarity index 68% rename from pkg/testcase/upgradecluster.go rename to pkg/testcase/upgrademanually.go index 7fe4f740..81a4e2ac 100644 --- a/pkg/testcase/upgradecluster.go +++ b/pkg/testcase/upgrademanually.go @@ -2,66 +2,24 @@ package testcase import ( "fmt" - "os" "strings" "sync" "github.com/rancher/distros-test-framework/factory" - "github.com/rancher/distros-test-framework/pkg/assert" "github.com/rancher/distros-test-framework/pkg/customflag" "github.com/rancher/distros-test-framework/shared" . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" ) -// TestUpgradeClusterSUC upgrades cluster using the system-upgrade-controller. -func TestUpgradeClusterSUC(version string) error { - fmt.Printf("\nUpgrading cluster to: %s\n", version) - - workloadErr := shared.ManageWorkload("apply", "suc.yaml") - Expect(workloadErr).NotTo(HaveOccurred(), - "system-upgrade-controller manifest did not deploy successfully") - - getPodsSystemUpgrade := "kubectl get pods -n system-upgrade --kubeconfig=" - err := assert.CheckComponentCmdHost( - getPodsSystemUpgrade+shared.KubeConfigFile, - "system-upgrade-controller", - statusRunning, - ) - Expect(err).NotTo(HaveOccurred(), err) - - product, err := shared.Product() - Expect(err).NotTo(HaveOccurred()) - - originalFilePath := shared.BasePath() + - fmt.Sprintf("/workloads/amd64/%s-upgrade-plan.yaml", product) - newFilePath := shared.BasePath() + "/workloads/amd64/plan.yaml" - - content, err := os.ReadFile(originalFilePath) - if err != nil { - return fmt.Errorf("failed to read file: %s", err) - } - - newContent := strings.ReplaceAll(string(content), "$UPGRADEVERSION", version) - err = os.WriteFile(newFilePath, []byte(newContent), 0644) - if err != nil { - return fmt.Errorf("failed to write file: %s", err) - } - - workloadErr = shared.ManageWorkload("apply", "plan.yaml") - Expect(workloadErr).NotTo(HaveOccurred(), "failed to upgrade cluster.") - - return nil -} - // TestUpgradeClusterManually upgrades the cluster "manually" func TestUpgradeClusterManually(version string) error { - fmt.Printf("\nUpgrading cluster to: %s", version) - if version == "" { return shared.ReturnLogError("please provide a non-empty version or commit to upgrade to") } + shared.PrintClusterState() + fmt.Printf("\nUpgrading cluster to: %s\n", version) + cluster := factory.ClusterConfig(GinkgoT()) if cluster.NumServers == 0 && cluster.NumAgents == 0 { diff --git a/pkg/testcase/upgradenodereplacement.go b/pkg/testcase/upgradenodereplacement.go new file mode 100644 index 00000000..377c84cb --- /dev/null +++ b/pkg/testcase/upgradenodereplacement.go @@ -0,0 +1,558 @@ +package testcase + +import ( + "fmt" + "os" + "strings" + "sync" + "time" + + "github.com/rancher/distros-test-framework/config" + "github.com/rancher/distros-test-framework/factory" + "github.com/rancher/distros-test-framework/pkg/aws" + "github.com/rancher/distros-test-framework/shared" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +const ( + agent = "agent" + master = "master" +) + +func TestUpgradeReplaceNode(version string) { + if version == "" { + Expect(version).NotTo(BeEmpty(), "version not sent") + } + + cluster := factory.ClusterConfig(GinkgoT()) + + envErr := config.SetEnv(shared.BasePath() + fmt.Sprintf("/config/%s.tfvars", + cluster.Config.Product)) + Expect(envErr).NotTo(HaveOccurred(), "error setting env: %s", envErr) + + resourceName := os.Getenv("resource_name") + + awsDependencies, err := aws.AddNode() + Expect(err).NotTo(HaveOccurred(), "error adding aws nodes: %s", err) + + var ( + serverNames, + instanceServerIds, + newExternalServerIps, + newPrivateServerIps []string + createErr error + ) + + // create server names + for i := 0; i < len(cluster.ServerIPs); i++ { + serverNames = append(serverNames, fmt.Sprintf("%s-server-%d", resourceName, i+1)) + } + + newExternalServerIps, newPrivateServerIps, instanceServerIds, createErr = + awsDependencies.CreateInstances(serverNames...) + Expect(createErr).NotTo(HaveOccurred(), createErr) + + shared.LogLevel("info", "\ncreated server public ips: %s ids:%s\n", + newExternalServerIps, instanceServerIds) + + scpErr := scpToNewNodes(cluster.Config.Product, master, newExternalServerIps) + Expect(scpErr).NotTo(HaveOccurred(), scpErr) + + serverLeaderIp := cluster.ServerIPs[0] + token, err := shared.FetchToken(serverLeaderIp) + Expect(err).NotTo(HaveOccurred(), err) + + serverErr := replaceServers( + cluster, + awsDependencies, + resourceName, + serverLeaderIp, + token, + version, + newExternalServerIps, + newPrivateServerIps, + ) + Expect(serverErr).NotTo(HaveOccurred(), serverErr) + + shared.LogLevel("info", "Server control plane nodes replaced with ips: %s\n", newExternalServerIps) + + // replace agents only if exists + if len(cluster.AgentIPs) > 0 { + // create agent names + var agentNames []string + for i := 0; i < len(cluster.AgentIPs); i++ { + agentNames = append(agentNames, fmt.Sprintf("%s-agent-%d", resourceName, i+1)) + } + + newExternalAgentIps, newPrivateAgentIps, instanceAgentIds, createAgentErr := + awsDependencies.CreateInstances(agentNames...) + Expect(createAgentErr).NotTo(HaveOccurred(), createAgentErr) + + shared.LogLevel("info", "created worker ips: %s worker ids:%s\n", + newExternalAgentIps, instanceAgentIds) + + scpErr := scpToNewNodes(cluster.Config.Product, agent, newExternalAgentIps) + Expect(scpErr).NotTo(HaveOccurred(), scpErr) + + shared.LogLevel("info", "scp files to new worker nodes done\n") + + agentErr := replaceAgents(cluster, awsDependencies, serverLeaderIp, token, version, newExternalAgentIps, + newPrivateAgentIps, + ) + Expect(agentErr).NotTo(HaveOccurred(), "error replacing agents: %s", agentErr) + + shared.LogLevel("info", "Agent nodes replaced with ips: %s\n", newExternalAgentIps) + } + + // delete the last remaining server = leader + delErr := deleteServer(serverLeaderIp, awsDependencies) + Expect(delErr).NotTo(HaveOccurred(), delErr) + + shared.LogLevel("info", "Last Server deleted ip: %s\n", serverLeaderIp) +} + +func scpToNewNodes(product, nodeType string, newNodeIps []string) error { + if newNodeIps == nil { + return shared.ReturnLogError("newServerIps should send at least one ip\n") + } + + if product != "k3s" && product != "rke2" { + return shared.ReturnLogError("unsupported product: %s\n", product) + } + + chanErr := make(chan error, len(newNodeIps)) + var wg sync.WaitGroup + + for _, ip := range newNodeIps { + wg.Add(1) + go func(ip string) { + defer wg.Done() + var err error + if product == "k3s" { + err = scpK3sFiles(product, nodeType, ip) + } else { + err = scpRke2Files(product, nodeType, ip) + } + if err != nil { + chanErr <- shared.ReturnLogError("error scp files to new nodes: %w\n", err) + close(chanErr) + } + }(ip) + } + + wg.Wait() + close(chanErr) + + return nil +} + +func scpRke2Files(product, nodeType, ip string) error { + joinLocalPath := shared.BasePath() + fmt.Sprintf("/modules/install/join_rke2_%s.sh", nodeType) + joinRemotePath := fmt.Sprintf("/tmp/join_rke2_%s.sh", nodeType) + + if err := shared.RunScp(ip, product, []string{joinLocalPath}, []string{joinRemotePath}); err != nil { + return shared.ReturnLogError("error running scp: %w with ip: %s", err, ip) + } + + return nil +} + +func scpK3sFiles(product, nodeType, ip string) error { + if nodeType == agent { + cisWorkerLocalPath := shared.BasePath() + "/modules/k3s/worker/cis_worker_config.yaml" + cisWorkerRemotePath := "/tmp/cis_worker_config.yaml" + + joinLocalPath := shared.BasePath() + fmt.Sprintf("/modules/install/join_k3s_%s.sh", agent) + joinRemotePath := fmt.Sprintf("/tmp/join_k3s_%s.sh", agent) + + if err := shared.RunScp( + ip, + product, + []string{cisWorkerLocalPath, joinLocalPath}, + []string{cisWorkerRemotePath, joinRemotePath}, + ); err != nil { + return err + } + } else { + cisMasterLocalPath := shared.BasePath() + "/modules/k3s/master/cis_master_config.yaml" + cisMasterRemotePath := "/tmp/cis_master_config.yaml" + + clusterLevelpssLocalPath := shared.BasePath() + "/modules/k3s/master/cluster-level-pss.yaml" + clusterLevelpssRemotePath := "/tmp/cluster-level-pss.yaml" + + auditLocalPath := shared.BasePath() + "/modules/k3s/master/audit.yaml" + auditRemotePath := "/tmp/audit.yaml" + + policyLocalPath := shared.BasePath() + "/modules/k3s/master/policy.yaml" + policyRemotePath := "/tmp/policy.yaml" + + ingressPolicyLocalPath := shared.BasePath() + "/modules/k3s/master/ingresspolicy.yaml" + ingressPolicyRemotePath := "/tmp/ingresspolicy.yaml" + + joinLocalPath := shared.BasePath() + fmt.Sprintf("/modules/install/join_k3s_%s.sh", master) + joinRemotePath := fmt.Sprintf("/tmp/join_k3s_%s.sh", master) + + if err := shared.RunScp( + ip, + product, + []string{ + cisMasterLocalPath, + clusterLevelpssLocalPath, + auditLocalPath, + policyLocalPath, + ingressPolicyLocalPath, + joinLocalPath, + }, + []string{ + cisMasterRemotePath, + clusterLevelpssRemotePath, + auditRemotePath, + policyRemotePath, + ingressPolicyRemotePath, + joinRemotePath, + }); err != nil { + return err + } + } + + return nil +} + +func replaceServers( + c *factory.Cluster, + a *aws.Client, + resourceName, serverLeaderIp, token, version string, + newExternalServerIps, newPrivateServerIps []string, +) error { + if token == "" { + return shared.ReturnLogError("token not sent\n") + } + + if len(newExternalServerIps) == 0 || len(newPrivateServerIps) == 0 { + return shared.ReturnLogError("externalIps or privateIps empty\n") + } + + // join the first new server + newFirstServerIP := newExternalServerIps[0] + err := serverJoin(c.Config.Product, serverLeaderIp, token, version, newFirstServerIP, newPrivateServerIps[0]) + if err != nil { + shared.LogLevel("error", "error joining first server: %w\n", err) + + return err + } + shared.LogLevel("info", "Proceeding to update config file after first server join %s\n", newFirstServerIP) + + // delete first the server that is not the leader neither the server ip in the kubeconfig + oldServerIPs := c.ServerIPs + if delErr := deleteServer(oldServerIPs[len(oldServerIPs)-2], a); delErr != nil { + shared.LogLevel("error", "error deleting server: %w\n", delErr) + + return delErr + } + + // update the kubeconfig file to point to the new added server + if kbCfgErr := shared.UpdateKubeConfig(newFirstServerIP, resourceName, c.Config.Product); kbCfgErr != nil { + return shared.ReturnLogError("error updating kubeconfig: %w with ip: %s", kbCfgErr, newFirstServerIP) + } + + nodeErr := validateNodeJoin(newFirstServerIP) + if nodeErr != nil { + shared.LogLevel("error", "error validating node join: %w with ip: %s", nodeErr, newFirstServerIP) + + return nodeErr + } + + // join the rest of the servers and delete all except the leader + for i := 1; i <= len(newExternalServerIps[1:]); i++ { + privateIp := newPrivateServerIps[i] + externalIp := newExternalServerIps[i] + + if i < len(oldServerIPs[1:]) { + if delErr := deleteServer(oldServerIPs[len(oldServerIPs)-1], a); delErr != nil { + shared.LogLevel("error", "error deleting server: %w\n for ip: %s", delErr, oldServerIPs[i]) + + return delErr + } + } + + if joinErr := serverJoin(c.Config.Product, serverLeaderIp, token, version, externalIp, privateIp); joinErr != nil { + shared.LogLevel("error", "error joining server: %w with ip: %s\n", joinErr, externalIp) + + return joinErr + } + + nodeErr = validateNodeJoin(externalIp) + if nodeErr != nil { + shared.LogLevel("error", "error validating node join: %w with ip: %s", nodeErr, externalIp) + + return nodeErr + } + } + + return nil +} + +func validateNodeJoin(ip string) error { + node, err := shared.GetNodeNameByIP(ip) + if err != nil { + return shared.ReturnLogError("error getting node name by ip:%s %w\n", ip, err) + } + if node == "" { + return shared.ReturnLogError("node not found\n") + } + node = strings.TrimSpace(node) + + shared.LogLevel("info", "Node joined: %s with ip: %s", node, ip) + + return nil +} + +func serverJoin(product, serverLeaderIP, token, version, newExternalIP, newPrivateIP string) error { + joinCmd, parseErr := buildJoinCmd(product, master, serverLeaderIP, token, version, newExternalIP, newPrivateIP) + if parseErr != nil { + return shared.ReturnLogError("error parsing join commands: %w\n", parseErr) + } + + if joinErr := joinNode(joinCmd, newExternalIP); joinErr != nil { + return shared.ReturnLogError("error joining node: %w\n", joinErr) + } + + return nil +} + +func deleteServer(ip string, a *aws.Client) error { + if ip == "" { + return shared.ReturnLogError("ip not sent\n") + } + + if delNodeErr := shared.DeleteNode(ip); delNodeErr != nil { + shared.LogLevel("error", "error deleting server: %w\n", delNodeErr) + + return delNodeErr + } + shared.LogLevel("info", "Node IP deleted from the cluster: %s\n", ip) + + err := a.DeleteInstance(ip) + if err != nil { + return err + } + + return nil +} + +func replaceAgents( + cluster *factory.Cluster, + a *aws.Client, + serverLeaderIp, token, version string, + newExternalAgentIps, newPrivateAgentIps []string, +) error { + if token == "" { + return shared.ReturnLogError("token not sent\n") + } + + if len(newExternalAgentIps) == 0 || len(newPrivateAgentIps) == 0 { + return shared.ReturnLogError("externalIps or privateIps empty\n") + } + + if err := deleteAgents(a, cluster); err != nil { + shared.LogLevel("error", "error deleting agent: %w\n", err) + + return err + } + + for i, externalIp := range newExternalAgentIps { + privateIp := newPrivateAgentIps[i] + + joinErr := joinAgent(cluster.Config.Product, serverLeaderIp, token, version, externalIp, privateIp) + if joinErr != nil { + shared.LogLevel("error", "error joining agent: %w\n", joinErr) + + return joinErr + } + } + + return nil +} + +func deleteAgents(a *aws.Client, c *factory.Cluster) error { + for _, i := range c.AgentIPs { + if deleteNodeErr := shared.DeleteNode(i); deleteNodeErr != nil { + shared.LogLevel("error", "error deleting agent: %w\n", deleteNodeErr) + + return deleteNodeErr + } + shared.LogLevel("info", "Node IP deleted from the cluster: %s\n", i) + + err := a.DeleteInstance(i) + if err != nil { + return err + } + shared.LogLevel("info", "Instance IP deleted from cloud provider: %s\n", i) + } + + return nil +} + +func joinAgent(product, serverIp, token, version, selfExternalIp, selfPrivateIp string) error { + cmd, parseErr := buildJoinCmd(product, agent, serverIp, token, version, selfExternalIp, selfPrivateIp) + if parseErr != nil { + return shared.ReturnLogError("error parsing join commands: %w\n", parseErr) + } + + if joinErr := joinNode(cmd, selfExternalIp); joinErr != nil { + return shared.ReturnLogError("error joining node: %w\n", joinErr) + } + + return nil +} + +func joinNode(cmd, ip string) error { + if cmd == "" { + return shared.ReturnLogError("cmd not sent\n") + } + if ip == "" { + return shared.ReturnLogError("server IP not sent\n") + } + + res, err := shared.RunCommandOnNode(cmd, ip) + if err != nil { + return shared.ReturnLogError("error joining node: %w\n", err) + } + res = strings.TrimSpace(res) + if strings.Contains(res, "service failed") { + shared.LogLevel("error", "join node response: %s\n", res) + + return shared.ReturnLogError("error joining node: %s\n", res) + } + + delay := time.After(40 * time.Second) + // delay not meant to wait if node is joined, but rather to give time for all join process to complete under the hood + <-delay + + return nil +} + +func buildJoinCmd(product, nodetype, serverIp, token, version, selfExternalIp, selfPrivateIp string) (string, error) { + if nodetype != master && nodetype != agent { + return "", shared.ReturnLogError("unsupported nodetype: %s\n", nodetype) + } + + var flags string + var installMode string + if nodetype == master { + flags = fmt.Sprintf("'%s'", os.Getenv("server_flags")) + } else { + flags = fmt.Sprintf("'%s'", os.Getenv("worker_flags")) + } + + if strings.HasPrefix(version, "v") { + installMode = fmt.Sprintf("INSTALL_%s_VERSION", strings.ToUpper(product)) + } else { + installMode = fmt.Sprintf("INSTALL_%s_COMMIT", strings.ToUpper(product)) + } + + switch product { + case "k3s": + return buildK3sCmd(nodetype, serverIp, token, version, selfExternalIp, selfPrivateIp, installMode, flags) + case "rke2": + return buildRke2Cmd(nodetype, serverIp, token, version, selfExternalIp, selfPrivateIp, installMode, flags) + default: + return "", shared.ReturnLogError("unsupported product: %s\n", product) + } +} + +func buildK3sCmd(nodetype, serverIp, token, version, selfExternalIp, selfPrivateIp, instalMode, flags string, +) (string, error) { + var cmd string + ipv6 := "" + if nodetype == agent { + cmd = fmt.Sprintf( + "sudo /tmp/join_k3s_%s.sh '%s' '%s' '%s' '%s' '%s' '%s' '%s' '%s' '%s' %s '%s' '%s'", + nodetype, + os.Getenv("node_os"), + serverIp, + token, + selfExternalIp, + selfPrivateIp, + ipv6, + instalMode, + version, + os.Getenv("k3s_channel"), + flags, + os.Getenv("username"), + os.Getenv("password"), + ) + } else { + datastoreEndpoint := "" + cmd = fmt.Sprintf( + "sudo /tmp/join_k3s_%s.sh '%s' '%s' '%s' '%s' '%s' '%s' '%s' '%s' '%s' '%s' '%s' '%s' %s '%s' '%s'", + nodetype, + os.Getenv("node_os"), + serverIp, + serverIp, + token, + selfExternalIp, + selfPrivateIp, + ipv6, + instalMode, + version, + os.Getenv("k3s_channel"), + os.Getenv("datastore_type"), + datastoreEndpoint, + flags, + os.Getenv("username"), + os.Getenv("password"), + ) + } + + return cmd, nil +} + +func buildRke2Cmd(nodetype, serverIp, token, version, selfExternalIp, selfPrivateIp, instalMode, flags string, +) (string, error) { + installMethod := os.Getenv("install_method") + var cmd string + ipv6 := "" + if nodetype == agent { + cmd = fmt.Sprintf( + "sudo /tmp/join_rke2_%s.sh '%s' '%s' '%s' '%s' '%s' '%s' '%s' '%s' '%s' '%s' %s '%s' '%s'", + nodetype, + os.Getenv("node_os"), + serverIp, + token, + selfExternalIp, + selfPrivateIp, + ipv6, + instalMode, + version, + os.Getenv("rke2_channel"), + installMethod, + flags, + os.Getenv("username"), + os.Getenv("password"), + ) + } else { + cmd = fmt.Sprintf( + "sudo /tmp/join_rke2_%s.sh '%s' '%s' '%s' '%s' '%s' '%s' '%s' '%s' '%s' '%s' '%s' %s '%s' '%s'", + nodetype, + os.Getenv("node_os"), + serverIp, + serverIp, + token, + selfExternalIp, + selfPrivateIp, + ipv6, + instalMode, + version, + os.Getenv("rke2_channel"), + installMethod, + flags, + os.Getenv("username"), + os.Getenv("password"), + ) + } + + return cmd, nil +} diff --git a/pkg/testcase/upgradesuc.go b/pkg/testcase/upgradesuc.go new file mode 100644 index 00000000..a660a234 --- /dev/null +++ b/pkg/testcase/upgradesuc.go @@ -0,0 +1,56 @@ +package testcase + +import ( + "fmt" + "os" + "strings" + + "github.com/rancher/distros-test-framework/pkg/assert" + "github.com/rancher/distros-test-framework/shared" + + . "github.com/onsi/gomega" +) + +// TestUpgradeClusterSUC upgrades cluster using the system-upgrade-controller. +func TestUpgradeClusterSUC(version string) error { + if version == "" { + return shared.ReturnLogError("please provide a non-empty version or commit to upgrade to") + } + shared.PrintClusterState() + fmt.Printf("\nUpgrading cluster to: %s\n", version) + + workloadErr := shared.ManageWorkload("apply", "suc.yaml") + Expect(workloadErr).NotTo(HaveOccurred(), + "system-upgrade-controller manifest did not deploy successfully") + + getPodsSystemUpgrade := "kubectl get pods -n system-upgrade --kubeconfig=" + err := assert.CheckComponentCmdHost( + getPodsSystemUpgrade+shared.KubeConfigFile, + "system-upgrade-controller", + statusRunning, + ) + Expect(err).NotTo(HaveOccurred(), err) + + product, err := shared.Product() + Expect(err).NotTo(HaveOccurred()) + + originalFilePath := shared.BasePath() + + fmt.Sprintf("/workloads/amd64/%s-upgrade-plan.yaml", product) + newFilePath := shared.BasePath() + "/workloads/amd64/plan.yaml" + + content, err := os.ReadFile(originalFilePath) + if err != nil { + return fmt.Errorf("failed to read file: %s", err) + } + + newContent := strings.ReplaceAll(string(content), "$UPGRADEVERSION", version) + err = os.WriteFile(newFilePath, []byte(newContent), 0644) + if err != nil { + return fmt.Errorf("failed to write file: %s", err) + } + + workloadErr = shared.ManageWorkload("apply", "plan.yaml") + Expect(workloadErr).NotTo(HaveOccurred(), "failed to upgrade cluster.") + + return nil +} diff --git a/scripts/test_runner.sh b/scripts/test_runner.sh index 6d70cb61..604ce1a2 100755 --- a/scripts/test_runner.sh +++ b/scripts/test_runner.sh @@ -47,6 +47,8 @@ if [ -n "${TEST_DIR}" ]; then go test -timeout=65m -v -tags=upgrademanual -count=1 ./entrypoint/upgradecluster/... -installVersionOrCommit "${INSTALL_VERSION_OR_COMMIT}" -channel "${CHANNEL}" elif [ "${TEST_TAG}" = "upgradesuc" ]; then go test -timeout=65m -v -tags=upgradesuc -count=1 ./entrypoint/upgradecluster/... -sucUpgradeVersion "${SUC_UPGRADE_VERSION}" -channel "${CHANNEL}" + elif [ "${TEST_TAG}" = "upgradereplacement" ]; then + go test -timeout=120m -v -tags=upgradereplacement -count=1 ./entrypoint/upgradecluster/... -installVersionOrCommit "${INSTALL_VERSION_OR_COMMIT}" fi elif [ "${TEST_DIR}" = "versionbump" ]; then declare -a OPTS diff --git a/shared/aux.go b/shared/aux.go index e63a24ca..ee7efa44 100644 --- a/shared/aux.go +++ b/shared/aux.go @@ -46,13 +46,14 @@ func RunCommandOnNode(cmd, ip string) (string, error) { if cmd == "" { return "", ReturnLogError("cmd should not be empty") } - LogLevel("debug", fmt.Sprintf("Execute: %s on %s", cmd, ip)) + LogLevel("debug", "Execute: %s on %s", cmd, ip) host := ip + ":22" conn, err := configureSSH(host) if err != nil { return "", ReturnLogError("failed to configure SSH: %v\n", err) } + stdout, stderr, err := runsshCommand(cmd, conn) if err != nil && !strings.Contains(stderr, "restart") { return "", fmt.Errorf( @@ -69,14 +70,12 @@ func RunCommandOnNode(cmd, ip string) (string, error) { cleanedStderr := strings.ReplaceAll(stderr, "\n", "") cleanedStderr = strings.ReplaceAll(cleanedStderr, "\t", "") - if cleanedStderr != "" && (!strings.Contains(stderr, "exited") || - !strings.Contains(cleanedStderr, "1") || + if cleanedStderr != "" && (!strings.Contains(stderr, "exited") || !strings.Contains(cleanedStderr, "1") || !strings.Contains(cleanedStderr, "2")) { return cleanedStderr, nil } else if cleanedStderr != "" { return "", fmt.Errorf("command: %s failed with error: %v\n", cmd, stderr) } - LogLevel("debug", fmt.Sprintf("StdOut: %s", stdout)) return stdout, err } @@ -135,6 +134,56 @@ func CountOfStringInSlice(str string, pods []Pod) int { return count } +// RunScp copies files from local to remote host based on a list of local and remote paths. +func RunScp(ip, product string, localPaths, remotePaths []string) error { + if ip == "" { + return ReturnLogError("ip is needed.\n") + } + + if product != "rke2" && product != "k3s" { + return ReturnLogError("unsupported product: %s\n", product) + } + + if len(localPaths) != len(remotePaths) { + return ReturnLogError("the number of local paths and remote paths must be the same\n") + } + + if err := config.SetEnv(BasePath() + fmt.Sprintf("/config/%s.tfvars", product)); err != nil { + return err + } + + for i, localPath := range localPaths { + remotePath := remotePaths[i] + scp := fmt.Sprintf( + "scp -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -i %s %s %s@%s:%s", + AccessKey, + localPath, + AwsUser, + ip, + remotePath, + ) + + res, cmdErr := RunCommandHost(scp) + if res != "" { + LogLevel("warn", "scp output: %s\n", res) + } + if cmdErr != nil { + LogLevel("error", "failed to run scp: %v\n", cmdErr) + return cmdErr + } + + chmod := fmt.Sprintf("sudo chmod +wx %s", remotePath) + _, cmdErr = RunCommandOnNode(chmod, ip) + if cmdErr != nil { + LogLevel("error", "failed to run chmod: %v\n", cmdErr) + return cmdErr + } + } + LogLevel("info", "Files copied and chmod successfully\n") + + return nil +} + // AddHelmRepo adds a helm repo to the cluster. func AddHelmRepo(name, url string) (string, error) { addRepo := fmt.Sprintf("helm repo add %s %s", name, url) diff --git a/shared/cluster.go b/shared/cluster.go index 166ee37a..8b6759d9 100644 --- a/shared/cluster.go +++ b/shared/cluster.go @@ -13,10 +13,9 @@ import ( ) var ( - KubeConfigFile string - AwsUser string - AccessKey string - Arch string + AwsUser string + AccessKey string + Arch string ) type Node struct { @@ -169,12 +168,7 @@ func KubectlCommand(destination, action, source string, args ...string) (string, return "", ReturnLogError("error setting env: %w\n", envErr) } - resourceName := os.Getenv("resource_name") - serverIP, _, err := kubeCfgServerIP(resourceName) - if err != nil { - return "", ReturnLogError("failed to extract server IP: %w", err) - } var cmd string switch destination { @@ -182,9 +176,14 @@ func KubectlCommand(destination, action, source string, args ...string) (string, cmd = cmdPrefix + " " + source + " " + strings.Join(args, " ") + kubeconfigFlag return kubectlCmdOnHost(cmd) case "node": + serverIP, _, err := ExtractServerIP(resourceName) + if err != nil { + return "", ReturnLogError("failed to extract server IP: %w", err) + } kubeconfigFlagRemotePath := fmt.Sprintf("/etc/rancher/%s/%s.yaml", product, product) kubeconfigFlagRemote := " --kubeconfig=" + kubeconfigFlagRemotePath cmd = cmdPrefix + " " + source + " " + strings.Join(args, " ") + kubeconfigFlagRemote + return kubectlCmdOnNode(cmd, serverIP) default: return "", ReturnLogError("invalid destination: %s", destination) @@ -509,31 +508,6 @@ func WriteDataPod(namespace string) (string, error) { return RunCommandHost(cmd) } -// kubeCfgServerIP extracts the server IP from the kubeconfig file. -func kubeCfgServerIP(resourceName string) (kubeConfigIP, kubeCfg string, err error) { - if resourceName == "" { - return "", "", ReturnLogError("resource name not sent\n") - } - - localPath := fmt.Sprintf("/tmp/%s_kubeconfig", resourceName) - kubeconfigContent, err := os.ReadFile(localPath) - if err != nil { - return "", "", ReturnLogError("failed to read kubeconfig file: %w\n", err) - } - // get server ip value from `server:` key - serverIP := strings.Split(string(kubeconfigContent), "server: ")[1] - // removing newline - serverIP = strings.Split(serverIP, "\n")[0] - // removing the https:// - serverIP = strings.Join(strings.Split(serverIP, "https://")[1:], "") - // removing the port - serverIP = strings.Split(serverIP, ":")[0] - - LogLevel("info", "Extracted from local kube config file server ip: %s", serverIP) - - return serverIP, string(kubeconfigContent), nil -} - // GetNodeArgsMap returns list of nodeArgs map func GetNodeArgsMap(nodeType string) (map[string]string, error) { product, err := Product() @@ -587,3 +561,75 @@ func processNodeArgs(nodeArgs string) (nodeArgsMapSlice []map[string]string) { return nodeArgsMapSlice } + +// DeleteNode deletes a node from the cluster filtering the name out by the IP. +func DeleteNode(ip string) error { + if ip == "" { + return ReturnLogError("must send a ip: %s\n", ip) + } + + name, err := GetNodeNameByIP(ip) + if err != nil { + return ReturnLogError("failed to get node name by ip: %w\n", err) + } + + res, delErr := RunCommandHost("kubectl delete node " + name + " --wait=false --kubeconfig=" + KubeConfigFile) + if delErr != nil { + return ReturnLogError("failed to delete node: %w\n", delErr) + } + LogLevel("info", "Deleting node: %s", res) + + // delay not meant to wait if node is deleted + // but rather to give time for the node to be removed from the cluster + delay := time.After(20 * time.Second) + <-delay + + return nil +} + +// GetNodeNameByIP returns the node name by the given IP. +func GetNodeNameByIP(ip string) (string, error) { + ticker := time.NewTicker(3 * time.Second) + timeout := time.After(45 * time.Second) + defer ticker.Stop() + + cmd := "kubectl get nodes -o custom-columns=NAME:.metadata.name,INTERNAL-IP:.status.addresses[*].address --kubeconfig=" + + KubeConfigFile + " | grep " + ip + " | awk '{print $1}'" + + for { + select { + case <-timeout: + return "", ReturnLogError("kubectl get nodes timed out for cmd: %s\n", cmd) + case <-ticker.C: + i := 0 + nodeName, err := RunCommandHost(cmd) + if err != nil { + i++ + LogLevel("warn", "error from RunCommandHost: %v\nwith res: %s Retrying...", err, nodeName) + if i > 5 { + return "", ReturnLogError("kubectl get nodes returned error: %w\n", err) + } + continue + } + if nodeName == "" { + continue + } + + name := strings.TrimSpace(nodeName) + LogLevel("info", "Node name: %s\n", name) + + return name, nil + } + } +} + +func FetchToken(ip string) (string, error) { + token, err := RunCommandOnNode("sudo cat /tmp/nodetoken", ip) + if err != nil { + return "", ReturnLogError("failed to fetch token: %w\n", err) + } + + LogLevel("info", "token successfully retrieved") + + return token, nil +} diff --git a/shared/kubeconfig.go b/shared/kubeconfig.go new file mode 100644 index 00000000..21b416a3 --- /dev/null +++ b/shared/kubeconfig.go @@ -0,0 +1,74 @@ +package shared + +import ( + "fmt" + "os" + "strings" +) + +var KubeConfigFile string + +func UpdateKubeConfig(newLeaderIP, resourceName, product string) error { + if resourceName == "" { + return ReturnLogError("resourceName not sent\n") + } + + err := updateKubeConfigLocal(newLeaderIP, resourceName, product) + if err != nil { + return ReturnLogError("error creating new kubeconfig file: %w\n", err) + } + + LogLevel("info", "kubeconfig files updated\n") + + return nil +} + +// ExtractServerIP extracts the server IP from the kubeconfig file. +func ExtractServerIP(resourceName string) (kubeConfigIP, kubeCfg string, err error) { + if resourceName == "" { + return "", "", ReturnLogError("resource name not sent\n") + } + + localPath := fmt.Sprintf("/tmp/%s_kubeconfig", resourceName) + kubeconfigContent, err := os.ReadFile(localPath) + if err != nil { + return "", "", ReturnLogError("failed to read kubeconfig file: %w\n", err) + } + // get server ip value from `server:` key + serverIP := strings.Split(string(kubeconfigContent), "server: ")[1] + // removing newline + serverIP = strings.Split(serverIP, "\n")[0] + // removing the https:// + serverIP = strings.Join(strings.Split(serverIP, "https://")[1:], "") + // removing the port + serverIP = strings.Split(serverIP, ":")[0] + + LogLevel("info", "Extracted from local kube config file server ip: %s", serverIP) + + return serverIP, string(kubeconfigContent), nil +} + +// updateKubeConfigLocal changes the server ip in the local kubeconfig file. +func updateKubeConfigLocal(newServerIP, resourceName, product string) error { + if newServerIP == "" { + return ReturnLogError("ip not sent.\n") + } + if product == "" { + return ReturnLogError("product not sent.\n") + } + oldServerIP, kubeconfigContent, err := ExtractServerIP(resourceName) + if err != nil { + return ReturnLogError("error extracting server ip: %w\n", err) + } + + path := fmt.Sprintf("/tmp/%s_kubeconfig", resourceName) + updatedKubeConfig := strings.ReplaceAll(kubeconfigContent, oldServerIP, newServerIP) + writeErr := os.WriteFile(path, []byte(updatedKubeConfig), 0644) + if writeErr != nil { + return ReturnLogError("failed to write updated kubeconfig file: %w\n", writeErr) + } + + LogLevel("info", "Updated local kubeconfig with ip: %s", newServerIP) + + return nil +}