Skip to content

Commit

Permalink
Update protos (#868)
Browse files Browse the repository at this point in the history
* Update protos subtree

* Compile fixes for new protos / remove some unneeded workflowclient fns

* Add new svc methods
  • Loading branch information
Sushisource authored Jan 24, 2025
1 parent b94b2fc commit fad4db3
Show file tree
Hide file tree
Showing 33 changed files with 3,019 additions and 266 deletions.
64 changes: 3 additions & 61 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,13 @@ use std::{
};
use temporal_sdk_core_api::telemetry::metrics::TemporalMeter;
use temporal_sdk_core_protos::{
coresdk::{workflow_commands::QueryResult, IntoPayloadsExt},
coresdk::IntoPayloadsExt,
grpc::health::v1::health_client::HealthClient,
temporal::api::{
cloud::cloudservice::v1::cloud_service_client::CloudServiceClient,
common,
common::v1::{Header, Payload, Payloads, RetryPolicy, WorkflowExecution, WorkflowType},
enums::v1::{TaskQueueKind, WorkflowIdConflictPolicy, WorkflowIdReusePolicy},
failure::v1::Failure,
operatorservice::v1::operator_service_client::OperatorServiceClient,
query::v1::WorkflowQuery,
replication::v1::ClusterReplicationConfig,
Expand Down Expand Up @@ -915,15 +914,6 @@ pub trait WorkflowClientTrait {
details: Option<Payloads>,
) -> Result<RespondActivityTaskCanceledResponse>;

/// Fail activity task by sending response to the server. `task_token` contains activity
/// identifier that would've been received from polling for an activity task. `failure` provides
/// failure details, such as message, cause and stack trace.
async fn fail_activity_task(
&self,
task_token: TaskToken,
failure: Option<Failure>,
) -> Result<RespondActivityTaskFailedResponse>;

/// Send a signal to a certain workflow instance
async fn signal_workflow_execution(
&self,
Expand Down Expand Up @@ -966,13 +956,6 @@ pub trait WorkflowClientTrait {
page_token: Vec<u8>,
) -> Result<GetWorkflowExecutionHistoryResponse>;

/// Respond to a legacy query-only workflow task
async fn respond_legacy_query(
&self,
task_token: TaskToken,
query_result: QueryResult,
) -> Result<RespondQueryTaskCompletedResponse>;

/// Cancel a currently executing workflow
async fn cancel_workflow_execution(
&self,
Expand Down Expand Up @@ -1173,7 +1156,7 @@ impl WorkflowClientTrait for Client {
result,
identity: self.inner.options.identity.clone(),
namespace: self.namespace.clone(),
worker_version: None,
..Default::default()
},
)
.await?
Expand Down Expand Up @@ -1210,28 +1193,7 @@ impl WorkflowClientTrait for Client {
details,
identity: self.inner.options.identity.clone(),
namespace: self.namespace.clone(),
worker_version: None,
},
)
.await?
.into_inner())
}

async fn fail_activity_task(
&self,
task_token: TaskToken,
failure: Option<Failure>,
) -> Result<RespondActivityTaskFailedResponse> {
Ok(WorkflowService::respond_activity_task_failed(
&mut self.inner.client.clone(),
RespondActivityTaskFailedRequest {
task_token: task_token.0,
failure,
identity: self.inner.options.identity.clone(),
namespace: self.namespace.clone(),
// TODO: Implement - https://github.com/temporalio/sdk-core/issues/293
last_heartbeat_details: None,
worker_version: None,
..Default::default()
},
)
.await?
Expand Down Expand Up @@ -1372,26 +1334,6 @@ impl WorkflowClientTrait for Client {
.into_inner())
}

async fn respond_legacy_query(
&self,
task_token: TaskToken,
query_result: QueryResult,
) -> Result<RespondQueryTaskCompletedResponse> {
let (_, completed_type, query_result, error_message) = query_result.into_components();
Ok(WorkflowService::respond_query_task_completed(
&mut self.inner.client.clone(),
RespondQueryTaskCompletedRequest {
task_token: task_token.into(),
completed_type: completed_type as i32,
query_result,
error_message,
namespace: self.namespace.clone(),
},
)
.await?
.into_inner())
}

async fn cancel_workflow_execution(
&self,
workflow_id: String,
Expand Down
54 changes: 54 additions & 0 deletions client/src/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1058,6 +1058,15 @@ proxier! {
r.extensions_mut().insert(labels);
}
);
(
describe_deployment,
DescribeDeploymentRequest,
DescribeDeploymentResponse,
|r| {
let labels = namespaced_request!(r);
r.extensions_mut().insert(labels);
}
);
(
list_batch_operations,
ListBatchOperationsRequest,
Expand All @@ -1067,6 +1076,15 @@ proxier! {
r.extensions_mut().insert(labels);
}
);
(
list_deployments,
ListDeploymentsRequest,
ListDeploymentsResponse,
|r| {
let labels = namespaced_request!(r);
r.extensions_mut().insert(labels);
}
);
(
execute_multi_operation,
ExecuteMultiOperationRequest,
Expand All @@ -1076,6 +1094,24 @@ proxier! {
r.extensions_mut().insert(labels);
}
);
(
get_current_deployment,
GetCurrentDeploymentRequest,
GetCurrentDeploymentResponse,
|r| {
let labels = namespaced_request!(r);
r.extensions_mut().insert(labels);
}
);
(
get_deployment_reachability,
GetDeploymentReachabilityRequest,
GetDeploymentReachabilityResponse,
|r| {
let labels = namespaced_request!(r);
r.extensions_mut().insert(labels);
}
);
(
get_worker_versioning_rules,
GetWorkerVersioningRulesRequest,
Expand Down Expand Up @@ -1124,6 +1160,15 @@ proxier! {
r.extensions_mut().insert(labels);
}
);
(
set_current_deployment,
SetCurrentDeploymentRequest,
SetCurrentDeploymentResponse,
|r| {
let labels = namespaced_request!(r);
r.extensions_mut().insert(labels);
}
);
(
shutdown_worker,
ShutdownWorkerRequest,
Expand Down Expand Up @@ -1160,6 +1205,15 @@ proxier! {
r.extensions_mut().insert(labels);
}
);
(
update_workflow_execution_options,
UpdateWorkflowExecutionOptionsRequest,
UpdateWorkflowExecutionOptionsResponse,
|r| {
let labels = namespaced_request!(r);
r.extensions_mut().insert(labels);
}
);
(
reset_activity_by_id,
ResetActivityByIdRequest,
Expand Down
28 changes: 0 additions & 28 deletions client/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@ use backoff::{backoff::Backoff, exponential::ExponentialBackoff, Clock, SystemCl
use futures_retry::{ErrorHandler, FutureRetry, RetryPolicy};
use std::{error::Error, fmt::Debug, future::Future, sync::Arc, time::Duration};
use temporal_sdk_core_protos::{
coresdk::workflow_commands::QueryResult,
temporal::api::{
common::v1::{Payload, Payloads},
failure::v1::Failure,
query::v1::WorkflowQuery,
update,
workflowservice::v1::*,
Expand Down Expand Up @@ -365,19 +363,6 @@ where
)
}

async fn fail_activity_task(
&self,
task_token: TaskToken,
failure: Option<Failure>,
) -> Result<RespondActivityTaskFailedResponse> {
retry_call!(
self,
fail_activity_task,
task_token.clone(),
failure.clone()
)
}

async fn signal_workflow_execution(
&self,
workflow_id: String,
Expand Down Expand Up @@ -453,19 +438,6 @@ where
)
}

async fn respond_legacy_query(
&self,
task_token: TaskToken,
query_result: QueryResult,
) -> Result<RespondQueryTaskCompletedResponse> {
retry_call!(
self,
respond_legacy_query,
task_token.clone(),
query_result.clone()
)
}

async fn cancel_workflow_execution(
&self,
workflow_id: String,
Expand Down
74 changes: 51 additions & 23 deletions core/src/worker/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ impl WorkerClientBag {
Some(WorkerVersionCapabilities {
build_id: self.worker_build_id.clone(),
use_versioning: self.use_versioning,
// TODO: https://github.com/temporalio/sdk-core/issues/866
deployment_series_name: "".to_string(),
})
} else {
None
Expand Down Expand Up @@ -238,6 +240,7 @@ impl WorkerClient for WorkerClientBag {
&self,
request: WorkflowTaskCompletion,
) -> Result<RespondWorkflowTaskCompletedResponse> {
#[allow(deprecated)] // want to list all fields explicitly
let request = RespondWorkflowTaskCompletedRequest {
task_token: request.task_token.into(),
commands: request.commands,
Expand All @@ -259,6 +262,8 @@ impl WorkerClient for WorkerClientBag {
result_type: completed_type as i32,
answer: query_result,
error_message,
// TODO: https://github.com/temporalio/sdk-core/issues/867
failure: None,
},
)
})
Expand All @@ -269,6 +274,9 @@ impl WorkerClient for WorkerClientBag {
capabilities: Some(respond_workflow_task_completed_request::Capabilities {
discard_speculative_workflow_task_with_events: true,
}),
// TODO: https://github.com/temporalio/sdk-core/issues/866
deployment: None,
versioning_behavior: 0,
};
Ok(self
.cloned_client()
Expand All @@ -284,13 +292,18 @@ impl WorkerClient for WorkerClientBag {
) -> Result<RespondActivityTaskCompletedResponse> {
Ok(self
.cloned_client()
.respond_activity_task_completed(RespondActivityTaskCompletedRequest {
task_token: task_token.0,
result,
identity: self.identity.clone(),
namespace: self.namespace.clone(),
worker_version: self.worker_version_stamp(),
})
.respond_activity_task_completed(
#[allow(deprecated)] // want to list all fields explicitly
RespondActivityTaskCompletedRequest {
task_token: task_token.0,
result,
identity: self.identity.clone(),
namespace: self.namespace.clone(),
worker_version: self.worker_version_stamp(),
// TODO: https://github.com/temporalio/sdk-core/issues/866
deployment: None,
},
)
.await?
.into_inner())
}
Expand Down Expand Up @@ -336,13 +349,18 @@ impl WorkerClient for WorkerClientBag {
) -> Result<RespondActivityTaskCanceledResponse> {
Ok(self
.cloned_client()
.respond_activity_task_canceled(RespondActivityTaskCanceledRequest {
task_token: task_token.0,
details,
identity: self.identity.clone(),
namespace: self.namespace.clone(),
worker_version: self.worker_version_stamp(),
})
.respond_activity_task_canceled(
#[allow(deprecated)] // want to list all fields explicitly
RespondActivityTaskCanceledRequest {
task_token: task_token.0,
details,
identity: self.identity.clone(),
namespace: self.namespace.clone(),
worker_version: self.worker_version_stamp(),
// TODO: https://github.com/temporalio/sdk-core/issues/866
deployment: None,
},
)
.await?
.into_inner())
}
Expand All @@ -354,15 +372,20 @@ impl WorkerClient for WorkerClientBag {
) -> Result<RespondActivityTaskFailedResponse> {
Ok(self
.cloned_client()
.respond_activity_task_failed(RespondActivityTaskFailedRequest {
task_token: task_token.0,
failure,
identity: self.identity.clone(),
namespace: self.namespace.clone(),
// TODO: Implement - https://github.com/temporalio/sdk-core/issues/293
last_heartbeat_details: None,
worker_version: self.worker_version_stamp(),
})
.respond_activity_task_failed(
#[allow(deprecated)] // want to list all fields explicitly
RespondActivityTaskFailedRequest {
task_token: task_token.0,
failure,
identity: self.identity.clone(),
namespace: self.namespace.clone(),
// TODO: Implement - https://github.com/temporalio/sdk-core/issues/293
last_heartbeat_details: None,
worker_version: self.worker_version_stamp(),
// TODO: https://github.com/temporalio/sdk-core/issues/866
deployment: None,
},
)
.await?
.into_inner())
}
Expand All @@ -373,6 +396,7 @@ impl WorkerClient for WorkerClientBag {
cause: WorkflowTaskFailedCause,
failure: Option<Failure>,
) -> Result<RespondWorkflowTaskFailedResponse> {
#[allow(deprecated)] // want to list all fields explicitly
let request = RespondWorkflowTaskFailedRequest {
task_token: task_token.0,
cause: cause as i32,
Expand All @@ -382,6 +406,8 @@ impl WorkerClient for WorkerClientBag {
namespace: self.namespace.clone(),
messages: vec![],
worker_version: self.worker_version_stamp(),
// TODO: https://github.com/temporalio/sdk-core/issues/866
deployment: None,
};
Ok(self
.cloned_client()
Expand Down Expand Up @@ -442,6 +468,8 @@ impl WorkerClient for WorkerClientBag {
query_result,
error_message,
namespace: self.namespace.clone(),
// TODO: https://github.com/temporalio/sdk-core/issues/867
failure: None,
})
.await?
.into_inner())
Expand Down
Loading

0 comments on commit fad4db3

Please sign in to comment.