Skip to content

Commit

Permalink
Implement heartbeat in dispatcher and encoding manager
Browse files Browse the repository at this point in the history
  • Loading branch information
yujiezhu0 committed Feb 8, 2025
1 parent d41c0b0 commit ca422ab
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 19 deletions.
10 changes: 4 additions & 6 deletions disperser/cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ var (
controllerHealthProbePath string = "/tmp/controller-health"
controllerMaxStallDuration time.Duration = 240 * time.Second
controllerLivenessChan = make(chan time.Time, 1)
controllerHeartbeatChan = make(chan time.Time, 1)
)

func main() {
Expand Down Expand Up @@ -135,6 +134,7 @@ func RunController(ctx *cli.Context) error {
chainReader,
logger,
metricsRegistry,
func() { signalHeartbeat(controllerLivenessChan, logger) },
)
if err != nil {
return fmt.Errorf("failed to create encoding manager: %v", err)
Expand Down Expand Up @@ -201,6 +201,7 @@ func RunController(ctx *cli.Context) error {
nodeClientManager,
logger,
metricsRegistry,
func() { signalHeartbeat(controllerLivenessChan, logger) },
)
if err != nil {
return fmt.Errorf("failed to create dispatcher: %v", err)
Expand Down Expand Up @@ -229,9 +230,6 @@ func RunController(ctx *cli.Context) error {
logger.Warn("Failed to create readiness file", "error", err, "path", controllerReadinessProbePath)
}

// Signal heartbeat
signalHeartbeat(logger)

return nil
}

Expand Down Expand Up @@ -266,9 +264,9 @@ func heartbeatMonitor(filePath string, controllerMaxStallDuration time.Duration)
}
}

