Skip to content

Commit

Permalink
cheating
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Feb 7, 2025
1 parent 8f5d909 commit 1521b26
Show file tree
Hide file tree
Showing 10 changed files with 152 additions and 578 deletions.
389 changes: 109 additions & 280 deletions backend/protos/xyz/block/ftl/provisioner/v1beta1/plugin.pb.go

Large diffs are not rendered by default.

14 changes: 0 additions & 14 deletions backend/protos/xyz/block/ftl/provisioner/v1beta1/plugin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,6 @@ message ProvisionResponse {
ProvisionResponseStatus status = 2;
}

message DeProvisionRequest {
string ftl_cluster_id = 1;
xyz.block.ftl.schema.v1.Module module = 2;
optional xyz.block.ftl.schema.v1.Module replacement_module = 3;
string changeset = 4;
repeated string kinds = 5;
}

message DeProvisionResponse {
string provisioning_token = 1;
ProvisionResponseStatus status = 2;
}

message StatusRequest {
string provisioning_token = 1;
// The outputs of this module are updated if the the status is a success
Expand Down Expand Up @@ -67,6 +54,5 @@ service ProvisionerPluginService {
rpc Ping(xyz.block.ftl.v1.PingRequest) returns (xyz.block.ftl.v1.PingResponse);

rpc Provision(ProvisionRequest) returns (ProvisionResponse);
rpc DeProvision(DeProvisionRequest) returns (DeProvisionResponse);
rpc Status(StatusRequest) returns (StatusResponse);
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

100 changes: 13 additions & 87 deletions backend/provisioner/inmem_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,89 +57,6 @@ type InMemProvisioner struct {
removeHandlers map[schema.ResourceType]InMemResourceProvisionerFn
}

func (d *InMemProvisioner) DeProvision(ctx context.Context, req *connect.Request[provisioner.DeProvisionRequest]) (*connect.Response[provisioner.DeProvisionResponse], error) {
logger := log.FromContext(ctx)
parsed, err := key.ParseChangesetKey(req.Msg.Changeset)
if err != nil {
err = fmt.Errorf("invalid changeset: %w", err)
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}

var replacementModule *schema.Module
if req.Msg.ReplacementModule != nil {
pm, err := schema.ValidatedModuleFromProto(req.Msg.ReplacementModule)
if err != nil {
err = fmt.Errorf("invalid replacment module: %w", err)
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
replacementModule = pm
}
removingModule, err := schema.ValidatedModuleFromProto(req.Msg.Module)
if err != nil {
err = fmt.Errorf("invalid removing module: %w", err)
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
kinds := slices.Map(req.Msg.Kinds, func(k string) schema.ResourceType { return schema.ResourceType(k) })
currentNodes := schema.GetProvisioned(replacementModule)
removingNodes := schema.GetProvisioned(removingModule)

task := &inMemProvisioningTask{}
// use chans to safely collect all events before completing each task
completions := make(chan stepCompletedEvent, 16)

for id, toRemove := range removingNodes {
inUse, ok := currentNodes[id]
for _, resource := range toRemove.GetProvisioned() {
if !ok || !resource.IsEqual(inUse.GetProvisioned().Get(resource.Kind)) {
if slices.Contains(kinds, resource.Kind) {
handler, ok := d.removeHandlers[resource.Kind]
if !ok {
err := fmt.Errorf("unsupported resource type: %s", resource.Kind)
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
step := &inMemProvisioningStep{Done: atomic.New(false)}
task.steps = append(task.steps, step)
go func() {
event, err := handler(ctx, parsed, removingModule.Runtime.Deployment.DeploymentKey, toRemove)
if err != nil {
step.Err = err
logger.Errorf(err, "failed to de-provision resource %s:%s", resource.Kind, toRemove.ResourceID())
completions <- stepCompletedEvent{step: step}
return
}
completions <- stepCompletedEvent{
step: step,
event: optional.Ptr(event),
}
}()
}
}
}
}

go func() {
for c := range channels.IterContext(ctx, completions) {
if e, ok := c.event.Get(); ok {
task.events = append(task.events, &e)
}
c.step.Done.Store(true)
done, err := task.Done()
if done || err != nil {
return
}
}
}()

token := uuid.New().String()
logger.Debugf("started a task with token %s", token)
d.running.Store(token, task)

return connect.NewResponse(&provisioner.DeProvisionResponse{
ProvisioningToken: token,
Status: provisioner.ProvisionResponseStatus_PROVISION_RESPONSE_STATUS_SUBMITTED,
}), nil
}

func NewEmbeddedProvisioner(handlers map[schema.ResourceType]InMemResourceProvisionerFn, deProvisionHandlers map[schema.ResourceType]InMemResourceProvisionerFn) *InMemProvisioner {
return &InMemProvisioner{
running: xsync.NewMapOf[string, *inMemProvisioningTask](),
Expand Down Expand Up @@ -195,10 +112,19 @@ func (d *InMemProvisioner) Provision(ctx context.Context, req *connect.Request[p
for _, resource := range desired.GetProvisioned() {
if !ok || !resource.IsEqual(previous.GetProvisioned().Get(resource.Kind)) {
if slices.Contains(kinds, resource.Kind) {
handler, ok := d.handlers[resource.Kind]
if !ok {
// TODO: should a missing de-provisioner handler be an error?
continue
var handler InMemResourceProvisionerFn
if desiredModule.Runtime.Deployment.State == schema.DeploymentStateDeProvisioning {
handler, ok = d.removeHandlers[resource.Kind]
if !ok {
// TODO: should a missing de-provisioner handler be an error?
continue
}
} else {
handler, ok = d.handlers[resource.Kind]
if !ok {
err := fmt.Errorf("unsupported resource type: %s", resource.Kind)
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
}
step := &inMemProvisioningStep{Done: atomic.New(false)}
task.steps = append(task.steps, step)
Expand Down
7 changes: 0 additions & 7 deletions backend/provisioner/noop_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,6 @@ import (
// NoopProvisioner is a provisioner that does nothing
type NoopProvisioner struct{}

func (d *NoopProvisioner) DeProvision(ctx context.Context, c *connect.Request[provisioner.DeProvisionRequest]) (*connect.Response[provisioner.DeProvisionResponse], error) {
return connect.NewResponse(&provisioner.DeProvisionResponse{
Status: provisioner.ProvisionResponseStatus_PROVISION_RESPONSE_STATUS_SUBMITTED,
ProvisioningToken: "token",
}), nil
}

var _ provisionerconnect.ProvisionerPluginServiceClient = (*NoopProvisioner)(nil)

func (d *NoopProvisioner) Ping(context.Context, *connect.Request[ftlv1.PingRequest]) (*connect.Response[ftlv1.PingResponse], error) {
Expand Down
16 changes: 11 additions & 5 deletions backend/provisioner/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,16 +181,22 @@ func (s *Service) HandleChangesetCommitted(ctx context.Context, req *schema.Chan
}()
return nil
}
func (s *Service) HandleChangesetDrained(ctx context.Context, req key.Changeset) error {
func (s *Service) HandleChangesetDrained(ctx context.Context, cs key.Changeset) error {
logger := log.FromContext(ctx)
group := errgroup.Group{}
// TODO: Block deployments to make sure only one module is modified at a time
for _, module := range s.eventSource.ActiveChangeset()[req].RemovingModules {
changeset := s.eventSource.ActiveChangeset()[cs]
for _, module := range changeset.RemovingModules {
moduleName := module.Name

group.Go(func() error {
deployment := s.registry.CreateDeployment(ctx, req.Key, module, existingModule, func(element *schema.RuntimeElement) error {
cs := req.Key.String()
var current *schema.Module
existing := s.eventSource.CanonicalView().Module(moduleName)
if f, ok := existing.Get(); ok {
current = f
}
deployment := s.registry.CreateDeployment(ctx, cs, module, current, func(element *schema.RuntimeElement) error {
cs := cs.String()
_, err := s.schemaClient.UpdateDeploymentRuntime(ctx, connect.NewRequest(&ftlv1.UpdateDeploymentRuntimeRequest{
Changeset: &cs,
Update: element.ToProto(),
Expand Down Expand Up @@ -220,7 +226,7 @@ func (s *Service) HandleChangesetDrained(ctx context.Context, req key.Changeset)
if err != nil {
return fmt.Errorf("error running deployments: %w", err)
}
_, err := s.schemaClient.FinalizeChangeset(ctx, connect.NewRequest(&ftlv1.FinalizeChangesetRequest{Changeset: req.String()}))
_, err = s.schemaClient.FinalizeChangeset(ctx, connect.NewRequest(&ftlv1.FinalizeChangesetRequest{Changeset: cs.String()}))
if err != nil {
return fmt.Errorf("error finalizing changeset: %w", err)
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 1521b26

Please sign in to comment.