Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GEN-2302]: watch for modified events, do not toast them #2370

Merged
merged 5 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions frontend/kube/watchers/instrumentation_config_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
)

var instrumentationConfigAddedEventBatcher *EventBatcher
var instrumentationConfigModifiedEventBatcher *EventBatcher
var instrumentationConfigDeletedEventBatcher *EventBatcher

func StartInstrumentationConfigWatcher(ctx context.Context, namespace string) error {
Expand All @@ -33,6 +34,21 @@ func StartInstrumentationConfigWatcher(ctx context.Context, namespace string) er
},
)

instrumentationConfigModifiedEventBatcher = NewEventBatcher(
EventBatcherConfig{
MinBatchSize: 1,
Duration: 5 * time.Second,
Event: sse.MessageEventModified,
CRDType: consts.InstrumentationConfig,
SuccessBatchMessageFunc: func(count int, crdType string) string {
return fmt.Sprintf("Successfully updated %d sources", count)
},
FailureBatchMessageFunc: func(count int, crdType string) string {
return fmt.Sprintf("Failed to update %d sources", count)
},
},
)

instrumentationConfigDeletedEventBatcher = NewEventBatcher(
EventBatcherConfig{
MinBatchSize: 1,
Expand Down Expand Up @@ -60,6 +76,7 @@ func StartInstrumentationConfigWatcher(ctx context.Context, namespace string) er
func handleInstrumentationConfigWatchEvents(ctx context.Context, watcher watch.Interface) {
ch := watcher.ResultChan()
defer instrumentationConfigAddedEventBatcher.Cancel()
defer instrumentationConfigModifiedEventBatcher.Cancel()
defer instrumentationConfigDeletedEventBatcher.Cancel()
for {
select {
Expand All @@ -73,6 +90,8 @@ func handleInstrumentationConfigWatchEvents(ctx context.Context, watcher watch.I
switch event.Type {
case watch.Added:
handleAddedInstrumentationConfig(event.Object.(*v1alpha1.InstrumentationConfig))
case watch.Modified:
handleModifiedInstrumentationConfig(event.Object.(*v1alpha1.InstrumentationConfig))
case watch.Deleted:
handleDeletedInstrumentationConfig(event.Object.(*v1alpha1.InstrumentationConfig))
}
Expand All @@ -93,6 +112,19 @@ func handleAddedInstrumentationConfig(instruConfig *v1alpha1.InstrumentationConf
instrumentationConfigAddedEventBatcher.AddEvent(sse.MessageTypeSuccess, data, target)
}

func handleModifiedInstrumentationConfig(instruConfig *v1alpha1.InstrumentationConfig) {
namespace := instruConfig.Namespace
name, kind, err := commonutils.ExtractWorkloadInfoFromRuntimeObjectName(instruConfig.Name)
if err != nil {
genericErrorMessage(sse.MessageEventModified, consts.InstrumentationConfig, err.Error())
return
}

target := fmt.Sprintf("namespace=%s&name=%s&kind=%s", namespace, name, kind)
data := fmt.Sprintf(`Source "%s" updated`, name)
instrumentationConfigModifiedEventBatcher.AddEvent(sse.MessageTypeSuccess, data, target)
}

func handleDeletedInstrumentationConfig(instruConfig *v1alpha1.InstrumentationConfig) {
namespace := instruConfig.Namespace
name, kind, err := commonutils.ExtractWorkloadInfoFromRuntimeObjectName(instruConfig.Name)
Expand Down
5 changes: 2 additions & 3 deletions frontend/webapp/cypress/constants/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,11 @@ export const TEXTS = {
INSTRUMENTATION_RULE_WARN_MODAL_TITLE: `Delete rule (${CYPRESS_TEST})`,

NOTIF_SOURCES_CREATED: (amount: number) => `Successfully created ${amount} sources`,
NOTIF_SOURCES_UPDATED: (amount: number) => `Successfully updated ${amount} source`,
NOTIF_SOURCES_UPDATED: (name: string) => `Successfully updated "${name}" source`,
NOTIF_SOURCES_DELETED: (amount: number) => `Successfully deleted ${amount} sources`,

NOTIF_DESTINATIONS_CREATED: (amount: number) => `Successfully created ${amount} destinations`,
// TODO: this message isn't right, fix in backend
NOTIF_DESTINATIONS_UPDATED: (amount: number) => `Successfully transformed ${amount + 1} destinations to otelcol configuration`,
NOTIF_DESTINATIONS_UPDATED: (name: string) => `Successfully updated "${name}" destination`,
NOTIF_DESTINATIONS_DELETED: (amount: number) => `Successfully deleted ${amount} destinations`,

NOTIF_ACTION_CREATED: (crdId: string) => `Action "${crdId}" created`,
Expand Down
2 changes: 1 addition & 1 deletion frontend/webapp/cypress/e2e/03-sources.cy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ describe('Sources CRUD', () => {
getCrdIds({ namespace, crdName, expectedError: '', expectedLength: 5 }, (crdIds) => {
const crdId = CRD_IDS.SOURCE;
expect(crdIds).includes(crdId);
awaitToast({ withSSE: false, message: TEXTS.NOTIF_SOURCES_UPDATED(1) }, () => {
awaitToast({ withSSE: false, message: TEXTS.NOTIF_SOURCES_UPDATED(SELECTED_ENTITIES.SOURCE) }, () => {
getCrdById({ namespace, crdName, crdId, expectedError: '', expectedKey: 'serviceName', expectedValue: TEXTS.UPDATED_NAME });
});
});
Expand Down
3 changes: 1 addition & 2 deletions frontend/webapp/cypress/e2e/04-destinations.cy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ describe('Destinations CRUD', () => {
cy.wait('@gql').then(() => {
getCrdIds({ namespace, crdName, expectedError: '', expectedLength: 1 }, (crdIds) => {
const crdId = crdIds[0];

awaitToast({ withSSE: true, message: TEXTS.NOTIF_DESTINATIONS_UPDATED(1) }, () => {
awaitToast({ withSSE: false, message: TEXTS.NOTIF_DESTINATIONS_UPDATED(SELECTED_ENTITIES.DESTINATION_TYPE) }, () => {
getCrdById({ namespace, crdName, crdId, expectedError: '', expectedKey: 'destinationName', expectedValue: TEXTS.UPDATED_NAME });
});
});
Expand Down
29 changes: 21 additions & 8 deletions frontend/webapp/hooks/destinations/useDestinationCRUD.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ interface Params {
export const useDestinationCRUD = (params?: Params) => {
const filters = useFilterStore();
const { data: config } = useConfig();
const { addPendingItems } = usePendingStore();
const { addPendingItems, removePendingItems } = usePendingStore();
const { addNotification, removeNotifications } = useNotificationStore();

const notifyUser = (type: NOTIFICATION_TYPE, title: string, message: string, id?: string, hideFromHistory?: boolean) => {
Expand Down Expand Up @@ -83,7 +83,20 @@ export const useDestinationCRUD = (params?: Params) => {

const [updateDestination, uState] = useMutation<{ updateDestination: { id: string } }>(UPDATE_DESTINATION, {
onError: (error) => handleError(ACTION.UPDATE, error.message),
onCompleted: () => handleComplete(ACTION.UPDATE),
onCompleted: (res, req) => {
handleComplete(ACTION.UPDATE);

// This is instead of toasting a k8s modified-event watcher...
// If we do toast with a watcher, we can't guarantee an SSE will be sent for this update alone. It will definitely include SSE for all updates, even those unexpected.
// Not that there's anything about a watcher that would break the UI, it's just that we would receive unexpected events with ridiculous amounts.
setTimeout(() => {
const { id, destination } = req?.variables || {};

refetch();
notifyUser(NOTIFICATION_TYPE.SUCCESS, ACTION.UPDATE, `Successfully updated "${destination.type}" destination`, id);
removePendingItems([{ entityType: ENTITY_TYPES.DESTINATION, entityId: id }]);
}, 2000);
},
});

const [deleteDestination, dState] = useMutation<{ deleteDestination: boolean }>(DELETE_DESTINATION, {
Expand All @@ -101,31 +114,31 @@ export const useDestinationCRUD = (params?: Params) => {
filteredDestinations: filtered,
refetchDestinations: refetch,

createDestination: (destination: DestinationInput) => {
createDestination: async (destination: DestinationInput) => {
if (config?.readonly) {
notifyUser(NOTIFICATION_TYPE.WARNING, DISPLAY_TITLES.READONLY, FORM_ALERTS.READONLY_WARNING, undefined, true);
} else {
notifyUser(NOTIFICATION_TYPE.INFO, 'Pending', 'Creating destination...', undefined, true);
addPendingItems([{ entityType: ENTITY_TYPES.DESTINATION, entityId: undefined }]);
createDestination({ variables: { destination: { ...destination, fields: destination.fields.filter(({ value }) => value !== undefined) } } });
await createDestination({ variables: { destination: { ...destination, fields: destination.fields.filter(({ value }) => value !== undefined) } } });
}
},
updateDestination: (id: string, destination: DestinationInput) => {
updateDestination: async (id: string, destination: DestinationInput) => {
if (config?.readonly) {
notifyUser(NOTIFICATION_TYPE.WARNING, DISPLAY_TITLES.READONLY, FORM_ALERTS.READONLY_WARNING, undefined, true);
} else {
notifyUser(NOTIFICATION_TYPE.INFO, 'Pending', 'Updating destination...', undefined, true);
addPendingItems([{ entityType: ENTITY_TYPES.DESTINATION, entityId: id }]);
updateDestination({ variables: { id, destination: { ...destination, fields: destination.fields.filter(({ value }) => value !== undefined) } } });
await updateDestination({ variables: { id, destination: { ...destination, fields: destination.fields.filter(({ value }) => value !== undefined) } } });
}
},
deleteDestination: (id: string) => {
deleteDestination: async (id: string) => {
if (config?.readonly) {
notifyUser(NOTIFICATION_TYPE.WARNING, DISPLAY_TITLES.READONLY, FORM_ALERTS.READONLY_WARNING, undefined, true);
} else {
notifyUser(NOTIFICATION_TYPE.INFO, 'Pending', 'Deleting destination...', undefined, true);
addPendingItems([{ entityType: ENTITY_TYPES.DESTINATION, entityId: id }]);
deleteDestination({ variables: { id } });
await deleteDestination({ variables: { id } });
}
},
};
Expand Down
19 changes: 10 additions & 9 deletions frontend/webapp/hooks/notification/useSSE.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { useEffect, useRef } from 'react';
import { NOTIFICATION_TYPE } from '@odigos/ui-utils';
import { useDestinationCRUD } from '../destinations';
import { usePaginatedSources } from '../compute-platform';
import { API, DISPLAY_TITLES, NOTIF_CRD_TYPES } from '@/utils';
import { API, DISPLAY_TITLES, SSE_CRD_TYPES, SSE_EVENT_TYPES } from '@/utils';
import { type NotifyPayload, useNotificationStore, usePendingStore, useStatusStore } from '@/store';

export const useSSE = () => {
Expand All @@ -21,28 +21,29 @@ export const useSSE = () => {

es.onmessage = (event) => {
const data = JSON.parse(event.data);
const crdType = data.crdType || '';
const notification: NotifyPayload = {
type: data.type,
title: data.event || '',
message: data.data || '',
crdType,
crdType: data.crdType || '',
target: data.target,
};

// SSE toast notification
if (crdType !== NOTIF_CRD_TYPES.CONNECTED) addNotification(notification);
if (notification.title !== SSE_EVENT_TYPES.MODIFIED && notification.crdType !== SSE_CRD_TYPES.CONNECTED) {
// SSE toast notification (for all events except "modified" and "connected")
addNotification(notification);
}

// Handle specific CRD types
if ([NOTIF_CRD_TYPES.CONNECTED].includes(crdType)) {
if ([SSE_CRD_TYPES.CONNECTED].includes(notification.crdType as string)) {
if (title !== DISPLAY_TITLES.API_TOKEN) {
setStatusStore({ status: NOTIFICATION_TYPE.SUCCESS, title: notification.title as string, message: notification.message as string });
}
} else if ([NOTIF_CRD_TYPES.INSTRUMENTATION_CONFIG, NOTIF_CRD_TYPES.INSTRUMENTATION_INSTANCE].includes(crdType)) {
} else if ([SSE_CRD_TYPES.INSTRUMENTATION_CONFIG, SSE_CRD_TYPES.INSTRUMENTATION_INSTANCE].includes(notification.crdType as string)) {
fetchSources();
} else if ([NOTIF_CRD_TYPES.DESTINATION].includes(crdType)) {
} else if ([SSE_CRD_TYPES.DESTINATION].includes(notification.crdType as string)) {
refetchDestinations();
} else console.warn('Unhandled SSE for CRD type:', crdType);
} else console.warn('Unhandled SSE for CRD type:', notification.crdType);

// This works for now,
// but in the future we might have to change this to "removePendingItems",
Expand Down
10 changes: 4 additions & 6 deletions frontend/webapp/hooks/sources/useSourceCRUD.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,14 @@ export const useSourceCRUD = (params?: Params) => {
onCompleted: (res, req) => {
handleComplete(ACTION.UPDATE);

// This is instead of using a k8s modified-event watcher...
// If we do use a watcher, we can't guarantee an SSE will be sent for this update alone.
// It will definitely include SSE for all updates, that can be instrument/uninstrument, conditions changed etc.
// Not that there's anything about a watcher that would break the UI, it's just that we would receive unexpected events with ridiculous amounts,
// (example: instrument 5 apps, update the name of 2, then uninstrument the other 3, we would get an SSE with minimum 10 updated sources, when we expect it to show only 2 due to name change).
// This is instead of toasting a k8s modified-event watcher...
// If we do toast with a watcher, we can't guarantee an SSE will be sent for this update alone. It will definitely include SSE for all updates, even those unexpected.
// Not that there's anything about a watcher that would break the UI, it's just that we would receive unexpected events with ridiculous amounts.
setTimeout(() => {
const { sourceId, patchSourceRequest } = req?.variables || {};

updateSource(sourceId, patchSourceRequest);
notifyUser(NOTIFICATION_TYPE.SUCCESS, ACTION.UPDATE, 'Successfully updated 1 source', sourceId);
notifyUser(NOTIFICATION_TYPE.SUCCESS, ACTION.UPDATE, `Successfully updated "${sourceId.name}" source`, sourceId);
removePendingItems([{ entityType: ENTITY_TYPES.SOURCE, entityId: sourceId }]);
}, 2000);
},
Expand Down
8 changes: 7 additions & 1 deletion frontend/webapp/utils/constants/string.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,15 @@ export const DISPLAY_TITLES = {
READONLY: 'Readonly',
};

export const NOTIF_CRD_TYPES = {
export const SSE_CRD_TYPES = {
CONNECTED: 'CONNECTED',
INSTRUMENTATION_CONFIG: 'InstrumentationConfig',
INSTRUMENTATION_INSTANCE: 'InstrumentationInstance',
DESTINATION: 'Destination',
};

export const SSE_EVENT_TYPES = {
ADDED: 'Added',
MODIFIED: 'Modified',
DELETED: 'Deleted',
};
Loading