Skip to content

Commit

Permalink
Add ability to set vars when scheduling and continuing actions (#142)
Browse files Browse the repository at this point in the history
* Add ability to set vars when creating and continuing action

* Fix odo support

* Fix bug when deleting subject that is already deleted

* Add kopf log configuration

* Do not delete canceled actions
  • Loading branch information
jkupferer authored Aug 1, 2022
1 parent f851f77 commit 5f7fd42
Show file tree
Hide file tree
Showing 13 changed files with 84 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def run(self, tmp=None, task_vars=None, **_):
anarchy_output_dir = task_vars['anarchy_output_dir']
module_args = self._task.args.copy()
after = module_args.get('after', None)
vars = module_args.get('vars', {})

if not after:
return dict(
Expand All @@ -52,7 +53,10 @@ def run(self, tmp=None, task_vars=None, **_):
after_timestamp = (datetime.utcnow() + interval).strftime('%FT%TZ')

with open(os.path.join(anarchy_output_dir, 'continue.yaml'), 'w') as f:
yaml.safe_dump({'after': after_timestamp}, f)
yaml.safe_dump({
'after': after_timestamp,
'vars': vars,
}, f)

return dict(
after = after_timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def run(self, tmp=None, task_vars=None, **_):
action = module_args.get('action', None)
after = module_args.get('after', None)
cancel = module_args.get('cancel', [])
vars = module_args.get('vars', {})

if isinstance(after, datetime):
after = after.strftime('%FT%TZ')
Expand All @@ -63,7 +64,7 @@ def run(self, tmp=None, task_vars=None, **_):
headers={'Authorization': 'Bearer {}:{}:{}'.format(
anarchy_runner_name, anarchy_run_pod_name, anarchy_runner_token
)},
json=dict(action=action, after=after, cancel=cancel)
json=dict(action=action, after=after, cancel=cancel, vars=vars)
)

result['action'] = response.json()['result']
Expand Down
62 changes: 26 additions & 36 deletions devfile.yaml
Original file line number Diff line number Diff line change
@@ -1,45 +1,35 @@
schemaVersion: 2.0.0
metadata:
name: anarchy
version: 1.0.0
components:
- name: s2i-builder
container:
env:
- name: ODO_S2I_SCRIPTS_URL
value: /usr/libexec/s2i
- name: ODO_S2I_SCRIPTS_PROTOCOL
value: image://
- name: ODO_S2I_SRC_BIN_PATH
value: /tmp
- name: ODO_S2I_DEPLOYMENT_DIR
value: ""
- name: ODO_S2I_WORKING_DIR
value: /opt/app-root/src
- name: ODO_S2I_BUILDER_IMG
value: ubi8/python-38
- name: ODO_SRC_BACKUP_DIR
value: /opt/app-root/src-backup
- name: ODO_S2I_CONVERTED_DEVFILE
value: "true"
# Anarchy vars
- name: CLEANUP_INTERVAL
value: "10"
image: quay.io/redhat-cop/python-kopf-s2i:v1.35
mountSources: true
sourceMapping: /tmp/projects
commands:
- id: s2i-assemble
exec:
commandLine: /opt/odo/bin/s2i-setup && /opt/odo/bin/assemble-and-restart
- exec:
commandLine: >-
rm -rf /tmp/src && cp /tmp/projects -r /tmp/src && /tmp/src/.s2i/bin/assemble
component: s2i-builder
group:
isDefault: true
kind: build
- id: s2i-run
exec:
commandLine: /opt/odo/bin/run
hotReloadCapable: false
workingDir: ${PROJECT_SOURCE}
id: s2i-assemble
- exec:
commandLine: /opt/app-root/src/.s2i/bin/run
component: s2i-builder
group:
isDefault: true
kind: run
hotReloadCapable: false
workingDir: ${PROJECT_SOURCE}
id: s2i-run
components:
- container:
env:
- name: ANARCHY_RUNNING_ALL_IN_ONE
value: "true"
- name: CLEANUP_INTERVAL
value: "10"
image: quay.io/redhat-cop/python-kopf-s2i:v1.35
mountSources: true
sourceMapping: /tmp/projects
name: s2i-builder
metadata:
name: anarchy
version: 1.0.0
schemaVersion: 2.0.0
4 changes: 4 additions & 0 deletions helm/crds/anarchyruns.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ spec:
after:
description: Timestamp for when to continue action
type: string
vars:
description: Variables to update in the AnarchyAction for continuation.
type: object
x-kubernetes-preserve-unknown-fields: true
rc:
type: integer
status:
Expand Down
5 changes: 3 additions & 2 deletions operator/anarchyaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ def refresh_from_api(self, anarchy_runtime):
else:
raise Exception('Unable to find AnarchyAction {} to refresh'.format(self.name))

def schedule_continuation(self, after, anarchy_runtime):
def schedule_continuation(self, after, anarchy_runtime, vars):
try:
anarchy_runtime.custom_objects_api.patch_namespaced_custom_object_status(
anarchy_runtime.operator_domain, anarchy_runtime.api_version,
Expand All @@ -364,7 +364,8 @@ def schedule_continuation(self, after, anarchy_runtime):
}
},
'spec': {
'after': after.strftime('%FT%TZ')
'after': after.strftime('%FT%TZ'),
'vars': vars,
},
}
)
Expand Down
4 changes: 4 additions & 0 deletions operator/anarchyrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ def continue_action_after(self):
def continue_action_after_timestamp(self):
return self.status.get('result', {}).get('continueAction', {}).get('after')

