Skip to content

Commit

Permalink
moreeeee cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
samliok committed Jan 31, 2025
1 parent 4278e28 commit f60b5c6
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 126 deletions.
4 changes: 2 additions & 2 deletions epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ func (e *Epoch) handleBlockMessage(message *BlockMessage, from NodeID) error {
// Schedule the block to be verified once its direct predecessor have been verified,
// or if it can be verified immediately.
e.Logger.Debug("Scheduling block verification", zap.Uint64("round", md.Round))
e.sched.Schedule(md.Digest, md.Digest, task, md.Prev, canBeImmediatelyVerified)
e.sched.Schedule(task, md.Prev, canBeImmediatelyVerified)

return nil
}
Expand Down Expand Up @@ -1013,7 +1013,7 @@ func (e *Epoch) createBlockVerificationTask(block Block, from NodeID, vote Vote)
}
round.votes[string(vote.Signature.Signer)] = &vote

if err := e.doProposed(block, vote, from); err != nil {
if err := e.doProposed(block); err != nil {
e.Logger.Warn("Failed voting on block", zap.Error(err))
}
return md.Digest
Expand Down
180 changes: 75 additions & 105 deletions sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import "sync"
type scheduler struct {
lock sync.Mutex
signal sync.Cond
pending dependencies[Digest, task]
pending dependencies
ready []task
close bool
}

func NewScheduler() *scheduler {
var as scheduler
as.pending = NewDependencies[Digest, task]()
as.pending = newDependencies()
as.signal = sync.Cond{L: &as.lock}

go as.run()
Expand All @@ -26,7 +26,8 @@ func NewScheduler() *scheduler {
func (as *scheduler) Size() int {
as.lock.Lock()
defer as.lock.Unlock()
return as.pending.Size()

return as.pending.Size() + len(as.ready)
}

func (as *scheduler) Close() {
Expand All @@ -35,81 +36,73 @@ func (as *scheduler) Close() {

as.close = true

as.signal.Signal()
as.signal.Broadcast()
}

func (as *scheduler) run() {
as.lock.Lock()
defer as.lock.Unlock()
/*
for !as.close {
as.maybeExecuteTask()
if as.close {
return
}
if len(as.ready) > 0 {
continue
}
as.signal.Wait()
}
}
(a) If a task A finished its execution before some task B that depends on it was scheduled, then when B was scheduled,
it was scheduled as ready.
Proof: The Epoch schedules new tasks under a lock, and computes whether a task is ready or not, under that lock as well.
Since each task in the Epoch obtains that lock as part of its execution, the computation of whether B is ready to be
scheduled or not is mutually exclusive with respect to A's execution. Therefore, if A finished executing it must be
that B is scheduled as ready when it is executed.
func (as *scheduler) maybeExecuteTask() {
for len(as.ready) > 0 {
if as.close {
return
}
var task task
task, as.ready = as.ready[0], as.ready[1:]
as.lock.Unlock()
task.f()
}
}
(b) If a task is scheduled and is ready to run, it will be executed after a finite set of instructions.
Proof: A ready task is entered into the ready queue (10) and then the condition variable is signaled (11) under a lock.
The scheduler goroutine in the meantime can be either waiting for the signal, in which case it will wake up (2) and perform the next
iteration where it will pop the task (4) and execute it (6), or it can be performing an instruction while not waiting for the signal.
In the latter case, the only time when a lock is not held (5), is when the task is executed (6).
If the lock is not held by the scheduler goroutine, then it will eventually reach the end of the loop and perform the next iteration,
in which it will detect the ready queue is not empty (1) and pop the task (4) and execute it (6).
/*
(a) While a task is scheduled, the lock is held and therefore the only instructions that the scheduler thread
can perform is running 'dispatchTaskAndScheduleDependingTasks', as any other line in 'maybeExecuteTask' requires
the lock to be held. Inside 'dispatchTaskAndScheduleDependingTasks', the only line that doesn't require the lock
to be held is executing the task itself. It follows from here, that it is not possible to schedule a new task
while moving tasks from pending to the ready queue, and vice versa.
(b) The Epoch schedules new tasks under a lock, and computes whether a task is ready or not, under that lock as well.
Since each task in the Epoch first obtains a lock before proceeding to do anything, if a task A finished its execution before
the task B that depends on it was scheduled, then it cannot be that B was scheduled with a dependency on A and is not ready,
because the computation of whether B is ready to be scheduled or not is mutually exclusive with respect to A's execution.
Therefore, if A finished executing it must be that B is ready to be executed.
(c) If a task is scheduled and is ready to run, it will be executed after a finite set of instructions.
The reason is that a ready task is entered into the ready queue and then the condition variable is signaled.
The scheduler goroutine can be either waiting for the signal, in which case it will wake up and execute the task,
or it can be performing an instruction before waiting for the signal. In the latter case, the only time when
a lock is not held, is when the task is executed. Afterward, the lock is re-acquired in 'dispatchTaskAndScheduleDependingTasks'.
It follows from (a) that if the lock is not held by the scheduler goroutine, then it will check for ready tasks one more time
just before entering the wait for the signal, and therefore even if the signal is given while the scheduler goroutine is not waiting
for it, a scheduling of a task ready to run will run after a finite set of instructions by the scheduler goroutine.
Assume in contradiction that there exists a task B such that it is scheduled and is not ready,
depends on a task A which finishes, but B is never scheduled once A finishes.
// main claim (liveness): Tasks that depend on other tasks to finish are eventually executed once the tasks they depend on are executed.
We will show that it cannot be that there exists a task B such that it is scheduled and is not ready to be executed,
and B depends on a task A which finishes, but B is never scheduled once A finishes.
We split into two distinct cases:
1) B is scheduled after A
2) A is scheduled after B
I) B is scheduled after A
II) A is scheduled after B
If (1) holds, then when B is scheduled, it is not ready and hence it is inserted into pending.
It follows from (b) that A does not finish before B is inserted into pending.
At some point the task A finishes its execution, after which the scheduler goroutine
enters 'dispatchTaskAndScheduleDependingTasks' where it proceeds to remove the ID of A,
retrieve B from pending, add B to the ready queue, and perform another iteration inside 'maybeExecuteTask'.
It will then pop tasks from the ready queue and execute them until it is empty, and one of these tasks will be B.
If (I) holds, then when B is scheduled, it is not ready (according to the assumption) and hence it is inserted into pending (9).
It follows from (a) that A does not finish before B is inserted into pending (otherwise B was executed as 'ready').
At some point the task A finishes its execution (6), after which the scheduler goroutine removes the ID of A,
retrieve B from pending (7), add B to the ready queue (8), and perform the next iteration.
It follows from (b) that eventually it will pop B from the ready queue (4) and execute it.
If (2) holds, then when B is scheduled it is pending on A to finish.
The rest follows trivially from (1).
If (II) holds, then when B is scheduled it is pending on A to finish and therefore added to the pending queue(9),
and A is not scheduled yet because scheduling of tasks is done under a lock. The rest follows trivially from (1).
*/

func (as *scheduler) Schedule(id Digest, f func(), prev Digest, ready bool) {
func (as *scheduler) run() {
as.lock.Lock()
defer as.lock.Unlock()

for !as.close {

if len(as.ready) == 0 { // (1)
as.signal.Wait() // (2)
continue // (3)
}

taskToRun := as.ready[0]
as.ready[0] = task{} // Cleanup any object references reachable from the closure of the task
as.ready = as.ready[1:] // (4)

as.lock.Unlock() // (5)
id := taskToRun.f() // (6)
as.lock.Lock()

newlyReadyTasks := as.pending.Remove(id) // (7)
as.ready = append(as.ready, newlyReadyTasks...) // (8)
}
}

func (as *scheduler) Schedule(f func() Digest, prev Digest, ready bool) {
as.lock.Lock()
defer as.lock.Unlock()

Expand All @@ -118,68 +111,45 @@ func (as *scheduler) Schedule(id Digest, f func(), prev Digest, ready bool) {
}

task := task{
f: as.dispatchTaskAndScheduleDependingTasks(id, f),
f: f,
parent: prev,
digest: id,
}

if ready {
as.ready = append(as.ready, task)
} else {
as.pending.Insert(task)
if !ready {
as.pending.Insert(task) // (9)
return
}

as.signal.Signal()
}
as.ready = append(as.ready, task) // (10)

func (as *scheduler) dispatchTaskAndScheduleDependingTasks(id Digest, task func()) func() {
return func() {
task()
as.lock.Lock()
newlyReadyTasks := as.pending.Remove(id)
as.ready = append(as.ready, newlyReadyTasks...)
}
as.signal.Broadcast() // (11)
}

type task struct {
f func()
digest Digest
f func() Digest
parent Digest
}

func (t task) dependsOn() Digest {
return t.parent
}

func (t task) id() Digest {
return t.digest
}

type dependent[C comparable] interface {
dependsOn() C
id() C
}

type dependencies[C comparable, D dependent[C]] struct {
dependsOn map[C][]D // values depend on key.
type dependencies struct {
dependsOn map[Digest][]task // values depend on key.
}

func NewDependencies[C comparable, D dependent[C]]() dependencies[C, D] {
return dependencies[C, D]{
dependsOn: make(map[C][]D),
func newDependencies() dependencies {
return dependencies{
dependsOn: make(map[Digest][]task),
}
}

func (t *dependencies[C, D]) Size() int {
return len(t.dependsOn)
func (d *dependencies) Size() int {
return len(d.dependsOn)
}

func (t *dependencies[C, D]) Insert(v D) {
dependency := v.dependsOn()
t.dependsOn[dependency] = append(t.dependsOn[dependency], v)
func (d *dependencies) Insert(t task) {
dependency := t.parent
d.dependsOn[dependency] = append(d.dependsOn[dependency], t)
}

func (t *dependencies[C, D]) Remove(id C) []D {
func (t *dependencies) Remove(id Digest) []task {
dependents := t.dependsOn[id]
delete(t.dependsOn, id)
return dependents
Expand Down
32 changes: 13 additions & 19 deletions sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,21 @@ import (
"github.com/stretchr/testify/require"
)

type testDependsOn []int

func (m testDependsOn) dependsOn() int {
return m[1]
}

func (m testDependsOn) id() int {
return m[0]
}

func TestDependencyTree(t *testing.T) {
dt := NewDependencies[int, testDependsOn]()
dt := newDependencies()

for i := 0; i < 5; i++ {
// [0] (i+1) depends on [1] (i)
dt.Insert([]int{i + 1, i})
dt.Insert(task{f: func() Digest {
return Digest{uint8(i + 1)}
}, parent: Digest{uint8(i)}})
}

require.Equal(t, 5, dt.Size())

for i := 0; i < 5; i++ {
j := dt.Remove(i)
j := dt.Remove(Digest{uint8(i)})
require.Len(t, j, 1)
require.Equal(t, i+1, j[0].id())
require.Equal(t, Digest{uint8(i + 1)}, j[0].f())
}

}
Expand All @@ -54,9 +45,10 @@ func TestAsyncScheduler(t *testing.T) {
dig1 := makeDigest(t)
dig2 := makeDigest(t)

as.Schedule(dig2, func() {
as.Schedule(func() Digest {
defer wg.Done()
<-ticks
return dig2
}, dig1, true)

ticks <- struct{}{}
Expand All @@ -72,8 +64,9 @@ func TestAsyncScheduler(t *testing.T) {
dig1 := makeDigest(t)
dig2 := makeDigest(t)

as.Schedule(dig2, func() {
as.Schedule(func() Digest {
close(ticks)
return dig2
}, dig1, true)

ticks <- struct{}{}
Expand Down Expand Up @@ -122,14 +115,15 @@ func scheduleTask(lock *sync.Mutex, finished map[Digest]struct{}, dependency Dig

_, hasFinished := finished[dep]

task := func() {
task := func() Digest {
lock.Lock()
defer lock.Unlock()
finished[id] = struct{}{}
wg.Done()
return id
}

as.Schedule(id, task, dep, i == 0 || hasFinished)
as.Schedule(task, dep, i == 0 || hasFinished)
}
}

Expand Down

0 comments on commit f60b5c6

Please sign in to comment.