From 58eb6a21d20d8eba76fbbac0603d00938f4c12b0 Mon Sep 17 00:00:00 2001 From: Wes Date: Mon, 3 Feb 2025 12:52:43 -0700 Subject: [PATCH] fix: module status in console not matching actual engine status (#4270) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit There were many random cases where the console status would not match the status displayed in the terminal: ![Screenshot 2025-02-03 at 11 44 56 AM](https://github.com/user-attachments/assets/cd9a3bd5-4f64-450c-9ef3-1e8951fd0acd) ![Screenshot 2025-02-03 at 12 34 11 PM](https://github.com/user-attachments/assets/25af1053-4da5-428b-b478-afc71c73a37c) This change fixes some of the logic in the updates and simplifies the frontend code now that we can rely on order and deduplication on the backend. ![Screenshot 2025-02-03 at 12 35 23 PM](https://github.com/user-attachments/assets/99e7a3d7-69a7-4a18-85ca-e8e1bf619c5f) --- .../engine/hooks/use-engine-status.ts | 16 +++++--- .../engine/hooks/use-stream-engine-events.ts | 9 +---- .../infrastructure/BuildEngineEvents.tsx | 4 +- internal/buildengine/updates.go | 38 ++++++++++++------- 4 files changed, 39 insertions(+), 28 deletions(-) diff --git a/frontend/console/src/features/engine/hooks/use-engine-status.ts b/frontend/console/src/features/engine/hooks/use-engine-status.ts index 2cfe7fec1e..de963a3f6a 100644 --- a/frontend/console/src/features/engine/hooks/use-engine-status.ts +++ b/frontend/console/src/features/engine/hooks/use-engine-status.ts @@ -1,6 +1,7 @@ import type { InfiniteData } from '@tanstack/react-query' import { useMemo } from 'react' import type { EngineEvent } from '../../../protos/xyz/block/ftl/buildengine/v1/buildengine_pb' +import { compareTimestamps } from '../../../shared/utils' import { getModuleName } from '../engine.utils' import { useStreamEngineEvents } from './use-stream-engine-events' @@ -9,14 +10,9 @@ interface EngineStatus { modules: Record } -const getEventTimestamp = (event: EngineEvent): bigint => { - if (!event.timestamp) return BigInt(0) - return BigInt(event.timestamp.seconds) * BigInt(1e9) + BigInt(event.timestamp.nanos) -} - const isNewerEvent = (current: EngineEvent, existing: EngineEvent | undefined): boolean => { if (!existing) return true - return getEventTimestamp(current) > getEventTimestamp(existing) + return compareTimestamps(current.timestamp, existing.timestamp) > 0 } export const useEngineStatus = (enabled = true) => { @@ -31,7 +27,15 @@ export const useEngineStatus = (enabled = true) => { const data = events.data as InfiniteData | undefined if (data?.pages) { for (const page of data.pages) { + // Skip if page is not an array + if (!Array.isArray(page)) { + console.warn('Received non-array page in engine status:', page) + continue + } + for (const event of page) { + if (!event) continue + const moduleName = getModuleName(event) if (moduleName) { // Only update if this is a newer event for this module diff --git a/frontend/console/src/features/engine/hooks/use-stream-engine-events.ts b/frontend/console/src/features/engine/hooks/use-stream-engine-events.ts index fc69b5be9e..f9975e7434 100644 --- a/frontend/console/src/features/engine/hooks/use-stream-engine-events.ts +++ b/frontend/console/src/features/engine/hooks/use-stream-engine-events.ts @@ -27,15 +27,10 @@ export const useStreamEngineEvents = (enabled = true) => { const newEvent = response.event as EngineEvent if (!old) return { pages: [[newEvent]], pageParams: [0] } - // Check if this event already exists to avoid duplicates - const eventExists = old.pages[0]?.some( - (event) => event.timestamp?.seconds === newEvent.timestamp?.seconds && event.timestamp?.nanos === newEvent.timestamp?.nanos, - ) - if (eventExists) return old - + // Add the new event to the end of the first page to maintain chronological order return { ...old, - pages: [[newEvent, ...old.pages[0]], ...old.pages.slice(1)], + pages: [[...old.pages[0], newEvent], ...old.pages.slice(1)], } }) } diff --git a/frontend/console/src/features/infrastructure/BuildEngineEvents.tsx b/frontend/console/src/features/infrastructure/BuildEngineEvents.tsx index 3c4f9670bf..0ece036b38 100644 --- a/frontend/console/src/features/infrastructure/BuildEngineEvents.tsx +++ b/frontend/console/src/features/infrastructure/BuildEngineEvents.tsx @@ -107,9 +107,11 @@ const EventContent = ({ event }: { event: EngineEvent }) => { } export const BuildEngineEvents = ({ events }: BuildEngineEventsProps) => { + const reversedEvents = [...events].reverse() + return (
- } className='text-xs w-full' /> + } className='text-xs w-full' />
) } diff --git a/internal/buildengine/updates.go b/internal/buildengine/updates.go index 5d1818b69e..7590f0468f 100644 --- a/internal/buildengine/updates.go +++ b/internal/buildengine/updates.go @@ -31,6 +31,8 @@ type updatesService struct { config Config lock sync.RWMutex events []*buildenginepb.EngineEvent + // Channel for subscribers to receive new events + subscribers sync.Map } var _ enginepbconnect.BuildEngineServiceHandler = &updatesService{} @@ -53,9 +55,27 @@ func (e *Engine) startUpdatesService(ctx context.Context, endpoint *url.URL) err // Start goroutine to collect events go func() { for event := range channels.IterContext(ctx, events) { + // Add timestamp to event if not present + if event.Timestamp == nil { + event.Timestamp = timestamppb.Now() + } + svc.lock.Lock() svc.events = append(svc.events, event) svc.lock.Unlock() + + // Broadcast to all subscribers + svc.subscribers.Range(func(key, value interface{}) bool { + if ch, ok := value.(chan *buildenginepb.EngineEvent); ok { + select { + case ch <- event: + default: + // If channel is full, skip the event for this subscriber + logger.Warnf("Subscriber channel is full, dropping event") + } + } + return true + }) } }() @@ -78,12 +98,12 @@ func (u *updatesService) Ping(context.Context, *connect.Request[ftlv1.PingReques } func (u *updatesService) StreamEngineEvents(ctx context.Context, req *connect.Request[buildenginepb.StreamEngineEventsRequest], stream *connect.ServerStream[buildenginepb.StreamEngineEventsResponse]) error { - // First subscribe to new events to avoid missing any events := make(chan *buildenginepb.EngineEvent, 64) - u.engine.EngineUpdates.Subscribe(events) - defer u.engine.EngineUpdates.Unsubscribe(events) + subscriberKey := fmt.Sprintf("subscriber-%p", events) + u.subscribers.Store(subscriberKey, events) + defer u.subscribers.Delete(subscriberKey) - // Then send cached events if replay_history is true + // Send cached events if replay_history is true if req.Msg.ReplayHistory { u.lock.RLock() for _, event := range u.events { @@ -100,16 +120,6 @@ func (u *updatesService) StreamEngineEvents(ctx context.Context, req *connect.Re // Process new events for event := range channels.IterContext(ctx, events) { - // Add timestamp to event - if event.Timestamp == nil { - event.Timestamp = timestamppb.Now() - } - - // Cache the event - u.lock.Lock() - u.events = append(u.events, event) - u.lock.Unlock() - err := stream.Send(&buildenginepb.StreamEngineEventsResponse{ Event: event, })