@property
def continue_action_vars(self):
return self.status.get('result', {}).get('continueAction', {}).get('vars', {})

@property
def creation_timestamp(self):
return self.metadata.get('creationTimestamp')
Expand Down
2 changes: 1 addition & 1 deletion operator/anarchyruntime.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(
self.is_active_condition = threading.Condition()
self.pod_name = os.environ.get('POD_NAME', os.environ.get('HOSTNAME', None))
self.pod = self.core_v1_api.read_namespaced_pod(self.pod_name, self.operator_namespace)
self.running_all_in_one = '' != os.environ.get('ODO_S2I_SCRIPTS_URL', '')
self.running_all_in_one = 'true' == os.environ.get('ANARCHY_RUNNING_ALL_IN_ONE', '')
if self.running_all_in_one:
self.anarchy_service_name = os.environ.get('ANARCHY_SERVICE', os.environ.get('HOSTNAME'))
self.anarchy_service = self.pod
Expand Down
16 changes: 11 additions & 5 deletions operator/anarchysubject.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,14 +473,20 @@ def check_spec_changed(self, anarchy_runtime):
return self.spec_sha256 != spec_sha256_annotation

def delete(self, remove_finalizers, anarchy_runtime):
resource_object = anarchy_runtime.custom_objects_api.delete_namespaced_custom_object(
anarchy_runtime.operator_domain, anarchy_runtime.api_version,
anarchy_runtime.operator_namespace, 'anarchysubjects', self.name
)
try:
resource_object = anarchy_runtime.custom_objects_api.delete_namespaced_custom_object(
anarchy_runtime.operator_domain, anarchy_runtime.api_version,
anarchy_runtime.operator_namespace, 'anarchysubjects', self.name
)
except kubernetes.client.rest.ApiException as e:
if e.status == 404:
return None
else:
raise
self.__init__(resource_object)
if remove_finalizers:
self.remove_finalizers(anarchy_runtime)
return result
return resource_object

def get_governor(self):
return AnarchyGovernor.get(self.governor_name)
Expand Down
11 changes: 11 additions & 0 deletions operator/configure_kopf_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import logging

def suppress_handler_succeeded_messages(record: logging.LogRecord) -> bool:
txt = record.getMessage()
if txt.startswith("Handler ") and txt.endswith(" succeeded."):
return False
return True

def configure_kopf_logging():
objlogger = logging.getLogger('kopf.objects')
objlogger.addFilter(suppress_handler_succeeded_messages)
19 changes: 11 additions & 8 deletions operator/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from anarchysubject import AnarchySubject
from anarchyaction import AnarchyAction
from anarchyrun import AnarchyRun
from configure_kopf_logging import configure_kopf_logging
from datetime import datetime

api = flask.Flask('rest')
Expand Down Expand Up @@ -72,6 +73,8 @@ def startup(settings: kopf.OperatorSettings, **_):
# Disable scanning for crds and namespaces
settings.scanning.disabled = True

configure_kopf_logging()

if anarchy_runtime.running_all_in_one:
AnarchyRunner.start_all_in_one_runner(anarchy_runtime)
else:
Expand Down Expand Up @@ -477,7 +480,7 @@ def run_update(new, old, **kwargs):
if not action:
action = AnarchyAction.get_from_api(
anarchy_runtime = anarchy_runtime,
name = run.action_name,
name = run.action_name,
)
if not action:
raise kopf.TemporaryError(
Expand All @@ -504,6 +507,7 @@ def run_update(new, old, **kwargs):
action.schedule_continuation(
after = continue_action_after,
anarchy_runtime = anarchy_runtime,
vars = run.continue_action_vars,
)
elif action_config.finish_on_successful_run:
run.logger.info(
Expand Down Expand Up @@ -962,6 +966,7 @@ def run_subject_action_post(subject_name):
action_name = flask.request.json.get('action', None)
after_timestamp = flask.request.json.get('after', None)
cancel_actions = flask.request.json.get('cancel', None)
action_vars = flask.request.json.get('vars', {})

if not action_name and not cancel_actions:
logger.warning(
Expand Down Expand Up @@ -997,12 +1002,9 @@ def run_subject_action_post(subject_name):
anarchy_runtime.operator_namespace, 'anarchyactions',
label_selector=f"{anarchy_runtime.subject_label}={subject.name}"
).get('items', []):
if action_resource['spec']['action'] in cancel_actions \
and 'status' not in action_resource:
anarchy_runtime.custom_objects_api.delete_namespaced_custom_object(
anarchy_runtime.operator_domain, anarchy_runtime.api_version,
anarchy_runtime.operator_namespace, 'anarchyactions', action_resource['metadata']['name']
)
action = AnarchyAction(resource_object=action_resource)
if action.action in cancel_actions:
action.set_finished(anarchy_runtime=anarchy_runtime, state='canceled')

if action_name:
result = anarchy_runtime.custom_objects_api.create_namespaced_custom_object(
Expand All @@ -1026,6 +1028,7 @@ def run_subject_action_post(subject_name):
"callbackToken": uuid.uuid4().hex,
"governorRef": governor.reference,
"subjectRef": subject.reference,
"vars": action_vars,
}
}
)
Expand Down Expand Up @@ -1135,7 +1138,7 @@ def run_subject_action_patch(subject_name, action_name):
elif flask.request.json.get('failed', False):
finished_state = 'failed'

if finished_state != None:
if not action.is_finished:
action.set_finished(
anarchy_runtime = anarchy_runtime,
state = finished_state,
Expand Down
1 change: 1 addition & 0 deletions test/roles/anarchy_test_simple/tasks/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
name: "{{ _action.metadata.name }}"
uid: "{{ _action.metadata.uid }}"
_expected_vars:
a_var_from_action: create
a_var_from_governor: foo
a_var_from_subject: 23
a_var_subject_should_override: true
Expand Down
7 changes: 4 additions & 3 deletions test/roles/anarchy_test_simple/templates/governor.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ spec:
- name: Schedule Configure
anarchy_schedule_action:
action: configure
vars:
a_var_from_action: create

update:
tasks:
Expand Down Expand Up @@ -93,12 +95,11 @@ spec:
- name: Record change
anarchy_subject_update:
skip_update_processing: true
spec:
vars:
continued: true
- name: Schedule continution
anarchy_continue_action:
after: 30s
vars:
continued: true
timeEstimate: 1m

destroy:
Expand Down
2 changes: 1 addition & 1 deletion test/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ def api_v2_jobs_get():
}), 200

@api.route('/api/v2/jobs/<string:job_id>/job_events/', methods=['GET'])
def api_v2_jobs_events_get(joib_id):
def api_v2_jobs_events_get(job_id):
return flask.jsonify({
"count": 0,
"next": None,
Expand Down

0 comments on commit 5f7fd42

Please sign in to comment.