Skip to content
This repository has been archived by the owner on Feb 22, 2024. It is now read-only.

Commit

Permalink
fix sse/eventstream since net.http refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
AnatoleAM committed Dec 4, 2023
1 parent da3b128 commit fe4f01c
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 67 deletions.
3 changes: 2 additions & 1 deletion internal/app/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"math/rand"
"net/http"
"sync"
"time"

Expand Down Expand Up @@ -50,7 +51,7 @@ type Connection interface {
// Close sends a close frame with the specified code and ends the connection
SendClose(code events.CloseCode, after time.Duration)
// SetWriter defines the connection's writable stream (SSE only)
SetWriter(w *bufio.Writer)
SetWriter(w *bufio.Writer, f http.Flusher)
}

func IsClientSentOp(op events.Opcode) bool {
Expand Down
7 changes: 6 additions & 1 deletion internal/app/connection/eventstream/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

type EventStream struct {
r *http.Request
f http.Flusher
ctx context.Context
gctx global.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -197,17 +198,21 @@ func (es *EventStream) Write(msg events.Message[json.RawMessage]) error {
if _, err = es.writer.Write(utils.S2B(sb.String())); err != nil {
zap.S().Errorw("failed to write to event stream connection", "error", err)
}

if err = es.writer.Flush(); err != nil {
return err
}

es.f.Flush()

es.seq++
return nil
}

// SetWriter implements Connection
func (es *EventStream) SetWriter(w *bufio.Writer) {
func (es *EventStream) SetWriter(w *bufio.Writer, f http.Flusher) {
es.writer = w
es.f = f
}

// Ready implements client.Connection
Expand Down
41 changes: 13 additions & 28 deletions internal/app/connection/eventstream/eventstream_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package eventstream

import (
"encoding/json"
"errors"
"fmt"
"io"
"net"
"syscall"
Expand All @@ -11,11 +13,9 @@ import (
"go.uber.org/zap"

"github.com/seventv/eventapi/internal/global"
"github.com/seventv/eventapi/internal/util"
)

func (es *EventStream) Read(gctx global.Context) {
conn := util.GetConn(es.r).(*net.TCPConn)
heartbeat := time.NewTicker(time.Duration(es.heartbeatInterval) * time.Millisecond)

liveness := time.NewTicker(time.Second * 1)
Expand All @@ -33,17 +33,16 @@ func (es *EventStream) Read(gctx global.Context) {
es.SetReady()

for {
if err := checkConn(conn); err != nil {
return
}

select {
case <-es.r.Context().Done():
return
case <-es.OnClose():
return
case <-gctx.Done():
es.SendClose(events.CloseCodeRestart, time.Second*5)
return
case <-heartbeat.C:
fmt.Println("hi heartbeat")
if err := es.SendHeartbeat(); err != nil {
return
}
Expand All @@ -67,28 +66,14 @@ func (es *EventStream) Read(gctx global.Context) {
}
}

func checkConn(conn net.Conn) error {
var sysErr error = nil
rc, err := conn.(syscall.Conn).SyscallConn()
if err != nil {
return err
}
err = rc.Read(func(fd uintptr) bool {
var buf []byte = []byte{0}
n, _, err := syscall.Recvfrom(int(fd), buf, syscall.MSG_PEEK|syscall.MSG_DONTWAIT)
switch {
case n == 0 && err == nil:
sysErr = io.EOF
case err == syscall.EAGAIN || err == syscall.EWOULDBLOCK:
sysErr = nil
default:
sysErr = err
}
func isNetConnClosedErr(err error) bool {

Check failure on line 69 in internal/app/connection/eventstream/eventstream_read.go

View workflow job for this annotation

GitHub Actions / EventAPI Lint & Build

func `isNetConnClosedErr` is unused (unused)
switch {
case
errors.Is(err, net.ErrClosed),
errors.Is(err, io.EOF),
errors.Is(err, syscall.EPIPE):
return true
})
if err != nil {
return err
default:
return false
}

return sysErr
}
3 changes: 2 additions & 1 deletion internal/app/connection/websocket/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/hex"
"encoding/json"
"net/http"
"sync"
"time"

Expand Down Expand Up @@ -199,7 +200,7 @@ func (w *WebSocket) OnClose() <-chan struct{} {
return w.ctx.Done()
}

func (*WebSocket) SetWriter(w *bufio.Writer) {
func (*WebSocket) SetWriter(w *bufio.Writer, f http.Flusher) {
zap.S().Fatalw("called SetWriter() on a WebSocket connection")
}

Expand Down
34 changes: 21 additions & 13 deletions internal/app/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"go.uber.org/zap"

client "github.com/seventv/eventapi/internal/app/connection"
client_eventstream "github.com/seventv/eventapi/internal/app/connection/eventstream"
client_websocket "github.com/seventv/eventapi/internal/app/connection/websocket"
v3 "github.com/seventv/eventapi/internal/app/v3"
)

Expand Down Expand Up @@ -36,36 +38,42 @@ func (s *Server) handleV3(w http.ResponseWriter, r *http.Request) {
con client.Connection
)

connected := false

if strings.ToLower(r.Header.Get("upgrade")) == "websocket" || strings.ToLower(r.Header.Get("connection")) == "upgrade" {
c, err := s.upgrader.Upgrade(w, r, nil)
if err != nil {
writeError(http.StatusBadRequest, err, w)
return
}
con, err = v3.WebSocket(s.gctx, c)

con, err = client_websocket.NewWebSocket(s.gctx, c)
if err != nil {
writeError(http.StatusBadRequest, err, w)
return
}
connected = true
} else { // New EventStream connection
var err error
con, err = v3.SSE(s.gctx, w, r)

err = v3.WebSocket(s.gctx, con)
if err != nil {
writeError(http.StatusBadRequest, err, w)
return
}

connected = true
}
go s.TrackConnection(s.gctx, r, con)
} else { // New EventStream connection
var err error

go func() {
if !connected {
con, err := client_eventstream.NewEventStream(s.gctx, r)
if err != nil {
return
}

s.TrackConnection(s.gctx, r, con)
}()
client_eventstream.SetEventStreamHeaders(w)

go s.TrackConnection(s.gctx, r, con)

err = v3.SSE(s.gctx, con, w, r)
if err != nil {
writeError(http.StatusBadRequest, err, w)
return
}
}
}
35 changes: 14 additions & 21 deletions internal/app/v3/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,23 @@ package v3

import (
"bufio"
"fmt"
"net/http"
"net/url"
"regexp"
"strings"

"github.com/go-chi/chi/v5"
"github.com/gorilla/websocket"
"github.com/seventv/api/data/events"

client "github.com/seventv/eventapi/internal/app/connection"
client_eventstream "github.com/seventv/eventapi/internal/app/connection/eventstream"
client_websocket "github.com/seventv/eventapi/internal/app/connection/websocket"
"github.com/seventv/eventapi/internal/global"
)

func WebSocket(gctx global.Context, conn *websocket.Conn) (client.Connection, error) {
w, err := client_websocket.NewWebSocket(gctx, conn)
if err != nil {
return nil, err
}
func WebSocket(gctx global.Context, con client.Connection) error {
go con.Read(gctx)

go w.Read(gctx)
return w, nil
return nil
}

var (
Expand All @@ -33,20 +27,19 @@ var (
SSE_SUBSCRIPTION_ITEM_I_CND = SSE_SUBSCRIPTION_ITEM.SubexpIndex("CND")
)

func SSE(gctx global.Context, w http.ResponseWriter, r *http.Request) (client.Connection, error) {
es, err := client_eventstream.NewEventStream(gctx, r)
if err != nil {
return nil, err
func SSE(gctx global.Context, conn client.Connection, w http.ResponseWriter, r *http.Request) error {
f, ok := w.(http.Flusher)
if !ok {
return fmt.Errorf("EventStream Not Supported")
}

client_eventstream.SetEventStreamHeaders(w)
conn.SetWriter(bufio.NewWriter(w), f)

es.SetWriter(bufio.NewWriter(w))
es.Read(gctx)
conn.Read(gctx)

go func() {
<-es.OnReady() // wait for the connection to be ready
if es.Context().Err() != nil {
<-conn.OnReady() // wait for the connection to be ready
if conn.Context().Err() != nil {
return
}

Expand Down Expand Up @@ -81,7 +74,7 @@ func SSE(gctx global.Context, w http.ResponseWriter, r *http.Request) (client.Co
cm[kv[0]] = kv[1]
}

if err, ok := es.Handler().Subscribe(gctx, events.NewMessage(events.OpcodeSubscribe, events.SubscribePayload{
if err, ok := conn.Handler().Subscribe(gctx, events.NewMessage(events.OpcodeSubscribe, events.SubscribePayload{
Type: events.EventType(evt),
Condition: cm,
}).ToRaw()); err != nil || !ok {
Expand All @@ -90,5 +83,5 @@ func SSE(gctx global.Context, w http.ResponseWriter, r *http.Request) (client.Co
}
}()

return es, nil
return nil
}
4 changes: 2 additions & 2 deletions terraform/deployment.tf
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ resource "kubernetes_deployment" "app" {
}
limits = {
cpu = local.infra.production ? "0.5" : "150m"
memory = local.infra.production ? "1Gi" : "500Mi"
memory = local.infra.production ? "1.5Gi" : "500Mi"
}
}

Expand Down Expand Up @@ -289,7 +289,7 @@ resource "kubernetes_horizontal_pod_autoscaler_v2" "app" {
}

min_replicas = local.infra.production ? 4 : 1
max_replicas = 100
max_replicas = 200

metric {
type = "Pods"
Expand Down

0 comments on commit fe4f01c

Please sign in to comment.