Skip to content

Commit

Permalink
push job_id and service_id to xcom if do_xcom_push and content
Browse files Browse the repository at this point in the history
  • Loading branch information
marwan116 committed Jun 25, 2024
1 parent dc648c1 commit 1f82801
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions anyscale_provider/operators/anyscale.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ def execute(self, context: Context) -> str | None:
# Submit the job to Anyscale
job_config = JobConfig(**job_params)
self.job_id = self.hook.submit_job(job_config)
if self.do_xcom_push and context is not None:
context["ti"].xcom_push(key="job_id", value=self.job_id)

self.log.info(f"Submitted Anyscale job with ID: {self.job_id}")

current_state = str(self.hook.get_job_status(self.job_id).state)
Expand Down Expand Up @@ -317,12 +320,18 @@ def execute(self, context: Context) -> str | None:
self.log.info(f"Service with config object: {svc_config}")

# Call the SDK method with the dynamically created service model
service_id = self.hook.deploy_service(
self.service_id = self.hook.deploy_service(
config=svc_config,
in_place=self.in_place,
canary_percent=self.canary_percent,
max_surge_percent=self.max_surge_percent,
)

if self.do_xcom_push and context is not None:
context["ti"].xcom_push(key="service_id", value=self.service_id)

self.log.info(f"Service rollout id: {self.service_id}")

self.defer(
trigger=AnyscaleServiceTrigger(
conn_id=self.conn_id,
Expand All @@ -335,9 +344,6 @@ def execute(self, context: Context) -> str | None:
timeout=timedelta(seconds=self.service_rollout_timeout_seconds),
)

self.log.info(f"Service rollout id: {service_id}")
return service_id

def execute_complete(self, context: Context, event: Any) -> None:
service_name = event["service_name"]
state = event["state"]
Expand Down

0 comments on commit 1f82801

Please sign in to comment.