Skip to content

Commit

Permalink
docs: 📝 add or update docs and comments
Browse files Browse the repository at this point in the history
  • Loading branch information
onionj committed Feb 23, 2024
1 parent 484f2cb commit 2366fcc
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 33 deletions.
39 changes: 24 additions & 15 deletions example/pingpong.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var (
server bool
)

// simple ping pong application
// main is the entry point of the application.
func main() {
flag.BoolVar(&client, "client", false, "start a client")
flag.BoolVar(&server, "server", false, "start a server")
Expand All @@ -28,69 +28,78 @@ func main() {
}
}

// serverRunner runs the server application.
func serverRunner() {
// Create a new WebSocket multiplexer server listening on port 8080.
server := muxr.NewServer(":8080")

// Handle incoming WebSocket connections on the root path.
server.Handle("/", func(stream *muxr.Stream) {
// Read the incoming message from the client.
data, err := stream.Read()
if err != nil {
fmt.Println(err.Error())
fmt.Println("Error reading from client:", err)
return
}
fmt.Println("server get :", string(data), ",streamId:", stream.Id())
fmt.Println("Server received:", string(data), ", Stream ID:", stream.Id())

// Send a response back to the client.
msg := []byte("Pong")
err = stream.Write(msg)
if err != nil {
fmt.Println(err.Error())
fmt.Println("Error writing to client:", err)
return
}
fmt.Println("server send:", string(msg), ",streamId:", stream.Id())
fmt.Println("Server sent:", string(msg), ", Stream ID:", stream.Id())
})

// Start the server and listen for incoming WebSocket connections.
if err := server.ListenAndServe(); err != nil {
fmt.Println(err)
fmt.Println("Error starting server:", err)
return
}
}

