Skip to content

Commit

Permalink
feat: store runtime event changes (#4297)
Browse files Browse the repository at this point in the history
This changes to storing the state change events that update the module
runtime and using this to compute if the schema has changed.

It also directly generates pull schema events instead of generating
intermediate synthetic raft events.
  • Loading branch information
stuartwdouglas authored Feb 5, 2025
1 parent 05ef097 commit 2f0a59f
Show file tree
Hide file tree
Showing 30 changed files with 1,960 additions and 1,652 deletions.
86 changes: 45 additions & 41 deletions backend/protos/xyz/block/ftl/v1/schemaservice.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion backend/protos/xyz/block/ftl/v1/schemaservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ message PullSchemaResponse {

// ChangesetFailed is sent when a changeset becomes canonical.
message ChangesetCommitted {
string key = 1;
ftl.schema.v1.Changeset changeset = 1;
}

// Deployment created is sent when a deployment is new to the listener but is not part of a changeset.
Expand Down
40 changes: 20 additions & 20 deletions backend/provisioner/dev_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ func NewDevProvisioner(postgresPort int, mysqlPort int, recreate bool) *InMemPro
})
}
func provisionMysql(mysqlPort int, recreate bool) InMemResourceProvisionerFn {
return func(ctx context.Context, changeset key.Changeset, moduleName string, res schema.Provisioned) (schema.Event, error) {
return func(ctx context.Context, changeset key.Changeset, deployment key.Deployment, res schema.Provisioned) (schema.Event, error) {
logger := log.FromContext(ctx)

dbName := strcase.ToLowerSnake(moduleName) + "_" + strcase.ToLowerSnake(res.ResourceID())
dbName := strcase.ToLowerSnake(deployment.Payload.Module) + "_" + strcase.ToLowerSnake(res.ResourceID())

logger.Infof("Provisioning mysql database: %s", dbName)

Expand All @@ -57,8 +57,8 @@ func provisionMysql(mysqlPort int, recreate bool) InMemResourceProvisionerFn {
continue
}
return &schema.DatabaseRuntimeEvent{
Module: moduleName,
Changeset: changeset,
Deployment: deployment,
Changeset: &changeset,
ID: res.ResourceID(),
Connections: event,
}, nil
Expand Down Expand Up @@ -104,7 +104,7 @@ func establishMySQLDB(ctx context.Context, mysqlDSN string, dbName string, mysql

func ProvisionPostgresForTest(ctx context.Context, moduleName string, id string) (string, error) {
node := &schema.Database{Name: id + "_test"}
event, err := provisionPostgres(15432, true)(ctx, key.NewChangesetKey(), moduleName, node)
event, err := provisionPostgres(15432, true)(ctx, key.NewChangesetKey(), key.NewDeploymentKey(moduleName), node)
if err != nil {
return "", err
}
Expand All @@ -114,7 +114,7 @@ func ProvisionPostgresForTest(ctx context.Context, moduleName string, id string)

func ProvisionMySQLForTest(ctx context.Context, moduleName string, id string) (string, error) {
node := &schema.Database{Name: id + "_test"}
event, err := provisionMysql(13306, true)(ctx, key.NewChangesetKey(), moduleName, node)
event, err := provisionMysql(13306, true)(ctx, key.NewChangesetKey(), key.NewDeploymentKey(moduleName), node)
if err != nil {
return "", err
}
Expand All @@ -123,10 +123,10 @@ func ProvisionMySQLForTest(ctx context.Context, moduleName string, id string) (s
}

func provisionPostgres(postgresPort int, recreate bool) InMemResourceProvisionerFn {
return func(ctx context.Context, changeset key.Changeset, moduleName string, resource schema.Provisioned) (schema.Event, error) {
return func(ctx context.Context, changeset key.Changeset, deployment key.Deployment, resource schema.Provisioned) (schema.Event, error) {
logger := log.FromContext(ctx)

dbName := strcase.ToLowerSnake(moduleName) + "_" + strcase.ToLowerSnake(resource.ResourceID())
dbName := strcase.ToLowerSnake(deployment.Payload.Module) + "_" + strcase.ToLowerSnake(resource.ResourceID())
logger.Infof("Provisioning postgres database: %s", dbName)

// We assume that the DB has already been started when running in dev mode
Expand Down Expand Up @@ -173,9 +173,9 @@ func provisionPostgres(postgresPort int, recreate bool) InMemResourceProvisioner

dsn := dsn.PostgresDSN(dbName, dsn.Port(postgresPort))
return &schema.DatabaseRuntimeEvent{
ID: resource.ResourceID(),
Module: moduleName,
Changeset: changeset,
ID: resource.ResourceID(),
Deployment: deployment,
Changeset: &changeset,
Connections: &schema.DatabaseRuntimeConnections{
Write: &schema.DSNDatabaseConnector{DSN: dsn},
Read: &schema.DSNDatabaseConnector{DSN: dsn},
Expand All @@ -186,7 +186,7 @@ func provisionPostgres(postgresPort int, recreate bool) InMemResourceProvisioner
}

func provisionTopic() InMemResourceProvisionerFn {
return func(ctx context.Context, changeset key.Changeset, moduleName string, res schema.Provisioned) (schema.Event, error) {
return func(ctx context.Context, changeset key.Changeset, deployment key.Deployment, res schema.Provisioned) (schema.Event, error) {
logger := log.FromContext(ctx)
if err := dev.SetUpRedPanda(ctx); err != nil {
return nil, fmt.Errorf("could not set up redpanda: %w", err)
Expand All @@ -196,7 +196,7 @@ func provisionTopic() InMemResourceProvisionerFn {
panic(fmt.Errorf("unexpected resource type: %T", res))
}

topicID := fmt.Sprintf("%s.%s", moduleName, topic.Name)
topicID := fmt.Sprintf("%s.%s", deployment.Payload.Module, topic.Name)
logger.Infof("Provisioning topic: %s", topicID)

config := sarama.NewConfig()
Expand Down Expand Up @@ -244,9 +244,9 @@ func provisionTopic() InMemResourceProvisionerFn {
}

return &schema.TopicRuntimeEvent{
Module: moduleName,
Changeset: changeset,
ID: res.ResourceID(),
Deployment: deployment,
Changeset: &changeset,
ID: res.ResourceID(),
Payload: &schema.TopicRuntime{
KafkaBrokers: redPandaBrokers,
TopicID: topicID,
Expand All @@ -256,7 +256,7 @@ func provisionTopic() InMemResourceProvisionerFn {
}

func provisionSubscription() InMemResourceProvisionerFn {
return func(ctx context.Context, changeset key.Changeset, moduleName string, res schema.Provisioned) (schema.Event, error) {
return func(ctx context.Context, changeset key.Changeset, deployment key.Deployment, res schema.Provisioned) (schema.Event, error) {
logger := log.FromContext(ctx)
verb, ok := res.(*schema.Verb)
if !ok {
Expand All @@ -265,9 +265,9 @@ func provisionSubscription() InMemResourceProvisionerFn {
for range slices.FilterVariants[*schema.MetadataSubscriber](verb.Metadata) {
logger.Infof("Provisioning subscription for verb: %s", verb.Name)
return &schema.VerbRuntimeEvent{
Module: moduleName,
Changeset: changeset,
ID: verb.Name,
Deployment: deployment,
Changeset: &changeset,
ID: verb.Name,
Subscription: optional.Some(schema.VerbRuntimeSubscription{
KafkaBrokers: redPandaBrokers,
}),
Expand Down
4 changes: 2 additions & 2 deletions backend/provisioner/inmem_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type RuntimeEvent struct {
Verb *schema.VerbRuntimeEvent
}

type InMemResourceProvisionerFn func(ctx context.Context, changeset key.Changeset, module string, resource schema.Provisioned) (schema.Event, error)
type InMemResourceProvisionerFn func(ctx context.Context, changeset key.Changeset, deployment key.Deployment, resource schema.Provisioned) (schema.Event, error)

// InMemProvisioner for running an in memory provisioner, constructing all resources concurrently
//
Expand Down Expand Up @@ -126,7 +126,7 @@ func (d *InMemProvisioner) Provision(ctx context.Context, req *connect.Request[p
step := &inMemProvisioningStep{Done: atomic.New(false)}
task.steps = append(task.steps, step)
go func() {
event, err := handler(ctx, parsed, desiredModule.Name, desired)
event, err := handler(ctx, parsed, desiredModule.Runtime.Deployment.DeploymentKey, desired)
if err != nil {
step.Err = err
logger.Errorf(err, "failed to provision resource %s:%s", resource.Kind, desired.ResourceID())
Expand Down
Loading

0 comments on commit 2f0a59f

Please sign in to comment.