Skip to content
This repository has been archived by the owner on Dec 28, 2024. It is now read-only.

Commit

Permalink
dev: support different streams in gobench
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Oct 26, 2023
1 parent f680581 commit d7eec4a
Showing 1 changed file with 45 additions and 11 deletions.
56 changes: 45 additions & 11 deletions gobench/gobench.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package gobench

import (
"encoding/json"
"fmt"

"github.com/anycable/anycable-go/common"
"github.com/anycable/anycable-go/metrics"
Expand All @@ -14,11 +15,6 @@ import (

const (
metricsCalls = "gochannels_call_total"

identifier = "{\"channel\":\"BenchmarkChannel\"}"

welcomeMessage = "{\"type\":\"welcome\"}"
confirmationMessage = "{\"type\":\"confirm_subscription\",\"identifier\":\"{\\\"channel\\\":\\\"BenchmarkChannel\\\"}\"}"
)

// Identifiers represents a connection identifiers
Expand Down Expand Up @@ -72,7 +68,7 @@ func (c *Controller) Authenticate(sid string, env *common.SessionEnv) (*common.C
return nil, err
}

return &common.ConnectResult{Identifier: string(idstr), Transmissions: []string{welcomeMessage}}, nil
return &common.ConnectResult{Identifier: string(idstr), Transmissions: []string{welcomeMessage(sid)}}, nil
}

// Subscribe performs Command RPC call with "subscribe" command
Expand All @@ -81,8 +77,8 @@ func (c *Controller) Subscribe(sid string, env *common.SessionEnv, id string, ch
res := &common.CommandResult{
Disconnect: false,
StopAllStreams: false,
Streams: []string{"all"},
Transmissions: []string{confirmationMessage},
Streams: []string{streamFromIdentifier(channel)},
Transmissions: []string{confirmationMessage(channel)},
}
return res, nil
}
Expand Down Expand Up @@ -114,7 +110,7 @@ func (c *Controller) Perform(sid string, env *common.SessionEnv, id string, chan
response, err := json.Marshal(
map[string]interface{}{
"message": payload,
"identifier": identifier,
"identifier": channel,
},
)

Expand All @@ -136,7 +132,7 @@ func (c *Controller) Perform(sid string, env *common.SessionEnv, id string, chan
}

broadcast := common.StreamMessage{
Stream: "all",
Stream: streamFromIdentifier(channel),
Data: string(broadcastMsg),
}

Expand All @@ -145,7 +141,7 @@ func (c *Controller) Perform(sid string, env *common.SessionEnv, id string, chan
response, err := json.Marshal(
map[string]interface{}{
"message": payload,
"identifier": identifier,
"identifier": channel,
},
)

Expand Down Expand Up @@ -177,3 +173,41 @@ func (c *Controller) Disconnect(sid string, env *common.SessionEnv, id string, s
c.metrics.Counter(metricsCalls).Inc()
return nil
}

func streamFromIdentifier(identifier string) string {
// identifier is a json of a form {"channel":"ChannelName","id":"1"}
// stream has a form of "all" if no "id" defined and "all#{id}" otherwise
var data struct {
Channel string `json:"channel"`
ID int `json:"id"`
}

err := json.Unmarshal([]byte(identifier), &data)

if err != nil {
fmt.Printf("failed to parse identifier %v: %v", identifier, err)
return "all"
}

if data.ID == 0 {
return "all"
}

return fmt.Sprintf("all%d", data.ID)
}

func confirmationMessage(identifier string) string {
data, _ := json.Marshal(struct {
Identifier string `json:"identifier"`
Type string `json:"type"`
}{
Identifier: identifier,
Type: "confirm_subscription",
})

return string(data)
}

func welcomeMessage(sid string) string {
return "{\"type\":\"welcome\",\"sid\":\"" + sid + "\"}"
}

0 comments on commit d7eec4a

Please sign in to comment.