Skip to content

Commit

Permalink
fix: module status in console not matching actual engine status (#4270)
Browse files Browse the repository at this point in the history
There were many random cases where the console status would not match
the status displayed in the terminal:

![Screenshot 2025-02-03 at 11 44
56 AM](https://github.com/user-attachments/assets/cd9a3bd5-4f64-450c-9ef3-1e8951fd0acd)
![Screenshot 2025-02-03 at 12 34
11 PM](https://github.com/user-attachments/assets/25af1053-4da5-428b-b478-afc71c73a37c)

This change fixes some of the logic in the updates and simplifies the
frontend code now that we can rely on order and deduplication on the
backend.
![Screenshot 2025-02-03 at 12 35
23 PM](https://github.com/user-attachments/assets/99e7a3d7-69a7-4a18-85ca-e8e1bf619c5f)
  • Loading branch information
wesbillman authored Feb 3, 2025
1 parent 37900e5 commit 58eb6a2
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 28 deletions.
16 changes: 10 additions & 6 deletions frontend/console/src/features/engine/hooks/use-engine-status.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { InfiniteData } from '@tanstack/react-query'
import { useMemo } from 'react'
import type { EngineEvent } from '../../../protos/xyz/block/ftl/buildengine/v1/buildengine_pb'
import { compareTimestamps } from '../../../shared/utils'
import { getModuleName } from '../engine.utils'
import { useStreamEngineEvents } from './use-stream-engine-events'

Expand All @@ -9,14 +10,9 @@ interface EngineStatus {
modules: Record<string, EngineEvent>
}

const getEventTimestamp = (event: EngineEvent): bigint => {
if (!event.timestamp) return BigInt(0)
return BigInt(event.timestamp.seconds) * BigInt(1e9) + BigInt(event.timestamp.nanos)
}

const isNewerEvent = (current: EngineEvent, existing: EngineEvent | undefined): boolean => {
if (!existing) return true
return getEventTimestamp(current) > getEventTimestamp(existing)
return compareTimestamps(current.timestamp, existing.timestamp) > 0
}

export const useEngineStatus = (enabled = true) => {
Expand All @@ -31,7 +27,15 @@ export const useEngineStatus = (enabled = true) => {
const data = events.data as InfiniteData<EngineEvent[]> | undefined
if (data?.pages) {
for (const page of data.pages) {
// Skip if page is not an array
if (!Array.isArray(page)) {
console.warn('Received non-array page in engine status:', page)
continue
}

for (const event of page) {
if (!event) continue

const moduleName = getModuleName(event)
if (moduleName) {
// Only update if this is a newer event for this module
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,10 @@ export const useStreamEngineEvents = (enabled = true) => {
const newEvent = response.event as EngineEvent
if (!old) return { pages: [[newEvent]], pageParams: [0] }

// Check if this event already exists to avoid duplicates
const eventExists = old.pages[0]?.some(
(event) => event.timestamp?.seconds === newEvent.timestamp?.seconds && event.timestamp?.nanos === newEvent.timestamp?.nanos,
)
if (eventExists) return old

// Add the new event to the end of the first page to maintain chronological order
return {
...old,
pages: [[newEvent, ...old.pages[0]], ...old.pages.slice(1)],
pages: [[...old.pages[0], newEvent], ...old.pages.slice(1)],
}
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,11 @@ const EventContent = ({ event }: { event: EngineEvent }) => {
}

export const BuildEngineEvents = ({ events }: BuildEngineEventsProps) => {
const reversedEvents = [...events].reverse()

return (
<div className='overflow-x-hidden w-full'>
<List items={events} renderItem={(event) => <EventContent event={event} />} className='text-xs w-full' />
<List items={reversedEvents} renderItem={(event) => <EventContent event={event} />} className='text-xs w-full' />
</div>
)
}
38 changes: 24 additions & 14 deletions internal/buildengine/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type updatesService struct {
config Config
lock sync.RWMutex
events []*buildenginepb.EngineEvent
// Channel for subscribers to receive new events
subscribers sync.Map
}

var _ enginepbconnect.BuildEngineServiceHandler = &updatesService{}
Expand All @@ -53,9 +55,27 @@ func (e *Engine) startUpdatesService(ctx context.Context, endpoint *url.URL) err
// Start goroutine to collect events
go func() {
for event := range channels.IterContext(ctx, events) {
// Add timestamp to event if not present
if event.Timestamp == nil {
event.Timestamp = timestamppb.Now()
}

svc.lock.Lock()
svc.events = append(svc.events, event)
svc.lock.Unlock()

// Broadcast to all subscribers
svc.subscribers.Range(func(key, value interface{}) bool {
if ch, ok := value.(chan *buildenginepb.EngineEvent); ok {
select {
case ch <- event:
default:
// If channel is full, skip the event for this subscriber
logger.Warnf("Subscriber channel is full, dropping event")
}
}
return true
})
}
}()

Expand All @@ -78,12 +98,12 @@ func (u *updatesService) Ping(context.Context, *connect.Request[ftlv1.PingReques
}

func (u *updatesService) StreamEngineEvents(ctx context.Context, req *connect.Request[buildenginepb.StreamEngineEventsRequest], stream *connect.ServerStream[buildenginepb.StreamEngineEventsResponse]) error {
// First subscribe to new events to avoid missing any
events := make(chan *buildenginepb.EngineEvent, 64)
u.engine.EngineUpdates.Subscribe(events)
defer u.engine.EngineUpdates.Unsubscribe(events)
subscriberKey := fmt.Sprintf("subscriber-%p", events)
u.subscribers.Store(subscriberKey, events)
defer u.subscribers.Delete(subscriberKey)

// Then send cached events if replay_history is true
// Send cached events if replay_history is true
if req.Msg.ReplayHistory {
u.lock.RLock()
for _, event := range u.events {
Expand All @@ -100,16 +120,6 @@ func (u *updatesService) StreamEngineEvents(ctx context.Context, req *connect.Re

// Process new events
for event := range channels.IterContext(ctx, events) {
// Add timestamp to event
if event.Timestamp == nil {
event.Timestamp = timestamppb.Now()
}

// Cache the event
u.lock.Lock()
u.events = append(u.events, event)
u.lock.Unlock()

err := stream.Send(&buildenginepb.StreamEngineEventsResponse{
Event: event,
})
Expand Down

0 comments on commit 58eb6a2

Please sign in to comment.