Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Old] feat: Changesets #4197

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion backend/admin/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ func NewLocalClient(cm *manager.Manager[cf.Configuration], sm *manager.Manager[c
return &localClient{NewAdminService(cm, sm, &diskSchemaRetriever{})}
}

func (s *diskSchemaRetriever) GetActiveSchema(ctx context.Context) (*schema.Schema, error) {
func (s *diskSchemaRetriever) GetCanonicalSchema(ctx context.Context) (*schema.Schema, error) {
// disk schema can not tell canonical schema from latest schema
return s.GetLatestSchema(ctx)
}

func (s *diskSchemaRetriever) GetLatestSchema(ctx context.Context) (*schema.Schema, error) {
path, ok := projectconfig.DefaultConfigPath().Get()
if !ok {
return nil, fmt.Errorf("no project config path available")
Expand Down
19 changes: 13 additions & 6 deletions backend/admin/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ type AdminService struct {
var _ ftlv1connect.AdminServiceHandler = (*AdminService)(nil)

type SchemaRetriever interface {
// BindAllocator is required if the schema is retrieved from disk using language plugins
GetActiveSchema(ctx context.Context) (*schema.Schema, error)
// TODO: docs
GetCanonicalSchema(ctx context.Context) (*schema.Schema, error)
GetLatestSchema(ctx context.Context) (*schema.Schema, error)
}

func NewSchemaRetreiver(source schemaeventsource.EventSource) SchemaRetriever {
Expand All @@ -52,8 +53,13 @@ type streamSchemaRetriever struct {
source schemaeventsource.EventSource
}

func (c streamSchemaRetriever) GetActiveSchema(ctx context.Context) (*schema.Schema, error) {
view := c.source.View()
func (c streamSchemaRetriever) GetCanonicalSchema(ctx context.Context) (*schema.Schema, error) {
view := c.source.CanonicalView()
return &schema.Schema{Modules: view.Modules}, nil
}

func (c streamSchemaRetriever) GetLatestSchema(ctx context.Context) (*schema.Schema, error) {
view := c.source.LatestView()
return &schema.Schema{Modules: view.Modules}, nil
}

Expand Down Expand Up @@ -286,7 +292,7 @@ func (s *AdminService) validateAgainstSchema(ctx context.Context, isSecret bool,
}

// If we can't retrieve an active schema, skip validation.
sch, err := s.schr.GetActiveSchema(ctx)
sch, err := s.schr.GetLatestSchema(ctx)
if err != nil {
logger.Debugf("skipping validation; could not get the active schema: %v", err)
return nil
Expand Down Expand Up @@ -330,7 +336,8 @@ func (s *AdminService) validateAgainstSchema(ctx context.Context, isSecret bool,

func (s *AdminService) ResetSubscription(ctx context.Context, req *connect.Request[ftlv1.ResetSubscriptionRequest]) (*connect.Response[ftlv1.ResetSubscriptionResponse], error) {
// Find nodes in schema
sch, err := s.schr.GetActiveSchema(ctx)
// TODO: we really want all deployments for a module... not just latest... Use canonical and check ActiveChangeset?
sch, err := s.schr.GetCanonicalSchema(ctx)
if err != nil {
return nil, fmt.Errorf("could not get the active schema: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions backend/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func verbSchemaString(sch *schema.Schema, verb *schema.Verb) (string, error) {
}

func (s *service) GetModules(ctx context.Context, req *connect.Request[consolepb.GetModulesRequest]) (*connect.Response[consolepb.GetModulesResponse], error) {
sch := s.schemaEventSource.View()
sch := s.schemaEventSource.LatestView()

allowed := map[string]bool{}
nilMap := map[schema.RefKey]map[schema.RefKey]bool{}
Expand Down Expand Up @@ -394,7 +394,7 @@ func (s *service) filterDeployments(unfilteredDeployments *schema.Schema) []*sch
}

func (s *service) sendStreamModulesResp(ctx context.Context, stream *connect.ServerStream[consolepb.StreamModulesResponse]) error {
unfilteredDeployments := s.schemaEventSource.View()
unfilteredDeployments := s.schemaEventSource.LatestView()

deployments := s.filterDeployments(unfilteredDeployments)
sch := &schema.Schema{
Expand Down
115 changes: 7 additions & 108 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ func New(
return svc, nil
}

// ProcessList lists "processes" running on the cluster.
func (s *Service) ProcessList(ctx context.Context, req *connect.Request[ftlv1.ProcessListRequest]) (*connect.Response[ftlv1.ProcessListResponse], error) {
currentState, err := s.runnerState.View(ctx)
if err != nil {
Expand All @@ -249,7 +250,7 @@ func (s *Service) ProcessList(ctx context.Context, req *connect.Request[ftlv1.Pr
Endpoint: p.Endpoint,
}
minReplicas := int32(0)
deployment, err := schemaState.GetDeployment(p.Deployment)
deployment, _, err := schemaState.FindDeployment(p.Deployment)
if err == nil {
minReplicas = deployment.GetRuntime().GetScaling().GetMinReplicas()
}
Expand All @@ -276,7 +277,7 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR
}

runners := currentRunnerState.Runners()
status := currentSchemaState.GetActiveDeployments()
status := currentSchemaState.GetAllActiveDeployments()
allModules := s.routeTable.Current()
routes := slices.Map(allModules.Schema().Modules, func(module *schema.Module) (out *ftlv1.StatusResponse_Route) {
key := ""
Expand Down Expand Up @@ -348,12 +349,11 @@ func (s *Service) UpdateDeploy(ctx context.Context, req *connect.Request[ftlv1.U
}

func (s *Service) setDeploymentReplicas(ctx context.Context, key key.Deployment, minReplicas int) (err error) {

view, err := s.schemaState.State.View(ctx)
if err != nil {
return fmt.Errorf("failed to get controller state: %w", err)
}
deployment, err := view.GetDeployment(key)
deployment, _, err := view.FindDeployment(key)
if err != nil {
return fmt.Errorf("could not get deployment: %w", err)
}
Expand Down Expand Up @@ -382,76 +382,6 @@ func (s *Service) setDeploymentReplicas(ctx context.Context, key key.Deployment,
return nil
}

func (s *Service) ReplaceDeploy(ctx context.Context, c *connect.Request[ftlv1.ReplaceDeployRequest]) (*connect.Response[ftlv1.ReplaceDeployResponse], error) {
newDeploymentKey, err := key.ParseDeploymentKey(c.Msg.DeploymentKey)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
logger := s.getDeploymentLogger(ctx, newDeploymentKey)
logger.Debugf("Replace deployment for: %s", newDeploymentKey)

view, err := s.schemaState.State.View(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get controller state: %w", err)
}
newDeployment, err := view.GetDeployment(newDeploymentKey)
if err != nil {
logger.Errorf(err, "Deployment not found: %s", newDeploymentKey)
return nil, connect.NewError(connect.CodeNotFound, errors.New("deployment not found"))
}
minReplicas := int(c.Msg.MinReplicas)
err = s.schemaState.State.Publish(ctx, &schemaservice.DeploymentActivatedEvent{Key: newDeploymentKey, ActivatedAt: time.Now(), MinReplicas: minReplicas})
if err != nil {
return nil, fmt.Errorf("replace deployment failed to activate: %w", err)
}

// If there's an existing deployment, set its desired replicas to 0
var replacedDeploymentKey optional.Option[key.Deployment]
// TODO: remove all this, it needs to be event driven
var oldDeployment *schema.Module
var oldKey key.Deployment
for key, dep := range view.GetActiveDeployments() {
if dep.Name == newDeployment.Name {
oldDeployment = dep
oldKey = key
break
}
}
if oldDeployment != nil {
if oldKey.String() == newDeploymentKey.String() {
return nil, fmt.Errorf("replace deployment failed: deployment already exists from %v to %v", oldKey, newDeploymentKey)
}
err = s.schemaState.State.Publish(ctx, &schemaservice.DeploymentReplicasUpdatedEvent{Key: newDeploymentKey, Replicas: minReplicas})
if err != nil {
return nil, fmt.Errorf("replace deployment failed to set new deployment replicas from %v to %v: %w", oldKey, newDeploymentKey, err)
}
err = s.schemaState.State.Publish(ctx, &schemaservice.DeploymentDeactivatedEvent{Key: oldKey})
if err != nil {
return nil, fmt.Errorf("replace deployment failed to deactivate old deployment %v: %w", oldKey, err)
}
replacedDeploymentKey = optional.Some(oldKey)
} else {
// Set the desired replicas for the new deployment
err = s.schemaState.State.Publish(ctx, &schemaservice.DeploymentReplicasUpdatedEvent{Key: newDeploymentKey, Replicas: minReplicas})
if err != nil {
return nil, fmt.Errorf("replace deployment failed to set replicas for %v: %w", newDeploymentKey, err)
}
}

s.timelineClient.Publish(ctx, timelineclient.DeploymentCreated{
DeploymentKey: newDeploymentKey,
ModuleName: newDeployment.Name,
MinReplicas: minReplicas,
ReplacedDeployment: replacedDeploymentKey,
Language: newDeployment.Runtime.Base.Language,
})
if err != nil {
return nil, fmt.Errorf("replace deployment failed to create event: %w", err)
}

return connect.NewResponse(&ftlv1.ReplaceDeployResponse{}), nil
}

func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStream[ftlv1.RegisterRunnerRequest]) (*connect.Response[ftlv1.RegisterRunnerResponse], error) {

deferredDeregistration := false
Expand Down Expand Up @@ -628,7 +558,7 @@ func (s *Service) GetDeploymentContext(ctx context.Context, req *connect.Request
if err != nil {
return fmt.Errorf("failed to get schema state: %w", err)
}
deployment, err := cs.GetDeployment(key)
deployment, _, err := cs.FindDeployment(key)
if err != nil {
return fmt.Errorf("could not get deployment: %w", err)
}
Expand Down Expand Up @@ -917,44 +847,13 @@ func (s *Service) UploadArtefact(ctx context.Context, req *connect.Request[ftlv1
return connect.NewResponse(&ftlv1.UploadArtefactResponse{Digest: digest[:]}), nil
}

func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftlv1.CreateDeploymentRequest]) (*connect.Response[ftlv1.CreateDeploymentResponse], error) {
logger := log.FromContext(ctx)

ms := req.Msg.Schema
if ms.Runtime == nil {
err := errors.New("missing runtime metadata")
logger.Errorf(err, "Missing runtime metadata")
return nil, err
}

module, err := schema.ValidatedModuleFromProto(ms)
if err != nil {
logger.Errorf(err, "Invalid module schema")
return nil, fmt.Errorf("invalid module schema: %w", err)
}

dkey := key.NewDeploymentKey(module.Name)
module.ModRuntime().ModDeployment().CreatedAt = time.Now()
err = s.schemaState.State.Publish(ctx, &schemaservice.DeploymentCreatedEvent{
Key: dkey,
Schema: module,
})
if err != nil {
logger.Errorf(err, "Could not create deployment event")
return nil, fmt.Errorf("could not create deployment event: %w", err)
}

deploymentLogger := s.getDeploymentLogger(ctx, dkey)
deploymentLogger.Debugf("Created deployment %s", dkey)
return connect.NewResponse(&ftlv1.CreateDeploymentResponse{DeploymentKey: dkey.String()}), nil
}

func (s *Service) getDeployment(ctx context.Context, dkey key.Deployment) (*schema.Module, error) {
view, err := s.schemaState.State.View(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get schema state: %w", err)
}
deployment, err := view.GetDeployment(dkey)
// TODO: revisit if this is the right function to use
deployment, _, err := view.FindDeployment(dkey)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("could not retrieve deployment: %w", err))
}
Expand Down
1 change: 1 addition & 0 deletions backend/controller/observability/calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (m *CallMetrics) BeginSpan(ctx context.Context, verb *schemapb.Ref) (contex
}
return observability.AddSpanToLogger(m.callTracer.Start(ctx, callMeterName, trace.WithAttributes(attrs...)))
}

func (m *CallMetrics) Request(ctx context.Context, verb *schemapb.Ref, startTime time.Time, maybeFailureMode optional.Option[string]) {
attrs := []attribute.KeyValue{
attribute.String(observability.ModuleNameAttribute, verb.Module),
Expand Down
5 changes: 3 additions & 2 deletions backend/cron/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,9 @@ func updateCronJobs(ctx context.Context, cronJobs map[string][]cronJob, change s
logger := log.FromContext(ctx).Scope("cron")
switch change := change.(type) {
case schemaeventsource.EventRemove:
logger.Debugf("Removing cron jobs for module %s", change.Module.Name)
delete(cronJobs, change.Module.Name)
// TODO: revisit this
// logger.Debugf("Removing cron jobs for module %s", change.Module.Name)
// delete(cronJobs, change.Module.Name)

case schemaeventsource.EventUpsert:
logger.Debugf("Updated cron jobs for module %s", change.Module.Name)
Expand Down
2 changes: 1 addition & 1 deletion backend/ingress/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func syncView(ctx context.Context, schemaEventSource schemaeventsource.EventSour
logger.Debugf("Starting routing sync from schema")
go func() {
for event := range channels.IterContext(ctx, schemaEventSource.Events()) {
state := extractIngressRoutingEntries(event.Schema())
state := extractIngressRoutingEntries(event.GetCanonical())
out.Store(state)
}
}()
Expand Down
Loading