From 1ec19d3629fbd1643ec28ced89d1142b5183c851 Mon Sep 17 00:00:00 2001 From: Marcin Owsiany Date: Wed, 22 May 2024 11:41:31 +0200 Subject: [PATCH 1/3] Add metrics support. --- .github/dependabot.yaml | 6 + .github/workflows/build.yaml | 4 + README.md | 28 ++- cmd/fetch.go | 4 +- cmd/metrics.go | 34 ++++ deploy/deployment.yaml.gotpl | 51 +++++ deploy/main.go | 10 +- go.mod | 20 +- go.sum | 24 ++- internal/main.go | 61 +++++- internal/metrics/Makefile | 59 ++++++ internal/metrics/gen/metrics.pb.go | 258 ++++++++++++++++++++++++ internal/metrics/gen/metrics_grpc.pb.go | 143 +++++++++++++ internal/metrics/metrics.proto | 19 ++ internal/metrics/server/main.go | 108 ++++++++++ internal/metrics/submitter/main.go | 103 ++++++++++ internal/metrics/tools/.gitignore | 2 + internal/metrics/tools/go.mod | 10 + internal/metrics/tools/go.sum | 6 + internal/metrics/tools/tools-import.go | 13 ++ internal/metrics/tools/tools.go | 3 + make/github.mk | 23 +++ 22 files changed, 962 insertions(+), 27 deletions(-) create mode 100644 cmd/metrics.go create mode 100644 internal/metrics/Makefile create mode 100644 internal/metrics/gen/metrics.pb.go create mode 100644 internal/metrics/gen/metrics_grpc.pb.go create mode 100644 internal/metrics/metrics.proto create mode 100644 internal/metrics/server/main.go create mode 100644 internal/metrics/submitter/main.go create mode 100644 internal/metrics/tools/.gitignore create mode 100644 internal/metrics/tools/go.mod create mode 100644 internal/metrics/tools/go.sum create mode 100644 internal/metrics/tools/tools-import.go create mode 100644 internal/metrics/tools/tools.go create mode 100644 make/github.mk diff --git a/.github/dependabot.yaml b/.github/dependabot.yaml index c379e06..eb96f3a 100644 --- a/.github/dependabot.yaml +++ b/.github/dependabot.yaml @@ -22,3 +22,9 @@ updates: interval: 'weekly' day: 'tuesday' open-pull-requests-limit: 3 + - package-ecosystem: 'gomod' + directory: '/internal/metrics/tools' + schedule: + interval: 'weekly' + day: 'tuesday' + open-pull-requests-limit: 3 diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 6d0b73d..d8e9d6b 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -53,6 +53,10 @@ jobs: ./deploy/deploy --k8s-flavor ocp my-images > manifests/ocp.yaml ./deploy/deploy --k8s-flavor vanilla --secret my-secret my-images > manifests/vanilla-with-secret.yaml ./deploy/deploy --k8s-flavor ocp --secret my-secret my-images > manifests/ocp-with-secret.yaml + ./deploy/deploy --k8s-flavor vanilla --collect-metrics my-images > manifests/vanilla-metrics.yaml + ./deploy/deploy --k8s-flavor ocp --collect-metrics my-images > manifests/ocp-metrics.yaml + ./deploy/deploy --k8s-flavor vanilla --secret my-secret --collect-metrics my-images > manifests/vanilla-with-secret-metrics.yaml + ./deploy/deploy --k8s-flavor ocp --secret my-secret --collect-metrics my-images > manifests/ocp-with-secret-metrics.yaml - name: kubeconform run: | diff --git a/README.md b/README.md index f9e7983..ca8580b 100644 --- a/README.md +++ b/README.md @@ -6,17 +6,22 @@ Talks directly to Container Runtime Interface ([CRI](https://kubernetes.io/docs/ - fetch all images on all nodes in parallel, - retry pulls with increasingly longer timeouts. This prevents getting stuck on stalled connections to image registry. +It also optionally collects each pull attempt's duration and result. + ## Architecture ### `image-prefetcher` - main binary, - shipped as an OCI image, -- provides two subcommands: +- provides three subcommands: - `fetch`: runs the actual image pulls via CRI, meant to run as an init container of DaemonSet pods. Requires access to the CRI UNIX domain socket from the host. - `sleep`: just sleeps forever, meant to run as the main container of DaemonSet pods. + - `aggregate-metrics`: runs a gRPC server which collects data points pushed by the + `fetch` pods, and makes the data available for download over HTTP. + Meant to run as a standalone pod. ### `deploy` @@ -40,6 +45,7 @@ Talks directly to Container Runtime Interface ([CRI](https://kubernetes.io/docs/ - `--secret`: image pull `Secret` name. Required if the images are not pullable anonymously. This image pull secret should be usable for all images fetched by the given instance. If provided, it must be of type `kubernetes.io/dockerconfigjson` and exist in the same namespace. + - `--collect-metrics`: if the image pull metrics should be collected. Example: @@ -71,6 +77,26 @@ Talks directly to Container Runtime Interface ([CRI](https://kubernetes.io/docs/ kubectl logs -n prefetch-images daemonset/my-images -c prefetch ``` +6. If metrics collection was requested, wait for the endpoint to appear, and fetch them: + ``` + attempt=0 + service="service/my-images-metrics" + while [[ -z $(kubectl -n "${ns}" get "${service}" -o jsonpath="{.status.loadBalancer.ingress}" 2>/dev/null) ]]; do + if [ "$attempt" -lt "10" ]; then + echo "Waiting for ${service} to obtain endpoint ..." + ((attempt++)) + sleep 10 + else + echo "Timeout waiting for ${service} to obtain endpoint!" + exit 1 + fi + done + endpoint="$(kubectl -n "${ns}" get "${service}" -o json | jq -r '.status.loadBalancer.ingress[] | .ip')" + curl "http://${endpoint}:8080/metrics" | jq + ``` + + See the [Result](internal/metrics/metrics.proto) message definition for a list of fields. + ### Customization You can tweak certain parameters such as timeouts by editing `args` in the above manifest. diff --git a/cmd/fetch.go b/cmd/fetch.go index e7ac905..4ff806b 100644 --- a/cmd/fetch.go +++ b/cmd/fetch.go @@ -33,7 +33,7 @@ It talks to Container Runtime Interface API to pull images in parallel, with ret return err } imageList = append(imageList, args...) - return internal.Run(logger, criSocket, dockerConfigJSONPath, timing, imageList...) + return internal.Run(logger, criSocket, dockerConfigJSONPath, timing, metricsEndpoint, imageList...) }, } @@ -41,6 +41,7 @@ var ( criSocket string dockerConfigJSONPath string imageListFile string + metricsEndpoint string imageListTimeout = time.Minute initialPullAttemptTimeout = 30 * time.Second maxPullAttemptTimeout = 5 * time.Minute @@ -56,6 +57,7 @@ func init() { fetchCmd.Flags().StringVar(&criSocket, "cri-socket", "/run/containerd/containerd.sock", "Path to CRI UNIX socket.") fetchCmd.Flags().StringVar(&dockerConfigJSONPath, "docker-config", "", "Path to docker config json file.") fetchCmd.Flags().StringVar(&imageListFile, "image-list-file", "", "Path to text file containing images to pull (one per line).") + fetchCmd.Flags().StringVar(&metricsEndpoint, "metrics-endpoint", "", "A host:port to submit image pull metrics to.") fetchCmd.Flags().DurationVar(&imageListTimeout, "image-list-timeout", imageListTimeout, "Timeout for image list calls (for debugging).") fetchCmd.Flags().DurationVar(&initialPullAttemptTimeout, "initial-pull-attempt-timeout", initialPullAttemptTimeout, "Timeout for initial image pull call. Each subsequent attempt doubles it until max.") diff --git a/cmd/metrics.go b/cmd/metrics.go new file mode 100644 index 0000000..2710582 --- /dev/null +++ b/cmd/metrics.go @@ -0,0 +1,34 @@ +package cmd + +import ( + "github.com/stackrox/image-prefetcher/internal/logging" + "github.com/stackrox/image-prefetcher/internal/metrics/server" + + "github.com/spf13/cobra" +) + +// aggregateMetricsCmd represents the aggregate-metrics command +var aggregateMetricsCmd = &cobra.Command{ + Use: "aggregate-metrics", + Short: "Accept metrics submissions and serve them.", + Long: `This subcommand is intended to run in a single pod. + +It serves: +- a gRPC endpoint to which individual metrics can be submitted, +- an HTTP endpoint from which the aggregate metrics can be fetched.`, + RunE: func(cmd *cobra.Command, args []string) error { + return server.Run(logging.GetLogger(), grpcPort, httpPort) + }, +} + +var ( + grpcPort int + httpPort int +) + +func init() { + rootCmd.AddCommand(aggregateMetricsCmd) + logging.AddFlags(aggregateMetricsCmd.Flags()) + aggregateMetricsCmd.Flags().IntVar(&grpcPort, "grpc-port", 8443, "Port for metrics submission gRPC endpoint to listen on.") + aggregateMetricsCmd.Flags().IntVar(&httpPort, "http-port", 8080, "Port for metrics retrieval HTTP endpoint to listen on.") +} diff --git a/deploy/deployment.yaml.gotpl b/deploy/deployment.yaml.gotpl index 98c6ced..4e9e233 100644 --- a/deploy/deployment.yaml.gotpl +++ b/deploy/deployment.yaml.gotpl @@ -26,6 +26,54 @@ roleRef: name: privileged-scc-use --- {{ end }} +{{ if .CollectMetrics }} +apiVersion: v1 +kind: Pod +metadata: + name: {{ .Name }}-metrics + labels: + app: {{ .Name }}-metrics +spec: + containers: + - name: aggregator + image: {{ .Image }}:{{ .Version }} + args: + - "aggregate-metrics" + - "--debug" + ports: + - containerPort: 8443 + name: grpc + - containerPort: 8080 + name: http + resources: + requests: + cpu: "5m" + memory: "16Mi" + limits: + cpu: "100m" + memory: "64Mi" + securityContext: + readOnlyRootFilesystem: true + runAsUser: 1000 + runAsNonRoot: true +--- +apiVersion: v1 +kind: Service +metadata: + name: {{ .Name }}-metrics +spec: + ports: + - name: grpc + port: 8443 + protocol: TCP + - name: http + port: 8080 + protocol: TCP + selector: + app: {{ .Name }}-metrics + type: LoadBalancer +--- +{{ end }} apiVersion: apps/v1 kind: DaemonSet metadata: @@ -64,6 +112,9 @@ spec: {{ else }} - "--cri-socket=/tmp/cri/containerd.sock" {{ end }} + {{ if .CollectMetrics }} + - "--metrics-endpoint={{ .Name }}-metrics:8443" + {{ end }} resources: requests: cpu: "20m" diff --git a/deploy/main.go b/deploy/main.go index f5dd5cb..d313181 100644 --- a/deploy/main.go +++ b/deploy/main.go @@ -17,6 +17,7 @@ type settings struct { Secret string IsCRIO bool NeedsPrivileged bool + CollectMetrics bool } const ( @@ -32,15 +33,17 @@ const imageRepo = "quay.io/stackrox-io/image-prefetcher" var deploymentTemplate string var ( - version string - k8sFlavor k8sFlavorType - secret string + version string + k8sFlavor k8sFlavorType + secret string + collectMetrics bool ) func init() { flag.StringVar(&version, "version", "v0.1.0", "Version of image prefetcher OCI image.") flag.TextVar(&k8sFlavor, "k8s-flavor", flavor(vanillaFlavor), fmt.Sprintf("Kubernetes flavor. Accepted values: %s", strings.Join(allFlavors, ","))) flag.StringVar(&secret, "secret", "", "Kubernetes image pull Secret to use when pulling.") + flag.BoolVar(&collectMetrics, "collect-metrics", false, "Whether to collect and expose image pull metrics.") } func main() { @@ -59,6 +62,7 @@ func main() { Secret: secret, IsCRIO: isOcp, NeedsPrivileged: isOcp, + CollectMetrics: collectMetrics, } tmpl := template.Must(template.New("deployment").Parse(deploymentTemplate)) if err := tmpl.Execute(os.Stdout, s); err != nil { diff --git a/go.mod b/go.mod index 8fa6048..8bcc2fe 100644 --- a/go.mod +++ b/go.mod @@ -1,25 +1,27 @@ module github.com/stackrox/image-prefetcher -go 1.21 +go 1.22.0 -toolchain go1.21.7 +toolchain go1.22.1 require ( + github.com/cenkalti/backoff/v4 v4.2.1 + github.com/google/uuid v1.6.0 github.com/spf13/cobra v1.8.0 + github.com/spf13/pflag v1.0.5 google.golang.org/grpc v1.63.2 - k8s.io/apimachinery v0.29.3 + google.golang.org/protobuf v1.33.0 + k8s.io/apimachinery v0.30.0 k8s.io/cri-api v0.29.3 - k8s.io/klog/v2 v2.110.1 + k8s.io/klog/v2 v2.120.1 ) require ( - github.com/go-logr/logr v1.3.0 // indirect + github.com/go-logr/logr v1.4.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect - github.com/spf13/pflag v1.0.5 // indirect - golang.org/x/net v0.21.0 // indirect - golang.org/x/sys v0.17.0 // indirect + golang.org/x/net v0.23.0 // indirect + golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect - google.golang.org/protobuf v1.33.0 // indirect ) diff --git a/go.sum b/go.sum index 933c3da..3ad9b6b 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,14 @@ +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= -github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= -github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= @@ -25,16 +29,16 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= -golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= -golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= @@ -55,9 +59,9 @@ google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGm google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/apimachinery v0.29.3 h1:2tbx+5L7RNvqJjn7RIuIKu9XTsIZ9Z5wX2G22XAa5EU= -k8s.io/apimachinery v0.29.3/go.mod h1:hx/S4V2PNW4OMg3WizRrHutyB5la0iCUbZym+W0EQIU= +k8s.io/apimachinery v0.30.0 h1:qxVPsyDM5XS96NIh9Oj6LavoVFYff/Pon9cZeDIkHHA= +k8s.io/apimachinery v0.30.0/go.mod h1:iexa2somDaxdnj7bha06bhb43Zpa6eWH8N8dbqVjTUc= k8s.io/cri-api v0.29.3 h1:ppKSui+hhTJW774Mou6x+/ealmzt2jmTM0vsEQVWrjI= k8s.io/cri-api v0.29.3/go.mod h1:3X7EnhsNaQnCweGhQCJwKNHlH7wHEYuKQ19bRvXMoJY= -k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0= -k8s.io/klog/v2 v2.110.1/go.mod h1:YGtd1984u+GgbuZ7e08/yBuAfKLSO0+uR1Fhi6ExXjo= +k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw= +k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= diff --git a/internal/main.go b/internal/main.go index f3ac70d..08bb533 100644 --- a/internal/main.go +++ b/internal/main.go @@ -10,7 +10,10 @@ import ( "time" "github.com/stackrox/image-prefetcher/internal/credentialprovider" + metricsProto "github.com/stackrox/image-prefetcher/internal/metrics/gen" + "github.com/stackrox/image-prefetcher/internal/metrics/submitter" + "github.com/google/uuid" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" criV1 "k8s.io/cri-api/pkg/apis/runtime/v1" @@ -25,7 +28,7 @@ type TimingConfig struct { MaxPullAttemptDelay time.Duration } -func Run(logger *slog.Logger, criSocketPath string, dockerConfigJSONPath string, timing TimingConfig, imageNames ...string) error { +func Run(logger *slog.Logger, criSocketPath string, dockerConfigJSONPath string, timing TimingConfig, metricsEndpoint string, imageNames ...string) error { ctx, cancel := context.WithTimeout(context.Background(), timing.OverallTimeout) defer cancel() @@ -39,6 +42,16 @@ func Run(logger *slog.Logger, criSocketPath string, dockerConfigJSONPath string, return fmt.Errorf("failed to list images for debugging before pulling: %w", err) } + var metricsSink *submitter.Submitter + if metricsEndpoint != "" { + metricsConn, err := grpc.DialContext(ctx, metricsEndpoint, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return fmt.Errorf("failed to dial metrics endpoint %q: %w", metricsEndpoint, err) + } + metricsSink = submitter.NewSubmitter(logger, metricsProto.NewMetricsClient(metricsConn)) + go metricsSink.Run(ctx) + } + kr := credentialprovider.BasicDockerKeyring{} if err := loadPullSecret(logger, &kr, dockerConfigJSONPath); err != nil { return fmt.Errorf("failed to load image pull secrets: %w", err) @@ -55,11 +68,12 @@ func Run(logger *slog.Logger, criSocketPath string, dockerConfigJSONPath string, }, Auth: auth, } - go pullImageWithRetries(ctx, logger.With("image", imageName, "authNum", i), &wg, criClient, request, timing) + go pullImageWithRetries(ctx, logger.With("image", imageName, "authNum", i), &wg, criClient, metricsSink.Chan(), imageName, request, timing) } } wg.Wait() logger.Info("pulling images finished") + metricsSink.Await() if err := listImagesForDebugging(ctx, logger, criClient, timing.ImageListTimeout, "after"); err != nil { return fmt.Errorf("failed to list images for debugging after pulling: %w", err) } @@ -123,7 +137,7 @@ func getAuthsForImage(ctx context.Context, logger *slog.Logger, kr credentialpro return auths } -func pullImageWithRetries(ctx context.Context, logger *slog.Logger, wg *sync.WaitGroup, client criV1.ImageServiceClient, request *criV1.PullImageRequest, timing TimingConfig) { +func pullImageWithRetries(ctx context.Context, logger *slog.Logger, wg *sync.WaitGroup, client criV1.ImageServiceClient, metricsSink chan<- *metricsProto.Result, name string, request *criV1.PullImageRequest, timing TimingConfig) { defer wg.Done() attemptTimeout := timing.InitialPullAttemptTimeout delay := timing.InitialPullAttemptDelay @@ -136,9 +150,12 @@ func pullImageWithRetries(ctx context.Context, logger *slog.Logger, wg *sync.Wai cancel() if err == nil { logger.InfoContext(ctx, "image pulled successfully", "response", response, "elapsed", elapsed) + sizeBytes := getImageSize(ctx, logger, client, response) + noteSuccess(metricsSink, name, start, elapsed, sizeBytes) return } logger.ErrorContext(ctx, "image failed to pull", "error", err, "timeout", attemptTimeout, "elapsed", elapsed) + noteFailure(metricsSink, name, start, elapsed, err) if ctx.Err() != nil { logger.ErrorContext(ctx, "not retrying any more", "error", ctx.Err()) return @@ -150,3 +167,41 @@ func pullImageWithRetries(ctx context.Context, logger *slog.Logger, wg *sync.Wai delay = delay * 2 } } + +func getImageSize(ctx context.Context, logger *slog.Logger, client criV1.ImageServiceClient, response *criV1.PullImageResponse) uint64 { + imageStatus, err := client.ImageStatus(ctx, &criV1.ImageStatusRequest{ + Image: &criV1.ImageSpec{ + Image: response.ImageRef, + }, + }) + if err != nil { + logger.WarnContext(ctx, "failed to obtain pulled image status", "image", response.ImageRef, "error", err) + return 0 + } + return imageStatus.GetImage().GetSize_() +} + +func noteSuccess(sink chan<- *metricsProto.Result, name string, start time.Time, elapsed time.Duration, sizeBytes uint64) { + if sink == nil { + return + } + sink <- &metricsProto.Result{ + AttemptId: uuid.NewString(), + StartedAt: start.Unix(), + Image: name, + DurationMs: uint64(elapsed.Milliseconds()), + SizeBytes: sizeBytes, + } +} +func noteFailure(sink chan<- *metricsProto.Result, name string, start time.Time, elapsed time.Duration, err error) { + if sink == nil { + return + } + sink <- &metricsProto.Result{ + AttemptId: uuid.NewString(), + StartedAt: start.Unix(), + Image: name, + DurationMs: uint64(elapsed.Milliseconds()), + Error: err.Error(), + } +} diff --git a/internal/metrics/Makefile b/internal/metrics/Makefile new file mode 100644 index 0000000..405dd7f --- /dev/null +++ b/internal/metrics/Makefile @@ -0,0 +1,59 @@ +# Makefile for generating protobuf and gRPC-related files. +# Much of this file is borrowed from https://github.com/stackrox/stackrox/blob/master/make/protogen.mk + +BASE_PATH ?= $(CURDIR) +SILENT ?= @ + +.PHONY: all +all: generate + +include $(BASE_PATH)/../../make/gotools.mk +include $(BASE_PATH)/../../make/github.mk + +$(call go-tool, PROTOC_GEN_GO_BIN, google.golang.org/protobuf/cmd/protoc-gen-go, tools) +$(call go-tool, PROTOC_GEN_GO_GRPC_BIN, google.golang.org/grpc/cmd/protoc-gen-go-grpc, tools) + +PROTOC_VERSION := 26.1 +UNAME_S := $(shell uname -s) +ifeq ($(UNAME_S),Linux) +PROTOC_OS = linux +endif +ifeq ($(UNAME_S),Darwin) +PROTOC_OS = osx +endif +PROTOC_ARCH=$(shell case $$(uname -m) in (arm64) echo aarch_64 ;; (s390x) echo s390_64 ;; (*) uname -m ;; esac) + +PROTO_PRIVATE_DIR := $(BASE_PATH)/tools + +PROTOC_DIR := $(PROTO_PRIVATE_DIR)/protoc-$(PROTOC_OS)-$(PROTOC_ARCH)-$(PROTOC_VERSION) + +PROTOC := $(PROTOC_DIR)/bin/protoc + +PROTOC_DOWNLOADS_DIR := $(PROTO_PRIVATE_DIR)/.downloads + +.PHONY: generate +generate: $(PROTOC) $(PROTOC_GEN_GO_BIN) $(PROTOC_GEN_GO_GRPC_BIN) + mkdir -p gen + export PATH=$(BASE_PATH)/tools:$(BASE_PATH)/.gotools/bin:$$PATH; \ + $(PROTOC) --go_out=gen --go_opt=paths=source_relative --go-grpc_out=gen --go-grpc_opt=paths=source_relative metrics.proto + +$(PROTOC_DOWNLOADS_DIR): + @echo "+ $@" + $(SILENT)mkdir -p "$@" + +PROTOC_ZIP := protoc-$(PROTOC_VERSION)-$(PROTOC_OS)-$(PROTOC_ARCH).zip +PROTOC_FILE := $(PROTOC_DOWNLOADS_DIR)/$(PROTOC_ZIP) + +$(PROTOC_FILE): $(PROTOC_DOWNLOADS_DIR) + @$(GET_GITHUB_RELEASE_FN); \ + get_github_release "$@" "https://github.com/protocolbuffers/protobuf/releases/download/v$(PROTOC_VERSION)/$(PROTOC_ZIP)" + +.PRECIOUS: $(PROTOC_FILE) + +$(PROTOC): + @echo "+ $@" + $(SILENT)$(MAKE) "$(PROTOC_FILE)" + $(SILENT)mkdir -p "$(PROTOC_DIR)" + $(SILENT)unzip -q -o -d "$(PROTOC_DIR)" "$(PROTOC_FILE)" + $(SILENT)test -x "$@" + diff --git a/internal/metrics/gen/metrics.pb.go b/internal/metrics/gen/metrics.pb.go new file mode 100644 index 0000000..f59d18c --- /dev/null +++ b/internal/metrics/gen/metrics.pb.go @@ -0,0 +1,258 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.33.0 +// protoc v5.26.1 +// source: metrics.proto + +package gen + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Result struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + AttemptId string `protobuf:"bytes,1,opt,name=attempt_id,json=attemptId,proto3" json:"attempt_id,omitempty"` + StartedAt int64 `protobuf:"varint,2,opt,name=started_at,json=startedAt,proto3" json:"started_at,omitempty"` + Image string `protobuf:"bytes,3,opt,name=image,proto3" json:"image,omitempty"` + Error string `protobuf:"bytes,4,opt,name=error,proto3" json:"error,omitempty"` + DurationMs uint64 `protobuf:"varint,5,opt,name=duration_ms,json=durationMs,proto3" json:"duration_ms,omitempty"` + Node string `protobuf:"bytes,6,opt,name=node,proto3" json:"node,omitempty"` + SizeBytes uint64 `protobuf:"varint,7,opt,name=size_bytes,json=sizeBytes,proto3" json:"size_bytes,omitempty"` +} + +func (x *Result) Reset() { + *x = Result{} + if protoimpl.UnsafeEnabled { + mi := &file_metrics_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Result) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Result) ProtoMessage() {} + +func (x *Result) ProtoReflect() protoreflect.Message { + mi := &file_metrics_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Result.ProtoReflect.Descriptor instead. +func (*Result) Descriptor() ([]byte, []int) { + return file_metrics_proto_rawDescGZIP(), []int{0} +} + +func (x *Result) GetAttemptId() string { + if x != nil { + return x.AttemptId + } + return "" +} + +func (x *Result) GetStartedAt() int64 { + if x != nil { + return x.StartedAt + } + return 0 +} + +func (x *Result) GetImage() string { + if x != nil { + return x.Image + } + return "" +} + +func (x *Result) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +func (x *Result) GetDurationMs() uint64 { + if x != nil { + return x.DurationMs + } + return 0 +} + +func (x *Result) GetNode() string { + if x != nil { + return x.Node + } + return "" +} + +func (x *Result) GetSizeBytes() uint64 { + if x != nil { + return x.SizeBytes + } + return 0 +} + +type Empty struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *Empty) Reset() { + *x = Empty{} + if protoimpl.UnsafeEnabled { + mi := &file_metrics_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Empty) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Empty) ProtoMessage() {} + +func (x *Empty) ProtoReflect() protoreflect.Message { + mi := &file_metrics_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Empty.ProtoReflect.Descriptor instead. +func (*Empty) Descriptor() ([]byte, []int) { + return file_metrics_proto_rawDescGZIP(), []int{1} +} + +var File_metrics_proto protoreflect.FileDescriptor + +var file_metrics_proto_rawDesc = []byte{ + 0x0a, 0x0d, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, + 0xc6, 0x01, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x61, 0x74, + 0x74, 0x65, 0x6d, 0x70, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, + 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x73, 0x74, 0x61, + 0x72, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x73, + 0x74, 0x61, 0x72, 0x74, 0x65, 0x64, 0x41, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x69, 0x6d, 0x61, 0x67, + 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x69, 0x6d, 0x61, 0x67, 0x65, 0x12, 0x14, + 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, + 0x72, 0x72, 0x6f, 0x72, 0x12, 0x1f, 0x0a, 0x0b, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x5f, 0x6d, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, 0x64, 0x75, 0x72, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x4d, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x6f, 0x64, 0x65, 0x18, 0x06, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x6f, 0x64, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x73, 0x69, 0x7a, + 0x65, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x73, + 0x69, 0x7a, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74, + 0x79, 0x32, 0x28, 0x0a, 0x07, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x1d, 0x0a, 0x06, + 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x07, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x1a, + 0x06, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x28, 0x01, 0x42, 0x3b, 0x5a, 0x39, 0x67, + 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x74, 0x61, 0x63, 0x6b, 0x72, + 0x6f, 0x78, 0x2f, 0x69, 0x6d, 0x61, 0x67, 0x65, 0x2d, 0x70, 0x72, 0x65, 0x66, 0x65, 0x74, 0x63, + 0x68, 0x65, 0x72, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x6d, 0x65, 0x74, + 0x72, 0x69, 0x63, 0x73, 0x3b, 0x67, 0x65, 0x6e, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_metrics_proto_rawDescOnce sync.Once + file_metrics_proto_rawDescData = file_metrics_proto_rawDesc +) + +func file_metrics_proto_rawDescGZIP() []byte { + file_metrics_proto_rawDescOnce.Do(func() { + file_metrics_proto_rawDescData = protoimpl.X.CompressGZIP(file_metrics_proto_rawDescData) + }) + return file_metrics_proto_rawDescData +} + +var file_metrics_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_metrics_proto_goTypes = []interface{}{ + (*Result)(nil), // 0: Result + (*Empty)(nil), // 1: Empty +} +var file_metrics_proto_depIdxs = []int32{ + 0, // 0: Metrics.Submit:input_type -> Result + 1, // 1: Metrics.Submit:output_type -> Empty + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_metrics_proto_init() } +func file_metrics_proto_init() { + if File_metrics_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_metrics_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Result); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_metrics_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Empty); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_metrics_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_metrics_proto_goTypes, + DependencyIndexes: file_metrics_proto_depIdxs, + MessageInfos: file_metrics_proto_msgTypes, + }.Build() + File_metrics_proto = out.File + file_metrics_proto_rawDesc = nil + file_metrics_proto_goTypes = nil + file_metrics_proto_depIdxs = nil +} diff --git a/internal/metrics/gen/metrics_grpc.pb.go b/internal/metrics/gen/metrics_grpc.pb.go new file mode 100644 index 0000000..2e14fdb --- /dev/null +++ b/internal/metrics/gen/metrics_grpc.pb.go @@ -0,0 +1,143 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc v5.26.1 +// source: metrics.proto + +package gen + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + Metrics_Submit_FullMethodName = "/Metrics/Submit" +) + +// MetricsClient is the client API for Metrics service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type MetricsClient interface { + Submit(ctx context.Context, opts ...grpc.CallOption) (Metrics_SubmitClient, error) +} + +type metricsClient struct { + cc grpc.ClientConnInterface +} + +func NewMetricsClient(cc grpc.ClientConnInterface) MetricsClient { + return &metricsClient{cc} +} + +func (c *metricsClient) Submit(ctx context.Context, opts ...grpc.CallOption) (Metrics_SubmitClient, error) { + stream, err := c.cc.NewStream(ctx, &Metrics_ServiceDesc.Streams[0], Metrics_Submit_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &metricsSubmitClient{stream} + return x, nil +} + +type Metrics_SubmitClient interface { + Send(*Result) error + CloseAndRecv() (*Empty, error) + grpc.ClientStream +} + +type metricsSubmitClient struct { + grpc.ClientStream +} + +func (x *metricsSubmitClient) Send(m *Result) error { + return x.ClientStream.SendMsg(m) +} + +func (x *metricsSubmitClient) CloseAndRecv() (*Empty, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(Empty) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// MetricsServer is the server API for Metrics service. +// All implementations must embed UnimplementedMetricsServer +// for forward compatibility +type MetricsServer interface { + Submit(Metrics_SubmitServer) error + mustEmbedUnimplementedMetricsServer() +} + +// UnimplementedMetricsServer must be embedded to have forward compatible implementations. +type UnimplementedMetricsServer struct { +} + +func (UnimplementedMetricsServer) Submit(Metrics_SubmitServer) error { + return status.Errorf(codes.Unimplemented, "method Submit not implemented") +} +func (UnimplementedMetricsServer) mustEmbedUnimplementedMetricsServer() {} + +// UnsafeMetricsServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to MetricsServer will +// result in compilation errors. +type UnsafeMetricsServer interface { + mustEmbedUnimplementedMetricsServer() +} + +func RegisterMetricsServer(s grpc.ServiceRegistrar, srv MetricsServer) { + s.RegisterService(&Metrics_ServiceDesc, srv) +} + +func _Metrics_Submit_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(MetricsServer).Submit(&metricsSubmitServer{stream}) +} + +type Metrics_SubmitServer interface { + SendAndClose(*Empty) error + Recv() (*Result, error) + grpc.ServerStream +} + +type metricsSubmitServer struct { + grpc.ServerStream +} + +func (x *metricsSubmitServer) SendAndClose(m *Empty) error { + return x.ServerStream.SendMsg(m) +} + +func (x *metricsSubmitServer) Recv() (*Result, error) { + m := new(Result) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// Metrics_ServiceDesc is the grpc.ServiceDesc for Metrics service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Metrics_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "Metrics", + HandlerType: (*MetricsServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Submit", + Handler: _Metrics_Submit_Handler, + ClientStreams: true, + }, + }, + Metadata: "metrics.proto", +} diff --git a/internal/metrics/metrics.proto b/internal/metrics/metrics.proto new file mode 100644 index 0000000..4689004 --- /dev/null +++ b/internal/metrics/metrics.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +option go_package = "github.com/stackrox/image-prefetcher/internal/metrics;gen"; + +message Result { + string attempt_id = 1; + int64 started_at = 2; + string image = 3; + string error = 4; + uint64 duration_ms = 5; + string node = 6; + uint64 size_bytes = 7; +} + +message Empty {} + +service Metrics { + rpc Submit(stream Result) returns (Empty) {} +} \ No newline at end of file diff --git a/internal/metrics/server/main.go b/internal/metrics/server/main.go new file mode 100644 index 0000000..7bdb655 --- /dev/null +++ b/internal/metrics/server/main.go @@ -0,0 +1,108 @@ +package server + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "log/slog" + "net" + "net/http" + "sync" + + "github.com/stackrox/image-prefetcher/internal/metrics/gen" + + "google.golang.org/grpc" +) + +type metricsServer struct { + mutex sync.Mutex + metrics map[string]*gen.Result + logger *slog.Logger + gen.UnimplementedMetricsServer +} + +func (s *metricsServer) Submit(stream gen.Metrics_SubmitServer) error { + for { + metric, err := stream.Recv() + if err == io.EOF { + return stream.SendAndClose(&gen.Empty{}) + } + if err != nil { + return err + } + s.metricSubmitted(metric) + } +} + +func (s *metricsServer) metricSubmitted(metric *gen.Result) { + s.logger.Debug("metric submitted", "metric", metric) + s.mutex.Lock() + defer s.mutex.Unlock() + if _, ok := s.metrics[metric.AttemptId]; ok { + s.logger.Info("duplicate metric submitted", "metric", metric) + } + s.metrics[metric.AttemptId] = metric +} + +func (s *metricsServer) ServeHTTP(writer http.ResponseWriter, _ *http.Request) { + resp, err := json.Marshal(s.currentMetrics()) + if err != nil { + s.logger.Error("failed to marshal metrics", "error", err) + writer.WriteHeader(http.StatusInternalServerError) + _, _ = writer.Write([]byte(err.Error())) + return + } + _, _ = writer.Write(resp) +} + +func (s *metricsServer) currentMetrics() []*gen.Result { + s.mutex.Lock() + defer s.mutex.Unlock() + metrics := make([]*gen.Result, 0, len(s.metrics)) + for _, metric := range s.metrics { + metrics = append(metrics, metric) + } + return metrics +} + +func Run(logger *slog.Logger, grpcPort int, httpPort int) error { + server := &metricsServer{ + logger: logger, + metrics: make(map[string]*gen.Result), + } + grpcErrChan := make(chan error) + httpErrChan := make(chan error) + + grpcSpec := fmt.Sprintf(":%d", grpcPort) + grpcListener, err := net.Listen("tcp", grpcSpec) + if err != nil { + return fmt.Errorf("failed to listen on %s", grpcSpec) + } + grpcServer := grpc.NewServer() + gen.RegisterMetricsServer(grpcServer, server) + logger.Info("starting to serve", "grpcSpec", grpcSpec) + go func() { grpcErrChan <- grpcServer.Serve(grpcListener) }() + + httpSpec := fmt.Sprintf(":%d", httpPort) + httpListener, err := net.Listen("tcp", httpSpec) + if err != nil { + return fmt.Errorf("failed to listen on %s", httpSpec) + } + httpServer := &http.Server{} + http.Handle("/metrics", server) + logger.Info("starting to serve", "httpSpec", httpSpec) + go func() { httpErrChan <- httpServer.Serve(httpListener) }() + + // On shutdown of either, stop the other one. + var httpErr, grpcErr error + select { + case httpErr = <-httpErrChan: + grpcServer.Stop() + grpcErr = <-grpcErrChan + case grpcErr = <-grpcErrChan: + _ = httpServer.Close() + httpErr = <-httpErrChan + } + return errors.Join(grpcErr, httpErr) +} diff --git a/internal/metrics/submitter/main.go b/internal/metrics/submitter/main.go new file mode 100644 index 0000000..1f2f296 --- /dev/null +++ b/internal/metrics/submitter/main.go @@ -0,0 +1,103 @@ +package submitter + +import ( + "context" + "fmt" + "log/slog" + "os" + "time" + + "github.com/stackrox/image-prefetcher/internal/metrics/gen" + + "github.com/cenkalti/backoff/v4" +) + +type Submitter struct { + c chan *gen.Result + done chan struct{} + client gen.MetricsClient + logger *slog.Logger +} + +func NewSubmitter(logger *slog.Logger, client gen.MetricsClient) *Submitter { + return &Submitter{ + c: make(chan *gen.Result, 1), + done: make(chan struct{}), + client: client, + logger: logger, + } +} + +func (s *Submitter) Run(ctx context.Context) { + defer func() { s.done <- struct{}{} }() + if s.client == nil { + for range s.c { + } + return + } + hostName, err := os.Hostname() + if err != nil { + s.logger.WarnContext(ctx, "could not obtain hostname", "error", err) + hostName = "unknown" + } + + var metrics []*gen.Result + for metric := range s.c { + metric.Node = hostName + s.logger.DebugContext(ctx, "metric received", "metric", metric) + metrics = append(metrics, metric) + } + + if err = s.submit(ctx, metrics); err == nil { + s.logger.InfoContext(ctx, "metrics submitted") + return + } + s.logger.ErrorContext(ctx, "metric Submit RPC failed, retrying", "error", err) + b := backoff.NewExponentialBackOff() + b.InitialInterval = 10 * time.Second + b.MaxElapsedTime = 0 + ticker := backoff.NewTicker(backoff.WithContext(b, ctx)) + defer ticker.Stop() + for range ticker.C { + if ctx.Err() != nil { + s.logger.ErrorContext(ctx, "giving up retrying metrics submission", "error", ctx.Err()) + } + if err = s.submit(ctx, metrics); err == nil { + s.logger.InfoContext(ctx, "metrics submitted") + return + } + s.logger.ErrorContext(ctx, "metric Submit RPC failed, retrying", "error", err) + } +} + +func (s *Submitter) Await() { + if s == nil { + return + } + close(s.c) + s.logger.Info("waiting for metrics to be submitted") + <-s.done +} + +func (s *Submitter) submit(ctx context.Context, metrics []*gen.Result) error { + submitClient, err := s.client.Submit(ctx) + if err != nil { + return fmt.Errorf("invoking metric Submit RPC failed: %w", err) + } + for _, metric := range metrics { + if err := submitClient.Send(metric); err != nil { + return fmt.Errorf("streaming metric to Submit RPC failed: %w", err) + } + } + if _, err := submitClient.CloseAndRecv(); err != nil { + return fmt.Errorf("finishing metric Submit RPC failed: %w", err) + } + return nil +} + +func (s *Submitter) Chan() chan<- *gen.Result { + if s == nil { + return nil + } + return s.c +} diff --git a/internal/metrics/tools/.gitignore b/internal/metrics/tools/.gitignore new file mode 100644 index 0000000..580cecf --- /dev/null +++ b/internal/metrics/tools/.gitignore @@ -0,0 +1,2 @@ +/protoc-*/ +/.downloads/ diff --git a/internal/metrics/tools/go.mod b/internal/metrics/tools/go.mod new file mode 100644 index 0000000..c2171e6 --- /dev/null +++ b/internal/metrics/tools/go.mod @@ -0,0 +1,10 @@ +module github.com/stackrox/image-prefetcher/internal/metrics/tools + +go 1.20 + +require ( + google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0 + google.golang.org/protobuf v1.33.0 +) + +require github.com/google/go-cmp v0.6.0 // indirect diff --git a/internal/metrics/tools/go.sum b/internal/metrics/tools/go.sum new file mode 100644 index 0000000..e8e124c --- /dev/null +++ b/internal/metrics/tools/go.sum @@ -0,0 +1,6 @@ +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0 h1:rNBFJjBCOgVr9pWD7rs/knKL4FRTKgpZmsRfV214zcA= +google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0/go.mod h1:Dk1tviKTvMCz5tvh7t+fh94dhmQVHuCt2OzJB3CTW9Y= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= diff --git a/internal/metrics/tools/tools-import.go b/internal/metrics/tools/tools-import.go new file mode 100644 index 0000000..7006aaf --- /dev/null +++ b/internal/metrics/tools/tools-import.go @@ -0,0 +1,13 @@ +//go:build tools + +package tools + +// This file declares dependencies on tool for `go mod` purposes. +// See https://github.com/golang/go/wiki/Modules#how-can-i-track-tool-dependencies-for-a-module +// for an explanation of the approach. + +import ( + // Tool dependencies, not used anywhere in the code. + _ "google.golang.org/grpc/cmd/protoc-gen-go-grpc" + _ "google.golang.org/protobuf/cmd/protoc-gen-go" +) diff --git a/internal/metrics/tools/tools.go b/internal/metrics/tools/tools.go new file mode 100644 index 0000000..b2918ed --- /dev/null +++ b/internal/metrics/tools/tools.go @@ -0,0 +1,3 @@ +package tools + +// This file only exists to prevent package loading errors for this directory. diff --git a/make/github.mk b/make/github.mk new file mode 100644 index 0000000..c8d66bf --- /dev/null +++ b/make/github.mk @@ -0,0 +1,23 @@ +# github.mk +# Helpers for fetching released binaries from github projects. +# Copied from https://github.com/stackrox/stackrox/blob/master/make/github.mk +# DO NOT EDIT. Instead modify the upstream file and copy. + +# For usage instructions, see uses of this macro elsewhere, since there is no +# single standard for architecture naming. +GET_GITHUB_RELEASE_FN = get_github_release() { \ + [[ -x $${1} ]] || { \ + set -euo pipefail ;\ + echo "+ $${1}" ;\ + mkdir -p bin ;\ + attempts=5 ;\ + for i in $$(seq $$attempts); do \ + curl --silent --show-error --fail --location --output "$${1}" "$${2}" && break ;\ + [[ $$i -eq $$attempts ]] && exit 1; \ + echo "Retrying after $$((i*i)) seconds..."; \ + sleep "$$((i*i))"; \ + done ;\ + [[ $$(uname -s) != Darwin ]] || xattr -c "$${1}" ;\ + chmod +x "$${1}" ;\ + } \ +} From ead636e5e33ee4373a57824ae0ffa016d7845473 Mon Sep 17 00:00:00 2001 From: Marcin Owsiany Date: Mon, 27 May 2024 10:12:57 +0200 Subject: [PATCH 2/3] Address feedback. --- internal/metrics/server/main.go | 8 ++- .../submitter/{main.go => submitter.go} | 63 ++++++++++--------- 2 files changed, 39 insertions(+), 32 deletions(-) rename internal/metrics/submitter/{main.go => submitter.go} (63%) diff --git a/internal/metrics/server/main.go b/internal/metrics/server/main.go index 7bdb655..7a141a1 100644 --- a/internal/metrics/server/main.go +++ b/internal/metrics/server/main.go @@ -49,11 +49,13 @@ func (s *metricsServer) ServeHTTP(writer http.ResponseWriter, _ *http.Request) { resp, err := json.Marshal(s.currentMetrics()) if err != nil { s.logger.Error("failed to marshal metrics", "error", err) - writer.WriteHeader(http.StatusInternalServerError) - _, _ = writer.Write([]byte(err.Error())) + http.Error(writer, err.Error(), http.StatusInternalServerError) return } - _, _ = writer.Write(resp) + _, err = writer.Write(resp) + if err != nil { + s.logger.Error("failed to write HTTP metrics response", "error", err) + } } func (s *metricsServer) currentMetrics() []*gen.Result { diff --git a/internal/metrics/submitter/main.go b/internal/metrics/submitter/submitter.go similarity index 63% rename from internal/metrics/submitter/main.go rename to internal/metrics/submitter/submitter.go index 1f2f296..dd8d72e 100644 --- a/internal/metrics/submitter/main.go +++ b/internal/metrics/submitter/submitter.go @@ -13,28 +13,33 @@ import ( ) type Submitter struct { - c chan *gen.Result - done chan struct{} - client gen.MetricsClient - logger *slog.Logger + channel chan *gen.Result + done chan struct{} + client gen.MetricsClient + logger *slog.Logger } +// NewSubmitter creates a new submitter object. func NewSubmitter(logger *slog.Logger, client gen.MetricsClient) *Submitter { return &Submitter{ - c: make(chan *gen.Result, 1), - done: make(chan struct{}), - client: client, - logger: logger, + channel: make(chan *gen.Result, 1), + done: make(chan struct{}), + client: client, + logger: logger, } } +// Chan returns a channel on which metrics can be provided to the submitter. +func (s *Submitter) Chan() chan<- *gen.Result { + if s == nil { + return nil + } + return s.channel +} + +// Run accepts metrics on the channel and submits them to the client passed to constructor until Await is called. func (s *Submitter) Run(ctx context.Context) { defer func() { s.done <- struct{}{} }() - if s.client == nil { - for range s.c { - } - return - } hostName, err := os.Hostname() if err != nil { s.logger.WarnContext(ctx, "could not obtain hostname", "error", err) @@ -42,7 +47,7 @@ func (s *Submitter) Run(ctx context.Context) { } var metrics []*gen.Result - for metric := range s.c { + for metric := range s.channel { metric.Node = hostName s.logger.DebugContext(ctx, "metric received", "metric", metric) metrics = append(metrics, metric) @@ -58,23 +63,30 @@ func (s *Submitter) Run(ctx context.Context) { b.MaxElapsedTime = 0 ticker := backoff.NewTicker(backoff.WithContext(b, ctx)) defer ticker.Stop() - for range ticker.C { - if ctx.Err() != nil { - s.logger.ErrorContext(ctx, "giving up retrying metrics submission", "error", ctx.Err()) - } - if err = s.submit(ctx, metrics); err == nil { - s.logger.InfoContext(ctx, "metrics submitted") + for { + select { + case <-ctx.Done(): + if ctx.Err() != nil { + s.logger.ErrorContext(ctx, "giving up retrying metrics submission", "error", ctx.Err()) + } return + case <-ticker.C: + if err = s.submit(ctx, metrics); err == nil { + s.logger.InfoContext(ctx, "metrics submitted") + return + } + s.logger.ErrorContext(ctx, "metric Submit RPC failed, retrying", "error", err) } - s.logger.ErrorContext(ctx, "metric Submit RPC failed, retrying", "error", err) } } +// Await signals the goroutine running Run that no more metrics will be sent on the channel. +// Then it waits for that goroutine to submit them (with retries). func (s *Submitter) Await() { if s == nil { return } - close(s.c) + close(s.channel) s.logger.Info("waiting for metrics to be submitted") <-s.done } @@ -94,10 +106,3 @@ func (s *Submitter) submit(ctx context.Context, metrics []*gen.Result) error { } return nil } - -func (s *Submitter) Chan() chan<- *gen.Result { - if s == nil { - return nil - } - return s.c -} From 4398018d5feef76b063e82f91c67790ad311663c Mon Sep 17 00:00:00 2001 From: Marcin Owsiany Date: Mon, 27 May 2024 12:26:28 +0200 Subject: [PATCH 3/3] Add unit test. --- go.mod | 6 + go.sum | 18 +++ internal/main.go | 2 +- internal/metrics/submitter/submitter.go | 40 +++--- internal/metrics/submitter/submitter_test.go | 141 +++++++++++++++++++ 5 files changed, 188 insertions(+), 19 deletions(-) create mode 100644 internal/metrics/submitter/submitter_test.go diff --git a/go.mod b/go.mod index 8bcc2fe..e49e286 100644 --- a/go.mod +++ b/go.mod @@ -7,8 +7,10 @@ toolchain go1.22.1 require ( github.com/cenkalti/backoff/v4 v4.2.1 github.com/google/uuid v1.6.0 + github.com/neilotoole/slogt v1.1.0 github.com/spf13/cobra v1.8.0 github.com/spf13/pflag v1.0.5 + github.com/stretchr/testify v1.8.4 google.golang.org/grpc v1.63.2 google.golang.org/protobuf v1.33.0 k8s.io/apimachinery v0.30.0 @@ -17,11 +19,15 @@ require ( ) require ( + github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect golang.org/x/net v0.23.0 // indirect golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 3ad9b6b..c1d12c5 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,9 @@ github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= @@ -13,11 +16,23 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2 github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/neilotoole/slogt v1.1.0 h1:c7qE92sq+V0yvCuaxph+RQ2jOKL61c4hqS1Bv9W7FZE= +github.com/neilotoole/slogt v1.1.0/go.mod h1:RCrGXkPc/hYybNulqQrMHRtvlQ7F6NktNVLuLwk6V+w= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -58,6 +73,9 @@ google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDom google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= k8s.io/apimachinery v0.30.0 h1:qxVPsyDM5XS96NIh9Oj6LavoVFYff/Pon9cZeDIkHHA= k8s.io/apimachinery v0.30.0/go.mod h1:iexa2somDaxdnj7bha06bhb43Zpa6eWH8N8dbqVjTUc= diff --git a/internal/main.go b/internal/main.go index 08bb533..71e9d08 100644 --- a/internal/main.go +++ b/internal/main.go @@ -49,7 +49,7 @@ func Run(logger *slog.Logger, criSocketPath string, dockerConfigJSONPath string, return fmt.Errorf("failed to dial metrics endpoint %q: %w", metricsEndpoint, err) } metricsSink = submitter.NewSubmitter(logger, metricsProto.NewMetricsClient(metricsConn)) - go metricsSink.Run(ctx) + go func() { _ = metricsSink.Run(ctx) }() // returned error is for testing, sink already handles errors } kr := credentialprovider.BasicDockerKeyring{} diff --git a/internal/metrics/submitter/submitter.go b/internal/metrics/submitter/submitter.go index dd8d72e..5172530 100644 --- a/internal/metrics/submitter/submitter.go +++ b/internal/metrics/submitter/submitter.go @@ -17,6 +17,7 @@ type Submitter struct { done chan struct{} client gen.MetricsClient logger *slog.Logger + timer backoff.Timer // for testing } // NewSubmitter creates a new submitter object. @@ -38,11 +39,11 @@ func (s *Submitter) Chan() chan<- *gen.Result { } // Run accepts metrics on the channel and submits them to the client passed to constructor until Await is called. -func (s *Submitter) Run(ctx context.Context) { +func (s *Submitter) Run(ctx context.Context) (err error) { defer func() { s.done <- struct{}{} }() - hostName, err := os.Hostname() - if err != nil { - s.logger.WarnContext(ctx, "could not obtain hostname", "error", err) + hostName, hostErr := os.Hostname() + if hostErr != nil { + s.logger.WarnContext(ctx, "could not obtain hostname", "error", hostErr) hostName = "unknown" } @@ -53,33 +54,36 @@ func (s *Submitter) Run(ctx context.Context) { metrics = append(metrics, metric) } - if err = s.submit(ctx, metrics); err == nil { - s.logger.InfoContext(ctx, "metrics submitted") - return - } - s.logger.ErrorContext(ctx, "metric Submit RPC failed, retrying", "error", err) - b := backoff.NewExponentialBackOff() - b.InitialInterval = 10 * time.Second - b.MaxElapsedTime = 0 - ticker := backoff.NewTicker(backoff.WithContext(b, ctx)) + ticker := newTicker(ctx, s.timer) defer ticker.Stop() + for { select { - case <-ctx.Done(): - if ctx.Err() != nil { - s.logger.ErrorContext(ctx, "giving up retrying metrics submission", "error", ctx.Err()) - } - return case <-ticker.C: if err = s.submit(ctx, metrics); err == nil { s.logger.InfoContext(ctx, "metrics submitted") return } s.logger.ErrorContext(ctx, "metric Submit RPC failed, retrying", "error", err) + case <-ctx.Done(): + if ctx.Err() != nil { + s.logger.ErrorContext(ctx, "giving up retrying metrics submission", "error", ctx.Err()) + err = ctx.Err() + } + return } } } +// newTicker returns a ticker that ticks once immediately, and then backs off exponentially forever. +// Caller is responsible for calling Stop() on it eventually. +func newTicker(ctx context.Context, timer backoff.Timer) *backoff.Ticker { + b := backoff.NewExponentialBackOff() + b.InitialInterval = 10 * time.Second + b.MaxElapsedTime = 0 + return backoff.NewTickerWithTimer(backoff.WithContext(b, ctx), timer) +} + // Await signals the goroutine running Run that no more metrics will be sent on the channel. // Then it waits for that goroutine to submit them (with retries). func (s *Submitter) Await() { diff --git a/internal/metrics/submitter/submitter_test.go b/internal/metrics/submitter/submitter_test.go new file mode 100644 index 0000000..a3210f8 --- /dev/null +++ b/internal/metrics/submitter/submitter_test.go @@ -0,0 +1,141 @@ +package submitter + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stackrox/image-prefetcher/internal/metrics/gen" + + "github.com/neilotoole/slogt" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +type fakeClient struct { + failures int + calls int +} + +func (f *fakeClient) Submit(ctx context.Context, _ ...grpc.CallOption) (gen.Metrics_SubmitClient, error) { + f.calls++ + if f.failures >= f.calls { + return nil, fmt.Errorf("failing as requested, %d calls, %d faiulres", f.calls, f.failures) + } + return &fakeSubmitClient{}, nil +} + +type fakeSubmitClient struct { +} + +func (f *fakeSubmitClient) Send(result *gen.Result) error { + return nil +} + +func (f *fakeSubmitClient) CloseAndRecv() (*gen.Empty, error) { + return nil, nil +} + +func (f *fakeSubmitClient) Header() (metadata.MD, error) { + panic("unimplemented") +} + +func (f *fakeSubmitClient) Trailer() metadata.MD { + panic("unimplemented") +} + +func (f *fakeSubmitClient) CloseSend() error { + panic("unimplemented") +} + +func (f *fakeSubmitClient) Context() context.Context { + panic("unimplemented") +} + +func (f *fakeSubmitClient) SendMsg(m any) error { + panic("unimplemented") +} + +func (f *fakeSubmitClient) RecvMsg(m any) error { + panic("unimplemented") +} + +type testTimer struct { + c chan time.Time +} + +func (t *testTimer) Start(duration time.Duration) { + go func() { t.c <- time.Now().Add(duration) }() +} + +func (t *testTimer) Stop() { +} + +func (t *testTimer) C() <-chan time.Time { + return t.c +} + +func TestSubmitter(t *testing.T) { + tests := map[string]struct { + client *fakeClient + expectCalls int + timer *testTimer + timeout time.Duration + expectErr error + }{ + "nil": { + client: nil, + }, + "simple": { + client: &fakeClient{}, + expectCalls: 1, + }, + "with retries": { + client: &fakeClient{ + failures: 2, + }, + timer: &testTimer{}, + expectCalls: 3, + }, + "timeout": { + client: &fakeClient{ + failures: 999, + }, + timeout: 50 * time.Millisecond, + expectCalls: 1, + expectErr: context.DeadlineExceeded, + }, + } + for name, test := range tests { + t.Run(name, func(t *testing.T) { + var sink *Submitter + if test.client != nil { + sink = NewSubmitter(slogt.New(t), test.client) + if test.timer != nil { + sink.timer = test.timer + } + } + timeout := test.timeout + if timeout == 0 { + timeout = 30 * time.Second // to catch hangs early + } + ctx, cancel := context.WithTimeout(context.Background(), timeout) + func() { + c := sink.Chan() + if sink != nil { + go func() { assert.ErrorIs(t, sink.Run(ctx), test.expectErr) }() + c <- &gen.Result{Error: "bam"} + } + sink.Await() + var actualCalls int + if test.client != nil { + actualCalls = test.client.calls + } + assert.Equal(t, test.expectCalls, actualCalls) + defer cancel() + }() + }) + } +}