Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MTG-1110 Add request timeout for PG DB query #371

Merged
merged 5 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
- Unique consumer ID for each worker. [MTG-1155]
- Unique worker name to simplify debugging. [MTG-1155]
- Ability to configure workers batch size via env variables. (account_processor_buffer_size tx_processor_buffer_size) [MTG-1155]
- Configurable timeout for the PG database queries [MTG-1110](https://github.com/metaplex-foundation/aura/pull/371)
-






### Changed
- Default number of Redis message reads retries to the (number of workers + 1) [MTG-1155]
-
Expand All @@ -30,5 +34,9 @@
* tx_processor_buffer_size
* redis_accounts_parsing_workers
* redis_transactions_parsing_workers
-
- [MTG-1110] Configur PG max query statement timeout or test default configuration.
* INGESTER_PG_MAX_QUERY_TIMEOUT_SECS (default: 120sec)
* SYNCHRONIZER_PG_MAX_QUERY_TIMEOUT_SECS (default: 24h)
* MIGRATOR_PG_MAX_QUERY_TIMEOUT_SECS (default: 24h)
* API_PG_MAX_QUERY_TIMEOUT_SECS (default: 120sec)

10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,14 @@ The project's structure is a reflection of the following clean architecture prin
The API specification is compatible with the standard DAS specification here https://github.com/metaplex-foundation/api-specifications

### Developing and running


#### Run Integration Tests

```bash
cargo t -F integration_tests
```



Full documentation and contribution guidelines coming soon…
2 changes: 2 additions & 0 deletions consistency_check/src/bin/jsons/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ async fn change_jsons_status(postgre_creds: String, file_path: String) {
PG_MIN_CONNECTIONS,
PG_MIGRATIONS_PATH,
None,
None,
)
.await
.unwrap(),
Expand Down Expand Up @@ -152,6 +153,7 @@ async fn check_jsons_consistency(rocks_path: String, postgre_creds: String, batc
PG_MIN_CONNECTIONS,
PG_MIGRATIONS_PATH,
None,
None,
)
.await
.unwrap(),
Expand Down
1 change: 1 addition & 0 deletions integration_tests/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ impl TestSetup {
MIN_PG_CONNECTIONS,
POSTGRE_MIGRATIONS_PATH,
Some(PathBuf::from_str("./dump").unwrap()),
None,
)
.await
.unwrap(),
Expand Down
1 change: 1 addition & 0 deletions integrity_verification/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ async fn main() {
500,
config.base_dump_path.clone(),
metrics.red_metrics,
None,
)
.await
.unwrap(),
Expand Down
2 changes: 2 additions & 0 deletions interface/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ pub enum StorageError {
CannotServiceRequest,
#[error("Unknown error: {0}")]
Unknown(String),
#[error("Request execution time exceeded the limit.")]
QueryTimedOut,
}

impl From<Error> for UsecaseError {
Expand Down
2 changes: 1 addition & 1 deletion nft_ingester/src/api/api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ where
self.metrics.inc_requests(label);
let latency_timer = Instant::now();

self.pg_client.check_health().await.map_err(|_| DasApiError::InternalDdError)?;
self.pg_client.check_health().await.map_err(|_| DasApiError::InternalDbError)?;

self.metrics.set_latency(label, latency_timer.elapsed().as_millis() as f64);

Expand Down
22 changes: 15 additions & 7 deletions nft_ingester/src/api/dapi/search_assets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,14 @@ async fn fetch_assets<
let keys = index_client
.get_asset_pubkeys_filtered(filter, &sort_by.into(), limit, page, before, after, &options)
.await
.map_err(|e| StorageError::Common(e.to_string()))?;
.map_err(|e| {
if e.to_string().contains("statement timeout") {
StorageError::QueryTimedOut
} else {
StorageError::Common(e.to_string())
}
})?;

Comment on lines +151 to +158
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be cool if we could statically type the error in IndexDbError, but if it's not possible then i'm ok with what written here and below

let asset_ids = keys
.iter()
.filter_map(|k| Pubkey::try_from(k.pubkey.clone()).ok())
Expand Down Expand Up @@ -199,12 +206,13 @@ async fn fetch_assets<
};
let mut grand_total = None;
if options.show_grand_total {
grand_total = Some(
index_client
.get_grand_total(filter, &options)
.await
.map_err(|e| StorageError::Common(e.to_string()))?,
)
grand_total = Some(index_client.get_grand_total(filter, &options).await.map_err(|e| {
if e.to_string().contains("statement timeout") {
StorageError::QueryTimedOut
} else {
StorageError::Common(e.to_string())
}
})?)
}

let resp = AssetList {
Expand Down
11 changes: 10 additions & 1 deletion nft_ingester/src/api/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use thiserror::Error;
use tracing::error;

const STANDARD_ERROR_CODE: i64 = -32000;
const QUERY_TIME_OUT_CODE: i64 = -32800;
pub const CANNOT_SERVICE_REQUEST_ERROR_CODE: i64 = -32050;

#[derive(Error, Debug)]
Expand Down Expand Up @@ -38,11 +39,13 @@ pub enum DasApiError {
#[error("Page number is too big. Up to {0} pages are supported with this kind of pagination. Please use a different pagination(before/after/cursor).")]
PageTooBig(usize),
#[error("Internal DB error")]
InternalDdError,
InternalDbError,
#[error("CannotServiceRequest")]
CannotServiceRequest,
#[error("MissingOwnerAddress")]
MissingOwnerAddress,
#[error("Request execution time exceeded the limit.")]
QueryTimedOut,
}

impl From<DasApiError> for jsonrpc_core::Error {
Expand Down Expand Up @@ -95,6 +98,11 @@ impl From<DasApiError> for jsonrpc_core::Error {
.to_string(),
data: None,
},
DasApiError::QueryTimedOut => jsonrpc_core::Error {
code: ErrorCode::ServerError(QUERY_TIME_OUT_CODE),
message: "Request execution time exceeded the limit.".to_string(),
data: None,
},
DasApiError::CannotServiceRequest => cannot_service_request_error(),
_ => jsonrpc_core::Error::new(ErrorCode::InternalError),
}
Expand All @@ -115,6 +123,7 @@ impl From<StorageError> for DasApiError {
fn from(value: StorageError) -> Self {
match value {
StorageError::CannotServiceRequest => Self::CannotServiceRequest,
StorageError::QueryTimedOut => Self::QueryTimedOut,
e => Self::RocksError(e.to_string()),
}
}
Expand Down
1 change: 1 addition & 0 deletions nft_ingester/src/bin/api/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub async fn main() -> Result<(), IngesterError> {
args.pg_max_db_connections,
None,
red_metrics.clone(),
Some(args.pg_max_query_statement_timeout_secs),
)
.await?;
let pg_client = Arc::new(pg_client);
Expand Down
1 change: 1 addition & 0 deletions nft_ingester/src/bin/ingester/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ pub async fn main() -> Result<(), IngesterError> {
DEFAULT_MIN_POSTGRES_CONNECTIONS,
PG_MIGRATIONS_PATH,
None,
Some(args.pg_max_query_statement_timeout_secs),
)
.await?,
);
Expand Down
10 changes: 4 additions & 6 deletions nft_ingester/src/bin/migrator/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ pub async fn main() -> Result<(), IngesterError> {
args.pg_max_db_connections,
None,
metrics_state.red_metrics.clone(),
Some(args.pg_max_query_statement_timeout_secs),
)
.await
.unwrap(),
.await?,
);

start_metrics(metrics_state.registry, args.metrics_port).await;
Expand All @@ -60,17 +60,15 @@ pub async fn main() -> Result<(), IngesterError> {
mutexed_tasks.clone(),
red_metrics.clone(),
MigrationState::Last,
)
.unwrap();
)?;

