diff --git a/cmd/yurt-lb-agent/app/core.go b/cmd/yurt-lb-agent/app/core.go new file mode 100644 index 00000000000..989b579fc78 --- /dev/null +++ b/cmd/yurt-lb-agent/app/core.go @@ -0,0 +1,173 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + "context" + "fmt" + "os" + + "github.com/spf13/cobra" + "github.com/spf13/pflag" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/klog/v2" + "k8s.io/klog/v2/klogr" + "k8s.io/kubernetes/pkg/util/sysctl" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/healthz" + + "github.com/openyurtio/openyurt/cmd/yurt-lb-agent/app/options" + "github.com/openyurtio/openyurt/pkg/apis" + "github.com/openyurtio/openyurt/pkg/yurtlbagent/controllers" + "github.com/openyurtio/openyurt/pkg/yurtlbagent/util" +) + +var ( + scheme = runtime.NewScheme() + setupLog = ctrl.Log.WithName("setup") +) + +func init() { + _ = clientgoscheme.AddToScheme(scheme) + + _ = apis.AddToScheme(clientgoscheme.Scheme) + _ = apis.AddToScheme(scheme) + + // +kubebuilder:scaffold:scheme +} + +func NewCmdYurtLBAgent(stopCh <-chan struct{}) *cobra.Command { + yurtLBAgentOptions := options.NewYurtLBAgentOptions() + cmd := &cobra.Command{ + Use: "yurt-lb-agent", + Short: "Launch yurt-lb-agent", + Long: "Launch yurt-lb-agent", + Run: func(cmd *cobra.Command, args []string) { + cmd.Flags().VisitAll(func(flag *pflag.Flag) { + klog.V(1).Infof("FLAG: --%s=%q", flag.Name, flag.Value) + }) + if err := options.ValidateOptions(yurtLBAgentOptions); err != nil { + klog.Fatalf("validate options: %v", err) + } + Run(yurtLBAgentOptions, stopCh) + }, + } + + yurtLBAgentOptions.AddFlags(cmd.Flags()) + return cmd +} + +func Run(opts *options.YurtLBAgentOptions, stopCh <-chan struct{}) { + ctrl.SetLogger(klogr.New()) + cfg := ctrl.GetConfigOrDie() + + mgr, err := ctrl.NewManager(cfg, ctrl.Options{ + Scheme: scheme, + MetricsBindAddress: opts.MetricsAddr, + HealthProbeBindAddress: opts.ProbeAddr, + LeaderElection: opts.EnableLeaderElection, + LeaderElectionID: "yurt-lb-agent", + Namespace: opts.Namespace, + }) + if err != nil { + setupLog.Error(err, "unable to start manager") + os.Exit(1) + } + + // perform preflight check + setupLog.Info("[preflight] Running pre-flight checks") + if err := preflightCheck(mgr, opts); err != nil { + setupLog.Error(err, "could not run pre-flight checks") + os.Exit(1) + } + setupLog.Info("yurtlb options: ", fmt.Sprintf("%v", opts)) + + // get nodepool where yurt-lb-agent run + if opts.Nodepool == "" { + opts.Nodepool, err = util.GetNodePool(mgr.GetConfig(), opts.Node) + if err != nil { + setupLog.Error(err, "could not get the nodepool where yurt-lb-agent run") + os.Exit(1) + } + } + setupLog.Info("yurtlb nodepool ", fmt.Sprintf("%v", opts.Nodepool)) + + // get interface + if opts.Iface == "" { + opts.Iface, err = util.GetNodeInterface() + if err != nil { + setupLog.Error(err, "could not get valid interface") + os.Exit(1) + } + } else { + err = util.ValidInterface(opts.Iface) + if err != nil { + setupLog.Error(err, "interface is not valid") + os.Exit(1) + } + } + // change the required network setting in /proc + sys := sysctl.New() + err = sys.SetSysctl("net/ipv4/ip_nonlocal_bind", 1) + if err != nil { + setupLog.Error(err, "unable to change network setting", "controller", "Yurt-lb-agent") + os.Exit(1) + } + + // setup the PoolService Reconciler + if err = (&controllers.PoolServiceReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr, opts); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "Yurt-lb-agent") + os.Exit(1) + } + //+kubebuilder:scaffold:builder + + if err := mgr.AddHealthzCheck("health", healthz.Ping); err != nil { + setupLog.Error(err, "unable to set up health check") + os.Exit(1) + } + if err := mgr.AddReadyzCheck("check", healthz.Ping); err != nil { + setupLog.Error(err, "unable to set up ready check") + os.Exit(1) + } + + setupLog.Info("[run controllers] Starting manager, acting on " + fmt.Sprintf("[Nodepool: %s, Namespace: %s]", opts.Nodepool, opts.Namespace)) + if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { + setupLog.Error(err, "could not running manager") + os.Exit(1) + } +} + +func preflightCheck(mgr ctrl.Manager, opts *options.YurtLBAgentOptions) error { + client, err := kubernetes.NewForConfig(mgr.GetConfig()) + if err != nil { + return err + } + if _, err := client.CoreV1().Namespaces().Get(context.TODO(), opts.Namespace, metav1.GetOptions{}); err != nil { + return err + } + if _, err := client.CoreV1().Nodes().Get(context.TODO(), opts.Node, metav1.GetOptions{}); err != nil { + return fmt.Errorf("failed to get node %v", opts.Node) + } + + return nil +} diff --git a/cmd/yurt-lb-agent/app/options/options.go b/cmd/yurt-lb-agent/app/options/options.go new file mode 100644 index 00000000000..52032284734 --- /dev/null +++ b/cmd/yurt-lb-agent/app/options/options.go @@ -0,0 +1,65 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package options + +import ( + "fmt" + + "github.com/spf13/pflag" +) + +// YurtIoTDockOptions is the main settings for the yurt-iot-dock +type YurtLBAgentOptions struct { + MetricsAddr string + ProbeAddr string + EnableLeaderElection bool + Namespace string + Version string + Nodepool string + Iface string + Node string +} + +func NewYurtLBAgentOptions() *YurtLBAgentOptions { + return &YurtLBAgentOptions{ + MetricsAddr: ":8080", + ProbeAddr: ":8080", + EnableLeaderElection: false, + Namespace: "default", + Version: "", + Nodepool: "", + Node: "", + } +} + +func ValidateOptions(options *YurtLBAgentOptions) error { + if options.Node == "" { + return fmt.Errorf("node name is required") + } + return nil +} + +func (o *YurtLBAgentOptions) AddFlags(fs *pflag.FlagSet) { + fs.StringVar(&o.MetricsAddr, "metrics-bind-address", o.MetricsAddr, "The address the metric endpoint binds to.") + fs.StringVar(&o.ProbeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") + fs.BoolVar(&o.EnableLeaderElection, "leader-elect", false, "Enable leader election for controller manager. "+"Enabling this will ensure there is only one active controller manager.") + fs.StringVar(&o.Namespace, "namespace", "", "The namespace YurtLB Agent is deployed in.(just for debugging)") + fs.StringVar(&o.Version, "version", "", "The version of edge resources deploymenet.") + fs.StringVar(&o.Nodepool, "nodepool", "", "The nodePool YurtLB Agent is deployed in.(just for debugging)") + fs.StringVar(&o.Iface, "iface", "", "The interface keepalived used") + fs.StringVar(&o.Node, "node", "", "The node YurtLB Agent is deployed in.") +} diff --git a/cmd/yurt-lb-agent/yurt-lb-agent.go b/cmd/yurt-lb-agent/yurt-lb-agent.go new file mode 100644 index 00000000000..3270604f718 --- /dev/null +++ b/cmd/yurt-lb-agent/yurt-lb-agent.go @@ -0,0 +1,43 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "flag" + "math/rand" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) + // to ensure that exec-entrypoint and run can make use of them. + _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + "k8s.io/klog/v2" + + "github.com/openyurtio/openyurt/cmd/yurt-lb-agent/app" +) + +func main() { + rand.Seed(time.Now().UnixNano()) + klog.InitFlags(nil) + defer klog.Flush() + + cmd := app.NewCmdYurtLBAgent(wait.NeverStop) + cmd.Flags().AddGoFlagSet(flag.CommandLine) + if err := cmd.Execute(); err != nil { + panic(err) + } +} diff --git a/hack/dockerfiles/build/Dockerfile.yurt-lb-agent b/hack/dockerfiles/build/Dockerfile.yurt-lb-agent new file mode 100644 index 00000000000..383d8480134 --- /dev/null +++ b/hack/dockerfiles/build/Dockerfile.yurt-lb-agent @@ -0,0 +1,8 @@ +# multi-arch image building for yurt-lb-agent + +FROM --platform=${TARGETPLATFORM} alpine:3.17 +ARG TARGETOS TARGETARCH MIRROR_REPO +RUN if [ ! -z "${MIRROR_REPO+x}" ]; then sed -i "s/dl-cdn.alpinelinux.org/${MIRROR_REPO}/g" /etc/apk/repositories; fi && \ + apk add ca-certificates bash libc6-compat iptables ip6tables busybox-binsh keepalived-common libcrypto3 libgcc libnl3 libssl3 musl keepalived && update-ca-certificates && rm /var/cache/apk/* +COPY ./_output/local/bin/${TARGETOS}/${TARGETARCH}/yurt-lb-agent /usr/local/bin/yurt-lb-agent +ENTRYPOINT ["/usr/local/bin/yurt-lb-agent"] \ No newline at end of file diff --git a/hack/make-rules/build.sh b/hack/make-rules/build.sh index 5e28026b658..b7a1a274978 100755 --- a/hack/make-rules/build.sh +++ b/hack/make-rules/build.sh @@ -26,6 +26,7 @@ readonly YURT_ALL_TARGETS=( yurthub yurt-manager yurt-iot-dock + yurt-lb-agent ) # clean old binaries at GOOS and GOARCH diff --git a/pkg/yurtlbagent/controllers/service_controller.go b/pkg/yurtlbagent/controllers/service_controller.go new file mode 100644 index 00000000000..ea8b9e26022 --- /dev/null +++ b/pkg/yurtlbagent/controllers/service_controller.go @@ -0,0 +1,317 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "fmt" + "math/rand" + "net" + "strconv" + "time" + + "github.com/openyurtio/openyurt/cmd/yurt-lb-agent/app/options" + netv1alpha1 "github.com/openyurtio/openyurt/pkg/apis/net/v1alpha1" + "github.com/openyurtio/openyurt/pkg/yurtlbagent/util" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + VRIDLabelKey = "net.openyurt.io/vrid" + BACKUPState = "BACKUP" + LOADBALANCERCLASS = "net.openyurt.io/yurtlb" +) + +var controllerResource = netv1alpha1.SchemeGroupVersion.WithResource("poolservices") + +// TODO: poolendpoints check + +func Format(format string, args ...interface{}) string { + s := fmt.Sprintf(format, args...) + return fmt.Sprintf("%s: %s", "yurt-lb-agent: controller PoolServiceReconciler", s) +} + +type PoolServiceReconciler struct { + client.Client + Scheme *runtime.Scheme + Nodepool string + Node string + Namespace string + Keepalived *util.Keepalived + LoadBalancerClass string + Services map[string][]net.IP +} + +// +kubebuilder:rbac:groups=net.openyurt.io,resources=poolservices,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=net.openyurt.io,resources=poolservices/status,verbs=get;update;patch +// TODO: poolendpoints + +func (r *PoolServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + klog.Infof(Format("reconcile PoolService %s/%s", req.Namespace, req.Name)) + + // Fetch the poolservice + poolservice := &netv1alpha1.PoolService{} + if err := r.Get(ctx, req.NamespacedName, poolservice); err != nil { + if apierrors.IsNotFound(err) { + return ctrl.Result{}, nil + } + klog.Errorf(Format("get PoolService %s/%s error %v", req.Namespace, req.Name, err)) + return ctrl.Result{}, err + } + klog.Infof(Format("get PoolService %s/%s", req.Namespace, req.Name)) + + // filter loadbalancerclass + if filterByLoadBalancerClass(poolservice, r.LoadBalancerClass) { + klog.Infof(Format("filter PoolService %s/%s by LoadBalancerClass %s", req.Namespace, req.Name, r.LoadBalancerClass)) + return ctrl.Result{}, nil + } + klog.Infof(Format("not filter PoolService %s/%s by LoadBalancerClass %s", req.Namespace, req.Name, r.LoadBalancerClass)) + + // filter nodepool + if filterByNodepool(poolservice, r.Nodepool) { + klog.Infof(Format("filter PoolService %s/%s by Nodepool %s", req.Namespace, req.Name, r.Nodepool)) + return ctrl.Result{}, nil + } + klog.Infof(Format("not filter PoolService %s/%s by Nodepool %s", req.Namespace, req.Name, r.Nodepool)) + + if poolservice.DeletionTimestamp != nil { + return r.reconcileDelete(ctx, poolservice) + } + + return r.reconcileNormal(ctx, poolservice) +} + +func (r *PoolServiceReconciler) reconcileDelete(_ context.Context, poolservice *netv1alpha1.PoolService) (ctrl.Result, error) { + klog.Infof(Format("reconcileDelete poolservice %s/%s", poolservice.Namespace, poolservice.Name)) + + return r.deleteBalancer(poolservice) +} + +func (r *PoolServiceReconciler) reconcileNormal(ctx context.Context, poolservice *netv1alpha1.PoolService) (ctrl.Result, error) { + klog.Infof(Format("reconcileNormal poolservice %s/%s", poolservice.Namespace, poolservice.Name)) + podExistsOnNode, err := r.filterByPod(ctx, poolservice) + if err != nil || !podExistsOnNode { + return r.deleteBalancer(poolservice) + } + + if poolservice.Status.LoadBalancer == nil { + return r.deleteBalancer(poolservice) + } + + if len(poolservice.Status.LoadBalancer.Ingress) == 0 { + return r.deleteBalancer(poolservice) + } + + vrid, err := getVrid(poolservice) + if err != nil { + klog.Errorf(Format("poolservice %s/%s has invalid vrid: %v", poolservice.Namespace, poolservice.Name, err)) + return r.deleteBalancer(poolservice) + } + + lbIPs := []net.IP{} + for _, ingress := range poolservice.Status.LoadBalancer.Ingress { + lbIP := net.ParseIP(ingress.IP) + if lbIP == nil { + klog.Errorf(Format("Unable to parse lb IP %s for poolservice %s/%s", ingress.IP, poolservice.Namespace, poolservice.Name)) + return r.deleteBalancer(poolservice) + } + lbIPs = append(lbIPs, lbIP) + } + + name := types.NamespacedName{Namespace: poolservice.Namespace, Name: poolservice.Name}.String() + _, ok := r.Services[name] + if ok && compareIPs(lbIPs, r.Services[name]) && vrid == r.Keepalived.LBServices[name].Vrid { + klog.Infof("poolservice %s LBIPs and vrid not change", poolservice.Namespace, poolservice.Name) + return ctrl.Result{}, nil + } + + var ips []string + for _, ip := range lbIPs { + ips = append(ips, ip.String()) + } + + svc := r.Keepalived.LBServices[name] + if !ok { + svc = util.LBService{Name: name, Priority: getPriority()} + } + svc.Vrid = vrid + svc.Ips = ips + r.Keepalived.LBServices[name] = svc + + if err = r.Keepalived.LoadConfig(); err != nil { + return ctrl.Result{}, fmt.Errorf("loading keepalived configuration error: %v", err) + } + + if err = r.Keepalived.ReloadKeepalived(); err != nil { + return ctrl.Result{}, fmt.Errorf("reloading keepalived configuration error: %v", err) + } + r.Services[name] = lbIPs + + return ctrl.Result{}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *PoolServiceReconciler) SetupWithManager(mgr ctrl.Manager, opts *options.YurtLBAgentOptions) error { + if _, err := mgr.GetRESTMapper().KindFor(controllerResource); err != nil { + klog.Infof("resource %s doesn't exist", controllerResource.String()) + return err + } + + r.Nodepool = opts.Nodepool + r.Node = opts.Node + r.Namespace = opts.Namespace + r.LoadBalancerClass = LOADBALANCERCLASS + r.Services = make(map[string][]net.IP) + + keepalivedConf := &util.Keepalived{ + State: BACKUPState, + Iface: opts.Iface, + LBServices: make(map[string]util.LBService), + } + var err error + err = keepalivedConf.LoadTemplate() + if err != nil { + return fmt.Errorf("load keepalived template err, %v", err) + } + err = keepalivedConf.LoadConfig() + if err != nil { + return fmt.Errorf("write keepalived config err, %v", err) + } + r.Keepalived = keepalivedConf + go func() { + // TODO: retry if error + err = keepalivedConf.Start() + klog.Errorf(Format("start keepalived err, %v", err)) + }() + + // TODO: watch poolendpoints + return ctrl.NewControllerManagedBy(mgr). + For(&netv1alpha1.PoolService{}). + Complete(r) +} + +func (r *PoolServiceReconciler) deleteBalancer(poolservice *netv1alpha1.PoolService) (ctrl.Result, error) { + name := types.NamespacedName{Namespace: poolservice.Namespace, Name: poolservice.Name}.String() + if _, ok := r.Keepalived.LBServices[name]; !ok { + klog.Infof(Format("poolservice %s/%s already deleted", poolservice.Namespace, poolservice.Name)) + return ctrl.Result{}, nil + } + delete(r.Keepalived.LBServices, name) + err := r.Keepalived.LoadConfig() + if err != nil { + klog.Errorf(Format("error loading keepalived configuration: %v", err)) + } + err = r.Keepalived.ReloadKeepalived() + if err != nil { + klog.Errorf(Format("update keepalived configuration error: %v", err)) + } + + delete(r.Services, name) + klog.Infof(Format("poolservice %s/%s successfully deleted", poolservice.Namespace, poolservice.Name)) + + return ctrl.Result{}, nil +} + +func filterByLoadBalancerClass(poolservice *netv1alpha1.PoolService, loadBalancerClass string) bool { + if poolservice == nil { + return false + } + if poolservice.Spec.LoadBalancerClass != loadBalancerClass { + return true + } + return false +} + +func filterByNodepool(poolservice *netv1alpha1.PoolService, nodepool string) bool { + if poolservice == nil { + return false + } + if poolservice.Spec.PoolName != nodepool { + return true + } + return false +} + +func (r *PoolServiceReconciler) filterByPod(ctx context.Context, poolservice *netv1alpha1.PoolService) (bool, error) { + // TODO: get service by poolservice + service := &v1.Service{} + if err := r.Get(ctx, types.NamespacedName{Namespace: poolservice.Namespace, Name: poolservice.Name}, service); err != nil { + klog.Errorf(Format("get Service %s/%s error %v", poolservice.Namespace, poolservice.Name, err)) + return false, err + } + klog.Infof(Format("get Service %s/%s", poolservice.Namespace, poolservice.Name)) + + podList := &v1.PodList{} + err := r.List(ctx, podList, client.InNamespace(service.Namespace), client.MatchingLabels(service.Spec.Selector)) + if err != nil { + klog.Errorf(Format("get pod list for service %s/%s error %v", service.Namespace, service.Name, err)) + return false, err + } + klog.Infof(Format("get %v pod items for service %s/%s", len(podList.Items), service.Namespace, service.Name)) + + podExistsOnNode := false + for _, pod := range podList.Items { + if pod.Spec.NodeName == r.Node { + podExistsOnNode = true + break + } + } + + return podExistsOnNode, nil +} + +func compareIPs(ips1 []net.IP, ips2 []net.IP) bool { + if len(ips1) != len(ips2) { + return false + } + for _, ip1 := range ips1 { + flag := false + for _, ip2 := range ips2 { + if ip1.Equal(ip2) { + flag = true + break + } + } + if !flag { + return false + } + } + return true +} + +func getPriority() int { + rand.Seed(time.Now().UnixNano()) + return rand.Intn(200) +} + +func getVrid(poolservice *netv1alpha1.PoolService) (int, error) { + vrid, err := strconv.Atoi(poolservice.Labels[VRIDLabelKey]) + if err != nil { + return -1, err + } + if vrid < 0 || vrid > 255 { + return -1, fmt.Errorf("invalid vrid %v", vrid) + } + return vrid, nil +} diff --git a/pkg/yurtlbagent/util/keepalived.go b/pkg/yurtlbagent/util/keepalived.go new file mode 100644 index 00000000000..35f6d0904aa --- /dev/null +++ b/pkg/yurtlbagent/util/keepalived.go @@ -0,0 +1,180 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + "syscall" + "text/template" + "time" +) + +const ( + keepalivedPid = "/var/run/keepalived.pid" + vrrpPid = "/var/run/vrrp.pid" + keepalivedPath = "/etc/keepalived/keepalived.conf" +) + +type LBService struct { + Name string + Priority int + Vrid int + Ips []string +} + +type Keepalived struct { + Iface string + State string + Started bool + Cmd *exec.Cmd + KeepalivedTmpl *template.Template + emptyTmpl *template.Template + LBServices map[string]LBService +} + +func (k *Keepalived) Start() error { + args := []string{"--dont-fork", "--log-console", "--log-detail", "--release-vips"} + args = append(args, fmt.Sprintf("--pid=%s", keepalivedPid)) + args = append(args, fmt.Sprintf("--vrrp_pid=%s", vrrpPid)) + + k.Cmd = exec.Command("keepalived", args...) + k.Cmd.Stdout = os.Stdout + k.Cmd.Stderr = os.Stderr + + k.Cmd.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + Pgid: 0, + } + + k.Started = true + + if err := k.Cmd.Run(); err != nil { + return fmt.Errorf("starting keepalived error: %v", err) + } + return nil +} + +func (k *Keepalived) ReloadKeepalived() error { + for !k.IsRunning() { + time.Sleep(time.Second) + } + + err := syscall.Kill(k.Cmd.Process.Pid, syscall.SIGHUP) + if err != nil { + return fmt.Errorf("reload keepalived error: %v", err) + } + + return nil +} + +func (k *Keepalived) IsRunning() bool { + if !k.Started { + return false + } + + if _, err := os.Stat(keepalivedPid); os.IsNotExist(err) { + return false + } + + return true +} + +func (k *Keepalived) Healthy() error { + if !k.IsRunning() { + return fmt.Errorf("keepalived is not running") + } + + if _, err := os.Stat(vrrpPid); os.IsNotExist(err) { + return fmt.Errorf("VRRP child process not running") + } + + // TODO: check whether bind correct vips + + return nil +} + +func (k *Keepalived) Stop() error { + err := syscall.Kill(k.Cmd.Process.Pid, syscall.SIGTERM) + if err != nil { + return fmt.Errorf("stopping keepalived error: %v", err) + } + return err +} + +func (k *Keepalived) LoadConfig() error { + conf := make(map[string]interface{}) + conf["Interface"] = k.Iface + conf["State"] = k.State + conf["LBServices"] = k.LBServices + + w, err := os.Create(keepalivedPath) + if err != nil { + return err + } + defer w.Close() + if len(k.LBServices) == 0 { + err = k.emptyTmpl.Execute(w, conf) + } else { + err = k.KeepalivedTmpl.Execute(w, conf) + } + if err != nil { + return fmt.Errorf("unexpected error creating keepalived.conf: %v", err) + } + + return nil +} + +func (k *Keepalived) LoadTemplate() error { + tmpl, err := template.New("empty").Parse(` +global_defs { + +}`) + if err != nil { + return fmt.Errorf("load keepalived empty template error: %v", err) + } + + dir := filepath.Dir(keepalivedPath) + if err := os.MkdirAll(dir, 0755); err != nil { + return err + } + k.emptyTmpl = tmpl + + tmpl, err = template.New("keepalived").Parse(` +{{ range $name, $service := .LBServices }} +vrrp_instance {{ $name }} { + state {{ $.State }} + interface {{ $.Interface }} + virtual_router_id {{ $service.Vrid }} + priority {{ $service.Priority }} + virtual_ipaddress { + {{ range $service.Ips }}{{ . }} + {{ end }} + } +} +{{ end }} +`) + if err != nil { + return fmt.Errorf("load keepalived template error: %v", err) + } + + k.KeepalivedTmpl = tmpl + + return nil +} diff --git a/pkg/yurtlbagent/util/tool.go b/pkg/yurtlbagent/util/tool.go new file mode 100644 index 00000000000..e5aa8af078c --- /dev/null +++ b/pkg/yurtlbagent/util/tool.go @@ -0,0 +1,118 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "context" + "fmt" + "net" + "strings" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/openyurtio/openyurt/pkg/projectinfo" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +const ( + PODHOSTNAME = "/etc/hostname" + PODNAMESPACE = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" +) + +func GetNodePool(cfg *rest.Config, nodeName string) (string, error) { + var nodePool string + client, err := kubernetes.NewForConfig(cfg) + if err != nil { + return nodePool, err + } + node, err := client.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) + if err != nil { + return nodePool, fmt.Errorf("not found node %s: %v", nodeName, err) + } + nodePool, ok := node.Labels[projectinfo.GetNodePoolLabel()] + if !ok { + return nodePool, fmt.Errorf("node %s doesn't add to a nodepool", node.GetName()) + } + return nodePool, err +} + +func GetNodeInterface() (string, error) { + // TODO: update interface when interfaces change + interfaces, err := net.Interfaces() + if err != nil { + return "", fmt.Errorf("get network interfaces error: %v", err) + } + + excludedPrefixes := []string{"lo", "docker", "flannel", "cbr"} + + for _, iface := range interfaces { + if iface.Flags&net.FlagUp == 0 { + continue + } + + excluded := false + for _, prefix := range excludedPrefixes { + if strings.HasPrefix(iface.Name, prefix) { + excluded = true + break + } + } + if !excluded { + return iface.Name, nil + } + } + + return "", fmt.Errorf("no available network interface found") +} + +func ValidInterface(name string) error { + iface, err := net.InterfaceByName(name) + if err != nil { + return fmt.Errorf("1. interface %s not found", name) + } + + if iface.Flags&net.FlagUp == 0 { + return fmt.Errorf("2. interface %s is not up", name) + } + + if iface.Flags&net.FlagLoopback != 0 { + return fmt.Errorf("3. interface %s is a loopback interface", name) + } + + addrs, err := iface.Addrs() + if err != nil { + return fmt.Errorf("4. failed to get addresses for interface %s: %v", name, err) + } + + if len(addrs) == 0 { + return fmt.Errorf("5. interface %s has no configured addresses", name) + } + + for _, addr := range addrs { + ip, _, err := net.ParseCIDR(addr.String()) + if err != nil { + continue + } + + if ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() { + return fmt.Errorf("6. interface %s has link-local address %s", name, ip) + } + } + + return nil +} diff --git a/pkg/yurtmanager/controller/controller.go b/pkg/yurtmanager/controller/controller.go index d1fdefbb945..933b881799d 100644 --- a/pkg/yurtmanager/controller/controller.go +++ b/pkg/yurtmanager/controller/controller.go @@ -36,6 +36,7 @@ import ( "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/nodelifecycle" "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/nodepool" "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/platformadmin" + "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/poolservice" "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/raven/dns" "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/raven/gatewayinternalservice" "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/raven/gatewaypickup" @@ -97,6 +98,7 @@ func NewControllerInitializers() map[string]InitFunc { register(names.GatewayPublicServiceController, gatewaypublicservice.Add) register(names.NodeLifeCycleController, nodelifecycle.Add) register(names.NodeBucketController, nodebucket.Add) + register(names.PoolServiceController, poolservice.Add) return controllers }