diff --git a/backend/schemaservice/changesetstate_test.go b/backend/schemaservice/changesetstate_test.go index 53a620f40a..24c4a7f2c4 100644 --- a/backend/schemaservice/changesetstate_test.go +++ b/backend/schemaservice/changesetstate_test.go @@ -33,40 +33,40 @@ func TestChangesetState(t *testing.T) { assert.Equal(t, 0, len(view.GetChangesets())) t.Run("changeset must have id", func(t *testing.T) { - err = state.Publish(ctx, &schema.ChangesetCreatedEvent{ + err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.ChangesetCreatedEvent{ Changeset: &schema.Changeset{ CreatedAt: time.Now(), Modules: []*schema.Module{module}, Error: "", }, - }) + }}) assert.Error(t, err) }) t.Run("deployment must must have deployment key", func(t *testing.T) { nm := reflect.DeepCopy(module) nm.Runtime = nil - err = state.Publish(ctx, &schema.ChangesetCreatedEvent{ + err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.ChangesetCreatedEvent{ Changeset: &schema.Changeset{ Key: key.NewChangesetKey(), CreatedAt: time.Now(), Modules: []*schema.Module{nm}, Error: "", }, - }) + }}) assert.Error(t, err) }) changesetKey := key.NewChangesetKey() t.Run("test create changeset", func(t *testing.T) { - err = state.Publish(ctx, &schema.ChangesetCreatedEvent{ + err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.ChangesetCreatedEvent{ Changeset: &schema.Changeset{ Key: changesetKey, CreatedAt: time.Now(), Modules: []*schema.Module{module}, Error: "", }, - }) + }}) assert.NoError(t, err) view, err = state.View(ctx) @@ -83,11 +83,11 @@ func TestChangesetState(t *testing.T) { t.Run("test update module schema", func(t *testing.T) { newState := reflect.DeepCopy(module) newState.Runtime.Deployment.Endpoint = "http://localhost:8080" - err = state.Publish(ctx, &schema.DeploymentSchemaUpdatedEvent{ + err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.DeploymentSchemaUpdatedEvent{ Key: module.Runtime.Deployment.DeploymentKey, Schema: newState, Changeset: changesetKey, - }) + }}) assert.NoError(t, err) view, err = state.View(ctx) assert.NoError(t, err) @@ -101,17 +101,17 @@ func TestChangesetState(t *testing.T) { t.Run("test commit changeset in bad state", func(t *testing.T) { // The deployment is not provisioned yet, this should fail - err = state.Publish(ctx, &schema.ChangesetCommittedEvent{ + err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.ChangesetCommittedEvent{ Key: changesetKey, - }) + }}) assert.Error(t, err) }) t.Run("test prepare changeset in bad state", func(t *testing.T) { // The deployment is not provisioned yet, this should fail - err = state.Publish(ctx, &schema.ChangesetPreparedEvent{ + err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.ChangesetPreparedEvent{ Key: changesetKey, - }) + }}) assert.Error(t, err) }) @@ -119,18 +119,18 @@ func TestChangesetState(t *testing.T) { newState := reflect.DeepCopy(module) newState.Runtime.Deployment.State = schema.DeploymentStateReady newState.Runtime.Deployment.Endpoint = "http://localhost:8080" - err = state.Publish(ctx, &schema.DeploymentSchemaUpdatedEvent{ + err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.DeploymentSchemaUpdatedEvent{ Key: module.Runtime.Deployment.DeploymentKey, Schema: newState, Changeset: changesetKey, - }) + }}) assert.NoError(t, err) view, err = state.View(ctx) assert.NoError(t, err) - err = state.Publish(ctx, &schema.ChangesetPreparedEvent{ + err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.ChangesetPreparedEvent{ Key: changesetKey, - }) + }}) assert.NoError(t, err) view, err = state.View(ctx) assert.NoError(t, err) @@ -140,9 +140,9 @@ func TestChangesetState(t *testing.T) { }) t.Run("test commit changeset", func(t *testing.T) { - err = state.Publish(ctx, &schema.ChangesetCommittedEvent{ + err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.ChangesetCommittedEvent{ Key: changesetKey, - }) + }}) assert.NoError(t, err) view, err = state.View(ctx) assert.NoError(t, err) @@ -152,9 +152,9 @@ func TestChangesetState(t *testing.T) { }) t.Run("test archive first changeset", func(t *testing.T) { - err = state.Publish(ctx, &schema.ChangesetDrainedEvent{ + err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.ChangesetDrainedEvent{ Key: changesetKey, - }) + }}) assert.NoError(t, err) view, err = state.View(ctx) assert.NoError(t, err) @@ -162,9 +162,9 @@ func TestChangesetState(t *testing.T) { assert.Equal(t, schema.ChangesetStateDrained, csd.State) assert.Equal(t, 1, len(view.GetChangesets())) - err = state.Publish(ctx, &schema.ChangesetFinalizedEvent{ + err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.ChangesetFinalizedEvent{ Key: changesetKey, - }) + }}) assert.NoError(t, err) view, err = state.View(ctx) assert.NoError(t, err) diff --git a/backend/schemaservice/schemaservice.go b/backend/schemaservice/schemaservice.go index 6ee9ae52ee..c67b54d749 100644 --- a/backend/schemaservice/schemaservice.go +++ b/backend/schemaservice/schemaservice.go @@ -22,12 +22,17 @@ import ( "github.com/block/ftl/internal/statemachine" ) +type CommonSchemaServiceConfig struct { +} + type Config struct { + CommonSchemaServiceConfig Bind *url.URL `help:"Socket to bind to." default:"http://127.0.0.1:8897" env:"FTL_BIND"` } type Service struct { - State *statemachine.SingleQueryHandle[struct{}, SchemaState, schema.Event] + State *statemachine.SingleQueryHandle[struct{}, SchemaState, EventWrapper] + Config Config } func (s *Service) GetDeployment(ctx context.Context, c *connect.Request[ftlv1.GetDeploymentRequest]) (*connect.Response[ftlv1.GetDeploymentResponse], error) { @@ -48,8 +53,10 @@ func (s *Service) GetDeployment(ctx context.Context, c *connect.Request[ftlv1.Ge var _ ftlv1connect.SchemaServiceHandler = (*Service)(nil) -func New(ctx context.Context) *Service { - return &Service{State: NewInMemorySchemaState(ctx)} +func New(ctx context.Context, handle statemachine.Handle[struct{}, SchemaState, EventWrapper], config Config) *Service { + return &Service{ + State: statemachine.NewSingleQueryHandle(handle, struct{}{}), + } } // Start the SchemaService. Blocks until the context is cancelled. @@ -60,13 +67,17 @@ func Start( logger := log.FromContext(ctx) logger.Debugf("Starting FTL schema service") - svc := New(ctx) - logger.Debugf("Listening on %s", config.Bind) + g, gctx := errgroup.WithContext(ctx) + + // TODO: Enable raft + // in local dev mode, use an inmemory state machine + shard := statemachine.NewLocalHandle(newStateMachine(ctx)) - g, ctx := errgroup.WithContext(ctx) + svc := New(ctx, shard, config) + logger.Debugf("Listening on %s", config.Bind) g.Go(func() error { - return rpc.Serve(ctx, config.Bind, + return rpc.Serve(gctx, config.Bind, rpc.GRPC(ftlv1connect.NewSchemaServiceHandler, svc), rpc.PProf(), ) @@ -102,7 +113,7 @@ func (s *Service) UpdateDeploymentRuntime(ctx context.Context, req *connect.Requ if err != nil { return nil, fmt.Errorf("could not parse event: %w", err) } - err = s.State.Publish(ctx, event) + err = s.State.Publish(ctx, EventWrapper{Event: event}) if err != nil { return nil, fmt.Errorf("could not apply event: %w", err) } @@ -114,7 +125,7 @@ func (s *Service) UpdateSchema(ctx context.Context, req *connect.Request[ftlv1.U if err != nil { return nil, fmt.Errorf("could not parse event: %w", err) } - if err = s.State.Publish(ctx, event); err != nil { + if err = s.State.Publish(ctx, EventWrapper{Event: event}); err != nil { return nil, fmt.Errorf("could not apply event: %w", err) } return connect.NewResponse(&ftlv1.UpdateSchemaResponse{}), nil @@ -163,9 +174,9 @@ func (s *Service) CreateChangeset(ctx context.Context, req *connect.Request[ftlv } // TODO: validate changeset schema with canonical schema - err = s.State.Publish(ctx, &schema.ChangesetCreatedEvent{ + err = s.State.Publish(ctx, EventWrapper{Event: &schema.ChangesetCreatedEvent{ Changeset: changeset, - }) + }}) if err != nil { return nil, fmt.Errorf("could not create changeset %w", err) } @@ -179,9 +190,9 @@ func (s *Service) PrepareChangeset(ctx context.Context, req *connect.Request[ftl if err != nil { return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid changeset key: %w", err)) } - err = s.State.Publish(ctx, &schema.ChangesetPreparedEvent{ + err = s.State.Publish(ctx, EventWrapper{Event: &schema.ChangesetPreparedEvent{ Key: changesetKey, - }) + }}) if err != nil { return nil, fmt.Errorf("could not prepare changeset %w", err) } @@ -194,9 +205,9 @@ func (s *Service) CommitChangeset(ctx context.Context, req *connect.Request[ftlv if err != nil { return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid changeset key: %w", err)) } - err = s.State.Publish(ctx, &schema.ChangesetCommittedEvent{ + err = s.State.Publish(ctx, EventWrapper{Event: &schema.ChangesetCommittedEvent{ Key: changesetKey, - }) + }}) if err != nil { return nil, fmt.Errorf("could not commit changeset %w", err) } @@ -214,9 +225,9 @@ func (s *Service) DrainChangeset(ctx context.Context, req *connect.Request[ftlv1 if err != nil { return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid changeset key: %w", err)) } - err = s.State.Publish(ctx, &schema.ChangesetDrainedEvent{ + err = s.State.Publish(ctx, EventWrapper{Event: &schema.ChangesetDrainedEvent{ Key: changesetKey, - }) + }}) if err != nil { return nil, fmt.Errorf("could not drain changeset %w", err) } @@ -228,9 +239,9 @@ func (s *Service) FinalizeChangeset(ctx context.Context, req *connect.Request[ft if err != nil { return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid changeset key: %w", err)) } - err = s.State.Publish(ctx, &schema.ChangesetFinalizedEvent{ + err = s.State.Publish(ctx, EventWrapper{Event: &schema.ChangesetFinalizedEvent{ Key: changesetKey, - }) + }}) if err != nil { return nil, fmt.Errorf("could not de-provision changeset %w", err) } @@ -245,8 +256,8 @@ func (s *Service) FailChangeset(context.Context, *connect.Request[ftlv1.FailChan func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(response *ftlv1.PullSchemaResponse) error) error { logger := log.FromContext(ctx) - uctx, cancel := context.WithCancelCause(ctx) - defer cancel(fmt.Errorf("schemaservice: stopped watching for module changes: %w", context.Canceled)) + uctx, cancel2 := context.WithCancelCause(ctx) + defer cancel2(fmt.Errorf("schemaservice: stopped watching for module changes: %w", context.Canceled)) stateIter, err := s.State.StateIter(uctx) if err != nil { return fmt.Errorf("failed to get schema state iterator: %w", err) diff --git a/backend/schemaservice/state.go b/backend/schemaservice/state.go index 37a428c022..d068cb2fe9 100644 --- a/backend/schemaservice/state.go +++ b/backend/schemaservice/state.go @@ -36,15 +36,18 @@ func NewSchemaState(validationEnabled bool) SchemaState { } } -func NewInMemorySchemaState(ctx context.Context) *statemachine.SingleQueryHandle[struct{}, SchemaState, schema.Event] { +func NewInMemorySchemaState(ctx context.Context) *statemachine.SingleQueryHandle[struct{}, SchemaState, EventWrapper] { + handle := statemachine.NewLocalHandle(newStateMachine(ctx)) + return statemachine.NewSingleQueryHandle(handle, struct{}{}) +} + +func newStateMachine(ctx context.Context) *schemaStateMachine { notifier := channels.NewNotifier(ctx) - handle := statemachine.NewLocalHandle[struct{}, SchemaState, schema.Event](&schemaStateMachine{ + return &schemaStateMachine{ notifier: notifier, runningCtx: ctx, state: NewSchemaState(true), - }) - - return statemachine.NewSingleQueryHandle(handle, struct{}{}) + } } func (r *SchemaState) Marshal() ([]byte, error) { @@ -183,6 +186,38 @@ func (r *SchemaState) GetProvisioning(module string, cs key.Changeset) (*schema. return nil, fmt.Errorf("provisioning for module %s not found", module) } +type EventWrapper struct { + Event schema.Event +} + +func (e EventWrapper) String() string { + return fmt.Sprintf("EventWrapper{Event: %T}", e.Event) +} + +var _ statemachine.Marshallable = EventWrapper{} + +func (e EventWrapper) MarshalBinary() ([]byte, error) { + pb := schema.EventToProto(e.Event) + bytes, err := proto.Marshal(pb) + if err != nil { + return nil, fmt.Errorf("failed to marshal event: %w", err) + } + return bytes, nil +} + +func (e *EventWrapper) UnmarshalBinary(bts []byte) error { + pb := schemapb.Event{} + if err := proto.Unmarshal(bts, &pb); err != nil { + return fmt.Errorf("error unmarshalling event proto: %w", err) + } + event, err := schema.EventFromProto(&pb) + if err != nil { + return fmt.Errorf("error decoding event proto: %w", err) + } + e.Event = event + return nil +} + type schemaStateMachine struct { state SchemaState @@ -192,8 +227,8 @@ type schemaStateMachine struct { lock sync.Mutex } -var _ statemachine.Snapshotting[struct{}, SchemaState, schema.Event] = &schemaStateMachine{} -var _ statemachine.Listenable[struct{}, SchemaState, schema.Event] = &schemaStateMachine{} +var _ statemachine.Snapshotting[struct{}, SchemaState, EventWrapper] = &schemaStateMachine{} +var _ statemachine.Listenable[struct{}, SchemaState, EventWrapper] = &schemaStateMachine{} func (c *schemaStateMachine) Lookup(key struct{}) (SchemaState, error) { c.lock.Lock() @@ -201,10 +236,10 @@ func (c *schemaStateMachine) Lookup(key struct{}) (SchemaState, error) { return reflect.DeepCopy(c.state), nil } -func (c *schemaStateMachine) Publish(msg schema.Event) error { +func (c *schemaStateMachine) Publish(msg EventWrapper) error { c.lock.Lock() defer c.lock.Unlock() - err := c.state.ApplyEvent(c.runningCtx, msg) + err := c.state.ApplyEvent(c.runningCtx, msg.Event) if err != nil { return fmt.Errorf("update: %w", err) } diff --git a/cmd/ftl/cmd_serve.go b/cmd/ftl/cmd_serve.go index 957b88d6ab..c60c57c967 100644 --- a/cmd/ftl/cmd_serve.go +++ b/cmd/ftl/cmd_serve.go @@ -70,13 +70,13 @@ type serveCommonConfig struct { NoConsole bool `help:"Disable the console."` Ingress ingress.Config `embed:"" prefix:"ingress-"` Timeline timeline.Config `embed:"" prefix:"timeline-"` - SchemaService schemaservice.Config `embed:"" prefix:"schemaservice-"` Console console.Config `embed:"" prefix:"console-"` Lease lease.Config `embed:"" prefix:"lease-"` Admin admin.Config `embed:"" prefix:"admin-"` Recreate bool `help:"Recreate any stateful resources if they already exist." default:"false"` controller.CommonConfig provisioner.CommonProvisionerConfig + schemaservice.CommonSchemaServiceConfig } const ftlRunningErrorMsg = "FTL is already running. Use 'ftl serve --stop' to stop it" @@ -223,10 +223,15 @@ func (s *serveCommonConfig) run( provisionerAddresses = append(provisionerAddresses, bind) } + schemaBind, err := url.Parse("http://localhost:8897") + if err != nil { + return fmt.Errorf("failed to parse bind URL: %w", err) + } + runnerScaling, err := localscaling.NewLocalScaling( ctx, controllerAddresses, - s.SchemaService.Bind, + schemaBind, s.Lease.Bind, projConfig.Path, devMode && !projConfig.DisableIDEIntegration, @@ -244,7 +249,12 @@ func (s *serveCommonConfig) run( schemaCtx := log.ContextWithLogger(ctx, logger.Scope("schemaservice")) wg.Go(func() error { - if err := schemaservice.Start(schemaCtx, s.SchemaService); err != nil { + // TODO: Allocate properly, and support multiple instances + config := schemaservice.Config{ + CommonSchemaServiceConfig: s.CommonSchemaServiceConfig, + Bind: schemaBind, + } + if err := schemaservice.Start(schemaCtx, config); err != nil { logger.Errorf(err, "schemaservice failed: %v", err) return fmt.Errorf("schemaservice failed: %w", err) } diff --git a/internal/raft/cluster.go b/internal/raft/cluster.go index cd36a29a31..8ed58c5041 100644 --- a/internal/raft/cluster.go +++ b/internal/raft/cluster.go @@ -30,22 +30,23 @@ import ( ) type RaftConfig struct { - InitialMembers []string `help:"Initial members"` - ReplicaID uint64 `help:"Node ID" required:""` - DataDir string `help:"Data directory" required:""` - Address string `help:"Address to advertise to other nodes" required:""` - ListenAddress string `help:"Address to listen for incoming traffic. If empty, Address will be used."` + InitialMembers []string `help:"Initial members" env:"RAFT_INITIAL_MEMBERS"` + ReplicaID uint64 `help:"Node ID" required:"" env:"RAFT_REPLICA_ID"` + DataDir string `help:"Data directory" required:"" env:"RAFT_DATA_DIR"` + Address string `help:"Address to advertise to other nodes" required:"" env:"RAFT_ADDRESS"` + ListenAddress string `help:"Address to listen for incoming traffic. If empty, Address will be used." env:"RAFT_LISTEN_ADDRESS"` ControlBind *url.URL `help:"Address to listen for control traffic. If empty, no control listener will be started."` ShardReadyTimeout time.Duration `help:"Timeout for shard to be ready" default:"5s"` Retry retry.RetryConfig `help:"Connection retry configuration" prefix:"retry-" embed:""` ChangesInterval time.Duration `help:"Interval for changes to be checked" default:"10ms"` ChangesTimeout time.Duration `help:"Timeout for changes to be checked" default:"1s"` + QueryTimeout time.Duration `help:"Timeout for queries" default:"5s"` // Raft configuration RTT time.Duration `help:"Estimated average round trip time between nodes" default:"200ms"` ElectionRTT uint64 `help:"Election RTT as a multiple of RTT" default:"10"` HeartbeatRTT uint64 `help:"Heartbeat RTT as a multiple of RTT" default:"1"` - SnapshotEntries uint64 `help:"Snapshot entries" default:"10"` + SnapshotEntries uint64 `help:"Snapshot entries" default:"100"` CompactionOverhead uint64 `help:"Compaction overhead" default:"100"` } @@ -184,6 +185,9 @@ func (s *ShardHandle[Q, R, E]) Publish(ctx context.Context, msg E) error { // Query the state of the shard. func (s *ShardHandle[Q, R, E]) Query(ctx context.Context, query Q) (R, error) { + ctx, cancel := context.WithTimeout(ctx, s.cluster.config.QueryTimeout) + defer cancel() + s.verifyReady() var zero R @@ -209,9 +213,7 @@ func (s *ShardHandle[Q, R, E]) Query(ctx context.Context, query Q) (R, error) { // Note, that this is not guaranteed to receive an event for every change, but // will always receive the latest state of the shard. func (s *ShardHandle[Q, R, E]) StateIter(ctx context.Context, query Q) (iter.Seq[R], error) { - if s.cluster.nh == nil { - panic("cluster not started") - } + s.verifyReady() result := make(chan R, 64) logger := log.FromContext(ctx).Scope("raft") @@ -251,9 +253,7 @@ func (s *ShardHandle[Q, R, E]) StateIter(ctx context.Context, query Q) (iter.Seq s.lastKnownIndex.Store(last) - ctx, cancel := context.WithTimeout(ctx, s.cluster.config.ChangesTimeout) res, err := s.Query(ctx, query) - cancel() if err != nil { logger.Errorf(err, "failed to query shard") diff --git a/internal/raft/cluster_test.go b/internal/raft/cluster_test.go index ac33196e1e..21a9f31818 100644 --- a/internal/raft/cluster_test.go +++ b/internal/raft/cluster_test.go @@ -196,6 +196,7 @@ func testBuilder(t *testing.T, addresses []*net.TCPAddr, id uint64, address stri ShardReadyTimeout: 5 * time.Second, ChangesInterval: 5 * time.Millisecond, ChangesTimeout: 1 * time.Second, + QueryTimeout: 5 * time.Second, Retry: retry.RetryConfig{ Min: 50 * time.Millisecond, Max: 1 * time.Second, @@ -251,7 +252,5 @@ func assertShardValue(ctx context.Context, t *testing.T, expected int64, shards func testContext(t *testing.T) context.Context { ctx := log.ContextWithNewDefaultLogger(context.Background()) - ctx, cancel := context.WithDeadline(ctx, time.Now().Add(60*time.Second)) - t.Cleanup(cancel) return ctx }