Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: add support for live logs for Sauce Orchestrate #850

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions internal/cmd/run/imagerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,14 @@ func runImageRunner(cmd *cobra.Command) (int, error) {
imageRunnerClient := http.NewImageRunner(regio.APIBaseURL(), creds, imgExecTimeout)
restoClient := http.NewResto(regio.APIBaseURL(), creds.Username, creds.AccessKey, 0)

r := saucecloud.ImgRunner{
Project: p,
RunnerService: &imageRunnerClient,
TunnelService: &restoClient,
Reporters: reporters,
Async: gFlags.async,
asyncEventManager, err := imagerunner.NewAsyncEventManager()
if err != nil {
return 1, err
}

r := saucecloud.NewImgRunner(p, &imageRunnerClient, &restoClient, reporters,
asyncEventManager, gFlags.async)

return r.RunProject()
}

Expand Down
1 change: 1 addition & 0 deletions internal/cmd/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func Command() *cobra.Command {
sc.Bool("dry-run", "dryRun", false, "Simulate a test run without actually running any tests.")
sc.Int("retries", "sauce::retries", 0, "Retries specifies the number of times to retry a failed suite")
sc.String("launch-order", "sauce::launchOrder", "", `Launch jobs based on the failure rate. Jobs with the highest failure rate launch first. Supports values: ["fail rate"]`)
sc.Bool("live-logs", "liveLogs", false, "Display live logs for a running job (supported only by Sauce Orchestrate).")

// Metadata
sc.StringSlice("tags", "sauce::metadata::tags", []string{}, "Adds tags to tests")
Expand Down
69 changes: 69 additions & 0 deletions internal/http/imagerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (
"fmt"
"io"
"net/http"
"os"
"strings"
"time"

"github.com/gorilla/websocket"
"github.com/hashicorp/go-retryablehttp"
"github.com/rs/zerolog/log"
"github.com/saucelabs/saucectl/internal/iam"
"github.com/saucelabs/saucectl/internal/imagerunner"
)
Expand Down Expand Up @@ -204,6 +207,72 @@ func (c *ImageRunner) GetLogs(ctx context.Context, id string) (string, error) {
return c.doGetStr(ctx, urlResponse.URL)
}

func (c *ImageRunner) getWebsocketURL() string {

wsURL := c.URL
wsURL = strings.Replace(wsURL, "https://", "wss://", 1)
wsURL = strings.Replace(wsURL, "http://", "ws://", 1)
return wsURL
}

func (c *ImageRunner) OpenAsyncEventsWebsocket(ctx context.Context, id string) (*websocket.Conn, error) {
// dummy request so that we build basic auth header consistently
dummyURL := fmt.Sprintf("%s/v1alpha1/hosted/async/image/runners/%s/events", c.URL, id)
req, err := http.NewRequest("GET", dummyURL, nil)
if err != nil {
panic(err)
}
req.SetBasicAuth(c.Creds.Username, c.Creds.AccessKey)

url := fmt.Sprintf("%s/v1alpha1/hosted/async/image/runners/%s/events", c.getWebsocketURL(), id)
headers := http.Header{}
headers.Add("Authorization", req.Header.Get("Authorization"))
ws, resp, err := websocket.DefaultDialer.Dial(
url, headers)
if err != nil {
if resp != nil {
log.Error().Err(err).Int("http status", resp.StatusCode).Msg("Could not open async events websocket")
} else {
log.Error().Err(err).Msg("Could not open async events websocket")
}
return nil, err
}
return ws, nil
}

func (c *ImageRunner) OpenAsyncEventsSSE(ctx context.Context, id string) (*http.Response, error) {
url := fmt.Sprintf("%s/v1alpha1/hosted/async/image/runners/%s/events", c.URL, id)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
req.Header.Set("Cache-Control", "no-cache")
req.Header.Set("Accept", "text/event-stream")
req.Header.Set("Connection", "keep-alive")
req.SetBasicAuth(c.Creds.Username, c.Creds.AccessKey)

client := http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected server response (%d)", resp.StatusCode)
}
return resp, nil
}

func (c *ImageRunner) OpenAsyncEventsTransport(ctx context.Context, id string) (imagerunner.AsyncEventTransportI, error) {
if os.Getenv("LIVELOGS") == "sse" {
resp, err := c.OpenAsyncEventsSSE(ctx, id)
return imagerunner.NewSseAsyncEventTransport(resp), err
}

ws, err := c.OpenAsyncEventsWebsocket(ctx, id)
return imagerunner.NewWebsocketAsyncEventTransport(ws), err
}

func (c *ImageRunner) doGetStr(ctx context.Context, url string) (string, error) {
urlReq, err := NewRetryableRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
Expand Down
255 changes: 255 additions & 0 deletions internal/imagerunner/async.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
package imagerunner

import (
"bufio"
"encoding/json"
"fmt"
"log"
"net/http"

"github.com/gorilla/websocket"
"github.com/santhosh-tekuri/jsonschema/v5"
)

var SCHEMA = `
{
"properties": {
"kind": {
"enum": [
"notice",
"log",
"ping"
]
},
"runnerID": {
"type": "string"
}
},
"allOf": [
{
"if": {
"properties": {
"kind": {
"const": "log"
}
}
},
"then": {
"properties": {
"lines": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {
"type": "string"
},
"containerName": {
"type": "string"
},
"message": {
"type": "string"
}
}
}
}
},
"additionalProperties": true
}
},
{
"if": {
"properties": {
"kind": {
"const": "notice"
}
}
},
"then": {
"properties": {
"severity": {
"enum": [
"info",
"warning",
"error"
]
},
"message": {
"type": "string"
}
},
"additionalProperties": true
}
},
{
"if": {
"properties": {
"kind": {
"const": "ping"
}
}
},
"then": {
"properties": {
"message": {
"type": "string"
}
},
"additionalProperties": true
}
}

],
"additionalProperties": true
}
`

const (
NOTICE = "notice"
LOG = "log"
PING = "ping"
)

type AsyncEventI interface {
GetKind() string
GetRunnerID() string
}

type AsyncEvent struct {
Kind string `json:"kind"`
RunnerID string `json:"runnerID"`
}

func (a *AsyncEvent) GetKind() string {
return a.Kind
}

func (a *AsyncEvent) GetRunnerID() string {
return a.RunnerID
}

type LogLine struct {
ID string `json:"id"`
ContainerName string `json:"containerName"`
Message string `json:"message"`
}

type LogEvent struct {
AsyncEvent
Lines []LogLine `json:"lines"`
}

type PingEvent struct {
AsyncEvent
Message string `json:"message"`
}

type NoticeEvent struct {
AsyncEvent
Severity string `json:"severity"`
Message string `json:"message"`
}

type AsyncEventTransportI interface {
ReadMessage() (string, error)
Close() error
}

type WebsocketAsyncEventTransport struct {
ws *websocket.Conn
}

func NewWebsocketAsyncEventTransport(ws *websocket.Conn) *WebsocketAsyncEventTransport {
return &WebsocketAsyncEventTransport{
ws: ws,
}
}

func (aet *WebsocketAsyncEventTransport) ReadMessage() (string, error) {
_, msg, err := aet.ws.ReadMessage()
return string(msg), err
}

func (aet *WebsocketAsyncEventTransport) Close() error {
return aet.ws.Close()
}

type SseAsyncEventTransport struct {
httpResponse *http.Response
scanner *bufio.Scanner
}

func NewSseAsyncEventTransport(httpResponse *http.Response) *SseAsyncEventTransport {
scanner := bufio.NewScanner(httpResponse.Body)
scanner.Split(bufio.ScanLines)
return &SseAsyncEventTransport{
httpResponse: httpResponse,
scanner: scanner,
}
}

func (aet *SseAsyncEventTransport) ReadMessage() (string, error) {
if aet.scanner.Scan() {
msg := aet.scanner.Bytes()
return string(msg), nil
}
return "", aet.scanner.Err()
}

func (aet *SseAsyncEventTransport) Close() error {
return aet.httpResponse.Body.Close()
}

type AsyncEventManagerI interface {
ParseEvent(event string) (AsyncEventI, error)
}

type AsyncEventManager struct {
schema *jsonschema.Schema
}

func NewAsyncEventManager() (*AsyncEventManager, error) {
schema, err := jsonschema.CompileString("schema.json", SCHEMA)
if err != nil {
return nil, err
}

asyncEventManager := AsyncEventManager{
schema: schema,
}

return &asyncEventManager, nil
}

func (a *AsyncEventManager) ParseEvent(event string) (AsyncEventI, error) {
err := a.schema.Validate(event)
if err != nil {
return nil, err
}
v := AsyncEvent{}
if err := json.Unmarshal([]byte(event), &v); err != nil {
log.Fatal(err)
}

if v.GetKind() == LOG {
logEvent := LogEvent{}
if err := json.Unmarshal([]byte(event), &logEvent); err != nil {
log.Fatal(err)
}
return &logEvent, nil
} else if v.GetKind() == NOTICE {
noticeEvent := NoticeEvent{}
if err := json.Unmarshal([]byte(event), &noticeEvent); err != nil {
log.Fatal(err)
}
return &noticeEvent, nil
} else if v.GetKind() == PING {
pingEvent := PingEvent{}
if err := json.Unmarshal([]byte(event), &pingEvent); err != nil {
log.Fatal(err)
}
return &pingEvent, nil
}

return nil, fmt.Errorf("unknown event type: %s", v.GetKind())
}
Loading
Loading