Skip to content

Commit

Permalink
Unique actors cluster wide
Browse files Browse the repository at this point in the history
  • Loading branch information
anthdm committed Jan 29, 2025
1 parent 4a508e1 commit 1f27e0f
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 7 deletions.
27 changes: 24 additions & 3 deletions cluster/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ type (
getMembers struct{}
getKinds struct{}
deactivate struct{ pid *actor.PID }
getActive struct{ id string }
getActive struct {
id string
kind string
}
)

// Agent is an actor/receiver that is responsible for managing the state
Expand Down Expand Up @@ -79,8 +82,19 @@ func (a *Agent) Receive(c *actor.Context) {
}
c.Respond(kinds)
case getActive:
pid := a.activated[msg.id]
c.Respond(pid)
if len(msg.id) > 0 {
pid := a.activated[msg.id]
c.Respond(pid)
}
if len(msg.kind) > 0 {
pids := make([]*actor.PID, len(a.activated))
i := 0
for _, pid := range a.activated {
pids[i] = pid
i++
}
c.Respond(pids)
}
}
}

Expand All @@ -107,6 +121,7 @@ func (a *Agent) handleActivationRequest(msg *ActivationRequest) *ActivationRespo
slog.Error("received activation request but kind not registered locally on this node", "kind", msg.Kind)
return &ActivationResponse{Success: false}
}

kind := a.localKinds[msg.Kind]
pid := a.cluster.engine.Spawn(kind.producer, msg.Kind, actor.WithID(msg.ID))
resp := &ActivationResponse{
Expand All @@ -117,6 +132,12 @@ func (a *Agent) handleActivationRequest(msg *ActivationRequest) *ActivationRespo
}

func (a *Agent) activate(kind string, config ActivationConfig) *actor.PID {
// Make sure actors are unique across the whole cluster.
id := kind + "/" + config.id // the id part of the PID
if _, ok := a.activated[id]; ok {
slog.Warn("activation failed", "err", "duplicated actor id across the cluster", "id", id)
return nil
}
members := a.members.FilterByKind(kind)
if len(members) == 0 {
slog.Warn("could not find any members with kind", "kind", kind)
Expand Down
23 changes: 21 additions & 2 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,27 @@ func (c *Cluster) HasKind(name string) bool {
return false
}

// TODO: Weird
func (c *Cluster) GetActivated(id string) *actor.PID {
// GetActiveByKind returns all the actor PIDS that are active across the cluster
// by the given kind.
//
// playerPids := c.GetActiveByKind("player")
// // [127.0.0.1:34364/player/1 127.0.0.1:34365/player/2]
func (c *Cluster) GetActiveByKind(kind string) []*actor.PID {
resp, err := c.engine.Request(c.agentPID, getActive{kind: kind}, c.config.requestTimeout).Result()
if err != nil {
return []*actor.PID{}
}
if res, ok := resp.([]*actor.PID); ok {
return res
}
return []*actor.PID{}
}

// GetActivedByID returns the full PID by the given ID.
//
// playerPid := c.GetActiveByID("player/1")
// // 127.0.0.1:34364/player/1
func (c *Cluster) GetActivedByID(id string) *actor.PID {
resp, err := c.engine.Request(c.agentPID, getActive{id: id}, c.config.requestTimeout).Result()
if err != nil {
return nil
Expand Down
60 changes: 58 additions & 2 deletions cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func TestActivate(t *testing.T) {
wg.Wait()
assert.Equal(t, len(c1.Members()), 2)
assert.True(t, c1.HasKind("player"))
assert.True(t, c1.GetActivated("player/1").Equals(expectedPID))
assert.True(t, c1.GetActivedByID("player/1").Equals(expectedPID))

c1.Stop()
c2.Stop()
Expand Down Expand Up @@ -237,7 +237,7 @@ func TestDeactivate(t *testing.T) {

assert.Equal(t, len(c1.Members()), 2)
assert.True(t, c1.HasKind("player"))
assert.Nil(t, c1.GetActivated("player/1"))
assert.Nil(t, c1.GetActivedByID("player/1"))

c1.Stop()
c2.Stop()
Expand Down Expand Up @@ -317,6 +317,62 @@ func TestMembersExcept(t *testing.T) {
assert.Equal(t, am[0].ID, "C")
}

func TestGetActiveByKind(t *testing.T) {
c1Addr := getRandomLocalhostAddr()
c2Addr := getRandomLocalhostAddr()

c1 := makeCluster(t, c1Addr, "A", "eu")
c1.RegisterKind("player", NewPlayer, NewKindConfig())
c1.Start()

c2 := makeCluster(t, c2Addr, "B", "eu")
c2.RegisterKind("player", NewPlayer, NewKindConfig())
c2.Start()

pid1 := c1.Activate("player", NewActivationConfig().WithID("1"))
pid2 := c2.Activate("player", NewActivationConfig().WithID("2"))
time.Sleep(time.Millisecond * 10)

// I know. But we need to have strings, so we dont compare
// protobuffer messages.
pids := c1.GetActiveByKind("player")
assert.Len(t, pids, 2)
pidsStr := make([]string, 2)
pidsStr[0] = pids[0].String()
pidsStr[1] = pids[1].String()
assert.Contains(t, pidsStr, pid1.String())
assert.Contains(t, pidsStr, pid2.String())
}

func TestCannotDuplicateActor(t *testing.T) {
c1Addr := getRandomLocalhostAddr()
c2Addr := getRandomLocalhostAddr()

c1 := makeCluster(t, c1Addr, "A", "eu")
c1.RegisterKind("player", NewPlayer, NewKindConfig())
c1.Start()

c2 := makeCluster(t, c2Addr, "B", "eu")
c2.RegisterKind("player", NewPlayer, NewKindConfig())
c2.Start()

pid := c1.Activate("player", NewActivationConfig().WithID("1"))
time.Sleep(10 * time.Millisecond)
// Lets make sure we spawn the actor on "our" node. Why?
// Because when we randomly selected the other node to spawn the actor
// with the same id on the test will pass.
// Local registry will prevent duplicated actor IDs from the get go.
pid2 := c2.Activate("player", NewActivationConfig().WithID("1").WithSelectMemberFunc(func(_ ActivationDetails) *Member {
return c2.Member()
}))
fmt.Println(pid2)
time.Sleep(time.Millisecond * 10)

pids := c1.GetActiveByKind("player")
assert.Len(t, pids, 1)
assert.Equal(t, pids[0].String(), pid.String())
}

func makeCluster(t *testing.T, addr, id, region string) *Cluster {
config := NewConfig().
WithID(id).
Expand Down

0 comments on commit 1f27e0f

Please sign in to comment.