Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add auto capabilities: #946

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Update receiver name to match across methods:
Signed-off-by: Jacob Weinstock <[email protected]>
jacobweinstock committed Aug 13, 2024
commit 2e77633b8cfc4417a75a68da6ebde270268e0805
4 changes: 2 additions & 2 deletions internal/server/kubernetes_api.go
Original file line number Diff line number Diff line change
@@ -118,6 +118,6 @@ func NewKubeBackedServerFromREST(logger logr.Logger, config *rest.Config, namesp
}

// Register registers the service on the gRPC server.
func (s *KubernetesBackedServer) Register(server *grpc.Server) {
proto.RegisterWorkflowServiceServer(server, s)
func (k *KubernetesBackedServer) Register(server *grpc.Server) {
proto.RegisterWorkflowServiceServer(server, k)
}
50 changes: 25 additions & 25 deletions internal/server/kubernetes_api_workflow.go
Original file line number Diff line number Diff line change
@@ -35,9 +35,9 @@ func getWorkflowContext(wf v1alpha1.Workflow) *proto.WorkflowContext {
}
}

func (s *KubernetesBackedServer) getCurrentAssignedNonTerminalWorkflowsForWorker(ctx context.Context, workerID string) ([]v1alpha1.Workflow, error) {
func (k *KubernetesBackedServer) getCurrentAssignedNonTerminalWorkflowsForWorker(ctx context.Context, workerID string) ([]v1alpha1.Workflow, error) {
stored := &v1alpha1.WorkflowList{}
err := s.ClientFunc().List(ctx, stored, &client.MatchingFields{
err := k.ClientFunc().List(ctx, stored, &client.MatchingFields{
workflowByNonTerminalState: workerID,
})
if err != nil {
@@ -53,44 +53,44 @@ func (s *KubernetesBackedServer) getCurrentAssignedNonTerminalWorkflowsForWorker
return wfs, nil
}

func (s *KubernetesBackedServer) getWorkflowByName(ctx context.Context, workflowID string) (*v1alpha1.Workflow, error) {
func (k *KubernetesBackedServer) getWorkflowByName(ctx context.Context, workflowID string) (*v1alpha1.Workflow, error) {
workflowNamespace, workflowName, _ := strings.Cut(workflowID, "/")
wflw := &v1alpha1.Workflow{}
err := s.ClientFunc().Get(ctx, types.NamespacedName{Name: workflowName, Namespace: workflowNamespace}, wflw)
err := k.ClientFunc().Get(ctx, types.NamespacedName{Name: workflowName, Namespace: workflowNamespace}, wflw)
if err != nil {
s.logger.Error(err, "get client", "workflow", workflowID)
k.logger.Error(err, "get client", "workflow", workflowID)
return nil, err
}
return wflw, nil
}

// The following APIs are used by the worker.

func (s *KubernetesBackedServer) GetWorkflowContexts(req *proto.WorkflowContextRequest, stream proto.WorkflowService_GetWorkflowContextsServer) error {
func (k *KubernetesBackedServer) GetWorkflowContexts(req *proto.WorkflowContextRequest, stream proto.WorkflowService_GetWorkflowContextsServer) error {
if req.GetWorkerId() == "" {
return status.Errorf(codes.InvalidArgument, errInvalidWorkflowID)
}
wflows, err := s.getCurrentAssignedNonTerminalWorkflowsForWorker(stream.Context(), req.WorkerId)
wflows, err := k.getCurrentAssignedNonTerminalWorkflowsForWorker(stream.Context(), req.WorkerId)
if err != nil {
return err
}

ctx := context.TODO()
id := req.WorkerId
if s.AutoCapMode != AutoCapModeDisabled && len(wflows) == 0 && (s.AutoCapMode == AutoCapModeDiscovery || s.AutoCapMode == AutoCapModeEnrollment) && !s.hardwareObjectExists(ctx, id) {
if k.AutoCapMode != AutoCapModeDisabled && len(wflows) == 0 && (k.AutoCapMode == AutoCapModeDiscovery || k.AutoCapMode == AutoCapModeEnrollment) && !k.hardwareObjectExists(ctx, id) {
// In the future, the worker could be signaled to send hardware device information to be used in creation of the Hardware object.
// or the proto.WorkflowContextRequest could be extended to include Hardware information.
if err := s.createHardwareObject(ctx, id); err != nil {
s.logger.Error(err, "failed to create hardware object")
if err := k.createHardwareObject(ctx, id); err != nil {
k.logger.Error(err, "failed to create hardware object")
return err
}

if s.AutoCapMode == AutoCapModeEnrollment {
if err := s.createWorkflowObject(ctx, id); err != nil {
s.logger.Error(err, "failed to create workflow object")
if k.AutoCapMode == AutoCapModeEnrollment {
if err := k.createWorkflowObject(ctx, id); err != nil {
k.logger.Error(err, "failed to create workflow object")
return err
}
wflows, err = s.getCurrentAssignedNonTerminalWorkflowsForWorker(stream.Context(), req.WorkerId)
wflows, err = k.getCurrentAssignedNonTerminalWorkflowsForWorker(stream.Context(), req.WorkerId)
if err != nil {
return err
}
@@ -105,20 +105,20 @@ func (s *KubernetesBackedServer) GetWorkflowContexts(req *proto.WorkflowContextR
return nil
}

func (s *KubernetesBackedServer) GetWorkflowActions(ctx context.Context, req *proto.WorkflowActionsRequest) (*proto.WorkflowActionList, error) {
func (k *KubernetesBackedServer) GetWorkflowActions(ctx context.Context, req *proto.WorkflowActionsRequest) (*proto.WorkflowActionList, error) {
wfID := req.GetWorkflowId()
if wfID == "" {
return nil, status.Errorf(codes.InvalidArgument, errInvalidWorkflowID)
}
wf, err := s.getWorkflowByName(ctx, wfID)
wf, err := k.getWorkflowByName(ctx, wfID)
if err != nil {
return nil, err
}
return workflow.ActionListCRDToProto(wf), nil
}

// Modifies a workflow for a given workflowContext.
func (s *KubernetesBackedServer) modifyWorkflowState(wf *v1alpha1.Workflow, wfContext *proto.WorkflowContext) error {
func (k *KubernetesBackedServer) modifyWorkflowState(wf *v1alpha1.Workflow, wfContext *proto.WorkflowContext) error {
if wf == nil {
return errors.New("no workflow provided")
}
@@ -159,19 +159,19 @@ cont:
// Workflow is running, so set the start time to now
wf.Status.State = v1alpha1.WorkflowState(proto.State_name[int32(wfContext.CurrentActionState)])
wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt = func() *metav1.Time {
t := metav1.NewTime(s.nowFunc())
t := metav1.NewTime(k.nowFunc())
return &t
}()
case proto.State_STATE_FAILED, proto.State_STATE_TIMEOUT:
// Handle terminal statuses by updating the workflow state and time
wf.Status.State = v1alpha1.WorkflowState(proto.State_name[int32(wfContext.CurrentActionState)])
if wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt != nil {
wf.Status.Tasks[taskIndex].Actions[actionIndex].Seconds = int64(s.nowFunc().Sub(wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt.Time).Seconds())
wf.Status.Tasks[taskIndex].Actions[actionIndex].Seconds = int64(k.nowFunc().Sub(wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt.Time).Seconds())
}
case proto.State_STATE_SUCCESS:
// Handle a success by marking the task as complete
if wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt != nil {
wf.Status.Tasks[taskIndex].Actions[actionIndex].Seconds = int64(s.nowFunc().Sub(wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt.Time).Seconds())
wf.Status.Tasks[taskIndex].Actions[actionIndex].Seconds = int64(k.nowFunc().Sub(wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt.Time).Seconds())
}
// Mark success on last action success
if wfContext.CurrentActionIndex+1 == wfContext.TotalNumberOfActions {
@@ -206,15 +206,15 @@ func getWorkflowContextForRequest(req *proto.WorkflowActionStatus, wf *v1alpha1.
return wfContext
}

func (s *KubernetesBackedServer) ReportActionStatus(ctx context.Context, req *proto.WorkflowActionStatus) (*proto.Empty, error) {
func (k *KubernetesBackedServer) ReportActionStatus(ctx context.Context, req *proto.WorkflowActionStatus) (*proto.Empty, error) {
err := validateActionStatusRequest(req)
if err != nil {
return nil, err
}
wfID := req.GetWorkflowId()
l := s.logger.WithValues("actionName", req.GetActionName(), "status", req.GetActionStatus(), "workflowID", req.GetWorkflowId(), "taskName", req.GetTaskName(), "worker", req.WorkerId)
l := k.logger.WithValues("actionName", req.GetActionName(), "status", req.GetActionStatus(), "workflowID", req.GetWorkflowId(), "taskName", req.GetTaskName(), "worker", req.WorkerId)

wf, err := s.getWorkflowByName(ctx, wfID)
wf, err := k.getWorkflowByName(ctx, wfID)
if err != nil {
l.Error(err, "get workflow")
return nil, status.Errorf(codes.InvalidArgument, errInvalidWorkflowID)
@@ -227,13 +227,13 @@ func (s *KubernetesBackedServer) ReportActionStatus(ctx context.Context, req *pr
}

wfContext := getWorkflowContextForRequest(req, wf)
err = s.modifyWorkflowState(wf, wfContext)
err = k.modifyWorkflowState(wf, wfContext)
if err != nil {
l.Error(err, "modify workflow state")
return nil, status.Errorf(codes.InvalidArgument, errInvalidWorkflowID)
}
l.Info("updating workflow in Kubernetes")
err = s.ClientFunc().Status().Update(ctx, wf)
err = k.ClientFunc().Status().Update(ctx, wf)
if err != nil {
l.Error(err, "applying update to workflow")
return nil, status.Errorf(codes.InvalidArgument, errInvalidWorkflowID)