Skip to content

Commit

Permalink
Merge pull request #371 from metaplex-foundation/feature/MTG-1110_add…
Browse files Browse the repository at this point in the history
…-request-time-out-for-pg-db-query

MTG-1110 Add request timeout for PG DB query
  • Loading branch information
andrii-kl authored Jan 17, 2025
2 parents 86eb15a + 752fc40 commit d0fcbf6
Show file tree
Hide file tree
Showing 17 changed files with 92 additions and 23 deletions.
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
- Unique consumer ID for each worker. [MTG-1155]
- Unique worker name to simplify debugging. [MTG-1155]
- Ability to configure workers batch size via env variables. (account_processor_buffer_size tx_processor_buffer_size) [MTG-1155]
- Configurable timeout for the PG database queries [MTG-1110](https://github.com/metaplex-foundation/aura/pull/371)
-






### Changed
- Default number of Redis message reads retries to the (number of workers + 1) [MTG-1155]
-
Expand All @@ -30,5 +34,9 @@
* tx_processor_buffer_size
* redis_accounts_parsing_workers
* redis_transactions_parsing_workers
-
- [MTG-1110] Configur PG max query statement timeout or test default configuration.
* INGESTER_PG_MAX_QUERY_TIMEOUT_SECS (default: 120sec)
* SYNCHRONIZER_PG_MAX_QUERY_TIMEOUT_SECS (default: 24h)
* MIGRATOR_PG_MAX_QUERY_TIMEOUT_SECS (default: 24h)
* API_PG_MAX_QUERY_TIMEOUT_SECS (default: 120sec)

10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,14 @@ The project's structure is a reflection of the following clean architecture prin
The API specification is compatible with the standard DAS specification here https://github.com/metaplex-foundation/api-specifications

### Developing and running


#### Run Integration Tests

```bash
cargo t -F integration_tests
```



Full documentation and contribution guidelines coming soon…
2 changes: 2 additions & 0 deletions consistency_check/src/bin/jsons/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ async fn change_jsons_status(postgre_creds: String, file_path: String) {
PG_MIN_CONNECTIONS,
PG_MIGRATIONS_PATH,
None,
None,
)
.await
.unwrap(),
Expand Down Expand Up @@ -152,6 +153,7 @@ async fn check_jsons_consistency(rocks_path: String, postgre_creds: String, batc
PG_MIN_CONNECTIONS,
PG_MIGRATIONS_PATH,
None,
None,
)
.await
.unwrap(),
Expand Down
1 change: 1 addition & 0 deletions integration_tests/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ impl TestSetup {
MIN_PG_CONNECTIONS,
POSTGRE_MIGRATIONS_PATH,
Some(PathBuf::from_str("./dump").unwrap()),
None,
)
.await
.unwrap(),
Expand Down
1 change: 1 addition & 0 deletions integrity_verification/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ async fn main() {
500,
config.base_dump_path.clone(),
metrics.red_metrics,
None,
)
.await
.unwrap(),
Expand Down
2 changes: 2 additions & 0 deletions interface/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ pub enum StorageError {
CannotServiceRequest,
#[error("Unknown error: {0}")]
Unknown(String),
#[error("Request execution time exceeded the limit.")]
QueryTimedOut,
}

impl From<Error> for UsecaseError {
Expand Down
2 changes: 1 addition & 1 deletion nft_ingester/src/api/api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ where
self.metrics.inc_requests(label);
let latency_timer = Instant::now();

self.pg_client.check_health().await.map_err(|_| DasApiError::InternalDdError)?;
self.pg_client.check_health().await.map_err(|_| DasApiError::InternalDbError)?;

self.metrics.set_latency(label, latency_timer.elapsed().as_millis() as f64);

Expand Down
22 changes: 15 additions & 7 deletions nft_ingester/src/api/dapi/search_assets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,14 @@ async fn fetch_assets<
let keys = index_client
.get_asset_pubkeys_filtered(filter, &sort_by.into(), limit, page, before, after, &options)
.await
.map_err(|e| StorageError::Common(e.to_string()))?;
.map_err(|e| {
if e.to_string().contains("statement timeout") {
StorageError::QueryTimedOut
} else {
StorageError::Common(e.to_string())
}
})?;

let asset_ids = keys
.iter()
.filter_map(|k| Pubkey::try_from(k.pubkey.clone()).ok())
Expand Down Expand Up @@ -199,12 +206,13 @@ async fn fetch_assets<
};
let mut grand_total = None;
if options.show_grand_total {
grand_total = Some(
index_client
.get_grand_total(filter, &options)
.await
.map_err(|e| StorageError::Common(e.to_string()))?,
)
grand_total = Some(index_client.get_grand_total(filter, &options).await.map_err(|e| {
if e.to_string().contains("statement timeout") {
StorageError::QueryTimedOut
} else {
StorageError::Common(e.to_string())
}
})?)
}

let resp = AssetList {
Expand Down
11 changes: 10 additions & 1 deletion nft_ingester/src/api/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use thiserror::Error;
use tracing::error;

const STANDARD_ERROR_CODE: i64 = -32000;
const QUERY_TIME_OUT_CODE: i64 = -32800;
pub const CANNOT_SERVICE_REQUEST_ERROR_CODE: i64 = -32050;

#[derive(Error, Debug)]
Expand Down Expand Up @@ -38,11 +39,13 @@ pub enum DasApiError {
#[error("Page number is too big. Up to {0} pages are supported with this kind of pagination. Please use a different pagination(before/after/cursor).")]
PageTooBig(usize),
#[error("Internal DB error")]
InternalDdError,
InternalDbError,
#[error("CannotServiceRequest")]
CannotServiceRequest,
#[error("MissingOwnerAddress")]
MissingOwnerAddress,
#[error("Request execution time exceeded the limit.")]
QueryTimedOut,
}

impl From<DasApiError> for jsonrpc_core::Error {
Expand Down Expand Up @@ -95,6 +98,11 @@ impl From<DasApiError> for jsonrpc_core::Error {
.to_string(),
data: None,
},
DasApiError::QueryTimedOut => jsonrpc_core::Error {
code: ErrorCode::ServerError(QUERY_TIME_OUT_CODE),
message: "Request execution time exceeded the limit.".to_string(),
data: None,
},
DasApiError::CannotServiceRequest => cannot_service_request_error(),
_ => jsonrpc_core::Error::new(ErrorCode::InternalError),
}
Expand All @@ -115,6 +123,7 @@ impl From<StorageError> for DasApiError {
fn from(value: StorageError) -> Self {
match value {
StorageError::CannotServiceRequest => Self::CannotServiceRequest,
StorageError::QueryTimedOut => Self::QueryTimedOut,
e => Self::RocksError(e.to_string()),
}
}
Expand Down
1 change: 1 addition & 0 deletions nft_ingester/src/bin/api/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub async fn main() -> Result<(), IngesterError> {
args.pg_max_db_connections,
None,
red_metrics.clone(),
Some(args.pg_max_query_statement_timeout_secs),
)
.await?;
let pg_client = Arc::new(pg_client);
Expand Down
1 change: 1 addition & 0 deletions nft_ingester/src/bin/ingester/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ pub async fn main() -> Result<(), IngesterError> {
DEFAULT_MIN_POSTGRES_CONNECTIONS,
PG_MIGRATIONS_PATH,
None,
Some(args.pg_max_query_statement_timeout_secs),
)
.await?,
);
Expand Down
10 changes: 4 additions & 6 deletions nft_ingester/src/bin/migrator/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ pub async fn main() -> Result<(), IngesterError> {
args.pg_max_db_connections,
None,
metrics_state.red_metrics.clone(),
Some(args.pg_max_query_statement_timeout_secs),
)
.await
.unwrap(),
.await?,
);

start_metrics(metrics_state.registry, args.metrics_port).await;
Expand All @@ -60,17 +60,15 @@ pub async fn main() -> Result<(), IngesterError> {
mutexed_tasks.clone(),
red_metrics.clone(),
MigrationState::Last,
)
.unwrap();
)?;

