Skip to content

Commit

Permalink
WIP: Reworking the event stream so its also an actor (#79)
Browse files Browse the repository at this point in the history
* added new event stream that is actually an actor

* fixed bug on sending event with remotes

* renamed event to event stream

* fixed engine address bug when using remote

* fixed address error and wrapped up big portion of the PR
  • Loading branch information
anthdm authored Dec 3, 2023
1 parent 8dac16e commit 2098fc6
Show file tree
Hide file tree
Showing 15 changed files with 215 additions and 178 deletions.
9 changes: 4 additions & 5 deletions _bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,16 @@ import (
)

func makeRemoteEngine(addr string) *actor.Engine {
e := actor.NewEngine()
r := remote.New(e, remote.Config{ListenAddr: addr})
e.WithRemote(r)
r := remote.New(remote.Config{ListenAddr: addr})
e := actor.NewEngine(actor.EngineOptRemote(r))
return e
}

func benchmarkRemote() {
var (
a = makeRemoteEngine("127.0.0.1:3000")
b = makeRemoteEngine("127.0.0.1:3001")
pidB = b.SpawnFunc(func(c *actor.Context) {}, "bench", actor.WithInboxSize(1024*8))
pidB = b.SpawnFunc(func(c *actor.Context) {}, "bench", actor.WithInboxSize(1024*8), actor.WithMaxRestarts(0))
)
its := []int{
1_000_000,
Expand All @@ -39,7 +38,7 @@ func benchmarkRemote() {

func benchmarkLocal() {
e := actor.NewEngine()
pid := e.SpawnFunc(func(c *actor.Context) {}, "bench", actor.WithInboxSize(1024*8))
pid := e.SpawnFunc(func(c *actor.Context) {}, "bench", actor.WithInboxSize(1024*8), actor.WithMaxRestarts(0))
its := []int{
1_000_000,
10_000_000,
Expand Down
5 changes: 4 additions & 1 deletion actor/deadletter.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package actor

import (
"github.com/anthdm/hollywood/log"
fmt "fmt"
"reflect"

"github.com/anthdm/hollywood/log"
)

//
Expand Down Expand Up @@ -34,6 +36,7 @@ func (d *deadLetter) Receive(ctx *Context) {
case Initialized:
d.logger.Debugw("default deadletter actor initialized")
case *DeadLetterEvent:
fmt.Println("received deadletter", msg)
d.logger.Warnw("deadletter arrived", "msg-type", reflect.TypeOf(msg),
"sender", msg.Sender, "target", msg.Target, "msg", msg.Message)
default:
Expand Down
7 changes: 5 additions & 2 deletions actor/deadletter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package actor
import (
"bytes"
"fmt"
"github.com/anthdm/hollywood/log"
"github.com/stretchr/testify/assert"
"log/slog"
"os"
"sync"
"testing"
"time"

"github.com/anthdm/hollywood/log"
"github.com/stretchr/testify/assert"
)

// TestDeadLetterDefault tests the default deadletter handling.
Expand Down Expand Up @@ -49,6 +50,8 @@ func TestDeadLetterCustom(t *testing.T) {
e.Poison(a1).Wait() // poison the a1 actor
// should be in deadletter
fmt.Println("==== sending message via a1 to deadletter ====")
fmt.Println(e.Registry)
fmt.Println("ID=> ", dl.PID())
e.Send(a1, testMessage{"bar"})
time.Sleep(time.Millisecond) // a flush would be nice here :-)
resp, err := e.Request(dl.PID(), &customDeadLetterFetch{flush: true}, time.Millisecond*10).Result()
Expand Down
74 changes: 52 additions & 22 deletions actor/engine.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package actor

import (
"log/slog"
reflect "reflect"
"sync"
"time"

Expand All @@ -10,7 +12,7 @@ import (
type Remoter interface {
Address() string
Send(*PID, any, *PID)
Start()
Start(*Engine)
}

// Producer is any function that can return a Receiver
Expand All @@ -23,26 +25,29 @@ type Receiver interface {

// Engine represents the actor engine.
type Engine struct {
EventStream *EventStream
Registry *Registry
Registry *Registry

address string
remote Remoter
deadLetter *PID
logger log.Logger
address string
remote Remoter
deadLetter *PID
eventStream *PID
logger log.Logger
}

// NewEngine returns a new actor Engine.
// You can pass an optional logger through
func NewEngine(opts ...func(*Engine)) *Engine {
e := &Engine{}
e.Registry = newRegistry(e) // need to init the registry in case we want a custom deadletter
e.address = LocalLookupAddr
e.Registry = newRegistry(e) // need to init the registry in case we want a custom deadletter
e.EventStream = NewEventStream() //
for _, o := range opts {
o(e)
}
if e.remote != nil {
e.address = e.remote.Address()
}

e.eventStream = e.Spawn(NewEventStream(), "eventstream")
// if no deadletter is registered, we will register the default deadletter from deadletter.go
if e.deadLetter == nil {
e.logger.Debugw("no deadletter receiver set, registering default")
Expand All @@ -51,36 +56,38 @@ func NewEngine(opts ...func(*Engine)) *Engine {
return e
}

// TODO: Doc
func EngineOptLogger(logger log.Logger) func(*Engine) {
return func(e *Engine) {
e.logger = logger
// This is a bit hacky, but we need to set the logger for the eventstream
// which cannot be set in the constructor since the logger is not set yet.
e.EventStream.logger = logger.SubLogger("[eventStream]")
}
}

// TODO: Doc
func EngineOptRemote(r Remoter) func(*Engine) {
return func(e *Engine) {
e.remote = r
e.address = r.Address()
// TODO: potential error not handled here
r.Start(e)
}
}

// TODO: Doc
func EngineOptPidSeparator(sep string) func(*Engine) {
// This looks weird because the separator is a global variable.
return func(e *Engine) {
pidSeparator = sep
}
}

// TODO: Doc
func EngineOptDeadletter(d Producer) func(*Engine) {
return func(e *Engine) {
e.deadLetter = e.Spawn(d, "deadletter")
}
}

// WithRemote returns a new actor Engine with the given Remoter,
// and will call its Start function
func (e *Engine) WithRemote(r Remoter) {
e.remote = r
e.address = r.Address()
r.Start()
}

// Spawn spawns a process that will producer by the given Producer and
// can be configured with the given opts.
func (e *Engine) Spawn(p Producer, name string, opts ...OptFunc) *PID {
Expand Down Expand Up @@ -138,19 +145,32 @@ func (e *Engine) Send(pid *PID, msg any) {
e.send(pid, msg, nil)
}

// BroadcastEvent will broadcast the given message over the eventstream, notifying all
// actors that are subscribed.
func (e *Engine) BroadcastEvent(msg any) {
if e.eventStream != nil {
e.send(e.eventStream, msg, nil)
}
}

func (e *Engine) send(pid *PID, msg any, sender *PID) {
if e.isLocalMessage(pid) {
e.SendLocal(pid, msg, sender)
return
}
if e.remote == nil {
e.logger.Errorw("failed sending messsage",
"err", "engine has no remote configured")
slog.Error("failed sending messsage",
"err", "engine has no remote configured",
"to", pid,
"type", reflect.TypeOf(msg),
"msg", msg,
)
return
}
e.remote.Send(pid, msg, sender)
}

// TODO: documentation
type SendRepeater struct {
engine *Engine
self *PID
Expand Down Expand Up @@ -254,6 +274,16 @@ func (e *Engine) SendLocal(pid *PID, msg any, sender *PID) {
proc.Send(pid, msg, sender)
}

// Subscribe will subscribe the given PID to the event stream.
func (e *Engine) Subscribe(pid *PID) {
e.Send(e.eventStream, EventSub{pid: pid})
}

// Unsubscribe will un subscribe the given PID from the event stream.
func (e *Engine) Unsubscribe(pid *PID) {
e.Send(e.eventStream, EventUnsub{pid: pid})
}

func (e *Engine) isLocalMessage(pid *PID) bool {
return e.address == pid.Address
}
Expand Down
78 changes: 22 additions & 56 deletions actor/event_stream.go
Original file line number Diff line number Diff line change
@@ -1,70 +1,36 @@
package actor

import (
"math"
"math/rand"
"sync"

"github.com/anthdm/hollywood/log"
)

// EventSub is the message that will be send to subscribe to the event stream.
type EventSub struct {
id uint32
}

type EventStreamFunc func(event any)

type EventStream struct {
mu sync.RWMutex
subs map[*EventSub]EventStreamFunc
logger log.Logger
pid *PID
}

func NewEventStream() *EventStream {
return &EventStream{
subs: make(map[*EventSub]EventStreamFunc),
}
// EventUnSub is the message that will be send to unsubscribe from the event stream.
type EventUnsub struct {
pid *PID
}

func (e *EventStream) Unsubscribe(sub *EventSub) {
e.mu.Lock()
defer e.mu.Unlock()

delete(e.subs, sub)

e.logger.Debugw("unsubscribe",
"subs", len(e.subs),
"id", sub.id,
)
type EventStream struct {
subs map[*PID]bool
}

func (e *EventStream) Subscribe(f EventStreamFunc) *EventSub {
e.mu.Lock()
defer e.mu.Unlock()

sub := &EventSub{
id: uint32(rand.Intn(math.MaxUint32)),
func NewEventStream() Producer {
return func() Receiver {
return &EventStream{
subs: make(map[*PID]bool),
}
}
e.subs[sub] = f

e.logger.Debugw("subscribe",
"subs", len(e.subs),
"id", sub.id,
)

return sub
}

func (e *EventStream) Publish(msg any) {
e.mu.RLock()
defer e.mu.RUnlock()
for _, f := range e.subs {
go f(msg)
func (e *EventStream) Receive(c *Context) {
switch msg := c.Message().(type) {
case EventSub:
e.subs[msg.pid] = true
case EventUnsub:
delete(e.subs, msg.pid)
default:
for sub := range e.subs {
c.Forward(sub)
}
}
}

func (e *EventStream) Len() int {
e.mu.RLock()
defer e.mu.RUnlock()
return len(e.subs)
}
Loading

0 comments on commit 2098fc6

Please sign in to comment.