Skip to content

Commit

Permalink
chore: wrap events to a marshallable wrapper (#4291)
Browse files Browse the repository at this point in the history
preparation for enabling Raft
  • Loading branch information
jvmakine authored Feb 4, 2025
1 parent c1c7550 commit 14e56ec
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 68 deletions.
44 changes: 22 additions & 22 deletions backend/schemaservice/changesetstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,40 +33,40 @@ func TestChangesetState(t *testing.T) {
assert.Equal(t, 0, len(view.GetChangesets()))

t.Run("changeset must have id", func(t *testing.T) {
err = state.Publish(ctx, &schema.ChangesetCreatedEvent{
err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.ChangesetCreatedEvent{
Changeset: &schema.Changeset{
CreatedAt: time.Now(),
Modules: []*schema.Module{module},
Error: "",
},
})
}})
assert.Error(t, err)
})

t.Run("deployment must must have deployment key", func(t *testing.T) {
nm := reflect.DeepCopy(module)
nm.Runtime = nil
err = state.Publish(ctx, &schema.ChangesetCreatedEvent{
err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.ChangesetCreatedEvent{
Changeset: &schema.Changeset{
Key: key.NewChangesetKey(),
CreatedAt: time.Now(),
Modules: []*schema.Module{nm},
Error: "",
},
})
}})
assert.Error(t, err)
})

changesetKey := key.NewChangesetKey()
t.Run("test create changeset", func(t *testing.T) {
err = state.Publish(ctx, &schema.ChangesetCreatedEvent{
err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.ChangesetCreatedEvent{
Changeset: &schema.Changeset{
Key: changesetKey,
CreatedAt: time.Now(),
Modules: []*schema.Module{module},
Error: "",
},
})
}})

assert.NoError(t, err)
view, err = state.View(ctx)
Expand All @@ -83,11 +83,11 @@ func TestChangesetState(t *testing.T) {
t.Run("test update module schema", func(t *testing.T) {
newState := reflect.DeepCopy(module)
newState.Runtime.Deployment.Endpoint = "http://localhost:8080"
err = state.Publish(ctx, &schema.DeploymentSchemaUpdatedEvent{
err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.DeploymentSchemaUpdatedEvent{
Key: module.Runtime.Deployment.DeploymentKey,
Schema: newState,
Changeset: changesetKey,
})
}})
assert.NoError(t, err)
view, err = state.View(ctx)
assert.NoError(t, err)
Expand All @@ -101,36 +101,36 @@ func TestChangesetState(t *testing.T) {

t.Run("test commit changeset in bad state", func(t *testing.T) {
// The deployment is not provisioned yet, this should fail
err = state.Publish(ctx, &schema.ChangesetCommittedEvent{
err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.ChangesetCommittedEvent{
Key: changesetKey,
})
}})
assert.Error(t, err)
})

t.Run("test prepare changeset in bad state", func(t *testing.T) {
// The deployment is not provisioned yet, this should fail
err = state.Publish(ctx, &schema.ChangesetPreparedEvent{
err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.ChangesetPreparedEvent{
Key: changesetKey,
})
}})
assert.Error(t, err)
})

t.Run("test prepare changeset", func(t *testing.T) {
newState := reflect.DeepCopy(module)
newState.Runtime.Deployment.State = schema.DeploymentStateReady
newState.Runtime.Deployment.Endpoint = "http://localhost:8080"
err = state.Publish(ctx, &schema.DeploymentSchemaUpdatedEvent{
err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.DeploymentSchemaUpdatedEvent{
Key: module.Runtime.Deployment.DeploymentKey,
Schema: newState,
Changeset: changesetKey,
})
}})
assert.NoError(t, err)
view, err = state.View(ctx)
assert.NoError(t, err)

err = state.Publish(ctx, &schema.ChangesetPreparedEvent{
err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.ChangesetPreparedEvent{
Key: changesetKey,
})
}})
assert.NoError(t, err)
view, err = state.View(ctx)
assert.NoError(t, err)
Expand All @@ -140,9 +140,9 @@ func TestChangesetState(t *testing.T) {
})

t.Run("test commit changeset", func(t *testing.T) {
err = state.Publish(ctx, &schema.ChangesetCommittedEvent{
err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.ChangesetCommittedEvent{
Key: changesetKey,
})
}})
assert.NoError(t, err)
view, err = state.View(ctx)
assert.NoError(t, err)
Expand All @@ -152,19 +152,19 @@ func TestChangesetState(t *testing.T) {
})

t.Run("test archive first changeset", func(t *testing.T) {
err = state.Publish(ctx, &schema.ChangesetDrainedEvent{
err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.ChangesetDrainedEvent{
Key: changesetKey,
})
}})
assert.NoError(t, err)
view, err = state.View(ctx)
assert.NoError(t, err)
csd := changeset(t, view)
assert.Equal(t, schema.ChangesetStateDrained, csd.State)
assert.Equal(t, 1, len(view.GetChangesets()))

