Skip to content

Commit

Permalink
Add broadcaster to notify clients and message changes
Browse files Browse the repository at this point in the history
  • Loading branch information
lakinduakash committed May 22, 2019
1 parent 485fe84 commit 63fe20c
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 0 deletions.
6 changes: 6 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,9 @@ func StartSever(port string, path string) {
func GetClients() map[string]*websocket.Client {
return pool.GetClients()
}

func ListenClientAddChanges() chan websocket.Client {
c := make(chan websocket.Client)
websocket.CBR.AddWorker(c)
return c
}
58 changes: 58 additions & 0 deletions websocket/broadcast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package websocket

import "sync"

var mutex1 sync.Mutex
var mutex2 sync.Mutex

type MessageBCastChannel struct {
clientList []chan Message
}

type ClientBCastChannel struct {
clientList []chan Client
}

func NewMessageBCastChannel(size int) *MessageBCastChannel {
return &MessageBCastChannel{clientList: make([]chan Message, size)}
}

func NewClientBCastChannel(size int) *ClientBCastChannel {
return &ClientBCastChannel{clientList: make([]chan Client, size)}
}

func (bc *MessageBCastChannel) AddWorker(c chan Message) {
mutex1.Lock()
bc.clientList = append(bc.clientList, c)
mutex1.Unlock()
}

func (bc *ClientBCastChannel) AddWorker(c chan Client) {
mutex2.Lock()
bc.clientList = append(bc.clientList, c)
mutex2.Unlock()
}

func (bc *MessageBCastChannel) BroadCast(data Message) {
mutex1.Lock()
for _, v := range bc.clientList {
if v != nil {
go func() {
v <- data
}()
}
}
mutex1.Unlock()
}

func (bc *ClientBCastChannel) BroadCast(data Client) {
mutex2.Lock()
for _, v := range bc.clientList {
if v != nil {
go func() {
v <- data
}()
}
}
mutex2.Unlock()
}
8 changes: 8 additions & 0 deletions websocket/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ func NewPool() *Pool {

}

var CBR = NewClientBCastChannel(2)
var CBU = NewClientBCastChannel(2)
var CBM = NewMessageBCastChannel(2)

func (pool *Pool) GetClients() map[string]*Client {
return pool.Clients
}
Expand All @@ -35,6 +39,8 @@ func (pool *Pool) Start() {
pool.Clients[client.ID] = client
fmt.Println("New user", client.ID)
fmt.Println("Size of Connection Pool: ", len(pool.Clients))
CBR.BroadCast(*client)

for _, client2 := range pool.Clients {
if err := client2.Conn.WriteJSON(Message{Type: 2, Body: MessageBody{Message: client.ID}}); err != nil {
log.Fatal("Error on write")
Expand All @@ -46,6 +52,7 @@ func (pool *Pool) Start() {
case client := <-pool.Unregister:
delete(pool.Clients, client.ID)
fmt.Println("Size of Connection Pool: ", len(pool.Clients))
CBU.BroadCast(*client)
for _, client := range pool.Clients {
if err := client.Conn.WriteJSON(Message{Type: 3, Body: MessageBody{Message: client.ID}}); err != nil {
log.Fatal("Error on write")
Expand All @@ -55,6 +62,7 @@ func (pool *Pool) Start() {
break
case message := <-pool.Broadcast:

CBM.BroadCast(*message)
if message.Body.To != "" {
fmt.Println("Sending message to", message.Body.To)

Expand Down

0 comments on commit 63fe20c

Please sign in to comment.