Skip to content

Commit

Permalink
fix: remove uneeded events (#4322)
Browse files Browse the repository at this point in the history
We now just have a single runtime update event
  • Loading branch information
stuartwdouglas authored Feb 6, 2025
1 parent 990f106 commit b6d05d9
Show file tree
Hide file tree
Showing 14 changed files with 1,582 additions and 3,873 deletions.
15 changes: 6 additions & 9 deletions backend/schemaservice/changesetstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,9 @@ func TestChangesetState(t *testing.T) {
t.Run("test update module schema", func(t *testing.T) {
newState := reflect.DeepCopy(module)
newState.ModRuntime().ModRunner().Endpoint = "http://localhost:8080"
err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.DeploymentSchemaUpdatedEvent{
Key: module.Runtime.Deployment.DeploymentKey,
Schema: newState,
Changeset: changesetKey,
err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.DeploymentRuntimeEvent{
Payload: &schema.RuntimeElement{Deployment: module.Runtime.Deployment.DeploymentKey, Element: &schema.ModuleRuntimeRunner{Endpoint: "http://localhost:8080"}},
Changeset: &changesetKey,
}})
assert.NoError(t, err)
view, err = state.View(ctx)
Expand Down Expand Up @@ -126,11 +125,9 @@ func TestChangesetState(t *testing.T) {
t.Run("test prepare changeset", func(t *testing.T) {
newState := reflect.DeepCopy(module)
newState.Runtime.Deployment.State = schema.DeploymentStateReady
newState.ModRuntime().ModRunner().Endpoint = "http://localhost:8080"
err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.DeploymentSchemaUpdatedEvent{
Key: module.Runtime.Deployment.DeploymentKey,
Schema: newState,
Changeset: changesetKey,
err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.DeploymentRuntimeEvent{
Payload: &schema.RuntimeElement{Deployment: module.Runtime.Deployment.DeploymentKey, Element: &schema.ModuleRuntimeDeployment{State: schema.DeploymentStateReady}},
Changeset: &changesetKey,
}})
assert.NoError(t, err)
view, err = state.View(ctx)
Expand Down
140 changes: 24 additions & 116 deletions backend/schemaservice/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/block/ftl/common/errors"
"github.com/block/ftl/common/reflect"
"github.com/block/ftl/common/schema"
"github.com/block/ftl/common/slices"
"github.com/block/ftl/internal/key"
"github.com/block/ftl/internal/log"
)
Expand All @@ -26,18 +25,8 @@ func (r SchemaState) ApplyEvent(ctx context.Context, event schema.Event) error {
return fmt.Errorf("invalid event: %w", err)
}
switch e := event.(type) {
case *schema.DeploymentSchemaUpdatedEvent:
return handleDeploymentSchemaUpdatedEvent(r, e)
case *schema.DeploymentReplicasUpdatedEvent:
return handleDeploymentReplicasUpdatedEvent(r, e)
case *schema.VerbRuntimeEvent:
return handleVerbRuntimeEvent(r, e)
case *schema.TopicRuntimeEvent:
return handleTopicRuntimeEvent(r, e)
case *schema.DatabaseRuntimeEvent:
return handleDatabaseRuntimeEvent(r, e)
case *schema.ModuleRuntimeEvent:
return handleModuleRuntimeEvent(ctx, r, e)
case *schema.DeploymentRuntimeEvent:
return handleDeploymentRuntimeEvent(r, e)
case *schema.ChangesetCreatedEvent:
return handleChangesetCreatedEvent(r, e)
case *schema.ChangesetPreparedEvent:
Expand All @@ -55,116 +44,35 @@ func (r SchemaState) ApplyEvent(ctx context.Context, event schema.Event) error {
}
}

func handleDeploymentSchemaUpdatedEvent(t SchemaState, e *schema.DeploymentSchemaUpdatedEvent) error {
if e.Changeset.IsZero() {
// The only reason this is optional is for event extract reasons that should change when it is refactored
return fmt.Errorf("changeset is required")
}
cs, ok := t.changesets[e.Changeset]
if !ok {
return fmt.Errorf("changeset %s not found", e.Key)
}
for i, m := range cs.Modules {
if m.Name == e.Schema.Name {
cs.Modules[i] = e.Schema
return nil
}
}

return fmt.Errorf("module %s not found in changeset %s", e.Schema.Name, e.Changeset)
}

func handleDeploymentReplicasUpdatedEvent(t SchemaState, e *schema.DeploymentReplicasUpdatedEvent) error {
existing, ok := t.deployments[e.Key.Payload.Module]
if !ok {
return fmt.Errorf("deployment %s not found", e.Key)
}
existing.ModRuntime().ModScaling().MinReplicas = int32(e.Replicas)
return nil
}

func handleVerbRuntimeEvent(t SchemaState, e *schema.VerbRuntimeEvent) error {
m, storeEvent, err := t.handleRuntimeEvent(e)
if err != nil {
return err
}
for verb := range slices.FilterVariants[*schema.Verb](m.Decls) {
if verb.Name == e.ID {
if verb.Runtime == nil {
verb.Runtime = &schema.VerbRuntime{}
}

if subscription, ok := e.Subscription.Get(); ok {
verb.Runtime.Subscription = &subscription
func handleDeploymentRuntimeEvent(t SchemaState, e *schema.DeploymentRuntimeEvent) error {
if cs, ok := e.ChangesetKey().Get(); ok {
c, ok := t.changesets[cs]
if !ok {
return fmt.Errorf("changeset %s not found", cs.String())
}
module := e.DeploymentKey().Payload.Module
for _, m := range c.Modules {
if m.Name == module {
err := e.Payload.ApplyToModule(m)
if err != nil {
return fmt.Errorf("error applying runtime event to module %s: %w", module, err)
}
t.changesetEvents[cs] = append(t.changesetEvents[cs], e)
return nil
}
}
}
storeEvent()
return nil
}

func handleTopicRuntimeEvent(t SchemaState, e *schema.TopicRuntimeEvent) error {
m, storeEvent, err := t.handleRuntimeEvent(e)
if err != nil {
return err
}
for topic := range slices.FilterVariants[*schema.Topic](m.Decls) {
if topic.Name == e.ID {
topic.Runtime = e.Payload
storeEvent()
return nil
}
}
return fmt.Errorf("topic %s not found", e.ID)
}

func handleDatabaseRuntimeEvent(t SchemaState, e *schema.DatabaseRuntimeEvent) error {
m, storeEvent, err := t.handleRuntimeEvent(e)
if err != nil {
return err
}
for _, decl := range m.Decls {
if db, ok := decl.(*schema.Database); ok && db.Name == e.ID {
if db.Runtime == nil {
db.Runtime = &schema.DatabaseRuntime{}
for k, m := range t.deployments {
if m.Runtime.Deployment.DeploymentKey == e.DeploymentKey() {
err := e.Payload.ApplyToModule(m)
if err != nil {
return fmt.Errorf("error applying runtime event to module %s: %w", m, err)
}
db.Runtime.Connections = e.Connections
storeEvent()
t.deploymentEvents[k] = append(t.deploymentEvents[k], e)
return nil
}
}
return fmt.Errorf("database %s not found", e.ID)
}

func handleModuleRuntimeEvent(ctx context.Context, t SchemaState, e *schema.ModuleRuntimeEvent) error {
module, storeEvent, err := t.handleRuntimeEvent(e)
if err != nil {
return err
}

if base, ok := e.Base.Get(); ok {
module.ModRuntime().Base = base
}
if scaling, ok := e.Scaling.Get(); ok {
module.ModRuntime().Scaling = &scaling
}
if runner, ok := e.Runner.Get(); ok {
module.ModRuntime().Runner = &runner
}
if deployment, ok := e.Deployment.Get(); ok {
if deployment.State == schema.DeploymentStateUnspecified {
deployment.State = module.Runtime.Deployment.State
} else if deployment.State != module.Runtime.Deployment.State {
// We only allow a few different externally driven state changes
if !(module.Runtime.Deployment.State == schema.DeploymentStateProvisioning && deployment.State == schema.DeploymentStateReady) {
return fmt.Errorf("invalid state transition from %d to %d", module.Runtime.Deployment.State, deployment.State)
}
}
log.FromContext(ctx).Debugf("deployment %s state change %v -> %v", module.Name, module.Runtime.Deployment, deployment)
module.ModRuntime().Deployment = &deployment
}
storeEvent()
return nil
return fmt.Errorf("deployment %s not found", e.DeploymentKey().String())
}

func handleChangesetCreatedEvent(t SchemaState, e *schema.ChangesetCreatedEvent) error {
Expand Down
30 changes: 1 addition & 29 deletions backend/schemaservice/schemaservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"net/url"

"connectrpc.com/connect"
"github.com/alecthomas/types/optional"
"golang.org/x/sync/errgroup"

ftlv1 "github.com/block/ftl/backend/protos/xyz/block/ftl/v1"
Expand Down Expand Up @@ -135,34 +134,7 @@ func (s *Service) UpdateDeploymentRuntime(ctx context.Context, req *connect.Requ
}
changeset = &cs
}
var re schema.Event
switch e := event.Element.(type) {
case *schema.ModuleRuntimeScaling:
re = &schema.ModuleRuntimeEvent{Changeset: changeset, Key: event.Deployment, Scaling: optional.Ptr(e)}
case *schema.ModuleRuntimeDeployment:
re = &schema.ModuleRuntimeEvent{Changeset: changeset, Key: event.Deployment, Deployment: optional.Ptr(e)}
case *schema.ModuleRuntimeRunner:
re = &schema.ModuleRuntimeEvent{Changeset: changeset, Key: event.Deployment, Runner: optional.Ptr(e)}
case *schema.VerbRuntime:
id, ok := event.Name.Get()
if !ok {
return nil, fmt.Errorf("missing name in event")
}
re = &schema.VerbRuntimeEvent{ID: id, Changeset: changeset, Deployment: event.Deployment, Subscription: optional.Ptr(e.Subscription)}
case *schema.TopicRuntime:
id, ok := event.Name.Get()
if !ok {
return nil, fmt.Errorf("missing name in event")
}
re = &schema.TopicRuntimeEvent{ID: id, Changeset: changeset, Deployment: event.Deployment, Payload: e}
case *schema.DatabaseRuntime:
id, ok := event.Name.Get()
if !ok {
return nil, fmt.Errorf("missing name in event")
}
re = &schema.DatabaseRuntimeEvent{ID: id, Changeset: changeset, Deployment: event.Deployment, Connections: e.Connections}
}
err = s.State.Publish(ctx, EventWrapper{Event: re})
err = s.State.Publish(ctx, EventWrapper{Event: &schema.DeploymentRuntimeEvent{Changeset: changeset, Payload: event}})
if err != nil {
return nil, fmt.Errorf("could not apply event: %w", err)
}
Expand Down
39 changes: 5 additions & 34 deletions backend/schemaservice/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
type SchemaState struct {
deployments map[string]*schema.Module
changesets map[key.Changeset]*schema.Changeset
changesetEvents map[key.Changeset][]schema.RuntimeEvent
deploymentEvents map[string][]schema.RuntimeEvent
changesetEvents map[key.Changeset][]*schema.DeploymentRuntimeEvent
deploymentEvents map[string][]*schema.DeploymentRuntimeEvent
validationEnabled bool // Huge hack to allow provisioner to use this for a single module
archivedChangesets []*schema.Changeset
}
Expand All @@ -34,8 +34,8 @@ func NewSchemaState(validationEnabled bool) SchemaState {
return SchemaState{
deployments: map[string]*schema.Module{},
changesets: map[key.Changeset]*schema.Changeset{},
deploymentEvents: map[string][]schema.RuntimeEvent{},
changesetEvents: map[key.Changeset][]schema.RuntimeEvent{},
deploymentEvents: map[string][]*schema.DeploymentRuntimeEvent{},
changesetEvents: map[key.Changeset][]*schema.DeploymentRuntimeEvent{},
archivedChangesets: []*schema.Changeset{},
validationEnabled: validationEnabled,
}
Expand All @@ -58,7 +58,7 @@ func newStateMachine(ctx context.Context) *schemaStateMachine {
func (r *SchemaState) Marshal() ([]byte, error) {
changesets := slices.Collect(maps.Values(r.changesets))
changesets = append(changesets, r.archivedChangesets...)
events := []schema.RuntimeEvent{}
events := []*schema.DeploymentRuntimeEvent{}
for _, e := range r.changesetEvents {
events = append(events, e...)
}
Expand Down Expand Up @@ -208,35 +208,6 @@ func (r *SchemaState) GetProvisioning(module string, cs key.Changeset) (*schema.
return nil, fmt.Errorf("provisioning for module %s not found", module)
}

func (r *SchemaState) handleRuntimeEvent(e schema.RuntimeEvent) (*schema.Module, func(), error) {
if cs, ok := e.ChangesetKey().Get(); ok {
c, ok := r.changesets[cs]
if !ok {
return nil, func() {

}, fmt.Errorf("changeset %s not found", cs.String())
}
module := e.DeploymentKey().Payload.Module
for _, m := range c.Modules {
if m.Name == module {
return m, func() {
r.changesetEvents[cs] = append(r.changesetEvents[cs], e)
}, nil
}
}
}
for k, m := range r.deployments {
if m.Runtime.Deployment.DeploymentKey == e.DeploymentKey() {
return m, func() {
r.deploymentEvents[k] = append(r.deploymentEvents[k], e)
}, nil
}
}
return nil, func() {

}, fmt.Errorf("deployment %s not found", e.DeploymentKey().String())
}

type EventWrapper struct {
Event schema.Event
}
Expand Down
13 changes: 7 additions & 6 deletions backend/schemaservice/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"

"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"

"github.com/block/ftl/common/schema"
"github.com/block/ftl/internal/key"
Expand Down Expand Up @@ -50,11 +49,13 @@ func TestStateMarshallingAfterCommonEvents(t *testing.T) {
},
},
}))
assert.NoError(t, state.ApplyEvent(ctx, &schema.ModuleRuntimeEvent{
Key: deploymentKey,
Changeset: &changesetKey,
Deployment: optional.Some(schema.ModuleRuntimeDeployment{DeploymentKey: deploymentKey, State: schema.DeploymentStateReady}),
Runner: optional.Some(schema.ModuleRuntimeRunner{Endpoint: "http://localhost:6734"}),
assert.NoError(t, state.ApplyEvent(ctx, &schema.DeploymentRuntimeEvent{
Payload: &schema.RuntimeElement{Deployment: deploymentKey, Element: &schema.ModuleRuntimeRunner{Endpoint: "http://localhost:8080"}},
Changeset: &changesetKey,
}))
assert.NoError(t, state.ApplyEvent(ctx, &schema.DeploymentRuntimeEvent{
Payload: &schema.RuntimeElement{Deployment: deploymentKey, Element: &schema.ModuleRuntimeDeployment{State: schema.DeploymentStateReady}},
Changeset: &changesetKey,
}))
assert.NoError(t, state.ApplyEvent(ctx, &schema.ChangesetPreparedEvent{
Key: changesetKey,
Expand Down
Loading

0 comments on commit b6d05d9

Please sign in to comment.