Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Add additional logs for state transitions #989

Merged
merged 4 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 69 additions & 3 deletions coordinator/src/indexer_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@ pub enum ProvisionedState {
Failed,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub enum OldLifecycleState {
Stopping,
Stopped,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct OldIndexerState {
pub account_id: AccountId,
pub function_name: String,
pub block_stream_synced_at: Option<u64>,
pub enabled: bool,
pub provisioned_state: ProvisionedState,
pub lifecycle_state: OldLifecycleState,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
Expand Down Expand Up @@ -75,15 +80,23 @@ impl IndexerStateManagerImpl {
tracing::info!("Migrating {}", raw_state);

let old_state: OldIndexerState = serde_json::from_str(&raw_state)?;
let migrated_lifecycle_state =
if old_state.lifecycle_state == OldLifecycleState::Stopping {
LifecycleState::Suspending
} else if old_state.lifecycle_state == OldLifecycleState::Stopped {
LifecycleState::Suspended
} else {
tracing::warn!("Unknown lifecycle state: {:?}", old_state.lifecycle_state);
continue;
};

let state = IndexerState {
account_id: old_state.account_id,
function_name: old_state.function_name,
block_stream_synced_at: old_state.block_stream_synced_at,
enabled: old_state.enabled,
lifecycle_state: LifecycleState::Running,
lifecycle_state: migrated_lifecycle_state,
};

self.redis_client
.set(state.get_state_key(), serde_json::to_string(&state)?)
.await?;
Expand Down Expand Up @@ -182,6 +195,59 @@ mod tests {
use mockall::predicate;
use registry_types::{Rule, StartBlock, Status};

#[tokio::test]
async fn migrate_state() {
let mut mock_redis_client = RedisClient::default();
let valid_state = serde_json::json!({ "account_id": "morgs.near", "function_name": "test_valid_1", "block_stream_synced_at": 200, "enabled": true, "lifecycle_state": "Initializing" }).to_string();
let valid_state_two = serde_json::json!({ "account_id": "morgs.near", "function_name": "test_valid_2", "block_stream_synced_at": 200, "enabled": true, "lifecycle_state": "Running" }).to_string();
let state_to_migrate_stopping = serde_json::json!({ "account_id": "morgs.near", "function_name": "test_migrate_stopping", "block_stream_synced_at": 200, "enabled": true, "lifecycle_state": "Stopping" }).to_string();
let state_to_migrate_stopped = serde_json::json!({ "account_id": "morgs.near", "function_name": "test_migrate_stopped", "block_stream_synced_at": 200, "enabled": true, "lifecycle_state": "Stopped" }).to_string();
let migrated_suspending = IndexerState {
account_id: "morgs.near".parse().unwrap(),
function_name: "test_migrate_stopping".to_string(),
block_stream_synced_at: Some(200),
enabled: true,
lifecycle_state: LifecycleState::Suspending,
};
let migrated_suspended = IndexerState {
account_id: "morgs.near".parse().unwrap(),
function_name: "test_migrate_stopped".to_string(),
block_stream_synced_at: Some(200),
enabled: true,
lifecycle_state: LifecycleState::Suspended,
};
mock_redis_client
.expect_list_indexer_states()
.returning(move || {
Ok(vec![
valid_state.clone(),
valid_state_two.clone(),
state_to_migrate_stopping.clone(),
state_to_migrate_stopped.clone(),
])
})
.once();
mock_redis_client
.expect_set::<String, String>()
.with(
predicate::eq(migrated_suspending.get_state_key()),
predicate::eq(serde_json::to_string(&migrated_suspending).unwrap()),
)
.returning(|_, _| Ok(()))
.once();
mock_redis_client
.expect_set::<String, String>()
.with(
predicate::eq(migrated_suspended.get_state_key()),
predicate::eq(serde_json::to_string(&migrated_suspended).unwrap()),
)
.returning(|_, _| Ok(()))
.once();

let indexer_manager = IndexerStateManagerImpl::new(mock_redis_client);
let _ = indexer_manager.migrate().await;
}

#[tokio::test]
async fn list_indexer_states() {
let mut mock_redis_client = RedisClient::default();
Expand Down
44 changes: 24 additions & 20 deletions coordinator/src/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,22 @@ pub enum LifecycleState {
/// they are running the latest version of the Indexer.
///
/// Transitions:
/// - `Stopping` if suspended
/// - `Suspending` if suspended
/// - `Running` if Block Stream or Executor fails to synchronise, essentially triggering a
/// retry
/// - `Running` on success
Running,
/// Indexer is being stopped, Block Stream and Executors are being stopped.
/// Indexer is being suspended, Block Stream and Executors are being stopped.
///
/// Transitions:
/// - `Stopping` on failure, triggering a retry
/// - `Stopped` on success
Stopping,
/// Indexer is stopped, Block Stream and Executors are not running.
/// - `Suspending` on failure, triggering a retry
/// - `Suspended` on success
Suspending,
/// Indexer is suspended, Block Stream and Executors are not running.
///
/// Transitions:
/// - `Running` if unsuspended
Stopped,
Suspended,
/// Indexer is in a bad state, currently requires manual intervention, but should eventually
/// self heal. This is a dead-end state
///
Expand Down Expand Up @@ -103,6 +103,7 @@ impl<'a> LifecycleManager<'a> {
.await
.is_err()
{
tracing::warn!("Failed to provision data layer");
return LifecycleState::Repairing;
}

Expand All @@ -120,7 +121,7 @@ impl<'a> LifecycleManager<'a> {
}

if !state.enabled {
return LifecycleState::Stopping;
return LifecycleState::Suspending;
}

if let Err(error) = self
Expand All @@ -129,23 +130,21 @@ impl<'a> LifecycleManager<'a> {
.await
{
warn!(?error, "Failed to synchronise block stream, retrying...");

return LifecycleState::Running;
}

state.block_stream_synced_at = Some(config.get_registry_version());

if let Err(error) = self.executors_handler.synchronise(config).await {
warn!(?error, "Failed to synchronise executor, retrying...");

return LifecycleState::Running;
}

LifecycleState::Running
}

#[tracing::instrument(name = "stopping", skip_all)]
async fn handle_stopping(&self, config: &IndexerConfig) -> LifecycleState {
#[tracing::instrument(name = "suspending", skip_all)]
async fn handle_suspending(&self, config: &IndexerConfig) -> LifecycleState {
if config.is_deleted() {
return LifecycleState::Deleting;
}
Expand All @@ -156,7 +155,7 @@ impl<'a> LifecycleManager<'a> {
.await
{
warn!(?error, "Failed to stop block stream, retrying...");
return LifecycleState::Stopping;
return LifecycleState::Suspending;
}

if let Err(error) = self
Expand All @@ -165,25 +164,30 @@ impl<'a> LifecycleManager<'a> {
.await
{
warn!(?error, "Failed to stop executor, retrying...");
return LifecycleState::Stopping;
return LifecycleState::Suspending;
}

LifecycleState::Stopped
LifecycleState::Suspended
}

#[tracing::instrument(name = "stopped", skip_all)]
async fn handle_stopped(&self, config: &IndexerConfig, state: &IndexerState) -> LifecycleState {
#[tracing::instrument(name = "suspended", skip_all)]
async fn handle_suspended(
&self,
config: &IndexerConfig,
state: &IndexerState,
) -> LifecycleState {
if config.is_deleted() {
return LifecycleState::Deleting;
}

// TODO Transistion to `Running` on config update

if state.enabled {
tracing::debug!("Suspended indexer was reactivated");
return LifecycleState::Running;
}

LifecycleState::Stopped
LifecycleState::Suspended
}

#[tracing::instrument(name = "repairing", skip_all)]
Expand Down Expand Up @@ -299,8 +303,8 @@ impl<'a> LifecycleManager<'a> {
let desired_lifecycle_state = match state.lifecycle_state {
LifecycleState::Initializing => self.handle_initializing(&config, &state).await,
LifecycleState::Running => self.handle_running(&config, &mut state).await,
LifecycleState::Stopping => self.handle_stopping(&config).await,
LifecycleState::Stopped => self.handle_stopped(&config, &state).await,
LifecycleState::Suspending => self.handle_suspending(&config).await,
LifecycleState::Suspended => self.handle_suspended(&config, &state).await,
LifecycleState::Repairing => self.handle_repairing(&config, &state).await,
LifecycleState::Deleting => self.handle_deleting(&state).await,
LifecycleState::Deleted => LifecycleState::Deleted,
Expand Down
Loading