From ea315b5726f74681fa07d84f003b239c544c0239 Mon Sep 17 00:00:00 2001 From: Marcin Owsiany Date: Wed, 22 May 2024 11:41:31 +0200 Subject: [PATCH] Add metrics support. --- .github/dependabot.yaml | 6 + .github/workflows/build.yaml | 4 + README.md | 28 ++- cmd/fetch.go | 4 +- cmd/metrics.go | 34 ++++ deploy/deployment.yaml.gotpl | 51 +++++ deploy/main.go | 10 +- go.mod | 20 +- go.sum | 24 ++- internal/main.go | 61 +++++- internal/metrics/.gitignore | 1 + internal/metrics/Makefile | 59 ++++++ internal/metrics/gen/metrics.pb.go | 258 ++++++++++++++++++++++++ internal/metrics/gen/metrics_grpc.pb.go | 143 +++++++++++++ internal/metrics/metrics.proto | 19 ++ internal/metrics/server/main.go | 108 ++++++++++ internal/metrics/submitter/main.go | 103 ++++++++++ internal/metrics/tools/.gitignore | 2 + internal/metrics/tools/go.mod | 10 + internal/metrics/tools/go.sum | 6 + internal/metrics/tools/tools-import.go | 13 ++ internal/metrics/tools/tools.go | 3 + make/github.mk | 23 +++ 23 files changed, 963 insertions(+), 27 deletions(-) create mode 100644 cmd/metrics.go create mode 100644 internal/metrics/.gitignore create mode 100644 internal/metrics/Makefile create mode 100644 internal/metrics/gen/metrics.pb.go create mode 100644 internal/metrics/gen/metrics_grpc.pb.go create mode 100644 internal/metrics/metrics.proto create mode 100644 internal/metrics/server/main.go create mode 100644 internal/metrics/submitter/main.go create mode 100644 internal/metrics/tools/.gitignore create mode 100644 internal/metrics/tools/go.mod create mode 100644 internal/metrics/tools/go.sum create mode 100644 internal/metrics/tools/tools-import.go create mode 100644 internal/metrics/tools/tools.go create mode 100644 make/github.mk diff --git a/.github/dependabot.yaml b/.github/dependabot.yaml index c379e06..eb96f3a 100644 --- a/.github/dependabot.yaml +++ b/.github/dependabot.yaml @@ -22,3 +22,9 @@ updates: interval: 'weekly' day: 'tuesday' open-pull-requests-limit: 3 + - package-ecosystem: 'gomod' + directory: '/internal/metrics/tools' + schedule: + interval: 'weekly' + day: 'tuesday' + open-pull-requests-limit: 3 diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 6d0b73d..d8e9d6b 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -53,6 +53,10 @@ jobs: ./deploy/deploy --k8s-flavor ocp my-images > manifests/ocp.yaml ./deploy/deploy --k8s-flavor vanilla --secret my-secret my-images > manifests/vanilla-with-secret.yaml ./deploy/deploy --k8s-flavor ocp --secret my-secret my-images > manifests/ocp-with-secret.yaml + ./deploy/deploy --k8s-flavor vanilla --collect-metrics my-images > manifests/vanilla-metrics.yaml + ./deploy/deploy --k8s-flavor ocp --collect-metrics my-images > manifests/ocp-metrics.yaml + ./deploy/deploy --k8s-flavor vanilla --secret my-secret --collect-metrics my-images > manifests/vanilla-with-secret-metrics.yaml + ./deploy/deploy --k8s-flavor ocp --secret my-secret --collect-metrics my-images > manifests/ocp-with-secret-metrics.yaml - name: kubeconform run: | diff --git a/README.md b/README.md index f9e7983..ca8580b 100644 --- a/README.md +++ b/README.md @@ -6,17 +6,22 @@ Talks directly to Container Runtime Interface ([CRI](https://kubernetes.io/docs/ - fetch all images on all nodes in parallel, - retry pulls with increasingly longer timeouts. This prevents getting stuck on stalled connections to image registry. +It also optionally collects each pull attempt's duration and result. + ## Architecture ### `image-prefetcher` - main binary, - shipped as an OCI image, -- provides two subcommands: +- provides three subcommands: - `fetch`: runs the actual image pulls via CRI, meant to run as an init container of DaemonSet pods. Requires access to the CRI UNIX domain socket from the host. - `sleep`: just sleeps forever, meant to run as the main container of DaemonSet pods. + - `aggregate-metrics`: runs a gRPC server which collects data points pushed by the + `fetch` pods, and makes the data available for download over HTTP. + Meant to run as a standalone pod. ### `deploy` @@ -40,6 +45,7 @@ Talks directly to Container Runtime Interface ([CRI](https://kubernetes.io/docs/ - `--secret`: image pull `Secret` name. Required if the images are not pullable anonymously. This image pull secret should be usable for all images fetched by the given instance. If provided, it must be of type `kubernetes.io/dockerconfigjson` and exist in the same namespace. + - `--collect-metrics`: if the image pull metrics should be collected. Example: @@ -71,6 +77,26 @@ Talks directly to Container Runtime Interface ([CRI](https://kubernetes.io/docs/ kubectl logs -n prefetch-images daemonset/my-images -c prefetch ``` +6. If metrics collection was requested, wait for the endpoint to appear, and fetch them: + ``` + attempt=0 + service="service/my-images-metrics" + while [[ -z $(kubectl -n "${ns}" get "${service}" -o jsonpath="{.status.loadBalancer.ingress}" 2>/dev/null) ]]; do + if [ "$attempt" -lt "10" ]; then + echo "Waiting for ${service} to obtain endpoint ..." + ((attempt++)) + sleep 10 + else + echo "Timeout waiting for ${service} to obtain endpoint!" + exit 1 + fi + done + endpoint="$(kubectl -n "${ns}" get "${service}" -o json | jq -r '.status.loadBalancer.ingress[] | .ip')" + curl "http://${endpoint}:8080/metrics" | jq + ``` + + See the [Result](internal/metrics/metrics.proto) message definition for a list of fields. + ### Customization You can tweak certain parameters such as timeouts by editing `args` in the above manifest. diff --git a/cmd/fetch.go b/cmd/fetch.go index e7ac905..4ff806b 100644 --- a/cmd/fetch.go +++ b/cmd/fetch.go @@ -33,7 +33,7 @@ It talks to Container Runtime Interface API to pull images in parallel, with ret return err } imageList = append(imageList, args...) - return internal.Run(logger, criSocket, dockerConfigJSONPath, timing, imageList...) + return internal.Run(logger, criSocket, dockerConfigJSONPath, timing, metricsEndpoint, imageList...) }, } @@ -41,6 +41,7 @@ var ( criSocket string dockerConfigJSONPath string imageListFile string + metricsEndpoint string imageListTimeout = time.Minute initialPullAttemptTimeout = 30 * time.Second maxPullAttemptTimeout = 5 * time.Minute @@ -56,6 +57,7 @@ func init() { fetchCmd.Flags().StringVar(&criSocket, "cri-socket", "/run/containerd/containerd.sock", "Path to CRI UNIX socket.") fetchCmd.Flags().StringVar(&dockerConfigJSONPath, "docker-config", "", "Path to docker config json file.") fetchCmd.Flags().StringVar(&imageListFile, "image-list-file", "", "Path to text file containing images to pull (one per line).") + fetchCmd.Flags().StringVar(&metricsEndpoint, "metrics-endpoint", "", "A host:port to submit image pull metrics to.") fetchCmd.Flags().DurationVar(&imageListTimeout, "image-list-timeout", imageListTimeout, "Timeout for image list calls (for debugging).") fetchCmd.Flags().DurationVar(&initialPullAttemptTimeout, "initial-pull-attempt-timeout", initialPullAttemptTimeout, "Timeout for initial image pull call. Each subsequent attempt doubles it until max.") diff --git a/cmd/metrics.go b/cmd/metrics.go new file mode 100644 index 0000000..2710582 --- /dev/null +++ b/cmd/metrics.go @@ -0,0 +1,34 @@ +package cmd + +import ( + "github.com/stackrox/image-prefetcher/internal/logging" + "github.com/stackrox/image-prefetcher/internal/metrics/server" + + "github.com/spf13/cobra" +) + +// aggregateMetricsCmd represents the aggregate-metrics command +var aggregateMetricsCmd = &cobra.Command{ + Use: "aggregate-metrics", + Short: "Accept metrics submissions and serve them.", + Long: `This subcommand is intended to run in a single pod. + +It serves: +- a gRPC endpoint to which individual metrics can be submitted, +- an HTTP endpoint from which the aggregate metrics can be fetched.`, + RunE: func(cmd *cobra.Command, args []string) error { + return server.Run(logging.GetLogger(), grpcPort, httpPort) + }, +} + +var ( + grpcPort int + httpPort int +) + +func init() { + rootCmd.AddCommand(aggregateMetricsCmd) + logging.AddFlags(aggregateMetricsCmd.Flags()) + aggregateMetricsCmd.Flags().IntVar(&grpcPort, "grpc-port", 8443, "Port for metrics submission gRPC endpoint to listen on.") + aggregateMetricsCmd.Flags().IntVar(&httpPort, "http-port", 8080, "Port for metrics retrieval HTTP endpoint to listen on.") +} diff --git a/deploy/deployment.yaml.gotpl b/deploy/deployment.yaml.gotpl index 98c6ced..4e9e233 100644 --- a/deploy/deployment.yaml.gotpl +++ b/deploy/deployment.yaml.gotpl @@ -26,6 +26,54 @@ roleRef: name: privileged-scc-use --- {{ end }} +{{ if .CollectMetrics }} +apiVersion: v1 +kind: Pod +metadata: + name: {{ .Name }}-metrics + labels: + app: {{ .Name }}-metrics +spec: + containers: + - name: aggregator + image: {{ .Image }}:{{ .Version }} + args: + - "aggregate-metrics" + - "--debug" + ports: + - containerPort: 8443 + name: grpc + - containerPort: 8080 + name: http + resources: + requests: + cpu: "5m" + memory: "16Mi" + limits: + cpu: "100m" + memory: "64Mi" + securityContext: + readOnlyRootFilesystem: true + runAsUser: 1000 + runAsNonRoot: true +--- +apiVersion: v1 +kind: Service +metadata: + name: {{ .Name }}-metrics +spec: + ports: + - name: grpc + port: 8443 + protocol: TCP + - name: http + port: 8080 + protocol: TCP + selector: + app: {{ .Name }}-metrics + type: LoadBalancer +--- +{{ end }} apiVersion: apps/v1 kind: DaemonSet metadata: @@ -64,6 +112,9 @@ spec: {{ else }} - "--cri-socket=/tmp/cri/containerd.sock" {{ end }} + {{ if .CollectMetrics }} + - "--metrics-endpoint={{ .Name }}-metrics:8443" + {{ end }} resources: requests: cpu: "20m" diff --git a/deploy/main.go b/deploy/main.go index f5dd5cb..d313181 100644 --- a/deploy/main.go +++ b/deploy/main.go @@ -17,6 +17,7 @@ type settings struct { Secret string IsCRIO bool NeedsPrivileged bool + CollectMetrics bool } const ( @@ -32,15 +33,17 @@ const imageRepo = "quay.io/stackrox-io/image-prefetcher" var deploymentTemplate string var ( - version string - k8sFlavor k8sFlavorType - secret string + version string + k8sFlavor k8sFlavorType + secret string + collectMetrics bool ) func init() { flag.StringVar(&version, "version", "v0.1.0", "Version of image prefetcher OCI image.") flag.TextVar(&k8sFlavor, "k8s-flavor", flavor(vanillaFlavor), fmt.Sprintf("Kubernetes flavor. Accepted values: %s", strings.Join(allFlavors, ","))) flag.StringVar(&secret, "secret", "", "Kubernetes image pull Secret to use when pulling.") + flag.BoolVar(&collectMetrics, "collect-metrics", false, "Whether to collect and expose image pull metrics.") } func main() { @@ -59,6 +62,7 @@ func main() { Secret: secret, IsCRIO: isOcp, NeedsPrivileged: isOcp, + CollectMetrics: collectMetrics, } tmpl := template.Must(template.New("deployment").Parse(deploymentTemplate)) if err := tmpl.Execute(os.Stdout, s); err != nil { diff --git a/go.mod b/go.mod index 8fa6048..8bcc2fe 100644 --- a/go.mod +++ b/go.mod @@ -1,25 +1,27 @@ module github.com/stackrox/image-prefetcher -go 1.21 +go 1.22.0 -toolchain go1.21.7 +toolchain go1.22.1 require ( + github.com/cenkalti/backoff/v4 v4.2.1 + github.com/google/uuid v1.6.0 github.com/spf13/cobra v1.8.0 + github.com/spf13/pflag v1.0.5 google.golang.org/grpc v1.63.2 - k8s.io/apimachinery v0.29.3 + google.golang.org/protobuf v1.33.0 + k8s.io/apimachinery v0.30.0 k8s.io/cri-api v0.29.3 - k8s.io/klog/v2 v2.110.1 + k8s.io/klog/v2 v2.120.1 ) require ( - github.com/go-logr/logr v1.3.0 // indirect + github.com/go-logr/logr v1.4.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect - github.com/spf13/pflag v1.0.5 // indirect - golang.org/x/net v0.21.0 // indirect - golang.org/x/sys v0.17.0 // indirect + golang.org/x/net v0.23.0 // indirect + golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect - google.golang.org/protobuf v1.33.0 // indirect ) diff --git a/go.sum b/go.sum index 933c3da..3ad9b6b 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,14 @@ +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= -github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= -github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= @@ -25,16 +29,16 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= -golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= -golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= @@ -55,9 +59,9 @@ google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGm google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/apimachinery v0.29.3 h1:2tbx+5L7RNvqJjn7RIuIKu9XTsIZ9Z5wX2G22XAa5EU= -k8s.io/apimachinery v0.29.3/go.mod h1:hx/S4V2PNW4OMg3WizRrHutyB5la0iCUbZym+W0EQIU= +k8s.io/apimachinery v0.30.0 h1:qxVPsyDM5XS96NIh9Oj6LavoVFYff/Pon9cZeDIkHHA= +k8s.io/apimachinery v0.30.0/go.mod h1:iexa2somDaxdnj7bha06bhb43Zpa6eWH8N8dbqVjTUc= k8s.io/cri-api v0.29.3 h1:ppKSui+hhTJW774Mou6x+/ealmzt2jmTM0vsEQVWrjI= k8s.io/cri-api v0.29.3/go.mod h1:3X7EnhsNaQnCweGhQCJwKNHlH7wHEYuKQ19bRvXMoJY= -k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0= -k8s.io/klog/v2 v2.110.1/go.mod h1:YGtd1984u+GgbuZ7e08/yBuAfKLSO0+uR1Fhi6ExXjo= +k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw= +k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= diff --git a/internal/main.go b/internal/main.go index f3ac70d..08bb533 100644 --- a/internal/main.go +++ b/internal/main.go @@ -10,7 +10,10 @@ import ( "time" "github.com/stackrox/image-prefetcher/internal/credentialprovider" + metricsProto "github.com/stackrox/image-prefetcher/internal/metrics/gen" + "github.com/stackrox/image-prefetcher/internal/metrics/submitter" + "github.com/google/uuid" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" criV1 "k8s.io/cri-api/pkg/apis/runtime/v1" @@ -25,7 +28,7 @@ type TimingConfig struct { MaxPullAttemptDelay time.Duration } -func Run(logger *slog.Logger, criSocketPath string, dockerConfigJSONPath string, timing TimingConfig, imageNames ...string) error { +func Run(logger *slog.Logger, criSocketPath string, dockerConfigJSONPath string, timing TimingConfig, metricsEndpoint string, imageNames ...string) error { ctx, cancel := context.WithTimeout(context.Background(), timing.OverallTimeout) defer cancel() @@ -39,6 +42,16 @@ func Run(logger *slog.Logger, criSocketPath string, dockerConfigJSONPath string, return fmt.Errorf("failed to list images for debugging before pulling: %w", err) } + var metricsSink *submitter.Submitter + if metricsEndpoint != "" { + metricsConn, err := grpc.DialContext(ctx, metricsEndpoint, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return fmt.Errorf("failed to dial metrics endpoint %q: %w", metricsEndpoint, err) + } + metricsSink = submitter.NewSubmitter(logger, metricsProto.NewMetricsClient(metricsConn)) + go metricsSink.Run(ctx) + } + kr := credentialprovider.BasicDockerKeyring{} if err := loadPullSecret(logger, &kr, dockerConfigJSONPath); err != nil { return fmt.Errorf("failed to load image pull secrets: %w", err) @@ -55,11 +68,12 @@ func Run(logger *slog.Logger, criSocketPath string, dockerConfigJSONPath string, }, Auth: auth, } - go pullImageWithRetries(ctx, logger.With("image", imageName, "authNum", i), &wg, criClient, request, timing) + go pullImageWithRetries(ctx, logger.With("image", imageName, "authNum", i), &wg, criClient, metricsSink.Chan(), imageName, request, timing) } } wg.Wait() logger.Info("pulling images finished") + metricsSink.Await() if err := listImagesForDebugging(ctx, logger, criClient, timing.ImageListTimeout, "after"); err != nil { return fmt.Errorf("failed to list images for debugging after pulling: %w", err) } @@ -123,7 +137,7 @@ func getAuthsForImage(ctx context.Context, logger *slog.Logger, kr credentialpro return auths } -func pullImageWithRetries(ctx context.Context, logger *slog.Logger, wg *sync.WaitGroup, client criV1.ImageServiceClient, request *criV1.PullImageRequest, timing TimingConfig) { +func pullImageWithRetries(ctx context.Context, logger *slog.Logger, wg *sync.WaitGroup, client criV1.ImageServiceClient, metricsSink chan<- *metricsProto.Result, name string, request *criV1.PullImageRequest, timing TimingConfig) { defer wg.Done() attemptTimeout := timing.InitialPullAttemptTimeout delay := timing.InitialPullAttemptDelay @@ -136,9 +150,12 @@ func pullImageWithRetries(ctx context.Context, logger *slog.Logger, wg *sync.Wai cancel() if err == nil { logger.InfoContext(ctx, "image pulled successfully", "response", response, "elapsed", elapsed) + sizeBytes := getImageSize(ctx, logger, client, response) + noteSuccess(metricsSink, name, start, elapsed, sizeBytes) return } logger.ErrorContext(ctx, "image failed to pull", "error", err, "timeout", attemptTimeout, "elapsed", elapsed) + noteFailure(metricsSink, name, start, elapsed, err) if ctx.Err() != nil { logger.ErrorContext(ctx, "not retrying any more", "error", ctx.Err()) return @@ -150,3 +167,41 @@ func pullImageWithRetries(ctx context.Context, logger *slog.Logger, wg *sync.Wai delay = delay * 2 } } + +func getImageSize(ctx context.Context, logger *slog.Logger, client criV1.ImageServiceClient, response *criV1.PullImageResponse) uint64 { + imageStatus, err := client.ImageStatus(ctx, &criV1.ImageStatusRequest{ + Image: &criV1.ImageSpec{ + Image: response.ImageRef, + }, + }) + if err != nil { + logger.WarnContext(ctx, "failed to obtain pulled image status", "image", response.ImageRef, "error", err) + return 0 + } + return imageStatus.GetImage().GetSize_() +} + +func noteSuccess(sink chan<- *metricsProto.Result, name string, start time.Time, elapsed time.Duration, sizeBytes uint64) { + if sink == nil { + return + } + sink <- &metricsProto.Result{ + AttemptId: uuid.NewString(), + StartedAt: start.Unix(), + Image: name, + DurationMs: uint64(elapsed.Milliseconds()), + SizeBytes: sizeBytes, + } +} +func noteFailure(sink chan<- *metricsProto.Result, name string, start time.Time, elapsed time.Duration, err error) { + if sink == nil { + return + } + sink <- &metricsProto.Result{ + AttemptId: uuid.NewString(), + StartedAt: start.Unix(), + Image: name, + DurationMs: uint64(elapsed.Milliseconds()), + Error: err.Error(), + } +} diff --git a/internal/metrics/.gitignore b/internal/metrics/.gitignore new file mode 100644 index 0000000..f81b0a3 --- /dev/null +++ b/internal/metrics/.gitignore @@ -0,0 +1 @@ +/.gotools/ diff --git a/internal/metrics/Makefile b/internal/metrics/Makefile new file mode 100644 index 0000000..405dd7f --- /dev/null +++ b/internal/metrics/Makefile @@ -0,0 +1,59 @@ +# Makefile for generating protobuf and gRPC-related files. +# Much of this file is borrowed from https://github.com/stackrox/stackrox/blob/master/make/protogen.mk + +BASE_PATH ?= $(CURDIR) +SILENT ?= @ + +.PHONY: all +all: generate + +include $(BASE_PATH)/../../make/gotools.mk +include $(BASE_PATH)/../../make/github.mk + +$(call go-tool, PROTOC_GEN_GO_BIN, google.golang.org/protobuf/cmd/protoc-gen-go, tools) +$(call go-tool, PROTOC_GEN_GO_GRPC_BIN, google.golang.org/grpc/cmd/protoc-gen-go-grpc, tools) + +PROTOC_VERSION := 26.1 +UNAME_S := $(shell uname -s) +ifeq ($(UNAME_S),Linux) +PROTOC_OS = linux +endif +ifeq ($(UNAME_S),Darwin) +PROTOC_OS = osx +endif +PROTOC_ARCH=$(shell case $$(uname -m) in (arm64) echo aarch_64 ;; (s390x) echo s390_64 ;; (*) uname -m ;; esac) + +PROTO_PRIVATE_DIR := $(BASE_PATH)/tools + +PROTOC_DIR := $(PROTO_PRIVATE_DIR)/protoc-$(PROTOC_OS)-$(PROTOC_ARCH)-$(PROTOC_VERSION) + +PROTOC := $(PROTOC_DIR)/bin/protoc + +PROTOC_DOWNLOADS_DIR := $(PROTO_PRIVATE_DIR)/.downloads + +.PHONY: generate +generate: $(PROTOC) $(PROTOC_GEN_GO_BIN) $(PROTOC_GEN_GO_GRPC_BIN) + mkdir -p gen + export PATH=$(BASE_PATH)/tools:$(BASE_PATH)/.gotools/bin:$$PATH; \ + $(PROTOC) --go_out=gen --go_opt=paths=source_relative --go-grpc_out=gen --go-grpc_opt=paths=source_relative metrics.proto + +$(PROTOC_DOWNLOADS_DIR): + @echo "+ $@" + $(SILENT)mkdir -p "$@" + +PROTOC_ZIP := protoc-$(PROTOC_VERSION)-$(PROTOC_OS)-$(PROTOC_ARCH).zip +PROTOC_FILE := $(PROTOC_DOWNLOADS_DIR)/$(PROTOC_ZIP) + +$(PROTOC_FILE): $(PROTOC_DOWNLOADS_DIR) + @$(GET_GITHUB_RELEASE_FN); \ + get_github_release "$@" "https://github.com/protocolbuffers/protobuf/releases/download/v$(PROTOC_VERSION)/$(PROTOC_ZIP)" + +.PRECIOUS: $(PROTOC_FILE) + +$(PROTOC): + @echo "+ $@" + $(SILENT)$(MAKE) "$(PROTOC_FILE)" + $(SILENT)mkdir -p "$(PROTOC_DIR)" + $(SILENT)unzip -q -o -d "$(PROTOC_DIR)" "$(PROTOC_FILE)" + $(SILENT)test -x "$@" + diff --git a/internal/metrics/gen/metrics.pb.go b/internal/metrics/gen/metrics.pb.go new file mode 100644 index 0000000..f59d18c --- /dev/null +++ b/internal/metrics/gen/metrics.pb.go @@ -0,0 +1,258 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.33.0 +// protoc v5.26.1 +// source: metrics.proto + +package gen + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Result struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + AttemptId string `protobuf:"bytes,1,opt,name=attempt_id,json=attemptId,proto3" json:"attempt_id,omitempty"` + StartedAt int64 `protobuf:"varint,2,opt,name=started_at,json=startedAt,proto3" json:"started_at,omitempty"` + Image string `protobuf:"bytes,3,opt,name=image,proto3" json:"image,omitempty"` + Error string `protobuf:"bytes,4,opt,name=error,proto3" json:"error,omitempty"` + DurationMs uint64 `protobuf:"varint,5,opt,name=duration_ms,json=durationMs,proto3" json:"duration_ms,omitempty"` + Node string `protobuf:"bytes,6,opt,name=node,proto3" json:"node,omitempty"` + SizeBytes uint64 `protobuf:"varint,7,opt,name=size_bytes,json=sizeBytes,proto3" json:"size_bytes,omitempty"` +} + +func (x *Result) Reset() { + *x = Result{} + if protoimpl.UnsafeEnabled { + mi := &file_metrics_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Result) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Result) ProtoMessage() {} + +func (x *Result) ProtoReflect() protoreflect.Message { + mi := &file_metrics_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Result.ProtoReflect.Descriptor instead. +func (*Result) Descriptor() ([]byte, []int) { + return file_metrics_proto_rawDescGZIP(), []int{0} +} + +func (x *Result) GetAttemptId() string { + if x != nil { + return x.AttemptId + } + return "" +} + +func (x *Result) GetStartedAt() int64 { + if x != nil { + return x.StartedAt + } + return 0 +} + +func (x *Result) GetImage() string { + if x != nil { + return x.Image + } + return "" +} + +func (x *Result) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +func (x *Result) GetDurationMs() uint64 { + if x != nil { + return x.DurationMs + } + return 0 +} + +func (x *Result) GetNode() string { + if x != nil { + return x.Node + } + return "" +} + +func (x *Result) GetSizeBytes() uint64 { + if x != nil { + return x.SizeBytes + } + return 0 +} + +type Empty struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *Empty) Reset() { + *x = Empty{} + if protoimpl.UnsafeEnabled { + mi := &file_metrics_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Empty) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Empty) ProtoMessage() {} + +func (x *Empty) ProtoReflect() protoreflect.Message { + mi := &file_metrics_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Empty.ProtoReflect.Descriptor instead. +func (*Empty) Descriptor() ([]byte, []int) { + return file_metrics_proto_rawDescGZIP(), []int{1} +} + +var File_metrics_proto protoreflect.FileDescriptor + +var file_metrics_proto_rawDesc = []byte{ + 0x0a, 0x0d, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, + 0xc6, 0x01, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x61, 0x74, + 0x74, 0x65, 0x6d, 0x70, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, + 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x73, 0x74, 0x61, + 0x72, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x73, + 0x74, 0x61, 0x72, 0x74, 0x65, 0x64, 0x41, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x69, 0x6d, 0x61, 0x67, + 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x69, 0x6d, 0x61, 0x67, 0x65, 0x12, 0x14, + 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, + 0x72, 0x72, 0x6f, 0x72, 0x12, 0x1f, 0x0a, 0x0b, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x5f, 0x6d, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, 0x64, 0x75, 0x72, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x4d, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x6f, 0x64, 0x65, 0x18, 0x06, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x6f, 0x64, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x73, 0x69, 0x7a, + 0x65, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x73, + 0x69, 0x7a, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74, + 0x79, 0x32, 0x28, 0x0a, 0x07, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x1d, 0x0a, 0x06, + 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x07, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x1a, + 0x06, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x28, 0x01, 0x42, 0x3b, 0x5a, 0x39, 0x67, + 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x74, 0x61, 0x63, 0x6b, 0x72, + 0x6f, 0x78, 0x2f, 0x69, 0x6d, 0x61, 0x67, 0x65, 0x2d, 0x70, 0x72, 0x65, 0x66, 0x65, 0x74, 0x63, + 0x68, 0x65, 0x72, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x6d, 0x65, 0x74, + 0x72, 0x69, 0x63, 0x73, 0x3b, 0x67, 0x65, 0x6e, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_metrics_proto_rawDescOnce sync.Once + file_metrics_proto_rawDescData = file_metrics_proto_rawDesc +) + +func file_metrics_proto_rawDescGZIP() []byte { + file_metrics_proto_rawDescOnce.Do(func() { + file_metrics_proto_rawDescData = protoimpl.X.CompressGZIP(file_metrics_proto_rawDescData) + }) + return file_metrics_proto_rawDescData +} + +var file_metrics_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_metrics_proto_goTypes = []interface{}{ + (*Result)(nil), // 0: Result + (*Empty)(nil), // 1: Empty +} +var file_metrics_proto_depIdxs = []int32{ + 0, // 0: Metrics.Submit:input_type -> Result + 1, // 1: Metrics.Submit:output_type -> Empty + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_metrics_proto_init() } +func file_metrics_proto_init() { + if File_metrics_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_metrics_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Result); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_metrics_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Empty); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_metrics_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_metrics_proto_goTypes, + DependencyIndexes: file_metrics_proto_depIdxs, + MessageInfos: file_metrics_proto_msgTypes, + }.Build() + File_metrics_proto = out.File + file_metrics_proto_rawDesc = nil + file_metrics_proto_goTypes = nil + file_metrics_proto_depIdxs = nil +} diff --git a/internal/metrics/gen/metrics_grpc.pb.go b/internal/metrics/gen/metrics_grpc.pb.go new file mode 100644 index 0000000..2e14fdb --- /dev/null +++ b/internal/metrics/gen/metrics_grpc.pb.go @@ -0,0 +1,143 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc v5.26.1 +// source: metrics.proto + +package gen + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + Metrics_Submit_FullMethodName = "/Metrics/Submit" +) + +// MetricsClient is the client API for Metrics service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type MetricsClient interface { + Submit(ctx context.Context, opts ...grpc.CallOption) (Metrics_SubmitClient, error) +} + +type metricsClient struct { + cc grpc.ClientConnInterface +} + +func NewMetricsClient(cc grpc.ClientConnInterface) MetricsClient { + return &metricsClient{cc} +} + +func (c *metricsClient) Submit(ctx context.Context, opts ...grpc.CallOption) (Metrics_SubmitClient, error) { + stream, err := c.cc.NewStream(ctx, &Metrics_ServiceDesc.Streams[0], Metrics_Submit_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &metricsSubmitClient{stream} + return x, nil +} + +type Metrics_SubmitClient interface { + Send(*Result) error + CloseAndRecv() (*Empty, error) + grpc.ClientStream +} + +type metricsSubmitClient struct { + grpc.ClientStream +} + +func (x *metricsSubmitClient) Send(m *Result) error { + return x.ClientStream.SendMsg(m) +} + +func (x *metricsSubmitClient) CloseAndRecv() (*Empty, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(Empty) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// MetricsServer is the server API for Metrics service. +// All implementations must embed UnimplementedMetricsServer +// for forward compatibility +type MetricsServer interface { + Submit(Metrics_SubmitServer) error + mustEmbedUnimplementedMetricsServer() +} + +// UnimplementedMetricsServer must be embedded to have forward compatible implementations. +type UnimplementedMetricsServer struct { +} + +func (UnimplementedMetricsServer) Submit(Metrics_SubmitServer) error { + return status.Errorf(codes.Unimplemented, "method Submit not implemented") +} +func (UnimplementedMetricsServer) mustEmbedUnimplementedMetricsServer() {} + +// UnsafeMetricsServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to MetricsServer will +// result in compilation errors. +type UnsafeMetricsServer interface { + mustEmbedUnimplementedMetricsServer() +} + +func RegisterMetricsServer(s grpc.ServiceRegistrar, srv MetricsServer) { + s.RegisterService(&Metrics_ServiceDesc, srv) +} + +func _Metrics_Submit_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(MetricsServer).Submit(&metricsSubmitServer{stream}) +} + +type Metrics_SubmitServer interface { + SendAndClose(*Empty) error + Recv() (*Result, error) + grpc.ServerStream +} + +type metricsSubmitServer struct { + grpc.ServerStream +} + +func (x *metricsSubmitServer) SendAndClose(m *Empty) error { + return x.ServerStream.SendMsg(m) +} + +func (x *metricsSubmitServer) Recv() (*Result, error) { + m := new(Result) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// Metrics_ServiceDesc is the grpc.ServiceDesc for Metrics service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Metrics_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "Metrics", + HandlerType: (*MetricsServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Submit", + Handler: _Metrics_Submit_Handler, + ClientStreams: true, + }, + }, + Metadata: "metrics.proto", +} diff --git a/internal/metrics/metrics.proto b/internal/metrics/metrics.proto new file mode 100644 index 0000000..4689004 --- /dev/null +++ b/internal/metrics/metrics.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +option go_package = "github.com/stackrox/image-prefetcher/internal/metrics;gen"; + +message Result { + string attempt_id = 1; + int64 started_at = 2; + string image = 3; + string error = 4; + uint64 duration_ms = 5; + string node = 6; + uint64 size_bytes = 7; +} + +message Empty {} + +service Metrics { + rpc Submit(stream Result) returns (Empty) {} +} \ No newline at end of file diff --git a/internal/metrics/server/main.go b/internal/metrics/server/main.go new file mode 100644 index 0000000..7bdb655 --- /dev/null +++ b/internal/metrics/server/main.go @@ -0,0 +1,108 @@ +package server + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "log/slog" + "net" + "net/http" + "sync" + + "github.com/stackrox/image-prefetcher/internal/metrics/gen" + + "google.golang.org/grpc" +) + +type metricsServer struct { + mutex sync.Mutex + metrics map[string]*gen.Result + logger *slog.Logger + gen.UnimplementedMetricsServer +} + +func (s *metricsServer) Submit(stream gen.Metrics_SubmitServer) error { + for { + metric, err := stream.Recv() + if err == io.EOF { + return stream.SendAndClose(&gen.Empty{}) + } + if err != nil { + return err + } + s.metricSubmitted(metric) + } +} + +func (s *metricsServer) metricSubmitted(metric *gen.Result) { + s.logger.Debug("metric submitted", "metric", metric) + s.mutex.Lock() + defer s.mutex.Unlock() + if _, ok := s.metrics[metric.AttemptId]; ok { + s.logger.Info("duplicate metric submitted", "metric", metric) + } + s.metrics[metric.AttemptId] = metric +} + +func (s *metricsServer) ServeHTTP(writer http.ResponseWriter, _ *http.Request) { + resp, err := json.Marshal(s.currentMetrics()) + if err != nil { + s.logger.Error("failed to marshal metrics", "error", err) + writer.WriteHeader(http.StatusInternalServerError) + _, _ = writer.Write([]byte(err.Error())) + return + } + _, _ = writer.Write(resp) +} + +func (s *metricsServer) currentMetrics() []*gen.Result { + s.mutex.Lock() + defer s.mutex.Unlock() + metrics := make([]*gen.Result, 0, len(s.metrics)) + for _, metric := range s.metrics { + metrics = append(metrics, metric) + } + return metrics +} + +func Run(logger *slog.Logger, grpcPort int, httpPort int) error { + server := &metricsServer{ + logger: logger, + metrics: make(map[string]*gen.Result), + } + grpcErrChan := make(chan error) + httpErrChan := make(chan error) + + grpcSpec := fmt.Sprintf(":%d", grpcPort) + grpcListener, err := net.Listen("tcp", grpcSpec) + if err != nil { + return fmt.Errorf("failed to listen on %s", grpcSpec) + } + grpcServer := grpc.NewServer() + gen.RegisterMetricsServer(grpcServer, server) + logger.Info("starting to serve", "grpcSpec", grpcSpec) + go func() { grpcErrChan <- grpcServer.Serve(grpcListener) }() + + httpSpec := fmt.Sprintf(":%d", httpPort) + httpListener, err := net.Listen("tcp", httpSpec) + if err != nil { + return fmt.Errorf("failed to listen on %s", httpSpec) + } + httpServer := &http.Server{} + http.Handle("/metrics", server) + logger.Info("starting to serve", "httpSpec", httpSpec) + go func() { httpErrChan <- httpServer.Serve(httpListener) }() + + // On shutdown of either, stop the other one. + var httpErr, grpcErr error + select { + case httpErr = <-httpErrChan: + grpcServer.Stop() + grpcErr = <-grpcErrChan + case grpcErr = <-grpcErrChan: + _ = httpServer.Close() + httpErr = <-httpErrChan + } + return errors.Join(grpcErr, httpErr) +} diff --git a/internal/metrics/submitter/main.go b/internal/metrics/submitter/main.go new file mode 100644 index 0000000..1f2f296 --- /dev/null +++ b/internal/metrics/submitter/main.go @@ -0,0 +1,103 @@ +package submitter + +import ( + "context" + "fmt" + "log/slog" + "os" + "time" + + "github.com/stackrox/image-prefetcher/internal/metrics/gen" + + "github.com/cenkalti/backoff/v4" +) + +type Submitter struct { + c chan *gen.Result + done chan struct{} + client gen.MetricsClient + logger *slog.Logger +} + +func NewSubmitter(logger *slog.Logger, client gen.MetricsClient) *Submitter { + return &Submitter{ + c: make(chan *gen.Result, 1), + done: make(chan struct{}), + client: client, + logger: logger, + } +} + +func (s *Submitter) Run(ctx context.Context) { + defer func() { s.done <- struct{}{} }() + if s.client == nil { + for range s.c { + } + return + } + hostName, err := os.Hostname() + if err != nil { + s.logger.WarnContext(ctx, "could not obtain hostname", "error", err) + hostName = "unknown" + } + + var metrics []*gen.Result + for metric := range s.c { + metric.Node = hostName + s.logger.DebugContext(ctx, "metric received", "metric", metric) + metrics = append(metrics, metric) + } + + if err = s.submit(ctx, metrics); err == nil { + s.logger.InfoContext(ctx, "metrics submitted") + return + } + s.logger.ErrorContext(ctx, "metric Submit RPC failed, retrying", "error", err) + b := backoff.NewExponentialBackOff() + b.InitialInterval = 10 * time.Second + b.MaxElapsedTime = 0 + ticker := backoff.NewTicker(backoff.WithContext(b, ctx)) + defer ticker.Stop() + for range ticker.C { + if ctx.Err() != nil { + s.logger.ErrorContext(ctx, "giving up retrying metrics submission", "error", ctx.Err()) + } + if err = s.submit(ctx, metrics); err == nil { + s.logger.InfoContext(ctx, "metrics submitted") + return + } + s.logger.ErrorContext(ctx, "metric Submit RPC failed, retrying", "error", err) + } +} + +func (s *Submitter) Await() { + if s == nil { + return + } + close(s.c) + s.logger.Info("waiting for metrics to be submitted") + <-s.done +} + +func (s *Submitter) submit(ctx context.Context, metrics []*gen.Result) error { + submitClient, err := s.client.Submit(ctx) + if err != nil { + return fmt.Errorf("invoking metric Submit RPC failed: %w", err) + } + for _, metric := range metrics { + if err := submitClient.Send(metric); err != nil { + return fmt.Errorf("streaming metric to Submit RPC failed: %w", err) + } + } + if _, err := submitClient.CloseAndRecv(); err != nil { + return fmt.Errorf("finishing metric Submit RPC failed: %w", err) + } + return nil +} + +func (s *Submitter) Chan() chan<- *gen.Result { + if s == nil { + return nil + } + return s.c +} diff --git a/internal/metrics/tools/.gitignore b/internal/metrics/tools/.gitignore new file mode 100644 index 0000000..580cecf --- /dev/null +++ b/internal/metrics/tools/.gitignore @@ -0,0 +1,2 @@ +/protoc-*/ +/.downloads/ diff --git a/internal/metrics/tools/go.mod b/internal/metrics/tools/go.mod new file mode 100644 index 0000000..c2171e6 --- /dev/null +++ b/internal/metrics/tools/go.mod @@ -0,0 +1,10 @@ +module github.com/stackrox/image-prefetcher/internal/metrics/tools + +go 1.20 + +require ( + google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0 + google.golang.org/protobuf v1.33.0 +) + +require github.com/google/go-cmp v0.6.0 // indirect diff --git a/internal/metrics/tools/go.sum b/internal/metrics/tools/go.sum new file mode 100644 index 0000000..e8e124c --- /dev/null +++ b/internal/metrics/tools/go.sum @@ -0,0 +1,6 @@ +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0 h1:rNBFJjBCOgVr9pWD7rs/knKL4FRTKgpZmsRfV214zcA= +google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0/go.mod h1:Dk1tviKTvMCz5tvh7t+fh94dhmQVHuCt2OzJB3CTW9Y= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= diff --git a/internal/metrics/tools/tools-import.go b/internal/metrics/tools/tools-import.go new file mode 100644 index 0000000..7006aaf --- /dev/null +++ b/internal/metrics/tools/tools-import.go @@ -0,0 +1,13 @@ +//go:build tools + +package tools + +// This file declares dependencies on tool for `go mod` purposes. +// See https://github.com/golang/go/wiki/Modules#how-can-i-track-tool-dependencies-for-a-module +// for an explanation of the approach. + +import ( + // Tool dependencies, not used anywhere in the code. + _ "google.golang.org/grpc/cmd/protoc-gen-go-grpc" + _ "google.golang.org/protobuf/cmd/protoc-gen-go" +) diff --git a/internal/metrics/tools/tools.go b/internal/metrics/tools/tools.go new file mode 100644 index 0000000..b2918ed --- /dev/null +++ b/internal/metrics/tools/tools.go @@ -0,0 +1,3 @@ +package tools + +// This file only exists to prevent package loading errors for this directory. diff --git a/make/github.mk b/make/github.mk new file mode 100644 index 0000000..c8d66bf --- /dev/null +++ b/make/github.mk @@ -0,0 +1,23 @@ +# github.mk +# Helpers for fetching released binaries from github projects. +# Copied from https://github.com/stackrox/stackrox/blob/master/make/github.mk +# DO NOT EDIT. Instead modify the upstream file and copy. + +# For usage instructions, see uses of this macro elsewhere, since there is no +# single standard for architecture naming. +GET_GITHUB_RELEASE_FN = get_github_release() { \ + [[ -x $${1} ]] || { \ + set -euo pipefail ;\ + echo "+ $${1}" ;\ + mkdir -p bin ;\ + attempts=5 ;\ + for i in $$(seq $$attempts); do \ + curl --silent --show-error --fail --location --output "$${1}" "$${2}" && break ;\ + [[ $$i -eq $$attempts ]] && exit 1; \ + echo "Retrying after $$((i*i)) seconds..."; \ + sleep "$$((i*i))"; \ + done ;\ + [[ $$(uname -s) != Darwin ]] || xattr -c "$${1}" ;\ + chmod +x "$${1}" ;\ + } \ +}