Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: changesets #4253

Merged
merged 16 commits into from
Feb 1, 2025
8 changes: 8 additions & 0 deletions backend/admin/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,14 @@ var testSchema = schema.MustValidate(&schema.Schema{
type mockSchemaRetriever struct {
}

func (d *mockSchemaRetriever) GetCanonicalSchema(ctx context.Context) (*schema.Schema, error) {
return d.GetActiveSchema(ctx)
}

func (d *mockSchemaRetriever) GetLatestSchema(ctx context.Context) (*schema.Schema, error) {
return d.GetActiveSchema(ctx)
}

func (d *mockSchemaRetriever) GetActiveSchema(ctx context.Context) (*schema.Schema, error) {
return testSchema, nil
}
Expand Down
7 changes: 6 additions & 1 deletion backend/admin/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ func NewLocalClient(cm *manager.Manager[cf.Configuration], sm *manager.Manager[c
return &localClient{NewAdminService(cm, sm, &diskSchemaRetriever{})}
}

func (s *diskSchemaRetriever) GetActiveSchema(ctx context.Context) (*schema.Schema, error) {
func (s *diskSchemaRetriever) GetCanonicalSchema(ctx context.Context) (*schema.Schema, error) {
// disk schema can not tell canonical schema from latest schema
return s.GetLatestSchema(ctx)
}

func (s *diskSchemaRetriever) GetLatestSchema(ctx context.Context) (*schema.Schema, error) {
path, ok := projectconfig.DefaultConfigPath().Get()
if !ok {
return nil, fmt.Errorf("no project config path available")
Expand Down
4 changes: 2 additions & 2 deletions backend/admin/local_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
func getDiskSchema(t testing.TB, ctx context.Context) (*schema.Schema, error) {
t.Helper()
dsr := &diskSchemaRetriever{}
return dsr.GetActiveSchema(ctx)
return dsr.GetLatestSchema(ctx)
}

func TestDiskSchemaRetrieverWithBuildArtefact(t *testing.T) {
Expand Down Expand Up @@ -68,7 +68,7 @@ func TestAdminNoValidationWithNoSchema(t *testing.T) {
assert.NoError(t, err)

dsr := &diskSchemaRetriever{deployRoot: optional.Some(string(t.TempDir()))}
_, err = dsr.GetActiveSchema(ctx)
_, err = dsr.GetLatestSchema(ctx)
assert.Error(t, err)

admin := NewAdminService(cm, sm, dsr)
Expand Down
19 changes: 13 additions & 6 deletions backend/admin/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ type AdminService struct {
var _ ftlv1connect.AdminServiceHandler = (*AdminService)(nil)

type SchemaRetriever interface {
// BindAllocator is required if the schema is retrieved from disk using language plugins
GetActiveSchema(ctx context.Context) (*schema.Schema, error)
// TODO: docs
GetCanonicalSchema(ctx context.Context) (*schema.Schema, error)
GetLatestSchema(ctx context.Context) (*schema.Schema, error)
}

func NewSchemaRetreiver(source schemaeventsource.EventSource) SchemaRetriever {
Expand All @@ -52,8 +53,13 @@ type streamSchemaRetriever struct {
source schemaeventsource.EventSource
}

func (c streamSchemaRetriever) GetActiveSchema(ctx context.Context) (*schema.Schema, error) {
view := c.source.View()
func (c streamSchemaRetriever) GetCanonicalSchema(ctx context.Context) (*schema.Schema, error) {
view := c.source.CanonicalView()
return &schema.Schema{Modules: view.Modules}, nil
}

func (c streamSchemaRetriever) GetLatestSchema(ctx context.Context) (*schema.Schema, error) {
view := c.source.LatestView()
return &schema.Schema{Modules: view.Modules}, nil
}

Expand Down Expand Up @@ -286,7 +292,7 @@ func (s *AdminService) validateAgainstSchema(ctx context.Context, isSecret bool,
}

// If we can't retrieve an active schema, skip validation.
sch, err := s.schr.GetActiveSchema(ctx)
sch, err := s.schr.GetLatestSchema(ctx)
if err != nil {
logger.Debugf("skipping validation; could not get the active schema: %v", err)
return nil
Expand Down Expand Up @@ -330,7 +336,8 @@ func (s *AdminService) validateAgainstSchema(ctx context.Context, isSecret bool,

func (s *AdminService) ResetSubscription(ctx context.Context, req *connect.Request[ftlv1.ResetSubscriptionRequest]) (*connect.Response[ftlv1.ResetSubscriptionResponse], error) {
// Find nodes in schema
sch, err := s.schr.GetActiveSchema(ctx)
// TODO: we really want all deployments for a module... not just latest... Use canonical and check ActiveChangeset?
sch, err := s.schr.GetCanonicalSchema(ctx)
if err != nil {
return nil, fmt.Errorf("could not get the active schema: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions backend/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ func verbSchemaString(sch *schema.Schema, verb *schema.Verb) (string, error) {
}

func (s *service) GetModules(ctx context.Context, req *connect.Request[consolepb.GetModulesRequest]) (*connect.Response[consolepb.GetModulesResponse], error) {
sch := s.schemaEventSource.View()
sch := s.schemaEventSource.LatestView()

allowed := map[string]bool{}
var modules []*consolepb.Module
for _, mod := range sch.Modules {
if mod.GetRuntime().GetDeployment().GetDeploymentKey().IsZero() {
if mod.GetRuntime().GetDeployment().GetDeploymentKey().IsZero() || mod.GetRuntime().GetDeployment().Endpoint == "" {
continue
}
allowed[mod.Name] = true
Expand Down Expand Up @@ -411,7 +411,7 @@ func (s *service) filterDeployments(unfilteredDeployments *schema.Schema) []*sch
}

func (s *service) sendStreamModulesResp(ctx context.Context, stream *connect.ServerStream[consolepb.StreamModulesResponse]) error {
unfilteredDeployments := s.schemaEventSource.View()
unfilteredDeployments := s.schemaEventSource.LatestView()

deployments := s.filterDeployments(unfilteredDeployments)
sch := &schema.Schema{
Expand Down
40 changes: 16 additions & 24 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type Config struct {
RunnerTimeout time.Duration `help:"Runner heartbeat timeout." default:"10s"`
ControllerTimeout time.Duration `help:"Controller heartbeat timeout." default:"10s"`
DeploymentReservationTimeout time.Duration `help:"Deployment reservation timeout." default:"120s"`
ModuleUpdateFrequency time.Duration `help:"Frequency to send module updates." default:"30s"`
ModuleUpdateFrequency time.Duration `help:"Frequency to send module updates." default:"1s"` //TODO: FIX this, this should be based on streaming events, 1s is a temp workaround for the lack of dependencies within changesets
ArtefactChunkSize int `help:"Size of each chunk streamed to the client." default:"1048576"`
CommonConfig
}
Expand Down Expand Up @@ -159,6 +159,9 @@ func New(
config Config,
devel bool,
) (*Service, error) {
logger := log.FromContext(ctx)
logger = logger.Scope("controller")
ctx = log.ContextWithLogger(ctx, logger)
controllerKey := config.Key
if config.Key.IsZero() {
controllerKey = key.NewControllerKey(config.Bind.Hostname(), config.Bind.Port())
Expand Down Expand Up @@ -233,6 +236,7 @@ func New(
return svc, nil
}

// ProcessList lists "processes" running on the cluster.
func (s *Service) ProcessList(ctx context.Context, req *connect.Request[ftlv1.ProcessListRequest]) (*connect.Response[ftlv1.ProcessListResponse], error) {
currentState, err := s.runnerState.View(ctx)
if err != nil {
Expand Down Expand Up @@ -323,11 +327,15 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR
}
var deployments []*ftlv1.StatusResponse_Deployment
for key, deployment := range activeDeployments {
var minReplicas int32
if deployment.Runtime != nil && deployment.Runtime.Scaling != nil {
minReplicas = deployment.Runtime.Scaling.MinReplicas
}
deployments = append(deployments, &ftlv1.StatusResponse_Deployment{
Key: key,
Language: deployment.Runtime.Base.Language,
Name: deployment.Name,
MinReplicas: deployment.Runtime.Scaling.MinReplicas,
MinReplicas: minReplicas,
Replicas: replicas[key],
Schema: deployment.ToProto(),
})
Expand All @@ -345,26 +353,9 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR
return connect.NewResponse(resp), nil
}

func (s *Service) UpdateDeploy(ctx context.Context, req *connect.Request[ftlv1.UpdateDeployRequest]) (response *connect.Response[ftlv1.UpdateDeployResponse], err error) {
deploymentKey, err := key.ParseDeploymentKey(req.Msg.DeploymentKey)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid deployment key: %w", err))
}

logger := s.getDeploymentLogger(ctx, deploymentKey)
logger.Debugf("Update deployment for: %s", deploymentKey)
if req.Msg.MinReplicas != nil {
err = s.setDeploymentReplicas(ctx, deploymentKey, int(*req.Msg.MinReplicas))
if err != nil {
logger.Errorf(err, "Could not set deployment replicas: %s", deploymentKey)
return nil, fmt.Errorf("could not set deployment replicas: %w", err)
}
}
return connect.NewResponse(&ftlv1.UpdateDeployResponse{}), nil
}

func (s *Service) setDeploymentReplicas(ctx context.Context, key key.Deployment, minReplicas int) (err error) {
deployments, err := s.schemaClient.GetDeployments(ctx, connect.NewRequest(&ftlv1.GetDeploymentsRequest{}))

if err != nil {
return fmt.Errorf("failed to get schema deployments: %w", err)
}
Expand Down Expand Up @@ -781,17 +772,17 @@ func (s *Service) GetDeploymentContext(ctx context.Context, req *connect.Request
configs := configsResp.Msg.Values
routeTable := map[string]string{}
for _, module := range callableModuleNames {
if module == deployment.Name {
continue
}
deployment, ok := routeView.GetDeployment(module).Get()
if !ok {
continue
}
if route, ok := routeView.Get(deployment).Get(); ok {
if route, ok := routeView.Get(deployment).Get(); ok && route.String() != "" {
routeTable[deployment.String()] = route.String()
}
}
if !deployment.GetRuntime().GetDeployment().GetDeploymentKey().IsZero() {
routeTable[key.String()] = deployment.Runtime.Deployment.Endpoint
}

secretsResp, err := s.adminClient.MapSecretsForModule(ctx, &connect.Request[ftlv1.MapSecretsForModuleRequest]{Msg: &ftlv1.MapSecretsForModuleRequest{Module: module}})
if err != nil {
Expand Down Expand Up @@ -1024,6 +1015,7 @@ func (s *Service) GetArtefactDiffs(ctx context.Context, req *connect.Request[ftl

func (s *Service) UploadArtefact(ctx context.Context, req *connect.Request[ftlv1.UploadArtefactRequest]) (*connect.Response[ftlv1.UploadArtefactResponse], error) {
logger := log.FromContext(ctx)
logger.Debugf("Uploading artefact")
digest, err := s.storage.Upload(ctx, artefacts.Artefact{Content: req.Msg.Content})
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions backend/controller/observability/calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (m *CallMetrics) BeginSpan(ctx context.Context, verb *schemapb.Ref) (contex
}
return observability.AddSpanToLogger(m.callTracer.Start(ctx, callMeterName, trace.WithAttributes(attrs...)))
}

func (m *CallMetrics) Request(ctx context.Context, verb *schemapb.Ref, startTime time.Time, maybeFailureMode optional.Option[string]) {
attrs := []attribute.KeyValue{
attribute.String(observability.ModuleNameAttribute, verb.Module),
Expand Down
1 change: 1 addition & 0 deletions backend/controller/sql/testdata/go/database/types.ftl.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions backend/cron/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,9 @@ func updateCronJobs(ctx context.Context, cronJobs map[string][]cronJob, change s
logger := log.FromContext(ctx).Scope("cron")
switch change := change.(type) {
case schemaeventsource.EventRemove:
logger.Debugf("Removing cron jobs for module %s", change.Module.Name)
delete(cronJobs, change.Module.Name)
// TODO: revisit this
// logger.Debugf("Removing cron jobs for module %s", change.Module.Name)
// delete(cronJobs, change.Module.Name)

case schemaeventsource.EventUpsert:
logger.Debugf("Updated cron jobs for module %s", change.Module.Name)
Expand All @@ -165,6 +166,7 @@ func updateCronJobs(ctx context.Context, cronJobs map[string][]cronJob, change s
}
logger.Debugf("Adding %d cron jobs for module %s", len(moduleJobs), change.Module.Name)
cronJobs[change.Module.Name] = moduleJobs
default:
}
return nil
}
Expand Down
4 changes: 1 addition & 3 deletions backend/cron/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"golang.org/x/sync/errgroup"

"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"

ftlv1 "github.com/block/ftl/backend/protos/xyz/block/ftl/v1"
schemapb "github.com/block/ftl/common/protos/xyz/block/ftl/schema/v1"
Expand Down Expand Up @@ -64,8 +63,7 @@ func TestCron(t *testing.T) {
},
}
eventSource.Publish(schemaeventsource.EventUpsert{
Deployment: optional.Some(key.NewDeploymentKey("echo")),
Module: module,
Module: module,
})

ctx := log.ContextWithLogger(context.Background(), log.Configure(os.Stderr, log.Config{Level: log.Trace}))
Expand Down
4 changes: 2 additions & 2 deletions backend/ingress/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,13 @@ func TestIngress(t *testing.T) {
Runtime: &schema.ModuleRuntime{
Deployment: &schema.ModuleRuntimeDeployment{
DeploymentKey: key.NewDeploymentKey("test"),
Endpoint: "http://localhost:8080",
},
},
}
// Publish the test module to the event source
eventSource.Publish(schemaeventsource.EventUpsert{
Module: testModule,
Deployment: optional.Some(key.NewDeploymentKey("test")),
Module: testModule,
})

svc := &service{
Expand Down
2 changes: 1 addition & 1 deletion backend/ingress/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func syncView(ctx context.Context, schemaEventSource schemaeventsource.EventSour
logger.Debugf("Starting routing sync from schema")
go func() {
for event := range channels.IterContext(ctx, schemaEventSource.Events()) {
state := extractIngressRoutingEntries(event.Schema())
state := extractIngressRoutingEntries(event.GetCanonical())
out.Store(state)
}
}()
Expand Down
Loading
Loading