Skip to content

Commit

Permalink
Finish implementing TieredHistoryService
Browse files Browse the repository at this point in the history
  • Loading branch information
progval committed Nov 2, 2024
1 parent 3859cf8 commit bdcc1e6
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 5 deletions.
3 changes: 2 additions & 1 deletion sable_network/src/history/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ pub trait HistoryService {
user: UserId,
target: TargetId,
request: HistoryRequest,
) -> impl Future<Output = Result<impl IntoIterator<Item = HistoricalEvent>, HistoryError>> + Send;
) -> impl Future<Output = Result<impl IntoIterator<Item = HistoricalEvent> + Send, HistoryError>>
+ Send;
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
Expand Down
74 changes: 70 additions & 4 deletions sable_network/src/history/tiered_service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::hash_map::{Entry, HashMap};

use futures::TryFutureExt;
use tracing::instrument;

use crate::prelude::*;
Expand Down Expand Up @@ -88,15 +89,80 @@ impl<FastService: HistoryService + Send + Sync, SlowService: HistoryService + Se
) -> Result<impl IntoIterator<Item = HistoricalEvent>, HistoryError> {
// It's tempting to return Box<dyn IntoIterator> here instead of collecting into a
// temporary Vec, but we can't because IntoIterator::IntoIter potentially differs

macro_rules! get_entries {
($service:expr, $user:expr, $target:expr, $request:expr) => {
$service
.get_entries($user, $target, $request)
.map_ok(|entries| -> Vec<_> { entries.into_iter().collect() })
.await
};
}

match (&self.fast_service, &self.slow_service) {
(_, Some(slow_service)) => {
// TODO: implement fallback
tracing::info!("get_entries slow");
(Some(fast_service), Some(slow_service)) => {
match request {
HistoryRequest::Latest { limit, .. } | HistoryRequest::Before { limit, .. } => {
let mut entries = get_entries!(fast_service, user, target, request.clone())
.unwrap_or_else(|e| {
tracing::error!("Could not get history from fast service: {e}");
vec![]
});
if entries.len() < limit {
// TODO: send a BEFORE request, and merge lists together
entries = get_entries!(slow_service, user, target, request)?;
}
Ok(entries)
}
HistoryRequest::After { start_ts, .. } => {
// Check if the fast-but-shortlived backend still has messages up to that
// timestamp
match fast_service
.get_entries(
user,
target,
HistoryRequest::Before {
from_ts: start_ts,
limit: 1,
},
)
.await
{
Ok(entries) => {
if entries.into_iter().count() > 0 {
// Yes, it does, so we don't need the slow_service to fulfill
// the request
match get_entries!(fast_service, user, target, request.clone())
{
Ok(entries) => Ok(entries),
Err(e) => {
tracing::error!(
"Could not get history from fast service: {e}"
);
get_entries!(slow_service, user, target, request)
}
}
} else {
get_entries!(slow_service, user, target, request)
}
}
Err(e) => {
tracing::error!("Could not get history from fast service: {e}");
get_entries!(slow_service, user, target, request)
}
}
}
HistoryRequest::Around { .. } | HistoryRequest::Between { .. } => {
// TODO: try to use the fast_service when possible
get_entries!(slow_service, user, target, request)
}
}
}
(None, Some(slow_service)) => {
let entries = slow_service.get_entries(user, target, request).await?;
Ok(entries.into_iter().collect())
}
(Some(fast_service), None) => {
tracing::info!("get_entries fast");
let entries = fast_service.get_entries(user, target, request).await?;
Ok(entries.into_iter().collect())
}
Expand Down

0 comments on commit bdcc1e6

Please sign in to comment.