let target_storage = Arc::new(storage);
let source_storage = Storage::open(
args.rocks_json_source_db.clone(),
mutexed_tasks.clone(),
red_metrics.clone(),
MigrationState::Last,
)
.unwrap();
)?;
let source_storage = Arc::new(source_storage);

let json_migrator = JsonMigrator::new(
Expand Down
1 change: 1 addition & 0 deletions nft_ingester/src/bin/synchronizer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub async fn main() -> Result<(), IngesterError> {
DEFAULT_MIN_POSTGRES_CONNECTIONS,
PG_MIGRATIONS_PATH,
Some(PathBuf::from(args.rocks_dump_path.clone())),
Some(args.pg_max_query_statement_timeout_secs),
)
.await?,
);
Expand Down
10 changes: 10 additions & 0 deletions nft_ingester/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ pub struct IngesterClapArgs {

#[clap(long, env, default_value = "100")]
pub pg_max_db_connections: u32,
#[clap(long, env = "INGESTER_PG_MAX_QUERY_TIMEOUT_SECS", default_value = "120")]
pub pg_max_query_statement_timeout_secs: u32,

#[clap(short('r'), long, env, help="example: {redis_connection_str=\"redis://127.0.0.1:6379/0\"}", value_parser = parse_json_to_dict)]
pub redis_connection_config: Dict,
Expand Down Expand Up @@ -254,6 +256,9 @@ pub struct SynchronizerClapArgs {
pub pg_database_url: String,
#[clap(long, env, default_value = "100")]
pub pg_max_db_connections: u32,
// 24 hour = 86400 secs
#[clap(long, env = "SYNCHRONIZER_PG_MAX_QUERY_TIMEOUT_SECS", default_value = "86400")]
pub pg_max_query_statement_timeout_secs: u32,

#[clap(
short('m'),
Expand Down Expand Up @@ -333,6 +338,9 @@ pub struct MigratorClapArgs {
pub pg_min_db_connections: u32,
#[clap(long, env, default_value = "250")]
pub pg_max_db_connections: u32,
// 24 hour = 86400 secs
#[clap(long, env = "MIGRATOR_PG_MAX_QUERY_TIMEOUT_SECS", default_value = "86400")]
pub pg_max_query_statement_timeout_secs: u32,

#[clap(
long,
Expand Down Expand Up @@ -362,6 +370,8 @@ pub struct ApiClapArgs {
pub pg_min_db_connections: u32,
#[clap(long, env, default_value = "250")]
pub pg_max_db_connections: u32,
#[clap(long, env = "API_PG_MAX_QUERY_TIMEOUT_SECS", default_value = "120")]
pub pg_max_query_statement_timeout_secs: u32,

#[clap(
short('m'),
Expand Down
16 changes: 11 additions & 5 deletions nft_ingester/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,18 @@ pub async fn init_index_storage_with_migration(
min_pg_connections: u32,
pg_migrations_path: &str,
base_dump_path: Option<PathBuf>,
pg_max_query_statement_timeout_secs: Option<u32>,
) -> Result<PgClient, IngesterError> {
let pg_client =
PgClient::new(url, min_pg_connections, max_pg_connections, base_dump_path, red_metrics)
.await
.map_err(|e| e.to_string())
.map_err(IngesterError::SqlxError)?;
let pg_client = PgClient::new(
url,
min_pg_connections,
max_pg_connections,
base_dump_path,
red_metrics,
pg_max_query_statement_timeout_secs,
)
.await
.map_err(|e| IngesterError::SqlxError(e.to_string()))?;

pg_client.run_migration(pg_migrations_path).await.map_err(IngesterError::SqlxError)?;

Expand Down
13 changes: 11 additions & 2 deletions postgre-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,23 @@ pub struct PgClient {
}

impl PgClient {
/// If `max_query_statement_timeout_secs` is `None`, the PostgresSQL database does not have any limitation on the query statement timeout. (if this was not specified via the db URL parameter)
pub async fn new(
url: &str,
min_connections: u32,
max_connections: u32,
base_dump_path: Option<PathBuf>,
metrics: Arc<RequestErrorDurationMetrics>,
max_query_statement_timeout_secs: Option<u32>,
) -> Result<Self, Error> {
let mut options: PgConnectOptions = url.parse().unwrap();
let mut options: PgConnectOptions = url.parse::<PgConnectOptions>()?;
if !url.contains("statement_timeout") && max_query_statement_timeout_secs.is_some() {
options = options.options([(
"statement_timeout",
&format!("{}s", max_query_statement_timeout_secs.unwrap()),
)]);
}

options.log_statements(LevelFilter::Off);
options.log_slow_statements(LevelFilter::Off, Duration::from_secs(100));

Expand Down Expand Up @@ -214,7 +223,7 @@ impl PgClient {
self.recreate_asset_constraints(&mut transaction).await?;

transaction.commit().await?;
// those await above will not always rollback the tx
// those await above will not always roll back the tx
// take this into account if we use this function somewhere else except the tests

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions rocks-db/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub enum StorageError {
NotFound(String),
CannotServiceRequest,
InvalidMigrationVersion(u64),
QueryTimedOut,
}

impl std::fmt::Display for StorageError {
Expand All @@ -77,6 +78,7 @@ impl From<StorageError> for interface::error::StorageError {
InterfaceStorageError::Common(format!("InvalidMigrationVersion: {v}"))
},
StorageError::NotFound(s) => InterfaceStorageError::NotFound(s),
StorageError::QueryTimedOut => InterfaceStorageError::QueryTimedOut,
}
}
}
Loading