Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] Add sync client implementation #243

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions cmd/sync/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
Copyright 2024 Adobe. All rights reserved.
This file is licensed to you under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. You may obtain a copy
of the License at http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
OF ANY KIND, either express or implied. See the License for the specific language
governing permissions and limitations under the License.
*/

package main

import (
"github.com/adobe/cluster-registry/pkg/sync/event"
"os"

"github.com/adobe/cluster-registry/pkg/config"
"github.com/adobe/cluster-registry/pkg/sqs"
"github.com/adobe/cluster-registry/pkg/sync/client"
awssqs "github.com/aws/aws-sdk-go/service/sqs"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

var (
rootCmd = &cobra.Command{
Use: "cluster-registry-sync-client",
Short: "Cluster Registry Sync Client is a service that keep the Cluster CRD in sync",
Long: "Cluster Registry Sync Client is a service that creates or updates the cluster CRD based on the messages received from the Cluster Registry Sync manager",
PersistentPreRun: loadAppConfig,
Run: run,
}

logLevel, logFormat string
appConfig *config.AppConfig
namespace string
cfgFile string
)

func Execute() {
err := rootCmd.Execute()
if err != nil {
log.Fatalln(err.Error())
}
}

func init() {
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "The path to the yaml configuration file")
rootCmd.PersistentFlags().StringVar(&logLevel, "log-level", logrus.DebugLevel.String(), "The verbosity level of the logs, can be [panic|fatal|error|warn|info|debug|trace]")
rootCmd.PersistentFlags().StringVar(&logFormat, "log-format", "text", "The output format of the logs, can be [text|json]")
rootCmd.PersistentFlags().StringVar(&namespace, "namespace", "cluster-registry", "The namespace where cluster-registry-sync-client will run.")
err := rootCmd.MarkPersistentFlagRequired("config")
if err != nil {
log.Fatalln("No config flag configured")
}
}

func loadAppConfig(cmd *cobra.Command, args []string) {
client.InitLogger(logLevel, logFormat)

log.Info("Loading the configuration")

var err error
appConfig, err = config.LoadSyncClientConfig()
if err != nil {
log.Error("Cannot load the cluster-registry-sync-client configuration:", err.Error())
os.Exit(1)
}

log.Info("Config loaded successfully")
}

func run(cmd *cobra.Command, args []string) {
log.Info("Cluster Registry Sync Client is running")

q, err := sqs.NewSQS(sqs.Config{
AWSRegion: appConfig.SqsAwsRegion,
Endpoint: appConfig.SqsEndpoint,
QueueName: appConfig.SqsQueueName,
BatchSize: 10,
VisibilityTimeout: 120,
WaitSeconds: 5,
RunInterval: 20,
RunOnce: false,
MaxHandlers: 10,
BusyTimeout: 30,
})
if err != nil {
log.Panicf("Error while trying to create SQS client: %v", err.Error())
}

handler := event.NewPartialClusterUpdateHandler()
q.RegisterHandler(func(msg *awssqs.Message) {
log.Debugf("Received message: %s", *msg.MessageId)
e, err := sqs.NewEvent(msg)
if err != nil {
log.Errorf("Cannot create event from message: %s", err.Error())
return
}
if e.Type != sqs.PartialClusterUpdateEvent {
log.Infof("Not interested in event of type %s, skipping", e.Type)
return
}
log.Debugf("Handling event for message: %s", *msg.MessageId)
if err = handler.Handle(e); err != nil {
log.Errorf("Failed to handle event: %s", err.Error())
return
}
if err = q.Delete(msg); err != nil {
log.Errorf("Failed to delete message: %s", err.Error())
return
}
})

log.Info("Starting the Cluster Registry Sync Client")

q.Poll()
}

func main() {
Execute()
}
9 changes: 3 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ require (
github.com/onsi/gomega v1.36.1
github.com/prometheus/client_golang v1.20.5
github.com/redis/go-redis/v9 v9.7.0
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.8.1
github.com/stretchr/testify v1.10.0
github.com/swaggo/echo-swagger v1.4.1
github.com/swaggo/swag v1.16.4
Expand Down Expand Up @@ -85,15 +87,13 @@ require (
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/imdario/mergo v0.3.6 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
Expand All @@ -116,7 +116,6 @@ require (
github.com/morikuni/aec v1.0.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/onsi/ginkgo/v2 v2.21.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand All @@ -127,7 +126,6 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/shirou/gopsutil/v3 v3.23.12 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/swaggo/files/v2 v2.0.0 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
Expand All @@ -151,7 +149,6 @@ require (
golang.org/x/time v0.8.0 // indirect
golang.org/x/tools v0.28.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240826202546-f6391c0de4c7 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/grpc v1.66.2 // indirect
google.golang.org/protobuf v1.35.1 // indirect
Expand Down
Loading
Loading