Skip to content

Commit

Permalink
Logic to save incremental
Browse files Browse the repository at this point in the history
  • Loading branch information
ebatsell committed Jan 21, 2025
1 parent 24b4350 commit d9571cc
Show file tree
Hide file tree
Showing 5 changed files with 352 additions and 1 deletion.
331 changes: 331 additions & 0 deletions tip-router-operator-cli/src/backup_snapshots.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,331 @@
use anyhow::{Context, Result};
use solana_client::rpc_client::RpcClient;
use std::path::{Path, PathBuf};
use std::time::Duration;
use tokio::time;

use crate::process_epoch::get_previous_epoch_last_slot;

/// Represents a parsed incremental snapshot filename
#[derive(Debug)]
struct SnapshotInfo {
path: PathBuf,
start_slot: u64,
end_slot: u64,
}

impl SnapshotInfo {
/// Try to parse a snapshot filename into slot information
fn from_path(path: PathBuf) -> Option<Self> {
let file_name = path.file_name()?.to_str()?;

// Only try to parse if it's an incremental snapshot
if !file_name.starts_with("incremental-snapshot-") {
return None;
}

// Split on hyphens and take the slot numbers
// Format: incremental-snapshot-<start>-<end>-<hash>.tar.zst
let parts: Vec<&str> = file_name.split('-').collect();
if parts.len() < 5 {
return None;
}

// Parse start and end slots
let start_slot = parts[2].parse().ok()?;
let end_slot = parts[3].parse().ok()?;

Some(SnapshotInfo {
path,
start_slot,
end_slot,
})
}
}

struct BackupSnapshotMonitor {
rpc_client: RpcClient,
snapshots_dir: PathBuf,
backup_dir: PathBuf,
override_target_slot: Option<u64>,
}

impl BackupSnapshotMonitor {
pub fn new(
rpc_url: &str,
snapshots_dir: PathBuf,
backup_dir: PathBuf,
override_target_slot: Option<u64>,
) -> Result<Self> {
Ok(Self {
rpc_client: RpcClient::new(rpc_url.to_string()),
snapshots_dir,
backup_dir,
override_target_slot,
})
}

/// Gets target slot for current epoch
fn get_target_slot(&self) -> Result<u64> {
if let Some(target_slot) = self.override_target_slot {
return Ok(target_slot);
}

let (_, last_slot) = get_previous_epoch_last_slot(&self.rpc_client)?;
Ok(last_slot)
}

/// Finds the most recent incremental snapshot that's before our target slot
fn find_closest_incremental(&self, target_slot: u64) -> Option<PathBuf> {
let dir_entries = std::fs::read_dir(&self.snapshots_dir).ok()?;

// Find the snapshot that ends closest to but not after target_slot
dir_entries
.filter_map(Result::ok)
.filter_map(|entry| SnapshotInfo::from_path(entry.path()))
.filter(|snap| snap.end_slot <= target_slot)
.max_by_key(|snap| snap.end_slot)
.map(|snap| snap.path)
}

/// Copies incremental snapshot files to backup directory
async fn backup_incremental_snapshot(&self, snapshot_path: &Path) -> Result<()> {
let file_name = snapshot_path
.file_name()
.context("Failed to get incremental snapshot filename")?;

let dest_path = self.backup_dir.join(file_name);

// Check if file already exists in backup
if dest_path.exists() {
log::info!(
"Incremental snapshot already exists in backup dir: {:?}",
dest_path
);
return Ok(());
}

log::info!(
"Copying incremental snapshot from {:?} to {:?}",
snapshot_path,
dest_path
);

// Copy the file
std::fs::copy(snapshot_path, &dest_path).with_context(|| {
format!(
"Failed to copy incremental snapshot from {:?} to {:?}",
snapshot_path, dest_path
)
})?;

// Verify file size matches
let source_size = std::fs::metadata(snapshot_path)?.len();
let dest_size = std::fs::metadata(&dest_path)?.len();

if source_size != dest_size {
// If sizes don't match, remove the corrupted copy and error
let _ = std::fs::remove_file(&dest_path);
anyhow::bail!(
"Backup size mismatch: source {}, dest {}",
source_size,
dest_size
);
}

log::info!(
"Successfully backed up incremental snapshot ({} bytes)",
source_size
);

Ok(())
}

pub async fn run(&self) -> Result<()> {
let mut interval = time::interval(Duration::from_secs(10));
let mut last_target_slot = None;
let mut last_backup_path = None;

loop {
interval.tick().await;

let target_slot = self.get_target_slot()?;

// Only search for new snapshot if target slot has changed
if last_target_slot != Some(target_slot) {
log::info!("New target slot: {}", target_slot);

if let Some(snapshot) = self.find_closest_incremental(target_slot) {
if last_backup_path.as_ref() != Some(&snapshot) {
log::info!(
"Found new best snapshot for slot {}: {:?}",
target_slot,
snapshot
);

if let Err(e) = self.backup_incremental_snapshot(&snapshot).await {
log::error!("Failed to backup snapshot: {}", e);
continue;
}
last_backup_path = Some(snapshot);
}
}

last_target_slot = Some(target_slot);
}
}
}
}

