From 6ad6af76a1bc667d1b1c247739e5515a56e2a6b4 Mon Sep 17 00:00:00 2001 From: djugei Date: Mon, 27 Jan 2025 00:17:08 +0100 Subject: [PATCH] db delta (3/?) --- client/Cargo.toml | 2 + client/src/main.rs | 65 +++++++++++++++++++------- client/src/util.rs | 107 +++++++++++++++++++++++++++++++++++++++++-- server/src/caches.rs | 2 +- server/src/main.rs | 1 + 5 files changed, 155 insertions(+), 22 deletions(-) diff --git a/client/Cargo.toml b/client/Cargo.toml index 00964c6..a0f2b18 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -24,6 +24,8 @@ bytesize = "1.3.0" strsim = "0.11" dialoguer = "*" command-rusage = "1.0.1" +ruma-headers = {path = "../ruma-headers"} +flate2 = { version = "*" } [[bin]] name = "deltaclient" diff --git a/client/src/main.rs b/client/src/main.rs index 988a408..edc4171 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -5,7 +5,7 @@ use bytesize::ByteSize; use clap::Parser; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use log::{debug, error, info, trace}; -use reqwest::Url; +use reqwest::{Client, Url}; use std::{ fs::OpenOptions, io::{Cursor, ErrorKind}, @@ -27,17 +27,23 @@ enum Commands { #[arg(default_values_t = [Str::from("linux"), Str::from("blas"), Str::from("lapack")])] blacklist: Vec>, }, + /// Upgrade the databases using deltas, ~= pacman -Sy + //TODO: target directory/rootless mode + Sync { server: Url }, /// Download the newest packages to the provided delta_cache path /// /// If delta_cache is somewhere you can write, no sudo is needed. /// /// ```bash - /// $ pacman -Sy + /// // todo make this rootless + /// $ sudo deltaclient sync http://bogen.moeh.re/ + /// ## or + /// $ sudo pacman -Sy /// - /// $ deltaclient download http://bogen.moeh.re/arch path + /// $ deltaclient download http://bogen.moeh.re/ path /// $ sudo cp path/*.pkg path/*.sig /var/cache/pacman/pkg/ /// ## or - /// $ sudo deltaclient download http://bogen.moeh.re/arch /var/cache/pacman/pkg/ + /// $ sudo deltaclient download http://bogen.moeh.re/ /var/cache/pacman/pkg/ /// /// $ sudo pacman -Su /// ``` @@ -90,16 +96,8 @@ fn main() { apply_patch(&orig, &patch, &new, pb).unwrap(); } Commands::Upgrade { server, blacklist } => { - info!("running pacman -Sy"); - let exit = Command::new("pacman") - .arg("-Sy") - .spawn() - .expect("could not run pacman -Sy") - .wait() - .expect("error waiting for pacman -Sy"); - if !exit.success() { - panic!("pacman -Sy failed, aborting"); - } + info!("syncing databases"); + sync(server.clone(), multi.clone()).unwrap(); let cachepath = PathBuf::from_str("/var/cache/pacman/pkg").unwrap(); let (deltasize, newsize, comptime) = match upgrade(server, blacklist, cachepath, multi) { @@ -143,16 +141,38 @@ fn main() { std::fs::create_dir_all(&delta_cache).unwrap(); upgrade(server, vec![], delta_cache, multi).unwrap(); } + Commands::Sync { server } => { + sync(server, multi).unwrap(); + } Commands::Stats { number: count } => util::calc_stats(count.unwrap_or(5)).unwrap(), } } +fn sync(server: Url, multi: MultiProgress) -> anyhow::Result<()> { + let server = server.join("archdb/")?; + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("could not build async runtime") + .block_on(async { + let client = Client::new(); + //TODO sync databases configured in /etc/pacman.conf + let (_core, _extra, _multilib) = tokio::try_join!( + util::sync_db(server.clone(), "core".into(), client.clone(), multi.clone()), + util::sync_db(server.clone(), "extra".into(), client.clone(), multi.clone()), + util::sync_db(server.clone(), "multilib".into(), client, multi), + )?; + Ok(()) + }) +} + fn upgrade( server: Url, blacklist: Vec, delta_cache: PathBuf, multi: MultiProgress, ) -> anyhow::Result<(u64, u64, Option)> { + let server = server.join("arch/")?; let upgrade_candidates = util::find_deltaupgrade_candidates(&blacklist)?; info!("downloading {} updates", upgrade_candidates.len()); @@ -196,7 +216,7 @@ fn upgrade( // delta download let mut file_name = delta_cache.clone(); - file_name.push(format!("{newpkg}")); + file_name.push(newpkg.to_string()); let delta = parsing::Delta::try_from((oldpkg.clone(), newpkg.clone()))?; let deltafile_name = file_name.with_file_name(format!("{delta}.delta")); @@ -209,7 +229,7 @@ fn upgrade( .await .unwrap(); - let url = format!("{server}/{oldpkg}/{newpkg}"); + let url = server.join(&format!("{oldpkg}/{newpkg}"))?; let pg = util::do_download( multi.clone(), @@ -394,3 +414,16 @@ fn apply_patch(orig: &[u8], patch: &Path, new: &Path, pb: ProgressBar) -> anyhow Ok(comptime) } + +#[test] +fn testurl() { + let url = Url::parse("http://bogen.moeh.re/").unwrap(); + let url = url.join("arch/").unwrap(); + let url = url.join("a/").unwrap(); + assert_eq!(url.path(), "/arch/a/"); + + let url = Url::parse("http://bogen.moeh.re/").unwrap(); + let url = url.join("arch").unwrap(); + let url = url.join("a/").unwrap(); + assert_eq!(url.path(), "/a/"); +} diff --git a/client/src/util.rs b/client/src/util.rs index 85f37f6..139edc2 100644 --- a/client/src/util.rs +++ b/client/src/util.rs @@ -1,6 +1,11 @@ -use std::{collections::HashMap, io::BufRead, path::PathBuf, process::Command}; +use std::{ + collections::HashMap, + io::{BufRead, Read}, + path::PathBuf, + process::Command, +}; -use anyhow::bail; +use anyhow::{bail, Context}; use bytesize::ByteSize; use http::{header::CONTENT_RANGE, StatusCode}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; @@ -8,7 +13,8 @@ use itertools::Itertools; use log::{debug, info}; use memmap2::Mmap; use parsing::Package; -use reqwest::Client; +use reqwest::{Client, Url}; +use ruma_headers::ContentDisposition; use tokio::{ io::{AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt}, pin, @@ -32,7 +38,7 @@ pub(crate) async fn do_download Result { let pg = ProgressBar::hidden() @@ -51,7 +57,7 @@ pub(crate) async fn do_download anyhow::Result<()> { + info!("syncing {}", name); + let mut max = None; + for line in std::fs::read_dir("/var/lib/pacman/sync/")? { + let line = line?; + if !line.file_type()?.is_file() { + continue; + } + let filename = line.file_name(); + let filename = filename.to_string_lossy(); + if !filename.starts_with(&*name) { + continue; + } + // 3 kids of files in this folder: + // core.db | (symlink to) the database + // core-timestamp | version of the database + // core-timestamp-timestamp | patch + // we are only looking for core-timestamp + if filename.matches('-').count() != 1 { + continue; + } + let (_, ts) = filename.split_once('-').context("malformed filename")?; + let ts: u64 = ts.parse()?; + let ts = Some(ts); + if ts > max { + info!("selecting {ts:?} instead of {max:?}"); + max = ts; + } + } + if let Some(old_ts) = max { + info!("upgrading {name} from {old_ts}"); + //TODO arch vs archdb in url + let url = server.join(&format!("{name}/{old_ts}"))?; + debug!("{url}"); + let response = client.get(url).send().await?; + debug!("{}", response.status()); + assert!(response.status().is_success()); + let header = response + .headers() + .get("content-disposition") + .context("missing content disposition header")?; + let patchname = ContentDisposition::try_from(header.as_bytes())? + .filename + .context("response has no filename")?; + let (_, new_ts) = patchname.rsplit_once('-').context("malformed http filename")?; + let new_ts: u64 = new_ts.parse()?; + + info!("new ts for {name}: {new_ts}"); + //TODO server side should be sending 304 for this + if new_ts != old_ts { + let patch = response.bytes().await?; + let patch = std::io::Cursor::new(patch); + tokio::task::spawn_blocking(move || -> anyhow::Result<()> { + info!("patching {name} from {old_ts} to {new_ts}"); + let mut patch = zstd::Decoder::new(patch)?; + + let old_p = format!("/var/lib/pacman/sync/{name}-{old_ts}"); + let old = std::fs::File::open(&old_p)?; + let mut old = flate2::read::GzDecoder::new(old); + let mut oldv = Vec::new(); + old.read_to_end(&mut oldv)?; + let mut old = std::io::Cursor::new(oldv); + + let new_p = format!("/var/lib/pacman/sync/{name}-{new_ts}"); + let new = std::fs::File::create(&new_p)?; + let mut new = flate2::write::GzEncoder::new(new, flate2::Compression::default()); + + ddelta::apply_chunked(&mut old, &mut new, &mut patch)?; + + info!("finished patching {name} to {new_ts}"); + + info!("linking {new_p} to db"); + let db_p = format!("/var/lib/pacman/sync/{name}.db"); + std::fs::remove_file(&db_p)?; + std::os::unix::fs::symlink(new_p, db_p)?; + + info!("deleting obsolete db {old_p}"); + std::fs::remove_file(old_p)?; + + Ok(()) + }) + .await??; + } else { + info!("{name}: old ({old_ts}) == new ({new_ts}), nothing to do"); + } + } else { + todo!("no previous db found, do full timestamped download") + } + Ok(()) +} + /// {package -> [(version, arch, trailer, path)]} fn build_package_versions() -> std::io::Result { let mut package_versions: PackageVersions = HashMap::new(); diff --git a/server/src/caches.rs b/server/src/caches.rs index 26850bb..f7aa133 100644 --- a/server/src/caches.rs +++ b/server/src/caches.rs @@ -218,7 +218,7 @@ impl DBCache { let mut response = sync .client .get(&uri) - // This is technically not HTTP-compliant, but idgaf + //FIXME: send http compliant header .header( reqwest::header::IF_MODIFIED_SINCE, sync.last_sync.to_timestamp().to_string(), diff --git a/server/src/main.rs b/server/src/main.rs index f99da97..e40b6c1 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -183,6 +183,7 @@ async fn dbdelta( info!(name = name, old = old, "dbdelta requested for",); let (stamp, patch) = db.get_delta_to(old).await.expect("dbdelta generating failed"); //TODO handle 304 (unchanged) case + //TODO the old version the client has may have already been expunged, thats a somewhat common case so better handle it gracefully. h.insert( axum::http::header::CONTENT_DISPOSITION,