From b5c8e95528ebdc5565ab58f9e4f50d099fbb5c02 Mon Sep 17 00:00:00 2001 From: seb vincent Date: Thu, 25 Jan 2024 08:42:50 +0800 Subject: [PATCH] feat: add livelogs support (#869) * adds livelogs support * ignore context error + do not print logs at the end if livelogs are enabled * ignoring websocket close error * improve runner messages * implements reconnection * sse fixes * Update internal/saucecloud/imagerunner.go * process cloudevents for live logs * removed SSE, better error handling * moved asyncEvent logic to http, added imagerunner logs --livelogs command * enable livelogs websocket compression * Added log activity tracker, printing progress less often * handles not authorized * improve message display * improve error handling * Update internal/cmd/imagerunner/logs.go --------- Co-authored-by: Alex Plischke Co-authored-by: Maciej Plonski --- go.mod | 8 ++ go.sum | 18 +++ internal/cmd/docker/cmd.go | 9 +- internal/cmd/imagerunner/cmd.go | 12 +- internal/cmd/imagerunner/logs.go | 28 ++-- internal/cmd/run/imagerunner.go | 18 +-- internal/cmd/run/run.go | 1 + internal/http/imagerunner.go | 220 ++++++++++++++++++++++++++++- internal/imagerunner/async.go | 136 ++++++++++++++++++ internal/imagerunner/config.go | 1 + internal/imagerunner/errors.go | 19 +++ internal/saucecloud/imagerunner.go | 53 ++++++- 12 files changed, 491 insertions(+), 32 deletions(-) create mode 100644 internal/imagerunner/async.go create mode 100644 internal/imagerunner/errors.go diff --git a/go.mod b/go.mod index 13734aa35..f2fefcfda 100644 --- a/go.mod +++ b/go.mod @@ -34,17 +34,25 @@ require ( require ( github.com/Microsoft/go-winio v0.6.1 // indirect github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect + github.com/cloudevents/sdk-go/v2 v2.14.0 // indirect github.com/distribution/reference v0.5.0 // indirect github.com/docker/distribution v2.8.3+incompatible // indirect github.com/docker/go-connections v0.4.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/google/uuid v1.1.2 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.2 // indirect github.com/pelletier/go-toml/v2 v2.0.5 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/rivo/uniseg v0.4.4 // indirect + go.uber.org/atomic v1.9.0 // indirect + go.uber.org/multierr v1.8.0 // indirect + go.uber.org/zap v1.21.0 // indirect golang.org/x/tools v0.6.0 // indirect ) diff --git a/go.sum b/go.sum index 18225b3d5..f7f9dedb0 100644 --- a/go.sum +++ b/go.sum @@ -57,6 +57,7 @@ github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hC github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= +github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM= github.com/bmatcuk/doublestar/v4 v4.0.2 h1:X0krlUVAVmtr2cRoTqR8aDMrDqnB36ht8wpWTiQ3jsA= @@ -70,6 +71,8 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cloudevents/sdk-go/v2 v2.14.0 h1:Nrob4FwVgi5L4tV9lhjzZcjYqFVyJzsA56CwPaPfv6s= +github.com/cloudevents/sdk-go/v2 v2.14.0/go.mod h1:xDmKfzNjM8gBvjaF8ijFjM1VYOVUEeUfapHMUX1T5To= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= @@ -188,6 +191,7 @@ github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= @@ -238,6 +242,8 @@ github.com/jedib0t/go-pretty/v6 v6.2.1 h1:O/3XdNfyWSyVLLIt1EeDhfP8AhNMjtBSh0MuZ4 github.com/jedib0t/go-pretty/v6 v6.2.1/go.mod h1:+nE9fyyHGil+PuISTCrp7avEdo6bqoMwqZnuiK2r2a0= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= @@ -294,8 +300,12 @@ github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= @@ -402,8 +412,15 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= +go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= +go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= +go.uber.org/zap v1.21.0 h1:WefMeulhovoZ2sYXz7st6K0sLj7bBhpiFaud4r4zST8= +go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190219172222-a4c6cb3142f2/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -658,6 +675,7 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/cmd/docker/cmd.go b/internal/cmd/docker/cmd.go index 59b9736df..d44e30c45 100644 --- a/internal/cmd/docker/cmd.go +++ b/internal/cmd/docker/cmd.go @@ -5,6 +5,7 @@ import ( "time" "github.com/saucelabs/saucectl/internal/http" + "github.com/saucelabs/saucectl/internal/imagerunner" "github.com/saucelabs/saucectl/internal/region" "github.com/spf13/cobra" ) @@ -34,7 +35,13 @@ func Command(preRun func(cmd *cobra.Command, args []string)) *cobra.Command { creds := reg.Credentials() url := reg.APIBaseURL() - imageRunnerService = http.NewImageRunner(url, creds, imageRunnerServiceTimeout) + + asyncEventManager, err := imagerunner.NewAsyncEventManager() + if err != nil { + return err + } + + imageRunnerService = http.NewImageRunner(url, creds, imageRunnerServiceTimeout, asyncEventManager) return nil }, diff --git a/internal/cmd/imagerunner/cmd.go b/internal/cmd/imagerunner/cmd.go index eb10705a9..2e4c69ee2 100644 --- a/internal/cmd/imagerunner/cmd.go +++ b/internal/cmd/imagerunner/cmd.go @@ -6,6 +6,7 @@ import ( "github.com/saucelabs/saucectl/internal/credentials" "github.com/saucelabs/saucectl/internal/http" + "github.com/saucelabs/saucectl/internal/imagerunner" "github.com/saucelabs/saucectl/internal/region" "github.com/saucelabs/saucectl/internal/segment" "github.com/spf13/cobra" @@ -13,6 +14,7 @@ import ( var ( imagerunnerClient http.ImageRunner + liveLogs bool ) func Command(preRun func(cmd *cobra.Command, args []string)) *cobra.Command { @@ -37,7 +39,12 @@ func Command(preRun func(cmd *cobra.Command, args []string)) *cobra.Command { creds := credentials.Get() url := region.FromString(regio).APIBaseURL() - imagerunnerClient = http.NewImageRunner(url, creds, 15*time.Minute) + asyncEventManager, err := imagerunner.NewAsyncEventManager() + if err != nil { + return err + } + + imagerunnerClient = http.NewImageRunner(url, creds, 15*time.Minute, asyncEventManager) return nil }, @@ -45,9 +52,10 @@ func Command(preRun func(cmd *cobra.Command, args []string)) *cobra.Command { flags := cmd.PersistentFlags() flags.StringVarP(®io, "region", "r", "us-west-1", "The Sauce Labs region. Options: us-west-1, eu-central-1.") + flags.BoolVarP(&liveLogs, "live-logs", "", false, "Retrieve logs from temporary livelogs storage.") cmd.AddCommand( - LogsCommand(), + LogsCommand(&liveLogs), ArtifactsCommand(), ) return cmd diff --git a/internal/cmd/imagerunner/logs.go b/internal/cmd/imagerunner/logs.go index f2376b69b..ca8193d37 100644 --- a/internal/cmd/imagerunner/logs.go +++ b/internal/cmd/imagerunner/logs.go @@ -15,7 +15,7 @@ import ( "golang.org/x/text/language" ) -func LogsCommand() *cobra.Command { +func LogsCommand(liveLogs *bool) *cobra.Command { cmd := &cobra.Command{ Use: "logs ", Short: "Fetch the logs for an imagerunner run", @@ -44,21 +44,31 @@ func LogsCommand() *cobra.Command { return nil }, RunE: func(cmd *cobra.Command, args []string) error { - return exec(args[0]) + return exec(args[0], *liveLogs) }, } return cmd } -func exec(runID string) error { - log, err := imagerunnerClient.GetLogs(context.Background(), runID) - if err != nil { - if err == imgrunner.ErrResourceNotFound { - return fmt.Errorf("could not find log URL for run with ID (%s): %w", runID, err) +func exec(runID string, liveLogs bool) error { + if liveLogs { + err := imagerunnerClient.FetchLiveLogs(context.Background(), runID) + if err != nil { + if err == imgrunner.ErrResourceNotFound { + return fmt.Errorf("could not find log URL for run with ID (%s): %w", runID, err) + } + return err + } + } else { + log, err := imagerunnerClient.GetLogs(context.Background(), runID) + if err != nil { + if err == imgrunner.ErrResourceNotFound { + return fmt.Errorf("could not find log URL for run with ID (%s): %w", runID, err) + } + return err } - return err + fmt.Println(log) } - fmt.Println(log) return nil } diff --git a/internal/cmd/run/imagerunner.go b/internal/cmd/run/imagerunner.go index cf252f02f..d1d8e02c6 100644 --- a/internal/cmd/run/imagerunner.go +++ b/internal/cmd/run/imagerunner.go @@ -66,16 +66,18 @@ func runImageRunner(cmd *cobra.Command) (int, error) { cleanupArtifacts(p.Artifacts) creds := regio.Credentials() - imageRunnerClient := http.NewImageRunner(regio.APIBaseURL(), creds, imgExecTimeout) - restoClient := http.NewResto(regio.APIBaseURL(), creds.Username, creds.AccessKey, 0) - r := saucecloud.ImgRunner{ - Project: p, - RunnerService: &imageRunnerClient, - TunnelService: &restoClient, - Reporters: reporters, - Async: gFlags.async, + asyncEventManager, err := imagerunner.NewAsyncEventManager() + if err != nil { + return 1, err } + + imageRunnerClient := http.NewImageRunner(regio.APIBaseURL(), creds, imgExecTimeout, asyncEventManager) + restoClient := http.NewResto(regio.APIBaseURL(), creds.Username, creds.AccessKey, 0) + + r := saucecloud.NewImgRunner(p, &imageRunnerClient, &restoClient, asyncEventManager, + reporters, gFlags.async) + return r.RunProject() } diff --git a/internal/cmd/run/run.go b/internal/cmd/run/run.go index e76b882d7..54975961c 100644 --- a/internal/cmd/run/run.go +++ b/internal/cmd/run/run.go @@ -116,6 +116,7 @@ func Command() *cobra.Command { sc.Bool("dry-run", "dryRun", false, "Simulate a test run without actually running any tests.") sc.Int("retries", "sauce::retries", 0, "Retries specifies the number of times to retry a failed suite") sc.String("launch-order", "sauce::launchOrder", "", `Launch jobs based on the failure rate. Jobs with the highest failure rate launch first. Supports values: ["fail rate"]`) + sc.Bool("live-logs", "liveLogs", false, "Display live logs for a running job (supported only by Sauce Orchestrate).") // Metadata sc.StringSlice("tags", "sauce::metadata::tags", []string{}, "Adds tags to tests") diff --git a/internal/http/imagerunner.go b/internal/http/imagerunner.go index ce5d52cd5..3ed5080e7 100644 --- a/internal/http/imagerunner.go +++ b/internal/http/imagerunner.go @@ -4,21 +4,30 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "io" "net/http" + "net/url" + "os" "strings" "time" + "github.com/fatih/color" + "github.com/gorilla/websocket" "github.com/hashicorp/go-retryablehttp" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" "github.com/saucelabs/saucectl/internal/iam" "github.com/saucelabs/saucectl/internal/imagerunner" ) type ImageRunner struct { - Client *retryablehttp.Client - URL string - Creds iam.Credentials + Client *retryablehttp.Client + URL string + Creds iam.Credentials + AsyncEventManager imagerunner.AsyncEventManagerI + eventLogger zerolog.Logger } type AuthToken struct { @@ -27,11 +36,23 @@ type AuthToken struct { Password string `json:"password"` } -func NewImageRunner(url string, creds iam.Credentials, timeout time.Duration) ImageRunner { +func NewImageRunner(url string, creds iam.Credentials, timeout time.Duration, + asyncEventManager imagerunner.AsyncEventManagerI) ImageRunner { + eventLogger := zerolog.New(zerolog.ConsoleWriter{ + Out: os.Stdout, + PartsOrder: []string{ + zerolog.MessageFieldName, + }, + FormatLevel: func(i interface{}) string { + return color.New(color.FgGreen).Sprint("[LOGS]") + }, + }) return ImageRunner{ - Client: NewRetryableClient(timeout), - URL: url, - Creds: creds, + Client: NewRetryableClient(timeout), + URL: url, + Creds: creds, + AsyncEventManager: asyncEventManager, + eventLogger: eventLogger, } } @@ -204,6 +225,191 @@ func (c *ImageRunner) GetLogs(ctx context.Context, id string) (string, error) { return c.doGetStr(ctx, urlResponse.URL) } +func (c *ImageRunner) getWebsocketURL() (string, error) { + + wsURL, err := url.Parse(c.URL) + if err != nil { + return "", err + } + if wsURL.Scheme == "https" { + wsURL.Scheme = "wss" + } + if wsURL.Scheme == "http" { + wsURL.Scheme = "ws" + } + if os.Getenv("SO_ASYNCEVENT_PORT") != "" { + wsURL.Host = fmt.Sprintf("%s:%s", wsURL.Hostname(), os.Getenv("SO_ASYNCEVENT_PORT")) + } + return wsURL.String(), nil +} + +func (c *ImageRunner) OpenAsyncEventsWebsocket(ctx context.Context, id string, lastseq string, nowait bool) (*websocket.Conn, error) { + // dummy request so that we build basic auth header consistently + dummyURL := fmt.Sprintf("%s/v1alpha1/hosted/async/image/runners/%s/events", c.URL, id) + req, err := http.NewRequest("GET", dummyURL, nil) + if err != nil { + return nil, err + } + req.SetBasicAuth(c.Creds.Username, c.Creds.AccessKey) + + websocketURL, err := c.getWebsocketURL() + if err != nil { + return nil, err + } + + // build query string + queryParts := []string{} + if lastseq != "" { + queryParts = append(queryParts, fmt.Sprintf("lastseq=%s", lastseq)) + } + if nowait { + queryParts = append(queryParts, "nowait=true") + } + query := "" + if len(queryParts) > 0 { + query = "?" + strings.Join(queryParts, "&") + } + + url := fmt.Sprintf("%s/v1alpha1/hosted/async/image/runners/%s/events%s", websocketURL, id, query) + headers := http.Header{} + headers.Add("Authorization", req.Header.Get("Authorization")) + dialer := websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: 45 * time.Second, + EnableCompression: true, + } + ws, resp, err := dialer.Dial(url, headers) + if resp.StatusCode == http.StatusNotFound || + resp.StatusCode == http.StatusUnauthorized || + resp.StatusCode == http.StatusForbidden { + return nil, imagerunner.AsyncEventFatalError{ + Err: errors.New(resp.Status), + } + } + if err != nil { + return nil, err + } + return ws, nil +} + +func (c *ImageRunner) OpenAsyncEventsTransport(ctx context.Context, id string, lastseq string, nowait bool) (imagerunner.AsyncEventTransportI, error) { + ws, err := c.OpenAsyncEventsWebsocket(ctx, id, lastseq, nowait) + if err != nil { + if _, ok := err.(imagerunner.AsyncEventFatalError); ok { + return nil, err + } + return nil, imagerunner.AsyncEventSetupError{ + Err: err, + } + } + return imagerunner.NewWebsocketAsyncEventTransport(ws), nil +} + +func (c *ImageRunner) HandleAsyncEvents(ctx context.Context, id string, nowait bool) error { + delay := 3 * time.Second + var lastseq = "" + var hasMoreLines bool + var err error + setupErrorCount := 0 + maxSetupErrors := 3 + for { + if setupErrorCount >= maxSetupErrors { + log.Info().Msgf("Could not setup Log streaming after %d attempts, disabling it.", maxSetupErrors) + return imagerunner.AsyncEventSetupError{} + } + hasMoreLines, lastseq, err = c.handleAsyncEventsOneshot(ctx, id, lastseq, nowait) + if errors.Is(err, context.Canceled) { + return err + } + if _, ok := err.(imagerunner.AsyncEventFatalError); ok { + return err + } + if !hasMoreLines { + return nil + } + if wrappedErr, ok := err.(imagerunner.AsyncEventSetupError); ok { + setupErrorCount++ + err = wrappedErr.Err + } else { + setupErrorCount = 0 + } + log.Info().Err(err).Msgf("Log streaming issue. Retrying in %s...", delay) + time.Sleep(delay) + } +} + +func (c *ImageRunner) handleAsyncEventsOneshot(ctx context.Context, id string, lastseq string, nowait bool) (bool, string, error) { + transport, err := c.OpenAsyncEventsTransport(ctx, id, lastseq, nowait) + if err != nil { + return true, lastseq, err + } + if transport == nil { + return true, lastseq, nil + } + + defer transport.Close() + + // the first message is expected to be a ping + readMessage, err := transport.ReadMessage() + if err != nil { + return true, lastseq, err + } + if readMessage == "" { + return true, lastseq, errors.New("empty message") + } + event, err := c.AsyncEventManager.ParseEvent(readMessage) + if err != nil { + return true, lastseq, err + } + if event.Type == "com.saucelabs.so.v1.ping" { + log.Info().Msg("Streaming logs...") + } else { + return true, lastseq, errors.New("first message is not a ping") + } + + for { + select { + case <-ctx.Done(): + return false, lastseq, ctx.Err() + default: + readMessage, err := transport.ReadMessage() + if err != nil { + if nowait && strings.Contains(err.Error(), "close") { + return false, lastseq, nil + } + return true, lastseq, err + } + if readMessage == "" { + return true, lastseq, errors.New("empty message") + } + + event, err := c.AsyncEventManager.ParseEvent(readMessage) + if err != nil { + return true, lastseq, err + } + switch event.Type { + case "com.saucelabs.so.v1.ping": + case "com.saucelabs.so.v1.log": + if event.LineSequence != "" { + lastseq = event.LineSequence + } + c.eventLogger.Info().Msgf("%s %s", + color.New(color.FgCyan).Sprint(event.Data["containerName"]), + event.Data["line"]) + c.AsyncEventManager.TrackLog() + default: + err := errors.New("unknown event type") + log.Err(err).Msgf("unknown even type: %s", event.Type) + } + } + } +} + +func (c *ImageRunner) FetchLiveLogs(ctx context.Context, id string) error { + err := c.HandleAsyncEvents(ctx, id, true) + return err +} + func (c *ImageRunner) doGetStr(ctx context.Context, url string) (string, error) { urlReq, err := NewRetryableRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { diff --git a/internal/imagerunner/async.go b/internal/imagerunner/async.go new file mode 100644 index 000000000..be4a7153a --- /dev/null +++ b/internal/imagerunner/async.go @@ -0,0 +1,136 @@ +package imagerunner + +import ( + "bufio" + "encoding/json" + "fmt" + "net/http" + "time" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/gorilla/websocket" +) + +type AsyncEvent struct { + Type string + LineSequence string + Data map[string]string +} + +type AsyncEventTransportI interface { + ReadMessage() (string, error) + Close() error +} + +type WebsocketAsyncEventTransport struct { + ws *websocket.Conn +} + +func NewWebsocketAsyncEventTransport(ws *websocket.Conn) *WebsocketAsyncEventTransport { + return &WebsocketAsyncEventTransport{ + ws: ws, + } +} + +func (aet *WebsocketAsyncEventTransport) ReadMessage() (string, error) { + _, msg, err := aet.ws.ReadMessage() + return string(msg), err +} + +func (aet *WebsocketAsyncEventTransport) Close() error { + return aet.ws.Close() +} + +type SseAsyncEventTransport struct { + httpResponse *http.Response + scanner *bufio.Scanner +} + +func NewSseAsyncEventTransport(httpResponse *http.Response) *SseAsyncEventTransport { + scanner := bufio.NewScanner(httpResponse.Body) + scanner.Split(bufio.ScanLines) + return &SseAsyncEventTransport{ + httpResponse: httpResponse, + scanner: scanner, + } +} + +func (aet *SseAsyncEventTransport) ReadMessage() (string, error) { + if aet.scanner.Scan() { + msg := aet.scanner.Bytes() + return string(msg), nil + } + err := aet.scanner.Err() + if err == nil { + err = fmt.Errorf("no more messages") + } + return "", err +} + +func (aet *SseAsyncEventTransport) Close() error { + return aet.httpResponse.Body.Close() +} + +type AsyncEventManagerI interface { + ParseEvent(event string) (*AsyncEvent, error) + TrackLog() + IsLogIdle() bool +} + +type AsyncEventManager struct { + logTimestamps time.Time +} + +func NewAsyncEventManager() (*AsyncEventManager, error) { + asyncEventManager := AsyncEventManager{ + logTimestamps: time.Now(), + } + + return &asyncEventManager, nil +} + +func parseLineSequence(cloudEvent *cloudevents.Event) (string, error) { + // The extension is not necessarily present, so ignore errors. + _lineseq, _ := cloudEvent.Context.GetExtension("linesequence") + lineseq, ok := _lineseq.(string) + if !ok { + return "", fmt.Errorf("linesequence is not a string") + } + return lineseq, nil +} + +func (a *AsyncEventManager) ParseEvent(event string) (*AsyncEvent, error) { + readEvent := cloudevents.NewEvent() + err := json.Unmarshal([]byte(event), &readEvent) + if err != nil { + return nil, err + } + + data := map[string]string{} + err = readEvent.DataAs(&data) + if err != nil { + return nil, err + } + + asyncEvent := AsyncEvent{ + Type: readEvent.Type(), + Data: data, + } + + if asyncEvent.Type == "com.saucelabs.so.v1.log" { + asyncEvent.LineSequence, err = parseLineSequence(&readEvent) + if err != nil { + return nil, err + } + } + + return &asyncEvent, nil +} + +func (a *AsyncEventManager) TrackLog() { + a.logTimestamps = time.Now() +} + +func (a *AsyncEventManager) IsLogIdle() bool { + return time.Since(a.logTimestamps) > 30*time.Second +} diff --git a/internal/imagerunner/config.go b/internal/imagerunner/config.go index d2852848e..da3a1d676 100644 --- a/internal/imagerunner/config.go +++ b/internal/imagerunner/config.go @@ -32,6 +32,7 @@ type Project struct { Suites []Suite `yaml:"suites,omitempty" json:"suites"` Artifacts config.Artifacts `yaml:"artifacts,omitempty" json:"artifacts"` DryRun bool `yaml:"-" json:"-"` + LiveLogs bool `yaml:"-" json:"-"` Env map[string]string `yaml:"env,omitempty" json:"env"` EnvFlag map[string]string `yaml:"-" json:"-"` Reporters config.Reporters `yaml:"reporters,omitempty" json:"-"` diff --git a/internal/imagerunner/errors.go b/internal/imagerunner/errors.go new file mode 100644 index 000000000..eef48ffef --- /dev/null +++ b/internal/imagerunner/errors.go @@ -0,0 +1,19 @@ +package imagerunner + +import "fmt" + +type AsyncEventSetupError struct { + Err error +} + +func (e AsyncEventSetupError) Error() string { + return fmt.Sprintf("streaming setup failed with: %v", e.Err) +} + +type AsyncEventFatalError struct { + Err error +} + +func (e AsyncEventFatalError) Error() string { + return e.Err.Error() +} diff --git a/internal/saucecloud/imagerunner.go b/internal/saucecloud/imagerunner.go index a1882ce6c..f526dedde 100644 --- a/internal/saucecloud/imagerunner.go +++ b/internal/saucecloud/imagerunner.go @@ -11,6 +11,7 @@ import ( "os/signal" "path/filepath" "reflect" + "strings" "time" "github.com/rs/zerolog/log" @@ -30,6 +31,7 @@ type ImageRunner interface { StopRun(ctx context.Context, id string) error DownloadArtifacts(ctx context.Context, id string) (io.ReadCloser, error) GetLogs(ctx context.Context, id string) (string, error) + HandleAsyncEvents(ctx context.Context, id string, nowait bool) error } type SuiteTimeoutError struct { @@ -49,12 +51,25 @@ type ImgRunner struct { Reporters []report.Reporter - Async bool + Async bool + AsyncEventManager imagerunner.AsyncEventManagerI ctx context.Context cancel context.CancelFunc } +func NewImgRunner(project imagerunner.Project, runnerService ImageRunner, tunnelService tunnel.Service, + asyncEventManager imagerunner.AsyncEventManagerI, reporters []report.Reporter, async bool) *ImgRunner { + return &ImgRunner{ + Project: project, + RunnerService: runnerService, + TunnelService: tunnelService, + Reporters: reporters, + Async: async, + AsyncEventManager: asyncEventManager, + } +} + type execResult struct { name string runID string @@ -200,6 +215,19 @@ func (r *ImgRunner) buildService(serviceIn imagerunner.SuiteService, suiteName s return serviceOut, nil } +func ignoreError(err error) bool { + if err == nil { + return true + } + if !errors.Is(err, context.Canceled) { + return true + } + if strings.Contains(err.Error(), "websocket: close") { + return true + } + return false +} + func (r *ImgRunner) runSuite(suite imagerunner.Suite) (imagerunner.Runner, error) { files, err := mapFiles(suite.Files) if err != nil { @@ -273,6 +301,16 @@ func (r *ImgRunner) runSuite(suite imagerunner.Suite) (imagerunner.Runner, error return runner, nil } + go func() { + if !r.Project.LiveLogs { + return + } + err := r.RunnerService.HandleAsyncEvents(ctx, runner.ID, false) + if !ignoreError(err) { + log.Err(err).Msg("Async event handler failed.") + } + }() + var run imagerunner.Runner run, err = r.PollRun(ctx, runner.ID, runner.Status) if errors.Is(err, context.DeadlineExceeded) && ctx.Err() != nil { @@ -312,7 +350,7 @@ func (r *ImgRunner) collectResults(results chan execResult, expected int) bool { inProgress := expected passed := true - stopProgress := startProgressTicker(r.ctx, &inProgress) + stopProgress := r.startProgressTicker(r.ctx, &inProgress) for i := 0; i < expected; i++ { res := <-results inProgress-- @@ -322,7 +360,10 @@ func (r *ImgRunner) collectResults(results chan execResult, expected int) bool { } r.PrintResult(res) - r.PrintLogs(res.runID, res.name) + if !r.Project.LiveLogs { + // only print logs if live logs are disabled + r.PrintLogs(res.runID, res.name) + } files := r.DownloadArtifacts(res.runID, res.name, res.status, res.err != nil) var artifacts []report.Artifact for _, f := range files { @@ -551,7 +592,7 @@ func readFile(path string) (string, error) { return base64.StdEncoding.Strict().EncodeToString(bytes), nil } -func startProgressTicker(ctx context.Context, progress *int) (cancel context.CancelFunc) { +func (r *ImgRunner) startProgressTicker(ctx context.Context, progress *int) (cancel context.CancelFunc) { ctx, cancel = context.WithCancel(ctx) go func() { @@ -562,7 +603,9 @@ func startProgressTicker(ctx context.Context, progress *int) (cancel context.Can case <-ctx.Done(): return case <-t.C: - log.Info().Msgf("Suites in progress: %d", *progress) + if r.AsyncEventManager.IsLogIdle() { + log.Info().Msgf("Suites in progress: %d", *progress) + } } } }()