#[cfg(test)]
mod tests {
use std::fs::File;

use super::*;
use std::io::Write;
use tempfile::TempDir;
use tokio;

#[tokio::test]
async fn test_snapshot_monitoring() {
let temp_dir = TempDir::new().unwrap();
let backup_dir = TempDir::new().unwrap();

let monitor = BackupSnapshotMonitor::new(
"http://localhost:8899",
temp_dir.path().to_path_buf(),
backup_dir.path().to_path_buf(),
None,
)
.unwrap();

// The test version will use the fixed slot from cfg(test) get_target_slot
// TODO: Add test cases
// 1. Create test snapshots
// 2. Verify correct snapshot selection
// 3. Test backup functionality
}

#[test]
fn test_snapshot_info_parsing() {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir
.path()
.join("incremental-snapshot-100-150-hash1.tar.zst");

let info = SnapshotInfo::from_path(path.clone()).unwrap();
assert_eq!(info.start_slot, 100);
assert_eq!(info.end_slot, 150);
assert_eq!(info.path, path);

// Test invalid cases
assert!(SnapshotInfo::from_path(temp_dir.path().join("not-a-snapshot.txt")).is_none());
assert!(
SnapshotInfo::from_path(temp_dir.path().join("snapshot-100-150-hash.tar.zst"))
.is_none()
);
}

#[test]
fn test_find_closest_incremental() {
let temp_dir = TempDir::new().unwrap();
let monitor = BackupSnapshotMonitor::new(
"http://localhost:8899",
temp_dir.path().to_path_buf(),
temp_dir.path().to_path_buf(),
None,
)
.unwrap();

// Create test snapshot files
let snapshots = [
"incremental-snapshot-100-150-hash1.tar.zst",
"incremental-snapshot-200-250-hash2.tar.zst",
"incremental-snapshot-300-350-hash3.tar.zst",
];

for name in snapshots.iter() {
let path = temp_dir.path().join(name);
File::create(path).unwrap();
}

// Test finding closest snapshot
let result = monitor
.find_closest_incremental(200)
.map(|p| p.file_name().unwrap().to_str().unwrap().to_string());

assert_eq!(
result,
Some("incremental-snapshot-100-150-hash1.tar.zst".to_string()),
"Should find snapshot ending at 150 for target 200"
);

// Test no valid snapshot
assert_eq!(
monitor.find_closest_incremental(100),
None,
"Should find no snapshot for target 100"
);
}

#[tokio::test]
async fn test_backup_snapshot() {
let source_dir = TempDir::new().unwrap();
let backup_dir = TempDir::new().unwrap();

let monitor = BackupSnapshotMonitor::new(
"http://localhost:8899",
source_dir.path().to_path_buf(),
backup_dir.path().to_path_buf(),
None,
)
.unwrap();

// Create test snapshot with some content
let snapshot_name = "incremental-snapshot-100-150-hash1.tar.zst";
let source_path = source_dir.path().join(snapshot_name);
let mut file = File::create(&source_path).unwrap();
file.write_all(b"test snapshot content").unwrap();

// Test backup
monitor
.backup_incremental_snapshot(&source_path)
.await
.unwrap();

// Verify backup exists and has correct content
let backup_path = backup_dir.path().join(snapshot_name);
assert!(backup_path.exists());

let backup_content = std::fs::read_to_string(backup_path).unwrap();
assert_eq!(backup_content, "test snapshot content");

// Test idempotency - should succeed without error
monitor
.backup_incremental_snapshot(&source_path)
.await
.unwrap();
}

#[tokio::test]
async fn test_backup_snapshot_missing_source() {
let source_dir = TempDir::new().unwrap();
let backup_dir = TempDir::new().unwrap();

let monitor = BackupSnapshotMonitor::new(
"http://localhost:8899",
source_dir.path().to_path_buf(),
backup_dir.path().to_path_buf(),
None,
)
.unwrap();

let missing_path = source_dir.path().join("nonexistent.tar.zst");

// Should error when source doesn't exist
assert!(monitor
.backup_incremental_snapshot(&missing_path)
.await
.is_err());
}
}
6 changes: 6 additions & 0 deletions tip-router-operator-cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ pub enum Commands {
#[arg(long, env)]
tip_router_program_id: Pubkey,

#[arg(long, env)]
backup_snapshots_dir: PathBuf,

#[arg(long, env, default_value = "false")]
enable_snapshots: bool,

Expand All @@ -57,6 +60,9 @@ pub enum Commands {

#[arg(long, env, default_value = "false")]
start_next_epoch: bool,

#[arg(long, env)]
override_target_slot: Option<u64>,
},
SnapshotSlot {
#[arg(short, long, env)]
Expand Down
1 change: 1 addition & 0 deletions tip-router-operator-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod tip_router;
pub use crate::cli::{Cli, Commands};
pub mod cli;
pub use crate::process_epoch::process_epoch;
pub mod backup_snapshots;
pub mod process_epoch;
pub mod submit;

Expand Down
13 changes: 13 additions & 0 deletions tip-router-operator-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ async fn main() -> Result<()> {
tip_distribution_program_id,
tip_payment_program_id,
tip_router_program_id,
backup_snapshots_dir,
enable_snapshots,
num_monitored_epochs,
start_next_epoch,
override_target_slot,
} => {
info!("Running Tip Router...");

Expand Down Expand Up @@ -90,6 +92,17 @@ async fn main() -> Result<()> {
}
});

tokio::spawn(async move {
BackupSnapshotMonitor::new(
&cli.rpc_url,
cli.full_snapshots_path,
backup_snapshots_dir,
override_target_slot,
)
.run()
.await?;
});

if start_next_epoch {
wait_for_next_epoch(&rpc_client).await?;
}
Expand Down
2 changes: 1 addition & 1 deletion tip-router-operator-cli/src/process_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub async fn wait_for_next_epoch(rpc_client: &RpcClient) -> Result<()> {
}
}

pub async fn get_previous_epoch_last_slot(rpc_client: &RpcClient) -> Result<(u64, u64)> {
pub fn get_previous_epoch_last_slot(rpc_client: &RpcClient) -> Result<(u64, u64)> {
let epoch_info = rpc_client.get_epoch_info()?;
let current_slot = epoch_info.absolute_slot;
let slot_index = epoch_info.slot_index;
Expand Down

0 comments on commit d9571cc

Please sign in to comment.