Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebTransport example POC #58

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 40 additions & 7 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package centrifuge

import (
"crypto/tls"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/centrifugal/protocol"
"github.com/lucas-clemente/quic-go"
)

type disconnect struct {
Expand Down Expand Up @@ -97,8 +100,12 @@ func NewProtobufClient(u string, config Config) *Client {
}

func newClient(u string, isProtobuf bool, config Config) *Client {
if !strings.HasPrefix(u, "ws") {
panic(fmt.Sprintf("unsupported connection endpoint: %s", u))
parsedURL, err := url.Parse(u)
if err != nil {
panic(err)
}
if !strings.HasPrefix(parsedURL.Scheme, "ws") && !strings.HasPrefix(parsedURL.Scheme, "http") {
panic("unsupported URL scheme: " + parsedURL.Scheme)
}

protocolType := protocol.TypeJSON
Expand Down Expand Up @@ -750,11 +757,37 @@ func (c *Client) connectFromScratch(isReconnect bool, reconnectWaitCB func()) er
Header: c.config.Header,
}

t, err := newWebsocketTransport(c.url, c.protocolType, wsConfig)
if err != nil {
go c.handleDisconnect(&disconnect{Reason: "connect error", Reconnect: true})
reconnectWaitCB()
return err
var t transport
var err error

if strings.HasPrefix(c.url, "ws") {
t, err = newWebsocketTransport(c.url, c.protocolType, wsConfig)
if err != nil {
go c.handleDisconnect(&disconnect{Reason: "connect error", Reconnect: true})
reconnectWaitCB()
return err
}
} else {
u, _ := url.Parse(c.url)
if c.protocolType == protocol.TypeProtobuf {
q := u.Query()
q.Set("format", "protobuf")
u.RawQuery = q.Encode()
}
wtCfg := webTransportConfig{
TLSConfig: &tls.Config{
InsecureSkipVerify: true,
},
QUICConfig: &quic.Config{},
DisableCompression: true,
Header: c.config.Header,
}
t, err = newWebTransport(u.String(), c.protocolType, wtCfg)
if err != nil {
go c.handleDisconnect(&disconnect{Reason: "connect error", Reconnect: true})
reconnectWaitCB()
return err
}
}

c.mu.Lock()
Expand Down
5 changes: 4 additions & 1 deletion examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ module github.com/centrifugal/centrifuge-go/examples

go 1.14

replace github.com/centrifugal/centrifuge-go => ../
replace (
github.com/centrifugal/centrifuge-go => ../
github.com/lucas-clemente/quic-go v0.24.0 => github.com/alta/quic-go v0.0.0-20210923171602-7151b11990d2
)

require (
github.com/centrifugal/centrifuge-go v0.3.0
Expand Down
276 changes: 272 additions & 4 deletions examples/go.sum

Large diffs are not rendered by default.

185 changes: 185 additions & 0 deletions examples/webtransport/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package main

import (
"bufio"
"encoding/json"
"flag"
"log"
"os"
"os/signal"
"syscall"
"time"

_ "net/http/pprof"

"github.com/centrifugal/centrifuge-go"
)

// ChatMessage is chat example specific message struct.
type ChatMessage struct {
Input string `json:"input"`
}

type eventHandler struct{}

func (h *eventHandler) OnConnect(_ *centrifuge.Client, e centrifuge.ConnectEvent) {
log.Printf("Connected to chat with ID %s", e.ClientID)
}

func (h *eventHandler) OnError(_ *centrifuge.Client, e centrifuge.ErrorEvent) {
log.Printf("Error: %s", e.Message)
}

func (h *eventHandler) OnMessage(_ *centrifuge.Client, e centrifuge.MessageEvent) {
log.Printf("Message from server: %s", string(e.Data))
}

func (h *eventHandler) OnDisconnect(_ *centrifuge.Client, e centrifuge.DisconnectEvent) {
log.Printf("Disconnected from chat: %s", e.Reason)
}

func (h *eventHandler) OnServerSubscribe(_ *centrifuge.Client, e centrifuge.ServerSubscribeEvent) {
log.Printf("Subscribe to server-side channel %s: (resubscribe: %t, recovered: %t)", e.Channel, e.Resubscribed, e.Recovered)
}

func (h *eventHandler) OnServerUnsubscribe(_ *centrifuge.Client, e centrifuge.ServerUnsubscribeEvent) {
log.Printf("Unsubscribe from server-side channel %s", e.Channel)
}

func (h *eventHandler) OnServerJoin(_ *centrifuge.Client, e centrifuge.ServerJoinEvent) {
log.Printf("Server-side join to channel %s: %s (%s)", e.Channel, e.User, e.Client)
}

func (h *eventHandler) OnServerLeave(_ *centrifuge.Client, e centrifuge.ServerLeaveEvent) {
log.Printf("Server-side leave from channel %s: %s (%s)", e.Channel, e.User, e.Client)
}

func (h *eventHandler) OnServerPublish(_ *centrifuge.Client, e centrifuge.ServerPublishEvent) {
log.Printf("Publication from server-side channel %s: %s", e.Channel, e.Data)
}

func (h *eventHandler) OnPublish(sub *centrifuge.Subscription, e centrifuge.PublishEvent) {
var chatMessage *ChatMessage
err := json.Unmarshal(e.Data, &chatMessage)
if err != nil {
return
}
log.Printf("Someone says via channel %s: %s", sub.Channel(), chatMessage.Input)
}

func (h *eventHandler) OnJoin(sub *centrifuge.Subscription, e centrifuge.JoinEvent) {
log.Printf("Someone joined %s: user id %s, client id %s", sub.Channel(), e.User, e.Client)
}

func (h *eventHandler) OnLeave(sub *centrifuge.Subscription, e centrifuge.LeaveEvent) {
log.Printf("Someone left %s: user id %s, client id %s", sub.Channel(), e.User, e.Client)
}

func (h *eventHandler) OnSubscribeSuccess(sub *centrifuge.Subscription, e centrifuge.SubscribeSuccessEvent) {
log.Printf("Subscribed on channel %s, resubscribed: %v, recovered: %v", sub.Channel(), e.Resubscribed, e.Recovered)
}

func (h *eventHandler) OnSubscribeError(sub *centrifuge.Subscription, e centrifuge.SubscribeErrorEvent) {
log.Printf("Subscribed on channel %s failed, error: %s", sub.Channel(), e.Error)
}

func (h *eventHandler) OnUnsubscribe(sub *centrifuge.Subscription, _ centrifuge.UnsubscribeEvent) {
log.Printf("Unsubscribed from channel %s", sub.Channel())
}

func newClient(handler *eventHandler) *centrifuge.Client {
url := "https://localhost:4242"
isProtobuf := flag.Bool("protobuf", false, "use Protobuf format")
flag.Parse()

var c *centrifuge.Client
if *isProtobuf {
c = centrifuge.NewProtobufClient(url, centrifuge.DefaultConfig())
} else {
c = centrifuge.NewJsonClient(url, centrifuge.DefaultConfig())
}

c.OnConnect(handler)
c.OnDisconnect(handler)
c.OnMessage(handler)
c.OnError(handler)

c.OnServerPublish(handler)
c.OnServerSubscribe(handler)
c.OnServerUnsubscribe(handler)
c.OnServerJoin(handler)
c.OnServerLeave(handler)

return c
}

func main() {
handler := &eventHandler{}

c := newClient(handler)
defer func() { _ = c.Close() }()

sub, err := c.NewSubscription("chat:index")
if err != nil {
log.Fatalln(err)
}

sub.OnPublish(handler)
sub.OnJoin(handler)
sub.OnLeave(handler)
sub.OnSubscribeSuccess(handler)
sub.OnSubscribeError(handler)
sub.OnUnsubscribe(handler)

err = sub.Subscribe()
if err != nil {
log.Fatalln(err)
}

err = c.Connect()
if err != nil {
log.Fatalln(err)
}

sigsCh := make(chan os.Signal, 1)
signal.Notify(sigsCh, os.Interrupt)
signal.Notify(sigsCh, syscall.SIGTERM)

go func() {
sig := <-sigsCh
log.Printf("handle signal %s\n", sig)
errClose := c.Close()
if errClose != nil {
log.Fatalln(errClose)
}

time.Sleep(1 * time.Second)
os.Exit(0)
}()

pubText := func(text string) error {
msg := &ChatMessage{
Input: text,
}
data, _ := json.Marshal(msg)
_, err := sub.Publish(data)
return err
}

log.Printf("Print something and press ENTER to send\n")

// Read input from stdin.
go func(sub *centrifuge.Subscription) {
reader := bufio.NewReader(os.Stdin)
for {
text, _ := reader.ReadString('\n')
err = pubText(text)
if err != nil {
log.Printf("Error publish: %s", err)
}
}
}(sub)

// Run until CTRL+C.
select {}
}
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ module github.com/centrifugal/centrifuge-go

go 1.13

replace github.com/lucas-clemente/quic-go v0.24.0 => github.com/alta/quic-go v0.0.0-20210923171602-7151b11990d2

require (
github.com/centrifugal/protocol v0.7.3
github.com/centrifugal/protocol v0.7.4-0.20211126085642-1eca1794242d
github.com/gorilla/websocket v1.4.2
github.com/jpillora/backoff v1.0.0
github.com/lucas-clemente/quic-go v0.24.0
)
Loading