From d7eec4a05c21eb13a5b2f5b22e211d663cefa1f0 Mon Sep 17 00:00:00 2001 From: Vladimir Dementyev Date: Wed, 25 Oct 2023 23:26:23 -0700 Subject: [PATCH] dev: support different streams in gobench --- gobench/gobench.go | 56 +++++++++++++++++++++++++++++++++++++--------- 1 file changed, 45 insertions(+), 11 deletions(-) diff --git a/gobench/gobench.go b/gobench/gobench.go index bec5724e..2d2e2643 100644 --- a/gobench/gobench.go +++ b/gobench/gobench.go @@ -4,6 +4,7 @@ package gobench import ( "encoding/json" + "fmt" "github.com/anycable/anycable-go/common" "github.com/anycable/anycable-go/metrics" @@ -14,11 +15,6 @@ import ( const ( metricsCalls = "gochannels_call_total" - - identifier = "{\"channel\":\"BenchmarkChannel\"}" - - welcomeMessage = "{\"type\":\"welcome\"}" - confirmationMessage = "{\"type\":\"confirm_subscription\",\"identifier\":\"{\\\"channel\\\":\\\"BenchmarkChannel\\\"}\"}" ) // Identifiers represents a connection identifiers @@ -72,7 +68,7 @@ func (c *Controller) Authenticate(sid string, env *common.SessionEnv) (*common.C return nil, err } - return &common.ConnectResult{Identifier: string(idstr), Transmissions: []string{welcomeMessage}}, nil + return &common.ConnectResult{Identifier: string(idstr), Transmissions: []string{welcomeMessage(sid)}}, nil } // Subscribe performs Command RPC call with "subscribe" command @@ -81,8 +77,8 @@ func (c *Controller) Subscribe(sid string, env *common.SessionEnv, id string, ch res := &common.CommandResult{ Disconnect: false, StopAllStreams: false, - Streams: []string{"all"}, - Transmissions: []string{confirmationMessage}, + Streams: []string{streamFromIdentifier(channel)}, + Transmissions: []string{confirmationMessage(channel)}, } return res, nil } @@ -114,7 +110,7 @@ func (c *Controller) Perform(sid string, env *common.SessionEnv, id string, chan response, err := json.Marshal( map[string]interface{}{ "message": payload, - "identifier": identifier, + "identifier": channel, }, ) @@ -136,7 +132,7 @@ func (c *Controller) Perform(sid string, env *common.SessionEnv, id string, chan } broadcast := common.StreamMessage{ - Stream: "all", + Stream: streamFromIdentifier(channel), Data: string(broadcastMsg), } @@ -145,7 +141,7 @@ func (c *Controller) Perform(sid string, env *common.SessionEnv, id string, chan response, err := json.Marshal( map[string]interface{}{ "message": payload, - "identifier": identifier, + "identifier": channel, }, ) @@ -177,3 +173,41 @@ func (c *Controller) Disconnect(sid string, env *common.SessionEnv, id string, s c.metrics.Counter(metricsCalls).Inc() return nil } + +func streamFromIdentifier(identifier string) string { + // identifier is a json of a form {"channel":"ChannelName","id":"1"} + // stream has a form of "all" if no "id" defined and "all#{id}" otherwise + var data struct { + Channel string `json:"channel"` + ID int `json:"id"` + } + + err := json.Unmarshal([]byte(identifier), &data) + + if err != nil { + fmt.Printf("failed to parse identifier %v: %v", identifier, err) + return "all" + } + + if data.ID == 0 { + return "all" + } + + return fmt.Sprintf("all%d", data.ID) +} + +func confirmationMessage(identifier string) string { + data, _ := json.Marshal(struct { + Identifier string `json:"identifier"` + Type string `json:"type"` + }{ + Identifier: identifier, + Type: "confirm_subscription", + }) + + return string(data) +} + +func welcomeMessage(sid string) string { + return "{\"type\":\"welcome\",\"sid\":\"" + sid + "\"}" +}