Skip to content

Commit

Permalink
Reworked package to use the request's context for cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesStewy committed Aug 20, 2016
1 parent dd29d9f commit b9e0917
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 40 deletions.
82 changes: 44 additions & 38 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,27 @@
package sse

import (
"context"
"errors"
"fmt"
"net/http"
"time"
)

// Client represents one Server-Sent Events connection.
type Client struct {
eventC chan Event
done chan struct{}
close chan struct{}
timeout time.Duration
w http.ResponseWriter
eventC chan Event
done chan struct{}
w http.ResponseWriter
}

// ClientInit initialises an HTTP connection for Server-Sent Events.
//
// Timeout is how long the connection will stay open for.
// Timeout starts when client is run.
// Set timeout to 0 to keep the connection open indefinitely.
//
// ClientInit first checks that data streaming is supported on the connection.
// ClientInit then sets the appropriate HTTP headers for Server-Sent Events.
// Finaly ClientInit returns a Client that can later be run and have events sent too.
//
// Headers: Content-Type=text/event-stream, Cache-Control=no-cache, Connection=keep-alive.
func ClientInit(w http.ResponseWriter, timeout time.Duration) (*Client, error) {
func ClientInit(w http.ResponseWriter) (*Client, error) {
// Make sure that the writer supports flushing.
if _, ok := w.(http.Flusher); !ok {
return nil, errors.New("streaming not supported")
Expand All @@ -45,21 +39,16 @@ func ClientInit(w http.ResponseWriter, timeout time.Duration) (*Client, error) {

// Create client
return &Client{
eventC: make(chan Event),
done: make(chan struct{}),
close: make(chan struct{}),
timeout: timeout,
w: w,
eventC: make(chan Event),
done: make(chan struct{}),
w: w,
}, nil
}

// Run waits for, and then writes events.
// Run will only exit once the connection is closed either by the browser, by running client.Close() or the timeout is reached.
func (client *Client) Run() {
timeoutC := time.After(client.timeout)
if client.timeout <= time.Duration(0) {
timeoutC = nil
}
// Run will only exit once the connection is closed either by the browser, or the provided context closes.
func (client *Client) Run(ctx context.Context) {
defer close(client.done)

// Send open event
openMsg := Msg{
Expand All @@ -73,11 +62,9 @@ loop:
select {
case event := <-client.eventC:
streamData(client.w, event.SSEEvent()) // Stream event
case <-client.close:
break loop
case <-client.w.(http.CloseNotifier).CloseNotify():
break loop
case <-timeoutC:
case <-ctx.Done():
break loop
}
}
Expand All @@ -87,9 +74,6 @@ loop:
Event: "close",
}
streamData(client.w, closeMsg.SSEEvent())

// Close done channel to indicate that the client is closed
close(client.done)
}

func streamData(w http.ResponseWriter, data ...interface{}) {
Expand All @@ -99,6 +83,35 @@ func streamData(w http.ResponseWriter, data ...interface{}) {
w.(http.Flusher).Flush()
}

type key uint8

const (
// ClientKey is used in Handler to set the client variable inside of the request context.
// Do not set a value in a context with this key, only read.
//
// Example: client := r.Context().Value(sse.ClientKey).(*sse.Client)
ClientKey key = iota
)

// Handler provides middleware for serving SSE requests.
// Handler sets the created sse client inside the request context using the key ClientKey.
// Handler will start serving events once the provided handler h exits.
// Handler exits when the client closes.
func Handler(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
client, err := ClientInit(w)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}

r = r.WithContext(context.WithValue(r.Context(), ClientKey, client))
h.ServeHTTP(w, r)

client.Run(r.Context())
})
}

// Send an event to the client.
// Send will only exit once the event has been sent.
func (client *Client) Send(event Event) error {
Expand All @@ -110,14 +123,7 @@ func (client *Client) Send(event Event) error {
return nil
}

// Close the connection.
// Close will only exit once the connection is fully closed.
func (client *Client) Close() {
for {
select {
case client.close <- struct{}{}:
case <-client.done:
return
}
}
// Done returns a channel which is closed when the client exits.
func (client *Client) Done() <-chan struct{} {
return client.done
}
8 changes: 6 additions & 2 deletions sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ This example binds the root of localhost:8080 to a Server-Sent Event stream that
func eventHandler(w http.ResponseWriter, req *http.Request) {
// Initialise (REQUIRED)
client, err := sse.ClientInit(w, 0)
client, err := sse.ClientInit(w)
// Return error if unable to initialise Server-Sent Events
if err != nil {
Expand All @@ -58,8 +58,12 @@ This example binds the root of localhost:8080 to a Server-Sent Event stream that
defer delete(clients, client)
// Run Client (REQUIRED)
client.Run()
client.Run(r.Context())
}
Caution
This package uses an http.CloseNotifier, see: https://github.com/golang/go/issues/13165.
*/
package sse

Expand Down

0 comments on commit b9e0917

Please sign in to comment.