-
Notifications
You must be signed in to change notification settings - Fork 189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
opt(torii-core): move off queryqueue for executing tx #2460
Changes from 8 commits
01ce338
e0ec767
6097a60
043f669
9d7d0e7
9314438
f9a136f
b883343
7771fdf
60c9069
cd52f0f
045eed0
045e4ae
388ba1e
b7acef5
b94ad7a
c13ff59
7fc27d5
260845c
a7e4f1f
8cf4452
2bcf226
3242ac4
ef3e4ba
299c0b9
e4404f1
663234a
c998428
994abc5
65612fa
13b1ba7
0c31327
afa2a0a
ef9fafc
4ec379c
d393896
1730bfc
b708081
7758cf9
607cd06
d48dd30
6d4b99f
baf7f35
c4f288a
5dac220
28633b4
fd3c377
4cabea5
ee86042
43246b6
706c7fb
9c9e0a3
6b6f5a6
61f0a4b
63cca75
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ use tokio::task::JoinSet; | |
use tokio::time::{sleep, Instant}; | ||
use tracing::{debug, error, info, trace, warn}; | ||
|
||
use crate::executor::{QueryMessage, QueryType}; | ||
use crate::processors::event_message::EventMessageProcessor; | ||
use crate::processors::{BlockProcessor, EventProcessor, TransactionProcessor}; | ||
use crate::sql::Sql; | ||
|
@@ -151,7 +152,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> { | |
// use the start block provided by user if head is 0 | ||
let (head, _, _) = self.db.head().await?; | ||
if head == 0 { | ||
self.db.set_head(self.config.start_block); | ||
self.db.set_head(self.config.start_block)?; | ||
} else if self.config.start_block != 0 { | ||
warn!(target: LOG_TARGET, "Start block ignored, stored head exists and will be used instead."); | ||
} | ||
|
@@ -179,7 +180,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> { | |
} | ||
|
||
match self.process(fetch_result).await { | ||
Ok(()) => {} | ||
Ok(()) => self.db.execute()?, | ||
Err(e) => { | ||
error!(target: LOG_TARGET, error = %e, "Processing fetched data."); | ||
erroring_out = true; | ||
|
@@ -407,15 +408,14 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> { | |
// provider. So we can fail silently and try | ||
// again in the next iteration. | ||
warn!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Retrieving pending transaction receipt."); | ||
self.db.set_head(data.block_number - 1); | ||
self.db.set_head(data.block_number - 1)?; | ||
if let Some(tx) = last_pending_block_tx { | ||
self.db.set_last_pending_block_tx(Some(tx)); | ||
self.db.set_last_pending_block_tx(Some(tx))?; | ||
} | ||
|
||
if let Some(tx) = last_pending_block_world_tx { | ||
self.db.set_last_pending_block_world_tx(Some(tx)); | ||
self.db.set_last_pending_block_world_tx(Some(tx))?; | ||
} | ||
self.db.execute().await?; | ||
return Ok(()); | ||
} | ||
_ => { | ||
|
@@ -441,18 +441,16 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> { | |
|
||
// Set the head to the last processed pending transaction | ||
// Head block number should still be latest block number | ||
self.db.set_head(data.block_number - 1); | ||
self.db.set_head(data.block_number - 1)?; | ||
|
||
if let Some(tx) = last_pending_block_tx { | ||
self.db.set_last_pending_block_tx(Some(tx)); | ||
self.db.set_last_pending_block_tx(Some(tx))?; | ||
} | ||
|
||
if let Some(tx) = last_pending_block_world_tx { | ||
self.db.set_last_pending_block_world_tx(Some(tx)); | ||
self.db.set_last_pending_block_world_tx(Some(tx))?; | ||
} | ||
|
||
self.db.execute().await?; | ||
|
||
Ok(()) | ||
} | ||
|
||
|
@@ -486,20 +484,14 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> { | |
self.process_block(block_number, data.blocks[&block_number]).await?; | ||
last_block = block_number; | ||
} | ||
|
||
if self.db.query_queue.queue.len() >= QUERY_QUEUE_BATCH_SIZE { | ||
self.db.execute().await?; | ||
} | ||
} | ||
|
||
// Process parallelized events | ||
self.process_tasks().await?; | ||
|
||
self.db.set_head(data.latest_block_number); | ||
self.db.set_last_pending_block_world_tx(None); | ||
self.db.set_last_pending_block_tx(None); | ||
|
||
self.db.execute().await?; | ||
self.db.set_head(data.latest_block_number)?; | ||
self.db.set_last_pending_block_world_tx(None)?; | ||
self.db.set_last_pending_block_tx(None)?; | ||
|
||
Ok(()) | ||
} | ||
|
@@ -536,10 +528,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> { | |
} | ||
|
||
// Join all tasks | ||
while let Some(result) = set.join_next().await { | ||
let local_db = result??; | ||
self.db.merge(local_db)?; | ||
} | ||
while let Some(_) = set.join_next().await {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sensei, handle errors from joined tasks in the The loop |
||
|
||
Ok(()) | ||
} | ||
|
@@ -688,7 +677,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> { | |
transaction_hash: Felt, | ||
) -> Result<()> { | ||
if self.config.flags.contains(IndexingFlags::RAW_EVENTS) { | ||
self.db.store_event(event_id, event, transaction_hash, block_timestamp); | ||
self.db.store_event(event_id, event, transaction_hash, block_timestamp)?; | ||
} | ||
|
||
let event_key = event.keys[0]; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -64,7 +64,7 @@ where | |
uri = %uri_str, | ||
"Resource metadata set." | ||
); | ||
db.set_metadata(resource, &uri_str, block_timestamp); | ||
db.set_metadata(resource, &uri_str, block_timestamp)?; | ||
|
||
let db = db.clone(); | ||
let resource = *resource; | ||
|
@@ -83,9 +83,7 @@ where | |
async fn try_retrieve(mut db: Sql, resource: Felt, uri_str: String) { | ||
match metadata(uri_str.clone()).await { | ||
Ok((metadata, icon_img, cover_img)) => { | ||
db.update_metadata(&resource, &uri_str, &metadata, &icon_img, &cover_img) | ||
.await | ||
.unwrap(); | ||
db.update_metadata(&resource, &uri_str, &metadata, &icon_img, &cover_img).unwrap(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ohayo, sensei! We might want to reconsider this change. The modification to
Consider the following improvements: match db.update_metadata(&resource, &uri_str, &metadata, &icon_img, &cover_img).await {
Ok(_) => info!(
target: LOG_TARGET,
resource = %format!("{:#x}", resource),
"Updated resource metadata from ipfs."
),
Err(e) => error!(
target: LOG_TARGET,
resource = %format!("{:#x}", resource),
error = %e,
"Failed to update resource metadata from ipfs."
),
} This approach maintains the asynchronous nature of the call and provides proper error handling, logging any issues that might occur during the update process. |
||
info!( | ||
target: LOG_TARGET, | ||
resource = %format!("{:#x}", resource), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ohayo, sensei! Avoid unnecessary cloning of the database pool.
Cloning the
pool
introduces overhead. SinceSqlitePool
is already anArc
, you can pass it directly without cloning.Apply this diff to remove the unnecessary clone:
📝 Committable suggestion