Skip to content

Commit

Permalink
Write sdk name & version to WFT completes (#864)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource authored Jan 22, 2025
1 parent f041893 commit b94b2fc
Show file tree
Hide file tree
Showing 12 changed files with 146 additions and 37 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ src/protos/*.rs
/bindings/
/core/machine_coverage/
/.cloud_certs/
.aider*
2 changes: 2 additions & 0 deletions core/src/core_tests/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ async fn worker_shutdown_api(#[case] use_cache: bool, #[case] api_success: bool)
mock.expect_workers()
.returning(|| DEFAULT_WORKERS_REGISTRY.clone());
mock.expect_is_mock().returning(|| true);
mock.expect_sdk_name_and_version()
.returning(|| ("test-core".to_string(), "0.0.0".to_string()));
if use_cache {
if api_success {
mock.expect_shutdown_worker()
Expand Down
115 changes: 103 additions & 12 deletions core/src/internal_flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,44 @@ pub(crate) enum InternalFlags {
lang: BTreeSet<u32>,
core_since_last_complete: HashSet<CoreInternalFlags>,
lang_since_last_complete: HashSet<u32>,
last_sdk_name: String,
last_sdk_version: String,
sdk_name: String,
sdk_version: String,
},
Disabled,
}

impl InternalFlags {
pub(crate) fn new(server_capabilities: &get_system_info_response::Capabilities) -> Self {
pub(crate) fn new(
server_capabilities: &get_system_info_response::Capabilities,
sdk_name: String,
sdk_version: String,
) -> Self {
match server_capabilities.sdk_metadata {
true => Self::Enabled {
core: Default::default(),
lang: Default::default(),
core_since_last_complete: Default::default(),
lang_since_last_complete: Default::default(),
last_sdk_name: "".to_string(),
last_sdk_version: "".to_string(),
sdk_name,
sdk_version,
},
false => Self::Disabled,
}
}

pub(crate) fn add_from_complete(&mut self, e: &WorkflowTaskCompletedEventAttributes) {
if let Self::Enabled { core, lang, .. } = self {
if let Self::Enabled {
core,
lang,
last_sdk_name,
last_sdk_version,
..
} = self
{
if let Some(metadata) = e.sdk_metadata.as_ref() {
core.extend(
metadata
Expand All @@ -74,6 +93,12 @@ impl InternalFlags {
.map(|u| CoreInternalFlags::from_u32(*u)),
);
lang.extend(metadata.lang_used_flags.iter());
if !metadata.sdk_name.is_empty() {
*last_sdk_name = metadata.sdk_name.clone();
}
if !metadata.sdk_version.is_empty() {
*last_sdk_version = metadata.sdk_version.clone();
}
}
}
}
Expand Down Expand Up @@ -133,6 +158,10 @@ impl InternalFlags {
lang_since_last_complete,
core,
lang,
last_sdk_name,
last_sdk_version,
sdk_name,
sdk_version,
} => {
let core_newly_used: Vec<_> = core_since_last_complete
.iter()
Expand All @@ -146,11 +175,21 @@ impl InternalFlags {
.collect();
core.extend(core_since_last_complete.iter());
lang.extend(lang_since_last_complete.iter());
let sdk_name = if last_sdk_name != sdk_name {
sdk_name.clone()
} else {
"".to_string()
};
let sdk_version = if last_sdk_version != sdk_version {
sdk_version.clone()
} else {
"".to_string()
};
WorkflowTaskCompletedMetadata {
core_used_flags: core_newly_used,
lang_used_flags: lang_newly_used,
sdk_name: "".to_string(),
sdk_version: "".to_string(),
sdk_name,
sdk_version,
}
}
Self::Disabled => WorkflowTaskCompletedMetadata::default(),
Expand Down Expand Up @@ -186,9 +225,19 @@ mod tests {
use super::*;
use temporal_sdk_core_protos::temporal::api::workflowservice::v1::get_system_info_response::Capabilities;

impl Default for InternalFlags {
fn default() -> Self {
Self::Disabled
}
}

#[test]
fn disabled_in_capabilities_disables() {
let mut f = InternalFlags::new(&Capabilities::default());
let mut f = InternalFlags::new(
&Capabilities::default(),
"name".to_string(),
"ver".to_string(),
);
f.add_lang_used([1]);
f.add_from_complete(&WorkflowTaskCompletedEventAttributes {
sdk_metadata: Some(WorkflowTaskCompletedMetadata {
Expand All @@ -214,23 +263,29 @@ mod tests {
}

#[test]
fn only_writes_new_flags() {
let mut f = InternalFlags::new(&Capabilities {
sdk_metadata: true,
..Default::default()
});
fn only_writes_new_flags_and_sdk_info() {
let mut f = InternalFlags::new(
&Capabilities {
sdk_metadata: true,
..Default::default()
},
"name".to_string(),
"ver".to_string(),
);
f.add_lang_used([1]);
f.try_use(CoreInternalFlags::IdAndTypeDeterminismChecks, true);
let gathered = f.gather_for_wft_complete();
assert_matches!(gathered.core_used_flags.as_slice(), &[1]);
assert_matches!(gathered.lang_used_flags.as_slice(), &[1]);
assert_matches!(gathered.sdk_name.as_str(), "name");
assert_matches!(gathered.sdk_version.as_str(), "ver");

f.add_from_complete(&WorkflowTaskCompletedEventAttributes {
sdk_metadata: Some(WorkflowTaskCompletedMetadata {
core_used_flags: vec![2],
lang_used_flags: vec![2],
sdk_name: "".to_string(),
sdk_version: "".to_string(),
sdk_name: "name".to_string(),
sdk_version: "ver".to_string(),
}),
..Default::default()
});
Expand All @@ -239,5 +294,41 @@ mod tests {
let gathered = f.gather_for_wft_complete();
assert_matches!(gathered.core_used_flags.as_slice(), &[]);
assert_matches!(gathered.lang_used_flags.as_slice(), &[]);
assert!(gathered.sdk_name.is_empty());
assert!(gathered.sdk_version.is_empty());

f.add_from_complete(&WorkflowTaskCompletedEventAttributes {
sdk_metadata: Some(WorkflowTaskCompletedMetadata::default()),
..Default::default()
});
let gathered = f.gather_for_wft_complete();
assert_matches!(gathered.core_used_flags.as_slice(), &[]);
assert_matches!(gathered.lang_used_flags.as_slice(), &[]);
assert!(gathered.sdk_name.is_empty());
assert!(gathered.sdk_version.is_empty());

f.add_from_complete(&WorkflowTaskCompletedEventAttributes {
sdk_metadata: Some(WorkflowTaskCompletedMetadata {
sdk_name: "other sdk".to_string(),
sdk_version: "other ver".to_string(),
..Default::default()
}),
..Default::default()
});
let gathered = f.gather_for_wft_complete();
assert_matches!(gathered.sdk_name.as_str(), "name");
assert_matches!(gathered.sdk_version.as_str(), "ver");

f.add_from_complete(&WorkflowTaskCompletedEventAttributes {
sdk_metadata: Some(WorkflowTaskCompletedMetadata {
sdk_name: "name".to_string(),
sdk_version: "ver2".to_string(),
..Default::default()
}),
..Default::default()
});
let gathered = f.gather_for_wft_complete();
assert!(gathered.sdk_name.is_empty());
assert_matches!(gathered.sdk_version.as_str(), "ver");
}
}
8 changes: 8 additions & 0 deletions core/src/worker/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ pub(crate) trait WorkerClient: Sync + Send {
fn capabilities(&self) -> Option<Capabilities>;
fn workers(&self) -> Arc<SlotManager>;
fn is_mock(&self) -> bool;
/// Return (sdk_name, sdk_version) from the underlying client configuration
fn sdk_name_and_version(&self) -> (String, String);
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -486,6 +488,12 @@ impl WorkerClient for WorkerClientBag {
fn is_mock(&self) -> bool {
false
}

fn sdk_name_and_version(&self) -> (String, String) {
let lock = self.replaceable_client.read();
let opts = lock.get_client().inner().options();
(opts.client_name.clone(), opts.client_version.clone())
}
}

/// A version of [RespondWorkflowTaskCompletedRequest] that will finish being filled out by the
Expand Down
5 changes: 5 additions & 0 deletions core/src/worker/client/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub(crate) fn mock_workflow_client() -> MockWorkerClient {
r.expect_is_mock().returning(|| true);
r.expect_shutdown_worker()
.returning(|_| Ok(ShutdownWorkerResponse {}));
r.expect_sdk_name_and_version()
.returning(|| ("test-core".to_string(), "0.0.0".to_string()));
r
}

Expand All @@ -42,6 +44,8 @@ pub(crate) fn mock_manual_workflow_client() -> MockManualWorkerClient {
r.expect_workers()
.returning(|| DEFAULT_WORKERS_REGISTRY.clone());
r.expect_is_mock().returning(|| true);
r.expect_sdk_name_and_version()
.returning(|| ("test-core".to_string(), "0.0.0".to_string()));
r
}

Expand Down Expand Up @@ -146,5 +150,6 @@ mockall::mock! {
fn capabilities(&self) -> Option<Capabilities>;
fn workers(&self) -> Arc<SlotManager>;
fn is_mock(&self) -> bool;
fn sdk_name_and_version(&self) -> (String, String);
}
}
28 changes: 8 additions & 20 deletions core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ use temporal_sdk_core_protos::{
temporal::api::{
enums::v1::TaskQueueKind,
taskqueue::v1::{StickyExecutionAttributes, TaskQueue},
workflowservice::v1::get_system_info_response,
},
TaskToken,
};
Expand Down Expand Up @@ -508,16 +507,19 @@ impl Worker {
external_wft_tx,
);
let worker_key = Mutex::new(client.workers().register(Box::new(provider)));
let sdk_name_and_ver = client.sdk_name_and_version();
Self {
worker_key,
client: client.clone(),
workflows: Workflows::new(
build_wf_basics(
config.clone(),
WorkflowBasics {
worker_config: Arc::new(config.clone()),
shutdown_token: shutdown_token.child_token(),
metrics,
shutdown_token.child_token(),
client.capabilities().unwrap_or_default(),
),
server_capabilities: client.capabilities().unwrap_or_default(),
sdk_name: sdk_name_and_ver.0,
sdk_version: sdk_name_and_ver.1,
},
sticky_queue_name.map(|sq| StickyExecutionAttributes {
worker_task_queue: Some(TaskQueue {
name: sq,
Expand Down Expand Up @@ -864,20 +866,6 @@ pub(crate) struct PostActivateHookData<'a> {
pub(crate) replaying: bool,
}

fn build_wf_basics(
config: WorkerConfig,
metrics: MetricsContext,
shutdown_token: CancellationToken,
server_capabilities: get_system_info_response::Capabilities,
) -> WorkflowBasics {
WorkflowBasics {
worker_config: Arc::new(config),
shutdown_token,
metrics,
server_capabilities,
}
}

pub(crate) enum TaskPollers {
Real,
#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions core/src/worker/workflow/machines/activity_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@ mod test {
attrs: Default::default(),
cancellation_type: Default::default(),
cancelled_before_sent: false,
internal_flags: Rc::new(RefCell::new(InternalFlags::new(&Default::default()))),
internal_flags: Rc::new(RefCell::new(InternalFlags::default())),
},
);
let cmds = s.cancel().unwrap();
Expand All @@ -894,7 +894,7 @@ mod test {
cancellation_type: ActivityCancellationType::Abandon.into(),
..Default::default()
},
Rc::new(RefCell::new(InternalFlags::new(&Default::default()))),
Rc::new(RefCell::new(InternalFlags::default())),
true,
);
let mut s = if let Machines::ActivityMachine(am) = s.machine {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,7 @@ mod test {
workflow_type: "".to_string(),
cancelled_before_sent: false,
cancel_type: Default::default(),
internal_flags: Rc::new(RefCell::new(InternalFlags::new(&Default::default()))),
internal_flags: Rc::new(RefCell::new(InternalFlags::default())),
},
);
let cmds = s.cancel().unwrap();
Expand All @@ -1006,7 +1006,7 @@ mod test {
workflow_type: "".to_string(),
cancelled_before_sent: false,
cancel_type: ChildWorkflowCancellationType::Abandon,
internal_flags: Rc::new(RefCell::new(InternalFlags::new(&Default::default()))),
internal_flags: Rc::new(RefCell::new(InternalFlags::default())),
};
let state = Cancelled {
seen_cancelled_event: true,
Expand Down
6 changes: 5 additions & 1 deletion core/src/worker/workflow/machines/workflow_machines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,11 @@ where
impl WorkflowMachines {
pub(crate) fn new(basics: RunBasics, driven_wf: DrivenWorkflow) -> Self {
let replaying = basics.history.previous_wft_started_id > 0;
let mut observed_internal_flags = InternalFlags::new(basics.capabilities);
let mut observed_internal_flags = InternalFlags::new(
basics.capabilities,
basics.sdk_name.to_owned(),
basics.sdk_version.to_owned(),
);
// Peek ahead to determine used flags in the first WFT.
if let Some(attrs) = basics.history.peek_next_wft_completed(0) {
observed_internal_flags.add_from_complete(attrs);
Expand Down
4 changes: 4 additions & 0 deletions core/src/worker/workflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ pub(crate) struct WorkflowBasics {
pub(crate) shutdown_token: CancellationToken,
pub(crate) metrics: MetricsContext,
pub(crate) server_capabilities: get_system_info_response::Capabilities,
pub(crate) sdk_name: String,
pub(crate) sdk_version: String,
}

pub(crate) struct RunBasics<'a> {
Expand All @@ -141,6 +143,8 @@ pub(crate) struct RunBasics<'a> {
pub(crate) history: HistoryUpdate,
pub(crate) metrics: MetricsContext,
pub(crate) capabilities: &'a get_system_info_response::Capabilities,
pub(crate) sdk_name: &'a str,
pub(crate) sdk_version: &'a str,
}

impl Workflows {
Expand Down
Loading

0 comments on commit b94b2fc

Please sign in to comment.