err = state.Publish(ctx, &schema.ChangesetFinalizedEvent{
err = state.Publish(ctx, schemaservice.EventWrapper{Event: &schema.ChangesetFinalizedEvent{
Key: changesetKey,
})
}})
assert.NoError(t, err)
view, err = state.View(ctx)
assert.NoError(t, err)
Expand Down
53 changes: 32 additions & 21 deletions backend/schemaservice/schemaservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@ import (
"github.com/block/ftl/internal/statemachine"
)

type CommonSchemaServiceConfig struct {
}

type Config struct {
CommonSchemaServiceConfig
Bind *url.URL `help:"Socket to bind to." default:"http://127.0.0.1:8897" env:"FTL_BIND"`
}

type Service struct {
State *statemachine.SingleQueryHandle[struct{}, SchemaState, schema.Event]
State *statemachine.SingleQueryHandle[struct{}, SchemaState, EventWrapper]
Config Config
}

func (s *Service) GetDeployment(ctx context.Context, c *connect.Request[ftlv1.GetDeploymentRequest]) (*connect.Response[ftlv1.GetDeploymentResponse], error) {
Expand All @@ -48,8 +53,10 @@ func (s *Service) GetDeployment(ctx context.Context, c *connect.Request[ftlv1.Ge

var _ ftlv1connect.SchemaServiceHandler = (*Service)(nil)

func New(ctx context.Context) *Service {
return &Service{State: NewInMemorySchemaState(ctx)}
func New(ctx context.Context, handle statemachine.Handle[struct{}, SchemaState, EventWrapper], config Config) *Service {
return &Service{
State: statemachine.NewSingleQueryHandle(handle, struct{}{}),
}
}

// Start the SchemaService. Blocks until the context is cancelled.
Expand All @@ -60,13 +67,17 @@ func Start(
logger := log.FromContext(ctx)
logger.Debugf("Starting FTL schema service")

svc := New(ctx)
logger.Debugf("Listening on %s", config.Bind)
g, gctx := errgroup.WithContext(ctx)

// TODO: Enable raft
// in local dev mode, use an inmemory state machine
shard := statemachine.NewLocalHandle(newStateMachine(ctx))

g, ctx := errgroup.WithContext(ctx)
svc := New(ctx, shard, config)
logger.Debugf("Listening on %s", config.Bind)

g.Go(func() error {
return rpc.Serve(ctx, config.Bind,
return rpc.Serve(gctx, config.Bind,
rpc.GRPC(ftlv1connect.NewSchemaServiceHandler, svc),
rpc.PProf(),
)
Expand Down Expand Up @@ -102,7 +113,7 @@ func (s *Service) UpdateDeploymentRuntime(ctx context.Context, req *connect.Requ
if err != nil {
return nil, fmt.Errorf("could not parse event: %w", err)
}
err = s.State.Publish(ctx, event)
err = s.State.Publish(ctx, EventWrapper{Event: event})
if err != nil {
return nil, fmt.Errorf("could not apply event: %w", err)
}
Expand All @@ -114,7 +125,7 @@ func (s *Service) UpdateSchema(ctx context.Context, req *connect.Request[ftlv1.U
if err != nil {
return nil, fmt.Errorf("could not parse event: %w", err)
}
if err = s.State.Publish(ctx, event); err != nil {
if err = s.State.Publish(ctx, EventWrapper{Event: event}); err != nil {
return nil, fmt.Errorf("could not apply event: %w", err)
}
return connect.NewResponse(&ftlv1.UpdateSchemaResponse{}), nil
Expand Down Expand Up @@ -163,9 +174,9 @@ func (s *Service) CreateChangeset(ctx context.Context, req *connect.Request[ftlv
}

// TODO: validate changeset schema with canonical schema
err = s.State.Publish(ctx, &schema.ChangesetCreatedEvent{
err = s.State.Publish(ctx, EventWrapper{Event: &schema.ChangesetCreatedEvent{
Changeset: changeset,
})
}})
if err != nil {
return nil, fmt.Errorf("could not create changeset %w", err)
}
Expand All @@ -179,9 +190,9 @@ func (s *Service) PrepareChangeset(ctx context.Context, req *connect.Request[ftl
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid changeset key: %w", err))
}
err = s.State.Publish(ctx, &schema.ChangesetPreparedEvent{
err = s.State.Publish(ctx, EventWrapper{Event: &schema.ChangesetPreparedEvent{
Key: changesetKey,
})
}})
if err != nil {
return nil, fmt.Errorf("could not prepare changeset %w", err)
}
Expand All @@ -194,9 +205,9 @@ func (s *Service) CommitChangeset(ctx context.Context, req *connect.Request[ftlv
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid changeset key: %w", err))
}
err = s.State.Publish(ctx, &schema.ChangesetCommittedEvent{
err = s.State.Publish(ctx, EventWrapper{Event: &schema.ChangesetCommittedEvent{
Key: changesetKey,
})
}})
if err != nil {
return nil, fmt.Errorf("could not commit changeset %w", err)
}
Expand All @@ -214,9 +225,9 @@ func (s *Service) DrainChangeset(ctx context.Context, req *connect.Request[ftlv1
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid changeset key: %w", err))
}
err = s.State.Publish(ctx, &schema.ChangesetDrainedEvent{
err = s.State.Publish(ctx, EventWrapper{Event: &schema.ChangesetDrainedEvent{
Key: changesetKey,
})
}})
if err != nil {
return nil, fmt.Errorf("could not drain changeset %w", err)
}
Expand All @@ -228,9 +239,9 @@ func (s *Service) FinalizeChangeset(ctx context.Context, req *connect.Request[ft
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid changeset key: %w", err))
}
err = s.State.Publish(ctx, &schema.ChangesetFinalizedEvent{
err = s.State.Publish(ctx, EventWrapper{Event: &schema.ChangesetFinalizedEvent{
Key: changesetKey,
})
}})
if err != nil {
return nil, fmt.Errorf("could not de-provision changeset %w", err)
}
Expand All @@ -245,8 +256,8 @@ func (s *Service) FailChangeset(context.Context, *connect.Request[ftlv1.FailChan
func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(response *ftlv1.PullSchemaResponse) error) error {
logger := log.FromContext(ctx)

uctx, cancel := context.WithCancelCause(ctx)
defer cancel(fmt.Errorf("schemaservice: stopped watching for module changes: %w", context.Canceled))
uctx, cancel2 := context.WithCancelCause(ctx)
defer cancel2(fmt.Errorf("schemaservice: stopped watching for module changes: %w", context.Canceled))
stateIter, err := s.State.StateIter(uctx)
if err != nil {
return fmt.Errorf("failed to get schema state iterator: %w", err)
Expand Down
53 changes: 44 additions & 9 deletions backend/schemaservice/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,18 @@ func NewSchemaState(validationEnabled bool) SchemaState {
}
}

func NewInMemorySchemaState(ctx context.Context) *statemachine.SingleQueryHandle[struct{}, SchemaState, schema.Event] {
func NewInMemorySchemaState(ctx context.Context) *statemachine.SingleQueryHandle[struct{}, SchemaState, EventWrapper] {
handle := statemachine.NewLocalHandle(newStateMachine(ctx))
return statemachine.NewSingleQueryHandle(handle, struct{}{})
}

func newStateMachine(ctx context.Context) *schemaStateMachine {
notifier := channels.NewNotifier(ctx)
handle := statemachine.NewLocalHandle[struct{}, SchemaState, schema.Event](&schemaStateMachine{
return &schemaStateMachine{
notifier: notifier,
runningCtx: ctx,
state: NewSchemaState(true),
})

return statemachine.NewSingleQueryHandle(handle, struct{}{})
}
}

func (r *SchemaState) Marshal() ([]byte, error) {
Expand Down Expand Up @@ -183,6 +186,38 @@ func (r *SchemaState) GetProvisioning(module string, cs key.Changeset) (*schema.
return nil, fmt.Errorf("provisioning for module %s not found", module)
}

type EventWrapper struct {
Event schema.Event
}

func (e EventWrapper) String() string {
return fmt.Sprintf("EventWrapper{Event: %T}", e.Event)
}

var _ statemachine.Marshallable = EventWrapper{}

func (e EventWrapper) MarshalBinary() ([]byte, error) {
pb := schema.EventToProto(e.Event)
bytes, err := proto.Marshal(pb)
if err != nil {
return nil, fmt.Errorf("failed to marshal event: %w", err)
}
return bytes, nil
}

func (e *EventWrapper) UnmarshalBinary(bts []byte) error {
pb := schemapb.Event{}
if err := proto.Unmarshal(bts, &pb); err != nil {
return fmt.Errorf("error unmarshalling event proto: %w", err)
}
event, err := schema.EventFromProto(&pb)
if err != nil {
return fmt.Errorf("error decoding event proto: %w", err)
}
e.Event = event
return nil
}

type schemaStateMachine struct {
state SchemaState

Expand All @@ -192,19 +227,19 @@ type schemaStateMachine struct {
lock sync.Mutex
}

var _ statemachine.Snapshotting[struct{}, SchemaState, schema.Event] = &schemaStateMachine{}
var _ statemachine.Listenable[struct{}, SchemaState, schema.Event] = &schemaStateMachine{}
var _ statemachine.Snapshotting[struct{}, SchemaState, EventWrapper] = &schemaStateMachine{}
var _ statemachine.Listenable[struct{}, SchemaState, EventWrapper] = &schemaStateMachine{}

func (c *schemaStateMachine) Lookup(key struct{}) (SchemaState, error) {
c.lock.Lock()
defer c.lock.Unlock()
return reflect.DeepCopy(c.state), nil
}

func (c *schemaStateMachine) Publish(msg schema.Event) error {
func (c *schemaStateMachine) Publish(msg EventWrapper) error {
c.lock.Lock()
defer c.lock.Unlock()
err := c.state.ApplyEvent(c.runningCtx, msg)
err := c.state.ApplyEvent(c.runningCtx, msg.Event)
if err != nil {
return fmt.Errorf("update: %w", err)
}
Expand Down
Loading

0 comments on commit 14e56ec

Please sign in to comment.