Skip to content

Commit

Permalink
feat(ftl-schema): pre-commit validation (#4382)
Browse files Browse the repository at this point in the history
- Checks that the event is valid against latest known state
- If still, when applying the event, it is invalid, returns a generic
error to the user.
  • Loading branch information
jvmakine authored Feb 12, 2025
1 parent 5637ec0 commit 121fd50
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 43 deletions.
157 changes: 137 additions & 20 deletions backend/schemaservice/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,60 @@ func (r *SchemaState) ApplyEvent(ctx context.Context, event schema.Event) error
}
}

func handleDeploymentRuntimeEvent(t *SchemaState, e *schema.DeploymentRuntimeEvent) error {
// VerifyEvent verifies an event is valid for the given state, without applying it
func (r *SchemaState) VerifyEvent(ctx context.Context, event schema.Event) error {
if err := event.Validate(); err != nil {
return fmt.Errorf("invalid event: %w", err)
}
switch e := event.(type) {
case *schema.DeploymentRuntimeEvent:
return verifyDeploymentRuntimeEvent(r, e)
case *schema.ChangesetCreatedEvent:
return verifyChangesetCreatedEvent(r, e)
case *schema.ChangesetPreparedEvent:
return verifyChangesetPreparedEvent(r, e)
case *schema.ChangesetCommittedEvent:
return verifyChangesetCommittedEvent(r, e)
case *schema.ChangesetDrainedEvent:
return verifyChangesetDrainedEvent(r, e)
case *schema.ChangesetFinalizedEvent:
return verifyChangesetFinalizedEvent(r, e)
case *schema.ChangesetRollingBackEvent:
return verifyChangesetRollingBackEvent(r, e)
case *schema.ChangesetFailedEvent:
return verifyChangesetFailedEvent(r, e)
default:
return fmt.Errorf("unknown event type: %T", e)
}
}

func verifyDeploymentRuntimeEvent(t *SchemaState, e *schema.DeploymentRuntimeEvent) error {
if cs, ok := e.ChangesetKey().Get(); ok {
c, ok := t.changesets[cs]
_, ok := t.changesets[cs]
if !ok {
return fmt.Errorf("changeset %s not found", cs.String())
}
for _, m := range t.changesets[cs].Modules {
if m.Name == e.DeploymentKey().Payload.Module {
return nil
}
}
}
for _, m := range t.deployments {
if m.Runtime.Deployment.DeploymentKey == e.DeploymentKey() {
return nil
}
}
return fmt.Errorf("deployment %s not found", e.DeploymentKey().String())
}

func handleDeploymentRuntimeEvent(t *SchemaState, e *schema.DeploymentRuntimeEvent) error {
if err := verifyDeploymentRuntimeEvent(t, e); err != nil {
return err
}
if cs, ok := e.ChangesetKey().Get(); ok {
module := e.DeploymentKey().Payload.Module
c := t.changesets[cs]
for _, m := range c.Modules {
if m.Name == module {
err := e.Payload.ApplyToModule(m)
Expand All @@ -77,15 +124,14 @@ func handleDeploymentRuntimeEvent(t *SchemaState, e *schema.DeploymentRuntimeEve
return fmt.Errorf("deployment %s not found", e.DeploymentKey().String())
}

func handleChangesetCreatedEvent(t *SchemaState, e *schema.ChangesetCreatedEvent) error {
func verifyChangesetCreatedEvent(t *SchemaState, e *schema.ChangesetCreatedEvent) error {
if existing := t.changesets[e.Changeset.Key]; existing != nil {
return fmt.Errorf("changeset %s already exists ", e.Changeset.Key)
}
activeCount := 0
existingModules := map[string]key.Changeset{}
for _, cs := range t.changesets {
if cs.ModulesAreCanonical() {
//TODO: at the moment changesets accumulate forever...
for _, mod := range cs.Modules {
existingModules[mod.Name] = cs.Key
}
Expand Down Expand Up @@ -130,11 +176,18 @@ func handleChangesetCreatedEvent(t *SchemaState, e *schema.ChangesetCreatedEvent
return fmt.Errorf("changeset failed validation %w", errors.Join(problems...))
}
}
return nil
}

func handleChangesetCreatedEvent(t *SchemaState, e *schema.ChangesetCreatedEvent) error {
if err := verifyChangesetCreatedEvent(t, e); err != nil {
return err
}
t.changesets[e.Changeset.Key] = e.Changeset
return nil
}

func handleChangesetPreparedEvent(t *SchemaState, e *schema.ChangesetPreparedEvent) error {
func verifyChangesetPreparedEvent(t *SchemaState, e *schema.ChangesetPreparedEvent) error {
changeset, ok := t.changesets[e.Key]
if !ok {
return fmt.Errorf("changeset %s not found", e.Key)
Expand All @@ -147,6 +200,14 @@ func handleChangesetPreparedEvent(t *SchemaState, e *schema.ChangesetPreparedEve
return fmt.Errorf("deployment %s has no endpoint", dep.Name)
}
}
return nil
}

func handleChangesetPreparedEvent(t *SchemaState, e *schema.ChangesetPreparedEvent) error {
if err := verifyChangesetPreparedEvent(t, e); err != nil {
return err
}
changeset := t.changesets[e.Key]
changeset.State = schema.ChangesetStatePrepared
// TODO: what does this actually mean? Worry about it when we start implementing canaries, but it will be clunky
// If everything that cares about canaries needs to scan for prepared changesets
Expand All @@ -156,7 +217,7 @@ func handleChangesetPreparedEvent(t *SchemaState, e *schema.ChangesetPreparedEve
return nil
}

func handleChangesetCommittedEvent(ctx context.Context, t *SchemaState, e *schema.ChangesetCommittedEvent) error {
func verifyChangesetCommittedEvent(t *SchemaState, e *schema.ChangesetCommittedEvent) error {
changeset, ok := t.changesets[e.Key]
if !ok {
return fmt.Errorf("changeset %s not found", e.Key)
Expand All @@ -167,6 +228,15 @@ func handleChangesetCommittedEvent(ctx context.Context, t *SchemaState, e *schem
return fmt.Errorf("deployment %s is not in correct state expected %v got %v", dep.Name, schema.DeploymentStateCanary, dep.Runtime.Deployment.State)
}
}
return nil
}

func handleChangesetCommittedEvent(ctx context.Context, t *SchemaState, e *schema.ChangesetCommittedEvent) error {
if err := verifyChangesetCommittedEvent(t, e); err != nil {
return err
}

changeset := t.changesets[e.Key]
logger := log.FromContext(ctx)
changeset.State = schema.ChangesetStateCommitted
for _, dep := range changeset.Modules {
Expand All @@ -182,43 +252,73 @@ func handleChangesetCommittedEvent(ctx context.Context, t *SchemaState, e *schem
return nil
}

func handleChangesetDrainedEvent(ctx context.Context, t *SchemaState, e *schema.ChangesetDrainedEvent) error {
logger := log.FromContext(ctx)
func verifyChangesetDrainedEvent(t *SchemaState, e *schema.ChangesetDrainedEvent) error {
changeset, ok := t.changesets[e.Key]
if !ok {
return fmt.Errorf("changeset %s not found", e.Key)
}
if changeset.State != schema.ChangesetStateCommitted {
return fmt.Errorf("changeset %v is not in the correct state", changeset.Key)
}

for _, dep := range changeset.RemovingModules {
if dep.ModRuntime().ModDeployment().State != schema.DeploymentStateDraining &&
dep.ModRuntime().ModDeployment().State != schema.DeploymentStateDeProvisioning {
return fmt.Errorf("deployment %s is not in correct state expected %v got %v", dep.Name, schema.DeploymentStateDeProvisioning, dep.Runtime.Deployment.State)
}
}
return nil
}

func handleChangesetDrainedEvent(ctx context.Context, t *SchemaState, e *schema.ChangesetDrainedEvent) error {
if err := verifyChangesetDrainedEvent(t, e); err != nil {
return err
}

logger := log.FromContext(ctx)
changeset := t.changesets[e.Key]
logger.Debugf("Changeset %s drained", e.Key)

for _, dep := range changeset.RemovingModules {
if dep.ModRuntime().ModDeployment().State == schema.DeploymentStateDraining {
dep.Runtime.Deployment.State = schema.DeploymentStateDeProvisioning
} else if dep.ModRuntime().ModDeployment().State != schema.DeploymentStateDeProvisioning {
return fmt.Errorf("deployment %s is not in correct state expected %v got %v", dep.Name, schema.DeploymentStateDeProvisioning, dep.Runtime.Deployment.State)
}
}
changeset.State = schema.ChangesetStateDrained
return nil
}
func handleChangesetFinalizedEvent(ctx context.Context, r *SchemaState, e *schema.ChangesetFinalizedEvent) error {
logger := log.FromContext(ctx)
changeset, ok := r.changesets[e.Key]
func verifyChangesetFinalizedEvent(t *SchemaState, e *schema.ChangesetFinalizedEvent) error {
changeset, ok := t.changesets[e.Key]
if !ok {
return fmt.Errorf("changeset %s not found", e.Key)
}
if changeset.State != schema.ChangesetStateDrained {
return fmt.Errorf("changeset %v is not in the correct state expected %v got %v", changeset.Key, schema.ChangesetStateDrained, changeset.State)
}

for _, dep := range changeset.RemovingModules {
if dep.ModRuntime().ModDeployment().State == schema.DeploymentStateDeProvisioning {
continue
}
if dep.ModRuntime().ModDeployment().State != schema.DeploymentStateDeleted {
return fmt.Errorf("deployment %s is not in correct state expected %v got %v", dep.Name, schema.DeploymentStateDeleted, dep.Runtime.Deployment.State)
}
}
return nil
}

func handleChangesetFinalizedEvent(ctx context.Context, r *SchemaState, e *schema.ChangesetFinalizedEvent) error {
if err := verifyChangesetFinalizedEvent(r, e); err != nil {
return err
}

logger := log.FromContext(ctx)
changeset := r.changesets[e.Key]
logger.Debugf("Changeset %s de-provisioned", e.Key)

for _, dep := range changeset.RemovingModules {
if dep.ModRuntime().ModDeployment().State == schema.DeploymentStateDeProvisioning {
dep.Runtime.Deployment.State = schema.DeploymentStateDeleted
} else if dep.ModRuntime().ModDeployment().State != schema.DeploymentStateDeleted {
return fmt.Errorf("deployment %s is not in correct state expected %v got %v", dep.Name, schema.DeploymentStateDeleted, dep.Runtime.Deployment.State)
}
}
changeset.State = schema.ChangesetStateFinalized
Expand All @@ -232,11 +332,20 @@ func handleChangesetFinalizedEvent(ctx context.Context, r *SchemaState, e *schem
return nil
}

func handleChangesetFailedEvent(t *SchemaState, e *schema.ChangesetFailedEvent) error {
changeset, ok := t.changesets[e.Key]
func verifyChangesetFailedEvent(t *SchemaState, e *schema.ChangesetFailedEvent) error {
_, ok := t.changesets[e.Key]
if !ok {
return fmt.Errorf("changeset %s not found", e.Key)
}
return nil
}

func handleChangesetFailedEvent(t *SchemaState, e *schema.ChangesetFailedEvent) error {
if err := verifyChangesetFailedEvent(t, e); err != nil {
return err
}

changeset := t.changesets[e.Key]
changeset.State = schema.ChangesetStateFailed
//TODO: de-provisioning on failure?
delete(t.changesets, changeset.Key)
Expand All @@ -246,14 +355,22 @@ func handleChangesetFailedEvent(t *SchemaState, e *schema.ChangesetFailedEvent)
t.archivedChangesets = nl
return nil
}
func handleChangesetRollingBackEvent(t *SchemaState, e *schema.ChangesetRollingBackEvent) error {
changeset, ok := t.changesets[e.Key]
func verifyChangesetRollingBackEvent(t *SchemaState, e *schema.ChangesetRollingBackEvent) error {
_, ok := t.changesets[e.Key]
if !ok {
return fmt.Errorf("changeset %s not found", e.Key)
}
return nil
}

func handleChangesetRollingBackEvent(t *SchemaState, e *schema.ChangesetRollingBackEvent) error {
if err := verifyChangesetRollingBackEvent(t, e); err != nil {
return err
}

changeset := t.changesets[e.Key]
changeset.State = schema.ChangesetStateRollingBack
changeset.Error = e.Error
println("ERROR " + e.Error)
for _, module := range changeset.Modules {
module.Runtime.Deployment.State = schema.DeploymentStateDeProvisioning
}
Expand Down
45 changes: 24 additions & 21 deletions backend/schemaservice/schemaservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (s *Service) UpdateDeploymentRuntime(ctx context.Context, req *connect.Requ
}
changeset = &cs
}
err = s.State.Publish(ctx, EventWrapper{Event: &schema.DeploymentRuntimeEvent{Changeset: changeset, Payload: event}})
err = s.publishEvent(ctx, &schema.DeploymentRuntimeEvent{Changeset: changeset, Payload: event})
if err != nil {
return nil, fmt.Errorf("could not apply event: %w", err)
}
Expand Down Expand Up @@ -200,9 +200,7 @@ func (s *Service) CreateChangeset(ctx context.Context, req *connect.Request[ftlv
}

// TODO: validate changeset schema with canonical schema
err = s.State.Publish(ctx, EventWrapper{Event: &schema.ChangesetCreatedEvent{
Changeset: changeset,
}})
err = s.publishEvent(ctx, &schema.ChangesetCreatedEvent{Changeset: changeset})
if err != nil {
return nil, fmt.Errorf("could not create changeset %w", err)
}
Expand All @@ -216,9 +214,7 @@ func (s *Service) PrepareChangeset(ctx context.Context, req *connect.Request[ftl
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid changeset key: %w", err))
}
err = s.State.Publish(ctx, EventWrapper{Event: &schema.ChangesetPreparedEvent{
Key: changesetKey,
}})
err = s.publishEvent(ctx, &schema.ChangesetPreparedEvent{Key: changesetKey})
if err != nil {
return nil, fmt.Errorf("could not prepare changeset %w", err)
}
Expand All @@ -231,9 +227,7 @@ func (s *Service) CommitChangeset(ctx context.Context, req *connect.Request[ftlv
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid changeset key: %w", err))
}
err = s.State.Publish(ctx, EventWrapper{Event: &schema.ChangesetCommittedEvent{
Key: changesetKey,
}})
err = s.publishEvent(ctx, &schema.ChangesetCommittedEvent{Key: changesetKey})
if err != nil {
return nil, fmt.Errorf("could not commit changeset %w", err)
}
Expand All @@ -251,9 +245,7 @@ func (s *Service) DrainChangeset(ctx context.Context, req *connect.Request[ftlv1
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid changeset key: %w", err))
}
err = s.State.Publish(ctx, EventWrapper{Event: &schema.ChangesetDrainedEvent{
Key: changesetKey,
}})
err = s.publishEvent(ctx, &schema.ChangesetDrainedEvent{Key: changesetKey})
if err != nil {
return nil, fmt.Errorf("could not drain changeset %w", err)
}
Expand All @@ -265,9 +257,7 @@ func (s *Service) FinalizeChangeset(ctx context.Context, req *connect.Request[ft
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid changeset key: %w", err))
}
err = s.State.Publish(ctx, EventWrapper{Event: &schema.ChangesetFinalizedEvent{
Key: changesetKey,
}})
err = s.publishEvent(ctx, &schema.ChangesetFinalizedEvent{Key: changesetKey})
if err != nil {
return nil, fmt.Errorf("could not de-provision changeset %w", err)
}
Expand All @@ -279,10 +269,10 @@ func (s *Service) RollbackChangeset(ctx context.Context, req *connect.Request[ft
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid changeset key: %w", err))
}
err = s.State.Publish(ctx, EventWrapper{Event: &schema.ChangesetRollingBackEvent{
err = s.publishEvent(ctx, &schema.ChangesetRollingBackEvent{
Key: changesetKey,
Error: req.Msg.Error,
}})
})
if err != nil {
return nil, fmt.Errorf("could not fail changeset %w", err)
}
Expand All @@ -295,15 +285,28 @@ func (s *Service) FailChangeset(ctx context.Context, req *connect.Request[ftlv1.
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid changeset key: %w", err))
}
err = s.State.Publish(ctx, EventWrapper{Event: &schema.ChangesetFailedEvent{
Key: changesetKey,
}})
err = s.publishEvent(ctx, &schema.ChangesetFailedEvent{Key: changesetKey})
if err != nil {
return nil, fmt.Errorf("could not fail changeset %w", err)
}
return connect.NewResponse(&ftlv1.FailChangesetResponse{}), nil
}

func (s *Service) publishEvent(ctx context.Context, event schema.Event) error {
// Verify the event against the latest known state before publishing
state, err := s.State.View(ctx)
if err != nil {
return fmt.Errorf("failed to get schema state: %w", err)
}
if err := state.VerifyEvent(ctx, event); err != nil {
return fmt.Errorf("invalid event: %w", err)
}
if err := s.State.Publish(ctx, EventWrapper{Event: event}); err != nil {
return fmt.Errorf("failed to publish event: %w", err)
}
return nil
}

func (s *Service) watchModuleChanges(ctx context.Context, subscriptionID string, sendChange func(response *ftlv1.PullSchemaResponse) error) error {
logger := log.FromContext(ctx).Scope(subscriptionID)

Expand Down
3 changes: 2 additions & 1 deletion backend/schemaservice/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/block/ftl/internal/channels"
"github.com/block/ftl/internal/key"
"github.com/block/ftl/internal/log"
"github.com/block/ftl/internal/raft"
"github.com/block/ftl/internal/statemachine"
)

Expand Down Expand Up @@ -274,7 +275,7 @@ func (c *schemaStateMachine) Publish(msg EventWrapper) error {
// TODO: we need to validate the events before they are
// committed to the log
logger.Errorf(err, "failed to apply event")
return nil
return raft.ErrInvalidEvent
}
// Notify all subscribers using broadcaster
c.notifier.Notify(c.runningCtx)
Expand Down
Loading

0 comments on commit 121fd50

Please sign in to comment.