let target_storage = Arc::new(storage);
let source_storage = Storage::open(
args.rocks_json_source_db.clone(),
mutexed_tasks.clone(),
red_metrics.clone(),
MigrationState::Last,
)
.unwrap();
)?;
let source_storage = Arc::new(source_storage);

let json_migrator = JsonMigrator::new(
Expand Down
1 change: 1 addition & 0 deletions nft_ingester/src/bin/synchronizer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub async fn main() -> Result<(), IngesterError> {
DEFAULT_MIN_POSTGRES_CONNECTIONS,
PG_MIGRATIONS_PATH,
Some(PathBuf::from(args.rocks_dump_path.clone())),
Some(args.pg_max_query_statement_timeout_secs),
)
.await?,
);
Expand Down
10 changes: 10 additions & 0 deletions nft_ingester/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ pub struct IngesterClapArgs {

#[clap(long, env, default_value = "100")]
pub pg_max_db_connections: u32,
#[clap(long, env = "INGESTER_PG_MAX_QUERY_TIMEOUT_SECS", default_value = "120")]
pub pg_max_query_statement_timeout_secs: u32,

#[clap(short('r'), long, env, help="example: {redis_connection_str=\"redis://127.0.0.1:6379/0\"}", value_parser = parse_json_to_dict)]
pub redis_connection_config: Dict,
Expand Down Expand Up @@ -254,6 +256,9 @@ pub struct SynchronizerClapArgs {
pub pg_database_url: String,
#[clap(long, env, default_value = "100")]
pub pg_max_db_connections: u32,
// 24 hour = 86400 secs
#[clap(long, env = "SYNCHRONIZER_PG_MAX_QUERY_TIMEOUT_SECS", default_value = "86400")]
pub pg_max_query_statement_timeout_secs: u32,

#[clap(
short('m'),
Expand Down Expand Up @@ -333,6 +338,9 @@ pub struct MigratorClapArgs {
pub pg_min_db_connections: u32,
#[clap(long, env, default_value = "250")]
pub pg_max_db_connections: u32,
// 24 hour = 86400 secs
#[clap(long, env = "MIGRATOR_PG_MAX_QUERY_TIMEOUT_SECS", default_value = "86400")]
pub pg_max_query_statement_timeout_secs: u32,

#[clap(
long,
Expand Down Expand Up @@ -362,6 +370,8 @@ pub struct ApiClapArgs {
pub pg_min_db_connections: u32,
#[clap(long, env, default_value = "250")]
pub pg_max_db_connections: u32,
#[clap(long, env = "API_PG_MAX_QUERY_TIMEOUT_SECS", default_value = "120")]
pub pg_max_query_statement_timeout_secs: u32,

#[clap(
short('m'),
Expand Down
16 changes: 11 additions & 5 deletions nft_ingester/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,18 @@ pub async fn init_index_storage_with_migration(
min_pg_connections: u32,
pg_migrations_path: &str,
base_dump_path: Option<PathBuf>,
pg_max_query_statement_timeout_secs: Option<u32>,
) -> Result<PgClient, IngesterError> {
let pg_client =
PgClient::new(url, min_pg_connections, max_pg_connections, base_dump_path, red_metrics)
.await
.map_err(|e| e.to_string())
.map_err(IngesterError::SqlxError)?;
let pg_client = PgClient::new(
url,
min_pg_connections,
max_pg_connections,
base_dump_path,
red_metrics,
pg_max_query_statement_timeout_secs,
)
.await
.map_err(|e| IngesterError::SqlxError(e.to_string()))?;

pg_client.run_migration(pg_migrations_path).await.map_err(IngesterError::SqlxError)?;

Expand Down
13 changes: 11 additions & 2 deletions postgre-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,23 @@ pub struct PgClient {
}

impl PgClient {
/// If `max_query_statement_timeout_secs` is `None`, the PostgresSQL database does not have any limitation on the query statement timeout. (if this was not specified via the db URL parameter)
pub async fn new(
url: &str,
min_connections: u32,
max_connections: u32,
base_dump_path: Option<PathBuf>,
metrics: Arc<RequestErrorDurationMetrics>,
max_query_statement_timeout_secs: Option<u32>,
) -> Result<Self, Error> {
let mut options: PgConnectOptions = url.parse().unwrap();
let mut options: PgConnectOptions = url.parse::<PgConnectOptions>()?;
if !url.contains("statement_timeout") && max_query_statement_timeout_secs.is_some() {
options = options.options([(
"statement_timeout",
&format!("{}s", max_query_statement_timeout_secs.unwrap()),
)]);
}

options.log_statements(LevelFilter::Off);
options.log_slow_statements(LevelFilter::Off, Duration::from_secs(100));

Expand Down Expand Up @@ -214,7 +223,7 @@ impl PgClient {
self.recreate_asset_constraints(&mut transaction).await?;

transaction.commit().await?;
// those await above will not always rollback the tx
// those await above will not always roll back the tx
// take this into account if we use this function somewhere else except the tests

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions rocks-db/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub enum StorageError {
NotFound(String),
CannotServiceRequest,
InvalidMigrationVersion(u64),
QueryTimedOut,
}

impl std::fmt::Display for StorageError {
Expand All @@ -77,6 +78,7 @@ impl From<StorageError> for interface::error::StorageError {
InterfaceStorageError::Common(format!("InvalidMigrationVersion: {v}"))
},
StorageError::NotFound(s) => InterfaceStorageError::NotFound(s),
StorageError::QueryTimedOut => InterfaceStorageError::QueryTimedOut,
}
}
}

0 comments on commit d0fcbf6

Please sign in to comment.