Skip to content
This repository has been archived by the owner on Oct 25, 2024. It is now read-only.

Commit

Permalink
enhancement: forc index status: report indexer status and errors (#1412)
Browse files Browse the repository at this point in the history
* initial version

* update

* move status to its own table

* finishing touches

* more finishing touches

* clippy, fmt, and humantime formatting of created_at

* clippy

* unpack tuple

* update docs

* foreign key

* ensure status is deleted when indexer is removed
  • Loading branch information
lostman authored Oct 18, 2023
1 parent a9c3831 commit 7172ee2
Show file tree
Hide file tree
Showing 13 changed files with 358 additions and 75 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 17 additions & 2 deletions packages/fuel-indexer-api-server/src/uses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use axum::{
use fuel_crypto::{Message, Signature};
use fuel_indexer_database::{
queries,
types::{IndexerAsset, IndexerAssetType},
types::{IndexerAsset, IndexerAssetType, IndexerStatus, RegisteredIndexer},
IndexerConnectionPool,
};
use fuel_indexer_graphql::dynamic::{build_dynamic_schema, execute_query};
Expand Down Expand Up @@ -133,7 +133,7 @@ pub(crate) async fn indexer_status(

let mut conn = pool.acquire().await?;

let indexers: Vec<_> = {
let indexers: Vec<RegisteredIndexer> = {
let indexers = queries::all_registered_indexers(&mut conn).await?;
if claims.sub().is_empty() {
indexers
Expand All @@ -145,6 +145,21 @@ pub(crate) async fn indexer_status(
}
};

let statuses = queries::all_registered_indexer_statuses(&mut conn).await?;

let indexers: Vec<(RegisteredIndexer, IndexerStatus)> = indexers
.into_iter()
.map(|i| {
if let Some(status) =
statuses.get(&(i.namespace.clone(), i.identifier.clone()))
{
(i, status.clone())
} else {
(i, IndexerStatus::unknown())
}
})
.collect();

let json: serde_json::Value = serde_json::to_value(indexers)?;

Ok(Json(json!(json)))
Expand Down
58 changes: 58 additions & 0 deletions packages/fuel-indexer-database/database-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,64 @@ impl RegisteredIndexer {
}
}

#[derive(
Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, EnumString, strum::Display,
)]
pub enum IndexerStatusKind {
#[strum(serialize = "starting")]
Starting,
#[strum(serialize = "running")]
Running,
#[strum(serialize = "stopped")]
Stopped,
#[strum(serialize = "error")]
Error,
#[strum(serialize = "unknown")]
Unknown,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct IndexerStatus {
/// The current status of an indexer.
pub status_kind: IndexerStatusKind,
/// Additional status message. Might be empty.
pub status_message: String,
}

impl IndexerStatus {
pub fn starting() -> Self {
IndexerStatus {
status_kind: IndexerStatusKind::Starting,
status_message: "".to_string(),
}
}

pub fn running(status_message: String) -> Self {
IndexerStatus {
status_kind: IndexerStatusKind::Running,
status_message,
}
}
pub fn stopped(status_message: String) -> Self {
IndexerStatus {
status_kind: IndexerStatusKind::Stopped,
status_message,
}
}
pub fn error(status_message: String) -> Self {
IndexerStatus {
status_kind: IndexerStatusKind::Error,
status_message,
}
}
pub fn unknown() -> Self {
IndexerStatus {
status_kind: IndexerStatusKind::Unknown,
status_message: "".to_string(),
}
}
}

/// SQL database types used by indexers.
#[derive(Eq, PartialEq, Debug, Clone, Default)]
pub enum DbType {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS index_status CASCADE;
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE IF NOT EXISTS index_status (
indexer_id BIGSERIAL UNIQUE NOT NULL,
status TEXT NOT NULL,
status_message TEXT NOT NULL,
FOREIGN KEY (indexer_id) REFERENCES index_registry (id) ON DELETE CASCADE
);
58 changes: 58 additions & 0 deletions packages/fuel-indexer-database/postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use bigdecimal::ToPrimitive;
use fuel_indexer_database_types::*;
use fuel_indexer_lib::utils::sha256_digest;
use sqlx::{pool::PoolConnection, postgres::PgRow, types::JsonValue, Postgres, Row};
use std::collections::HashMap;
use std::str::FromStr;
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::info;
Expand Down Expand Up @@ -913,6 +914,8 @@ pub async fn put_many_to_many_record(
Ok(())
}

/// Create a database trigger on the indexer's indexmetadataentity table that
/// ensures no blocks can be missing.
pub async fn create_ensure_block_height_consecutive_trigger(
conn: &mut PoolConnection<Postgres>,
namespace: &str,
Expand Down Expand Up @@ -959,6 +962,8 @@ pub async fn create_ensure_block_height_consecutive_trigger(
Ok(())
}

/// When -allow-non-sequential-blocks is set, we need to remove the trigger from
/// indexer's indexmetadataentity table.
pub async fn remove_ensure_block_height_consecutive_trigger(
conn: &mut PoolConnection<Postgres>,
namespace: &str,
Expand All @@ -972,3 +977,56 @@ pub async fn remove_ensure_block_height_consecutive_trigger(

Ok(())
}

/// Set the status of a registered indexer to be displayed by `forc index status`.
pub async fn set_indexer_status(
conn: &mut PoolConnection<Postgres>,
namespace: &str,
identifier: &str,
status: IndexerStatus,
) -> sqlx::Result<()> {
let indexer_id = get_indexer_id(conn, namespace, identifier).await?;
sqlx::query(
"INSERT INTO index_status (indexer_id, status, status_message)
VALUES ($1, $2, $3)
ON CONFLICT (indexer_id) DO UPDATE
SET status = EXCLUDED.status, status_message = EXCLUDED.status_message;",
)
.bind(indexer_id)
.bind(status.status_kind.to_string())
.bind(status.status_message)
.execute(conn)
.await?;

Ok(())
}

/// Fetch the statuses of all registered indexers.
pub async fn all_registered_indexer_statuses(
conn: &mut PoolConnection<Postgres>,
) -> sqlx::Result<HashMap<(String, String), IndexerStatus>> {
let rows = sqlx::query(
"SELECT index_registry.namespace, index_registry.identifier, status, status_message
FROM index_status
INNER JOIN index_registry
ON index_status.indexer_id = index_registry.id;"
)
.fetch_all(conn)
.await?;

let mut result = HashMap::new();
for row in rows {
let namespace: String = row.get(0);
let identifier: String = row.get(1);
let status_kind =
IndexerStatusKind::from_str(row.get(2)).unwrap_or(IndexerStatusKind::Unknown);
let status_message: String = row.get(3);
let status = IndexerStatus {
status_kind,
status_message,
};
result.insert((namespace, identifier), status);
}

Ok(result)
}
31 changes: 31 additions & 0 deletions packages/fuel-indexer-database/src/queries.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;

use crate::{types::*, IndexerConnection};
use fuel_indexer_postgres as postgres;
use sqlx::types::{
Expand Down Expand Up @@ -410,6 +412,8 @@ pub async fn put_many_to_many_record(
}
}

/// Create a database trigger on the indexer's indexmetadataentity table that
/// ensures no blocks can be missing.
pub async fn create_ensure_block_height_consecutive_trigger(
conn: &mut IndexerConnection,
namespace: &str,
Expand All @@ -425,6 +429,8 @@ pub async fn create_ensure_block_height_consecutive_trigger(
}
}

/// When -allow-non-sequential-blocks is set, we need to remove the trigger from
/// indexer's indexmetadataentity table.
pub async fn remove_ensure_block_height_consecutive_trigger(
conn: &mut IndexerConnection,
namespace: &str,
Expand All @@ -439,3 +445,28 @@ pub async fn remove_ensure_block_height_consecutive_trigger(
}
}
}

/// Set the status of a registered indexer to be displayed by `forc index status`.
pub async fn set_indexer_status(
conn: &mut IndexerConnection,
namespace: &str,
identifier: &str,
status: IndexerStatus,
) -> sqlx::Result<()> {
match conn {
IndexerConnection::Postgres(ref mut c) => {
postgres::set_indexer_status(c, namespace, identifier, status).await
}
}
}

/// Return the statuses of all indexers registered with the service.
pub async fn all_registered_indexer_statuses(
conn: &mut IndexerConnection,
) -> sqlx::Result<HashMap<(String, String), IndexerStatus>> {
match conn {
IndexerConnection::Postgres(ref mut c) => {
postgres::all_registered_indexer_statuses(c).await
}
}
}
4 changes: 2 additions & 2 deletions packages/fuel-indexer/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl Database {
let table = match self.tables.get(&type_id) {
Some(t) => t,
None => {
return Err(IndexerError::Unknown(format!(
return Err(anyhow::anyhow!(
r#"TypeId({type_id}) not found in tables: {:?}.
Does the schema version in SchemaManager::new_schema match the schema version in Database::load_schema?
Expand All @@ -154,7 +154,7 @@ Do your WASM modules need to be rebuilt?
"#,
self.tables,
)));
).into());
}
};

Expand Down
Loading

0 comments on commit 7172ee2

Please sign in to comment.