Skip to content

Commit

Permalink
nonvoter join/leave
Browse files Browse the repository at this point in the history
  • Loading branch information
octu0 committed Oct 25, 2024
1 parent 11cf00b commit cfbcc8b
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 85 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ func main() {
WithElectionTimeout(1*time.Second),
WithObserveFunc(func(b *bullyelection.Bully, evt bullyelection.NodeEvent, id, addr string) {
log.Printf("[%s] event: %s node=%s(%s)", b.ID(), evt.String(), id, addr)
if evt == bullyelection.ElectionEvent {
for _, n := range b.Members() {
log.Printf("%s is_leader=%v", n.ID(), n.IsLeader())
}
}
}),
WithOnErrorFunc(func(err error) {
log.Printf("error=%+v", err)
Expand Down
29 changes: 19 additions & 10 deletions _example/join_leave/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ import (

func main() {
var (
id = flag.String("id", "node1", "node id")
addr = flag.String("addr", "127.0.0.1", "ip addr")
port = flag.Int("port", 7234, "port")
join = flag.String("join", "127.0.0.1:7234", "join addr")
id = flag.String("id", "node1", "node id")
addr = flag.String("addr", "127.0.0.1", "ip addr")
port = flag.Int("port", 7234, "port")
join = flag.String("join", "127.0.0.1:7234", "join addr")
isNonVoter = flag.Bool("nonvoter", false, "nonvoter mode")
)
flag.Parse()

Expand All @@ -34,14 +35,22 @@ func main() {
sig, sigStop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer sigStop()

b, err := bullyelection.CreateVoter(ctx, conf, bullyelection.WithObserveFunc(func(b *bullyelection.Bully, evt bullyelection.NodeEvent, id, addr string) {
log.Printf("[%s] event: %s node=%s(%s)", b.ID(), evt.String(), id, addr)
if evt == bullyelection.ElectionEvent {
for _, n := range b.Members() {
log.Printf("%s is_leader=%v", n.ID(), n.IsLeader())
opts := []bullyelection.BullyOptFunc{
bullyelection.WithObserveFunc(func(b *bullyelection.Bully, evt bullyelection.NodeEvent, id, addr string) {
log.Printf("[%s] event: %s node=%s(%s)", b.ID(), evt.String(), id, addr)
if evt == bullyelection.ElectionEvent {
for _, n := range b.Members() {
log.Printf("%s is_leader=%v", n.ID(), n.IsLeader())
}
}
}),
}
b, err := func() (*bullyelection.Bully, error) {
if *isNonVoter {
return bullyelection.CreateNonVoter(ctx, conf, opts...)
}
}))
return bullyelection.CreateVoter(ctx, conf, opts...)
}()
if err != nil {
log.Fatal(err)
}
Expand Down
88 changes: 20 additions & 68 deletions bully.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,29 +37,6 @@ var (
ErrNodeNotFound = errors.New("node not found")
)

type NodeEvent uint8

const (
JoinEvent NodeEvent = iota + 1
LeaveEvent
TransferLeadershipEvent
ElectionEvent
)

func (evt NodeEvent) String() string {
switch evt {
case JoinEvent:
return "join"
case LeaveEvent:
return "leave"
case TransferLeadershipEvent:
return "transfer_leadership"
case ElectionEvent:
return "election"
}
return "unknown event"
}

type (
ObserveFunc func(*Bully, NodeEvent, string, string)
ULIDGeneratorFunc func() string
Expand Down Expand Up @@ -276,16 +253,23 @@ func (b *Bully) listNodes() []Node {
return m
}

func (b *Bully) Join(addr string) (err error) {
func (b *Bully) followerJoin(addr string) error {
b.opt.logger.Printf("info: join %s", addr)
if _, err := b.list.Join([]string{addr}); err != nil {
return errors.WithStack(err)
}
return nil
}

func (b *Bully) Join(addr string) error {
if addr == b.list.LocalNode().Address() {
return nil // skip self join
}
oldLeader := b.getLeaderID()
defer func() {
if err != nil {
b.setLeaderID(oldLeader)
}
}()

if b.IsVoter() != true {
return b.followerJoin(addr)
}

// clear leader
b.clearLeaderID()
if err := b.updateNode(); err != nil {
Expand Down Expand Up @@ -439,47 +423,15 @@ func (b *Bully) send(targetNodeID string, data []byte) error {
return nil
}

func (b *Bully) readNodeEventLoop(ctx context.Context, ch chan *nodeEventMsg) {
defer b.wg.Done()

for {
select {
case <-ctx.Done():
return

case msg := <-ch:
select {
case b.electionQueue <- msg:
// ok
default:
b.opt.onErrorFunc(errors.Wrapf(ErrBullyBusy, "maybe hangup election, drop: %s", msg))
}
}
}
}

func (b *Bully) electionRunLoop(ctx context.Context) {
defer b.wg.Done()

for {
select {
case <-ctx.Done():
return

case msg := <-b.electionQueue:
switch msg.evt {
case JoinEvent, LeaveEvent, TransferLeadershipEvent:
if err := b.startElection(ctx); err != nil {
b.opt.onErrorFunc(errors.Wrapf(err, "election failure"))
continue
}
b.opt.observeFunc(b, msg.evt, msg.id, msg.addr)
case ElectionEvent:
// emit only
b.opt.observeFunc(b, msg.evt, msg.id, msg.addr)
func (b *Bully) checkVoterNode(targetNodeID string) bool {
for _, n := range b.listNodes() {
if n.ID() == targetNodeID {
if n.IsVoter() {
return true
}
}
}
return false
}

func newBully(opt *bullyOpt, node Node, list *memberlist.Memberlist, cancel context.CancelFunc) *Bully {
Expand Down
28 changes: 21 additions & 7 deletions election.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ var (
ErrTransferLeadership = errors.New("failed to transfer_leadership")
)

func (b *Bully) startElection(ctx context.Context) (err error) {
func (b *Bully) startElection(ctx context.Context, event NodeEvent, nodeID string) (err error) {
if b.node.IsVoter() != true {
b.opt.logger.Printf("debug: is not voter, skip election")
return nil
}
defer func() {
if b.waitElection != nil {
select {
Expand All @@ -24,9 +28,10 @@ func (b *Bully) startElection(ctx context.Context) (err error) {
}
}()

if b.node.IsVoter() != true {
b.opt.logger.Printf("debug: is not voter, skip election")
return nil
if event == JoinEvent {
if b.checkVoterNode(nodeID) != true {
return nil // skip nonvoter join
}
}

b.opt.logger.Printf("debug: start election")
Expand All @@ -38,10 +43,18 @@ func (b *Bully) startElection(ctx context.Context) (err error) {
b.electionCancel()
b.electionCancel = nopCancelFunc()

nodes := getVoters(b)
voterNodes := getVoters(b)
if len(voterNodes) < 2 { // promote self
for _, n := range b.listNodes() {
if err := b.sendCoordinatorMessage(n.ID()); err != nil {
b.opt.onErrorFunc(errors.Wrapf(ErrElection, "send coordinator: %+v", err))
}
}
return nil
}

selfULID := b.getULID()
for _, n := range nodes {
for _, n := range voterNodes {
// electable = smaller than own ULID
if n.getULID() < selfULID {
if err := b.sendElectionMessage(n.ID()); err != nil {
Expand All @@ -56,7 +69,8 @@ func (b *Bully) startElection(ctx context.Context) (err error) {
return

case <-time.After(b.opt.electionTimeout):
for _, n := range nodes {
// send all nodes
for _, n := range b.listNodes() {
if err := b.sendCoordinatorMessage(n.ID()); err != nil {
b.opt.onErrorFunc(errors.Wrapf(ErrElection, "send coordinator: %+v", err))
}
Expand Down
73 changes: 73 additions & 0 deletions event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package bullyelection

import (
"context"

"github.com/pkg/errors"
)

type NodeEvent uint8

const (
JoinEvent NodeEvent = iota + 1
LeaveEvent
TransferLeadershipEvent
ElectionEvent
)

func (evt NodeEvent) String() string {
switch evt {
case JoinEvent:
return "join"
case LeaveEvent:
return "leave"
case TransferLeadershipEvent:
return "transfer_leadership"
case ElectionEvent:
return "election"
}
return "unknown event"
}

func (b *Bully) readNodeEventLoop(ctx context.Context, ch chan *nodeEventMsg) {
defer b.wg.Done()

for {
select {
case <-ctx.Done():
return

case msg := <-ch:
select {
case b.electionQueue <- msg:
// ok
default:
b.opt.onErrorFunc(errors.Wrapf(ErrBullyBusy, "maybe hangup election, drop: %s", msg))
}
}
}
}

func (b *Bully) electionRunLoop(ctx context.Context) {
defer b.wg.Done()

for {
select {
case <-ctx.Done():
return

case msg := <-b.electionQueue:
switch msg.evt {
case JoinEvent, LeaveEvent, TransferLeadershipEvent:
if err := b.startElection(ctx, msg.evt, msg.id); err != nil {
b.opt.onErrorFunc(errors.Wrapf(err, "election failure"))
continue
}
b.opt.observeFunc(b, msg.evt, msg.id, msg.addr)
case ElectionEvent:
// emit only
b.opt.observeFunc(b, msg.evt, msg.id, msg.addr)
}
}
}
}

0 comments on commit cfbcc8b

Please sign in to comment.