Skip to content

Commit

Permalink
sse fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
sebv committed Dec 21, 2023
1 parent 4c16eda commit c62bd9f
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 11 deletions.
55 changes: 46 additions & 9 deletions internal/http/imagerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"os"
"strings"
"time"
Expand Down Expand Up @@ -206,29 +207,43 @@ func (c *ImageRunner) GetLogs(ctx context.Context, id string) (string, error) {
return c.doGetStr(ctx, urlResponse.URL)
}

func (c *ImageRunner) getWebsocketURL() string {
func (c *ImageRunner) getWebsocketURL() (string, error) {

wsURL := c.URL
wsURL = strings.Replace(wsURL, "https://", "wss://", 1)
wsURL = strings.Replace(wsURL, "http://", "ws://", 1)
wsURL = strings.Replace(wsURL, "9091", "9095", 1)
return wsURL
wsURL, err := url.Parse(c.URL)
if err != nil {
return "", err
}
if wsURL.Scheme == "https" {
wsURL.Scheme = "wss"
}
if wsURL.Scheme == "http" {
wsURL.Scheme = "ws"
}
if os.Getenv("SO_ASYNCEVENT_PORT") != "" {
wsURL.Host = fmt.Sprintf("%s:%s", wsURL.Hostname(), os.Getenv("SO_ASYNCEVENT_PORT"))
}
return wsURL.String(), nil
}

func (c *ImageRunner) OpenAsyncEventsWebsocket(ctx context.Context, id string, lastseq string) (*websocket.Conn, error) {
// dummy request so that we build basic auth header consistently
dummyURL := fmt.Sprintf("%s/v1alpha1/hosted/async/image/runners/%s/events", c.URL, id)
req, err := http.NewRequest("GET", dummyURL, nil)
if err != nil {
panic(err)
return nil, err
}
req.SetBasicAuth(c.Creds.Username, c.Creds.AccessKey)

websocketURL, err := c.getWebsocketURL()
if err != nil {
return nil, err
}

query := ""
if lastseq != "" {
query = fmt.Sprintf("?lastseq=%s", lastseq)
}
url := fmt.Sprintf("%s/v1alpha1/hosted/async/image/runners/%s/events%s", c.getWebsocketURL(), id, query)
url := fmt.Sprintf("%s/v1alpha1/hosted/async/image/runners/%s/events%s", websocketURL, id, query)
headers := http.Header{}
headers.Add("Authorization", req.Header.Get("Authorization"))
ws, _, err := websocket.DefaultDialer.Dial(
Expand All @@ -239,12 +254,28 @@ func (c *ImageRunner) OpenAsyncEventsWebsocket(ctx context.Context, id string, l
return ws, nil
}

func (c *ImageRunner) getSseURL() (string, error) {

sseURL, err := url.Parse(c.URL)
if err != nil {
return "", err
}
if os.Getenv("SO_ASYNCEVENT_PORT") != "" {
sseURL.Host = fmt.Sprintf("%s:%s", sseURL.Hostname(), os.Getenv("SO_ASYNCEVENT_PORT"))
}
return sseURL.String(), nil
}

func (c *ImageRunner) OpenAsyncEventsSSE(ctx context.Context, id string, lastseq string) (*http.Response, error) {
sseURL, err := c.getSseURL()
if err != nil {
return nil, err
}
query := ""
if lastseq != "" {
query = fmt.Sprintf("?lastseq=%s", lastseq)
}
url := fmt.Sprintf("%s/v1alpha1/hosted/async/image/runners/%s/events%s", c.URL, id, query)
url := fmt.Sprintf("%s/v1alpha1/hosted/async/image/runners/%s/events%s", sseURL, id, query)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
Expand All @@ -269,10 +300,16 @@ func (c *ImageRunner) OpenAsyncEventsSSE(ctx context.Context, id string, lastseq
func (c *ImageRunner) OpenAsyncEventsTransport(ctx context.Context, id string, lastseq string) (imagerunner.AsyncEventTransportI, error) {
if os.Getenv("LIVELOGS") == "sse" {
resp, err := c.OpenAsyncEventsSSE(ctx, id, lastseq)
if err != nil {
return nil, err
}
return imagerunner.NewSseAsyncEventTransport(resp), err
}

ws, err := c.OpenAsyncEventsWebsocket(ctx, id, lastseq)
if err != nil {
return nil, err
}
return imagerunner.NewWebsocketAsyncEventTransport(ws), err
}

Expand Down
6 changes: 5 additions & 1 deletion internal/imagerunner/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,11 @@ func (aet *SseAsyncEventTransport) ReadMessage() (string, error) {
msg := aet.scanner.Bytes()
return string(msg), nil
}
return "", aet.scanner.Err()
err := aet.scanner.Err()
if err == nil {
err = fmt.Errorf("no more messages")
}
return "", err
}

func (aet *SseAsyncEventTransport) Close() error {
Expand Down
3 changes: 2 additions & 1 deletion internal/saucecloud/imagerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,8 +476,9 @@ func (r *ImgRunner) handleAsyncEventsOneshot(ctx context.Context, id string, las
return lastseq, err
}
if msg == "" {
continue
return lastseq, errors.New("empty message")
}

event, err := r.asyncEventManager.ParseEvent(msg)
if err != nil {
return lastseq, err
Expand Down

0 comments on commit c62bd9f

Please sign in to comment.