// clientRunner runs the client application.
func clientRunner() {

client := muxr.NewClient("ws://127.0.0.1:8080/")
client.Start()
defer client.Stop()

// Create a wait group to synchronize goroutines.
wg := sync.WaitGroup{}

// Spawn multiple goroutines to simulate concurrent clients.
for i := 0; i < 15; i++ {
wg.Add(1)

go func(client *muxr.Client) {
defer wg.Done()

// create new stream
// Create a new stream for communication with the server.
stream, err := client.Dial()
if err != nil {
fmt.Println(err.Error())
fmt.Println("Error creating stream:", err)
return
}
defer stream.Close()

// Send a message to the server.
msg := []byte("Ping")

// write to stream
if err = stream.Write(msg); err != nil {
fmt.Println(err.Error(), 1, stream.Id())
fmt.Println("Error writing to server:", err)
return
}
fmt.Println("client send:", string(msg), ",streamId:", stream.Id())
fmt.Println("Client sent:", string(msg), ", Stream ID:", stream.Id())

// read pong from server
// Read the response from the server.
data, err := stream.Read()
if err != nil {
fmt.Println(err.Error())
fmt.Println("Error reading from server:", err)
return
}
fmt.Println("client get :", string(data), ",streamId:", stream.Id())
fmt.Println("Client received:", string(data), ", Stream ID:", stream.Id())
}(client)
}
wg.Wait()
Expand Down
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ module github.com/onionj/websocket-mux

go 1.21.0

require github.com/stretchr/testify v1.8.4

require golang.org/x/net v0.21.0 // indirect
require (
github.com/gorilla/websocket v1.5.1
github.com/stretchr/testify v1.8.4
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gorilla/websocket v1.5.1
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/net v0.21.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
7 changes: 7 additions & 0 deletions muxr/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/gorilla/websocket"
)

// Client represents a client that connects to a server.
type Client struct {
sync.Mutex
serverAddr string
Expand All @@ -19,6 +20,7 @@ type Client struct {
streamsManager StreamManager
}

// NewClient creates and returns a new instance of Client with the provided server address.
func NewClient(serverAddr string) *Client {
return &Client{
serverAddr: serverAddr,
Expand All @@ -30,6 +32,8 @@ func NewClient(serverAddr string) *Client {
}
}

// Start establishes a connection and initiates communication with the server.
// It returns an error if connection setup fails.
func (c *Client) Start() error {
c.Lock()
defer c.Unlock()
Expand Down Expand Up @@ -113,6 +117,7 @@ func (c *Client) StartForever() (closer func(), err error) {
}, nil
}

// Stop closes the client's connection and releases associated resources.
func (c *Client) Stop() {
c.Lock()
defer c.Unlock()
Expand All @@ -133,6 +138,8 @@ func (c *Client) getStreamId() uint32 {
return current
}

// Dial establishes a new stream with the server.
// It returns a pointer to the created stream and any error encountered.
func (c *Client) Dial() (*Stream, error) {
streamId := c.getStreamId()

Expand Down
4 changes: 3 additions & 1 deletion muxr/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@ import (
"github.com/gorilla/websocket"
)

// ConnAdaptor adapts a WebSocket connection.
type ConnAdaptor struct {
sync.Mutex
Conn *websocket.Conn
}

// WritePacket writes a packet to the WebSocket connection.
func (c *ConnAdaptor) WritePacket(typ uint8, id uint32, data []byte) error {
c.Lock()
defer c.Unlock()
if id == 0 {
// handle single stream mode (in case the client is not a Muxr client)
// Handle single stream mode (in case the client is not a Muxr client).
return c.Conn.WriteMessage(websocket.BinaryMessage, data)
}
return c.Conn.WriteMessage(websocket.BinaryMessage, Packing(id, typ, data))
Expand Down
2 changes: 1 addition & 1 deletion muxr/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package muxr
import "errors"

const (
VERSION string = "v0.4.0"
VERSION string = "v0.4.1"
NUM_BYTES_HEADER = 7
TYPE_INITIAL uint8 = 1 // 0000 0001
TYPE_DATA uint8 = 2 // 0000 0010
Expand Down
8 changes: 6 additions & 2 deletions muxr/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"encoding/binary"
)

// 7 byte header + payload
// Frame represents a muxr frame.
// 7-byte header + payload
//
// {1byteType}{2byteLenght}{4byteID}{payload}
// {1byteType}{2byteLength}{4byteID}{payload}
type Frame []byte

// Packing packs a frame with the provided ID, type, and payload.
func Packing(id uint32, typ uint8, payload []byte) []byte {

length := len(payload)
Expand All @@ -22,12 +24,14 @@ func Packing(id uint32, typ uint8, payload []byte) []byte {
return frame
}

// ParseHeader parses the header of a frame and returns the type, length, and stream ID.
func ParseHeader(header []byte) (uint8, uint16, uint32) {
return uint8(header[0]), // type
binary.BigEndian.Uint16(header[1:3]), // length
binary.BigEndian.Uint32(header[3:]) // stream id
}

// GetPayload extracts the payload from a frame.
func GetPayload(frame Frame, buf []byte) (n int) {
n = copy(buf, frame[NUM_BYTES_HEADER:])
return
Expand Down
2 changes: 2 additions & 0 deletions muxr/frame_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/stretchr/testify/assert"
)

// TestFrameOpen tests the creation and parsing of a data frame.
func TestFrameOpen(t *testing.T) {
payload := []byte{6, 6, 6, 6, 6, 6, 6, 6, 6}
lenght := uint16(len(payload))
Expand All @@ -24,6 +25,7 @@ func TestFrameOpen(t *testing.T) {
assert.Equal(t, payload, _payload)
}

// TestFrameClose tests the creation and parsing of a close frame.
func TestFrameClose(t *testing.T) {

lenght := uint16(0)
Expand Down
2 changes: 2 additions & 0 deletions muxr/muxr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/assert"
)

// TestPingPongWithMuxrClientAndServer tests ping-pong communication between a muxr client and server.
func TestPingPongWithMuxrClientAndServer(t *testing.T) {
go func() {
server := NewServer(":19881")
Expand Down Expand Up @@ -55,6 +56,7 @@ func TestPingPongWithMuxrClientAndServer(t *testing.T) {
assert.Equal(t, ErrStreamClosed, err)
}

// TestPingPongWithGorillaClientAndMuxrServer tests ping-pong communication between a Gorilla WebSocket client and a muxr server.
func TestPingPongWithGorillaClientAndMuxrServer(t *testing.T) {
totalLoop := 50

Expand Down
11 changes: 9 additions & 2 deletions muxr/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,27 @@ import (
"github.com/gorilla/websocket"
)

// Handler represents a function that handles WebSocket streams.
type Handler func(*Stream)

// wsServer represents a WebSocket server.
type wsServer struct {
bindAddr string
handlers map[string]Handler
}

// NewServer creates and returns a new WebSocket server instance.
func NewServer(bindAddr string) *wsServer {
server := &wsServer{bindAddr: bindAddr, handlers: make(map[string]Handler)}
return server
}

// Handle registers a handler function for the given pattern.
func (s wsServer) Handle(pattern string, handler Handler) {
s.handlers[pattern] = handler
}

// ListenAndServe starts the WebSocket server and listens for incoming connections.
func (s *wsServer) ListenAndServe() error {
fmt.Println("ListenAndServe on", s.bindAddr)

Expand All @@ -39,6 +44,7 @@ func (s *wsServer) ListenAndServe() error {
return nil
}

// ListenAndServeTLS starts the WebSocket server with TLS encryption and listens for incoming connections.
func (s *wsServer) ListenAndServeTLS(certFile string, keyFile string) error {
fmt.Println("ListenAndServeTLS on", s.bindAddr)

Expand All @@ -56,6 +62,7 @@ func (s *wsServer) ListenAndServeTLS(certFile string, keyFile string) error {

var upgrader = websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}

// wsServerHandler handles incoming WebSocket connections.
func (s *wsServer) wsServerHandler(writer http.ResponseWriter, request *http.Request) {
conn, err := upgrader.Upgrade(writer, request, nil)

Expand All @@ -77,7 +84,7 @@ func (s *wsServer) wsServerHandler(writer http.ResponseWriter, request *http.Req
streamsManager := StreamManager{Streams: make(map[uint32]*Stream)}
defer streamsManager.KillAll()

// Handle muxr conn (in case the client is muxr.client)
// Handle muxr connection (in case the client is a muxr client)
if IsMuxClient {
for {
_, data, err := conn.ReadMessage()
Expand Down Expand Up @@ -121,7 +128,7 @@ func (s *wsServer) wsServerHandler(writer http.ResponseWriter, request *http.Req
continue
}
}
} else { // Single stream mode (handle normal websocket clients)
} else { // Single stream mode (handle normal WebSocket clients)
streamId := uint32(0)
stream := newStream(streamId, connAdaptor)
streamsManager.Set(streamId, stream)
Expand Down
11 changes: 11 additions & 0 deletions muxr/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Stream struct {

var ErrStreamClosed = errors.New("stream closed")

// newStream creates a new Stream instance with the provided ID and connection adaptor.
func newStream(
id uint32,
connAdaptor *ConnAdaptor,
Expand All @@ -28,6 +29,7 @@ func newStream(
}
}

// Read reads data from the stream's receiver channel.
func (st *Stream) Read() ([]byte, error) {
data, ok := <-st.reciverChannel
if !ok {
Expand All @@ -36,6 +38,7 @@ func (st *Stream) Read() ([]byte, error) {
return data, nil
}

// Write writes data to the stream.
func (st *Stream) Write(data []byte) error {
st.Lock()
defer st.Unlock()
Expand All @@ -49,11 +52,13 @@ func (st *Stream) Write(data []byte) error {
return nil
}

// Close closes the stream.
func (st *Stream) Close() {
st.ConnAdaptor.WritePacket(TYPE_CLOSE, st.id, []byte{})
st.Kill()
}

// kill marks the stream as closed and closes its receiver channel.
func (st *Stream) Kill() {

st.Lock()
Expand All @@ -67,10 +72,12 @@ func (st *Stream) Kill() {
close(st.reciverChannel)
}

// IsClosed returns true if the stream is closed, otherwise false.
func (st *Stream) IsClose() bool {
return st.isClosed
}

// ID returns the ID of the stream.
func (st *Stream) Id() uint32 {
return st.id
}
Expand All @@ -80,25 +87,29 @@ type StreamManager struct {
Streams map[uint32]*Stream
}

// Get retrieves a stream from the stream manager by its ID.
func (sm *StreamManager) Get(id uint32) (*Stream, bool) {
sm.Lock()
defer sm.Unlock()
stream, ok := sm.Streams[id]
return stream, ok
}

// Set adds a stream to the stream manager.
func (sm *StreamManager) Set(id uint32, stream *Stream) {
sm.Lock()
defer sm.Unlock()
sm.Streams[id] = stream
}

// Delete removes a stream from the stream manager by its ID.
func (sm *StreamManager) Delete(id uint32) {
sm.Lock()
defer sm.Unlock()
delete(sm.Streams, id)
}

// KillAll closes all streams in the stream manager.
func (sm *StreamManager) KillAll() {
sm.Lock()
defer sm.Unlock()
Expand Down
Loading

0 comments on commit 2366fcc

Please sign in to comment.