func signalHeartbeat(logger logging.Logger) {
func signalHeartbeat(controllerLivenessChan chan time.Time, logger logging.Logger) {
select {
case controllerHeartbeatChan <- time.Now():
case controllerLivenessChan <- time.Now():
logger.Info("Heartbeat signal sent from Controller")
default:
// Avoid blocking if the channel is full or no receiver is actively consuming
Expand Down
4 changes: 3 additions & 1 deletion disperser/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ var (

func TestMain(m *testing.M) {
setup(m)
startHeartbeatMonitoring()
code := m.Run()
teardown()
os.Exit(code)
Expand Down Expand Up @@ -224,6 +223,9 @@ func TestHeartbeatMonitoring(t *testing.T) {
heartbeatChan <- time.Now()
time.Sleep(50 * time.Millisecond)
}

// Stop listening after test
close(doneListening)
}

func startHeartbeatMonitoring() {
Expand Down
10 changes: 8 additions & 2 deletions disperser/controller/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ type Dispatcher struct {
logger logging.Logger
metrics *dispatcherMetrics

cursor *blobstore.StatusIndexCursor
cursor *blobstore.StatusIndexCursor
signalHeartbeat func()
}

type batchData struct {
Expand All @@ -63,6 +64,7 @@ func NewDispatcher(
nodeClientManager NodeClientManager,
logger logging.Logger,
registry *prometheus.Registry,
signalHeartbeat func(),
) (*Dispatcher, error) {
if config == nil {
return nil, errors.New("config is required")
Expand All @@ -81,7 +83,8 @@ func NewDispatcher(
logger: logger.With("component", "Dispatcher"),
metrics: newDispatcherMetrics(registry),

cursor: nil,
cursor: nil,
signalHeartbeat: signalHeartbeat,
}, nil
}

Expand Down Expand Up @@ -124,6 +127,9 @@ func (d *Dispatcher) Start(ctx context.Context) error {
}

func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage, *batchData, error) {
// Signal Liveness to indicate no stall
d.signalHeartbeat()

start := time.Now()
defer func() {
d.metrics.reportHandleBatchLatency(time.Since(start))
Expand Down
109 changes: 100 additions & 9 deletions disperser/controller/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ func TestDispatcherHandleBatch(t *testing.T) {
objs := setupBlobCerts(t, components.BlobMetadataStore, []core.QuorumID{0, 1}, 2)
ctx := context.Background()

defer func() {
heartbeats := getHeartbeats()
require.NotEmpty(t, heartbeats, "Expected heartbeats, but none were received")
require.GreaterOrEqual(t, len(heartbeats), 2, "Expected at least 2 heartbeats")

// Additional checks (e.g., time intervals between heartbeats)
for i := 1; i < len(heartbeats); i++ {
require.GreaterOrEqual(t, heartbeats[i].Sub(heartbeats[i-1]), time.Second,
"Heartbeats should have at least 1-second interval")
}
}()

// Get batch header hash to mock signatures
merkleTree, err := corev2.BuildMerkleTree(objs.blobCerts)
require.NoError(t, err)
Expand Down Expand Up @@ -132,6 +144,18 @@ func TestDispatcherInsufficientSignatures(t *testing.T) {
successfulObjs := setupBlobCerts(t, components.BlobMetadataStore, []core.QuorumID{1}, 1)
ctx := context.Background()

defer func() {
heartbeats := getHeartbeats()
require.NotEmpty(t, heartbeats, "Expected heartbeats, but none were received")
require.GreaterOrEqual(t, len(heartbeats), 2, "Expected at least 2 heartbeats")

// Additional checks (e.g., time intervals between heartbeats)
for i := 1; i < len(heartbeats); i++ {
require.GreaterOrEqual(t, heartbeats[i].Sub(heartbeats[i-1]), time.Second,
"Heartbeats should have at least 1-second interval")
}
}()

// Get batch header hash to mock signatures
certs := make([]*corev2.BlobCertificate, 0, len(failedObjs.blobCerts)+len(successfulObjs.blobCerts))
certs = append(certs, failedObjs.blobCerts...)
Expand Down Expand Up @@ -211,6 +235,18 @@ func TestDispatcherInsufficientSignatures2(t *testing.T) {
objsInQuorum1 := setupBlobCerts(t, components.BlobMetadataStore, []core.QuorumID{1}, 1)
ctx := context.Background()

defer func() {
heartbeats := getHeartbeats()
require.NotEmpty(t, heartbeats, "Expected heartbeats, but none were received")
require.GreaterOrEqual(t, len(heartbeats), 2, "Expected at least 2 heartbeats")

// Additional checks (e.g., time intervals between heartbeats)
for i := 1; i < len(heartbeats); i++ {
require.GreaterOrEqual(t, heartbeats[i].Sub(heartbeats[i-1]), time.Second,
"Heartbeats should have at least 1-second interval")
}
}()

// Get batch header hash to mock signatures
certs := make([]*corev2.BlobCertificate, 0, len(objsInBothQuorum.blobCerts)+len(objsInQuorum1.blobCerts))
certs = append(certs, objsInBothQuorum.blobCerts...)
Expand Down Expand Up @@ -274,6 +310,19 @@ func TestDispatcherMaxBatchSize(t *testing.T) {
numBlobs := 12
objs := setupBlobCerts(t, components.BlobMetadataStore, []core.QuorumID{0, 1}, numBlobs)
ctx := context.Background()

defer func() {
heartbeats := getHeartbeats()
require.NotEmpty(t, heartbeats, "Expected heartbeats, but none were received")
require.GreaterOrEqual(t, len(heartbeats), 2, "Expected at least 2 heartbeats")

// Additional checks (e.g., time intervals between heartbeats)
for i := 1; i < len(heartbeats); i++ {
require.GreaterOrEqual(t, heartbeats[i].Sub(heartbeats[i-1]), time.Second,
"Heartbeats should have at least 1-second interval")
}
}()

expectedNumBatches := (numBlobs + int(maxBatchSize) - 1) / int(maxBatchSize)
for i := 0; i < expectedNumBatches; i++ {
batchData, err := components.Dispatcher.NewBatch(ctx, blockNumber)
Expand All @@ -299,6 +348,18 @@ func TestDispatcherNewBatch(t *testing.T) {
require.Len(t, objs.blobCerts, 2)
ctx := context.Background()

defer func() {
heartbeats := getHeartbeats()
require.NotEmpty(t, heartbeats, "Expected heartbeats, but none were received")
require.GreaterOrEqual(t, len(heartbeats), 2, "Expected at least 2 heartbeats")

// Additional checks (e.g., time intervals between heartbeats)
for i := 1; i < len(heartbeats); i++ {
require.GreaterOrEqual(t, heartbeats[i].Sub(heartbeats[i-1]), time.Second,
"Heartbeats should have at least 1-second interval")
}
}()

batchData, err := components.Dispatcher.NewBatch(ctx, blockNumber)
require.NoError(t, err)
batch := batchData.Batch
Expand Down Expand Up @@ -349,6 +410,18 @@ func TestDispatcherNewBatch(t *testing.T) {
}

func TestDispatcherBuildMerkleTree(t *testing.T) {
defer func() {
heartbeats := getHeartbeats()
require.NotEmpty(t, heartbeats, "Expected heartbeats, but none were received")
require.GreaterOrEqual(t, len(heartbeats), 2, "Expected at least 2 heartbeats")

// Additional checks (e.g., time intervals between heartbeats)
for i := 1; i < len(heartbeats); i++ {
require.GreaterOrEqual(t, heartbeats[i].Sub(heartbeats[i-1]), time.Second,
"Heartbeats should have at least 1-second interval")
}
}()

certs := []*corev2.BlobCertificate{
{
BlobHeader: &corev2.BlobHeader{
Expand Down Expand Up @@ -460,7 +533,7 @@ func deleteBlobs(t *testing.T, blobMetadataStore *blobstore.BlobMetadataStore, k
}
}

func newDispatcherComponents(t *testing.T) *dispatcherComponents {
func newDispatcherComponents(t *testing.T) (*dispatcherComponents, func() []time.Time) {
// logger := testutils.GetLogger()
logger, err := common.NewLogger(common.DefaultLoggerConfig())
require.NoError(t, err)
Expand All @@ -472,6 +545,19 @@ func newDispatcherComponents(t *testing.T) *dispatcherComponents {
require.NoError(t, err)
nodeClientManager := &controller.MockClientManager{}
mockChainState.On("GetCurrentBlockNumber").Return(uint(blockNumber), nil)

// Heartbeat tracking variables
var mu sync.Mutex

Check failure on line 550 in disperser/controller/dispatcher_test.go

View workflow job for this annotation

GitHub Actions / Linter

undefined: sync

Check failure on line 550 in disperser/controller/dispatcher_test.go

View workflow job for this annotation

GitHub Actions / Unit Tests

undefined: sync
var heartbeatsReceived []time.Time
doneListening := make(chan bool)

// Mock signalHeartbeat function
mockSignalHeartbeat := func() {

Check failure on line 555 in disperser/controller/dispatcher_test.go

View workflow job for this annotation

GitHub Actions / Linter

mockSignalHeartbeat declared and not used

Check failure on line 555 in disperser/controller/dispatcher_test.go

View workflow job for this annotation

GitHub Actions / Unit Tests

mockSignalHeartbeat declared and not used
mu.Lock()
heartbeatsReceived = append(heartbeatsReceived, time.Now())
mu.Unlock()
}

d, err := controller.NewDispatcher(&controller.DispatcherConfig{
PullInterval: 1 * time.Second,
FinalizationBlockDelay: finalizationBlockDelay,
Expand All @@ -481,12 +567,17 @@ func newDispatcherComponents(t *testing.T) *dispatcherComponents {
}, blobMetadataStore, pool, mockChainState, agg, nodeClientManager, logger, prometheus.NewRegistry())

Check failure on line 567 in disperser/controller/dispatcher_test.go

View workflow job for this annotation

GitHub Actions / Linter

not enough arguments in call to controller.NewDispatcher

Check failure on line 567 in disperser/controller/dispatcher_test.go

View workflow job for this annotation

GitHub Actions / Unit Tests

not enough arguments in call to controller.NewDispatcher
require.NoError(t, err)
return &dispatcherComponents{
Dispatcher: d,
BlobMetadataStore: blobMetadataStore,
Pool: pool,
ChainReader: chainReader,
ChainState: mockChainState,
SigAggregator: agg,
NodeClientManager: nodeClientManager,
}
Dispatcher: d,
BlobMetadataStore: blobMetadataStore,
Pool: pool,
ChainReader: chainReader,
ChainState: mockChainState,
SigAggregator: agg,
NodeClientManager: nodeClientManager,
}, func() []time.Time {
close(doneListening) // Stop tracking
mu.Lock()
defer mu.Unlock()
return heartbeatsReceived
}
}
8 changes: 7 additions & 1 deletion disperser/controller/encoding_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ type EncodingManager struct {
cursor *blobstore.StatusIndexCursor
blobVersionParameters atomic.Pointer[corev2.BlobVersionParameterMap]

metrics *encodingManagerMetrics
metrics *encodingManagerMetrics
signalHeartbeat func()
}

func NewEncodingManager(
Expand All @@ -72,6 +73,7 @@ func NewEncodingManager(
chainReader core.Reader,
logger logging.Logger,
registry *prometheus.Registry,
signalHeartbeat func(),
) (*EncodingManager, error) {
if config.NumRelayAssignment < 1 ||
len(config.AvailableRelays) == 0 ||
Expand All @@ -90,6 +92,7 @@ func NewEncodingManager(
logger: logger.With("component", "EncodingManager"),
cursor: nil,
metrics: newEncodingManagerMetrics(registry),
signalHeartbeat: signalHeartbeat,
}, nil
}

Expand Down Expand Up @@ -142,6 +145,9 @@ func (e *EncodingManager) Start(ctx context.Context) error {
}

func (e *EncodingManager) HandleBatch(ctx context.Context) error {
// Signal Liveness to indicate no stall
e.signalHeartbeat()

// Get a batch of blobs to encode
blobMetadatas, cursor, err := e.blobMetadataStore.GetBlobMetadataByStatusPaginated(ctx, v2.Queued, e.cursor, e.MaxNumBlobsPerIteration)
if err != nil {
Expand Down
Loading

0 comments on commit ca422ab

Please sign in to comment.