Skip to content

Commit

Permalink
db delta (3/?)
Browse files Browse the repository at this point in the history
  • Loading branch information
djugei committed Jan 26, 2025
1 parent b8b01a9 commit 6ad6af7
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 22 deletions.
2 changes: 2 additions & 0 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ bytesize = "1.3.0"
strsim = "0.11"
dialoguer = "*"
command-rusage = "1.0.1"
ruma-headers = {path = "../ruma-headers"}
flate2 = { version = "*" }

[[bin]]
name = "deltaclient"
Expand Down
65 changes: 49 additions & 16 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use bytesize::ByteSize;
use clap::Parser;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use log::{debug, error, info, trace};
use reqwest::Url;
use reqwest::{Client, Url};
use std::{
fs::OpenOptions,
io::{Cursor, ErrorKind},
Expand All @@ -27,17 +27,23 @@ enum Commands {
#[arg(default_values_t = [Str::from("linux"), Str::from("blas"), Str::from("lapack")])]
blacklist: Vec<Box<str>>,
},
/// Upgrade the databases using deltas, ~= pacman -Sy
//TODO: target directory/rootless mode
Sync { server: Url },
/// Download the newest packages to the provided delta_cache path
///
/// If delta_cache is somewhere you can write, no sudo is needed.
///
/// ```bash
/// $ pacman -Sy
/// // todo make this rootless
/// $ sudo deltaclient sync http://bogen.moeh.re/
/// ## or
/// $ sudo pacman -Sy
///
/// $ deltaclient download http://bogen.moeh.re/arch path
/// $ deltaclient download http://bogen.moeh.re/ path
/// $ sudo cp path/*.pkg path/*.sig /var/cache/pacman/pkg/
/// ## or
/// $ sudo deltaclient download http://bogen.moeh.re/arch /var/cache/pacman/pkg/
/// $ sudo deltaclient download http://bogen.moeh.re/ /var/cache/pacman/pkg/
///
/// $ sudo pacman -Su
/// ```
Expand Down Expand Up @@ -90,16 +96,8 @@ fn main() {
apply_patch(&orig, &patch, &new, pb).unwrap();
}
Commands::Upgrade { server, blacklist } => {
info!("running pacman -Sy");
let exit = Command::new("pacman")
.arg("-Sy")
.spawn()
.expect("could not run pacman -Sy")
.wait()
.expect("error waiting for pacman -Sy");
if !exit.success() {
panic!("pacman -Sy failed, aborting");
}
info!("syncing databases");
sync(server.clone(), multi.clone()).unwrap();

let cachepath = PathBuf::from_str("/var/cache/pacman/pkg").unwrap();
let (deltasize, newsize, comptime) = match upgrade(server, blacklist, cachepath, multi) {
Expand Down Expand Up @@ -143,16 +141,38 @@ fn main() {
std::fs::create_dir_all(&delta_cache).unwrap();
upgrade(server, vec![], delta_cache, multi).unwrap();
}
Commands::Sync { server } => {
sync(server, multi).unwrap();
}
Commands::Stats { number: count } => util::calc_stats(count.unwrap_or(5)).unwrap(),
}
}

fn sync(server: Url, multi: MultiProgress) -> anyhow::Result<()> {
let server = server.join("archdb/")?;
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("could not build async runtime")
.block_on(async {
let client = Client::new();
//TODO sync databases configured in /etc/pacman.conf
let (_core, _extra, _multilib) = tokio::try_join!(
util::sync_db(server.clone(), "core".into(), client.clone(), multi.clone()),
util::sync_db(server.clone(), "extra".into(), client.clone(), multi.clone()),
util::sync_db(server.clone(), "multilib".into(), client, multi),
)?;
Ok(())
})
}

fn upgrade(
server: Url,
blacklist: Vec<Str>,
delta_cache: PathBuf,
multi: MultiProgress,
) -> anyhow::Result<(u64, u64, Option<Duration>)> {
let server = server.join("arch/")?;
let upgrade_candidates = util::find_deltaupgrade_candidates(&blacklist)?;
info!("downloading {} updates", upgrade_candidates.len());

Expand Down Expand Up @@ -196,7 +216,7 @@ fn upgrade(

// delta download
let mut file_name = delta_cache.clone();
file_name.push(format!("{newpkg}"));
file_name.push(newpkg.to_string());

let delta = parsing::Delta::try_from((oldpkg.clone(), newpkg.clone()))?;
let deltafile_name = file_name.with_file_name(format!("{delta}.delta"));
Expand All @@ -209,7 +229,7 @@ fn upgrade(
.await
.unwrap();

let url = format!("{server}/{oldpkg}/{newpkg}");
let url = server.join(&format!("{oldpkg}/{newpkg}"))?;

let pg = util::do_download(
multi.clone(),
Expand Down Expand Up @@ -394,3 +414,16 @@ fn apply_patch(orig: &[u8], patch: &Path, new: &Path, pb: ProgressBar) -> anyhow

Ok(comptime)
}

#[test]
fn testurl() {
let url = Url::parse("http://bogen.moeh.re/").unwrap();
let url = url.join("arch/").unwrap();
let url = url.join("a/").unwrap();
assert_eq!(url.path(), "/arch/a/");

let url = Url::parse("http://bogen.moeh.re/").unwrap();
let url = url.join("arch").unwrap();
let url = url.join("a/").unwrap();
assert_eq!(url.path(), "/a/");
}
107 changes: 102 additions & 5 deletions client/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
use std::{collections::HashMap, io::BufRead, path::PathBuf, process::Command};
use std::{
collections::HashMap,
io::{BufRead, Read},
path::PathBuf,
process::Command,
};

use anyhow::bail;
use anyhow::{bail, Context};
use bytesize::ByteSize;
use http::{header::CONTENT_RANGE, StatusCode};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use itertools::Itertools;
use log::{debug, info};
use memmap2::Mmap;
use parsing::Package;
use reqwest::Client;
use reqwest::{Client, Url};
use ruma_headers::ContentDisposition;
use tokio::{
io::{AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt},
pin,
Expand All @@ -32,7 +38,7 @@ pub(crate) async fn do_download<W: AsyncRead + AsyncWrite + AsyncSeek, G: AsRef<
client: Client,
request_guard: G,
dl_guard: G,
url: String,
url: Url,
target: W,
) -> Result<ProgressBar, anyhow::Error> {
let pg = ProgressBar::hidden()
Expand All @@ -51,7 +57,7 @@ pub(crate) async fn do_download<W: AsyncRead + AsyncWrite + AsyncSeek, G: AsRef<
// get the server to generate the delta, this is expected to time out, possibly multiple times
let mut delta = {
loop {
let mut req = client.get(&url);
let mut req = client.get(url.clone());
if write_offset != 0 {
let range = format!("bytes={write_offset}-");
debug!("{pkg:?} sending range request {range}");
Expand Down Expand Up @@ -191,6 +197,97 @@ pub(crate) fn find_deltaupgrade_candidates(
Ok(lines)
}

pub async fn sync_db(server: Url, name: Str, client: Client, _multi: MultiProgress) -> anyhow::Result<()> {
info!("syncing {}", name);
let mut max = None;
for line in std::fs::read_dir("/var/lib/pacman/sync/")? {
let line = line?;
if !line.file_type()?.is_file() {
continue;
}
let filename = line.file_name();
let filename = filename.to_string_lossy();
if !filename.starts_with(&*name) {
continue;
}
// 3 kids of files in this folder:
// core.db | (symlink to) the database
// core-timestamp | version of the database
// core-timestamp-timestamp | patch
// we are only looking for core-timestamp
if filename.matches('-').count() != 1 {
continue;
}
let (_, ts) = filename.split_once('-').context("malformed filename")?;
let ts: u64 = ts.parse()?;
let ts = Some(ts);
if ts > max {
info!("selecting {ts:?} instead of {max:?}");
max = ts;
}
}
if let Some(old_ts) = max {
info!("upgrading {name} from {old_ts}");
//TODO arch vs archdb in url
let url = server.join(&format!("{name}/{old_ts}"))?;
debug!("{url}");
let response = client.get(url).send().await?;
debug!("{}", response.status());
assert!(response.status().is_success());
let header = response
.headers()
.get("content-disposition")
.context("missing content disposition header")?;
let patchname = ContentDisposition::try_from(header.as_bytes())?
.filename
.context("response has no filename")?;
let (_, new_ts) = patchname.rsplit_once('-').context("malformed http filename")?;
let new_ts: u64 = new_ts.parse()?;

info!("new ts for {name}: {new_ts}");
//TODO server side should be sending 304 for this
if new_ts != old_ts {
let patch = response.bytes().await?;
let patch = std::io::Cursor::new(patch);
tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
info!("patching {name} from {old_ts} to {new_ts}");
let mut patch = zstd::Decoder::new(patch)?;

let old_p = format!("/var/lib/pacman/sync/{name}-{old_ts}");
let old = std::fs::File::open(&old_p)?;
let mut old = flate2::read::GzDecoder::new(old);
let mut oldv = Vec::new();
old.read_to_end(&mut oldv)?;
let mut old = std::io::Cursor::new(oldv);

let new_p = format!("/var/lib/pacman/sync/{name}-{new_ts}");
let new = std::fs::File::create(&new_p)?;
let mut new = flate2::write::GzEncoder::new(new, flate2::Compression::default());

ddelta::apply_chunked(&mut old, &mut new, &mut patch)?;

info!("finished patching {name} to {new_ts}");

info!("linking {new_p} to db");
let db_p = format!("/var/lib/pacman/sync/{name}.db");
std::fs::remove_file(&db_p)?;
std::os::unix::fs::symlink(new_p, db_p)?;

info!("deleting obsolete db {old_p}");
std::fs::remove_file(old_p)?;

Ok(())
})
.await??;
} else {
info!("{name}: old ({old_ts}) == new ({new_ts}), nothing to do");
}
} else {
todo!("no previous db found, do full timestamped download")
}
Ok(())
}

/// {package -> [(version, arch, trailer, path)]}
fn build_package_versions() -> std::io::Result<PackageVersions> {
let mut package_versions: PackageVersions = HashMap::new();
Expand Down
2 changes: 1 addition & 1 deletion server/src/caches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ impl DBCache {
let mut response = sync
.client
.get(&uri)
// This is technically not HTTP-compliant, but idgaf
//FIXME: send http compliant header
.header(
reqwest::header::IF_MODIFIED_SINCE,
sync.last_sync.to_timestamp().to_string(),
Expand Down
1 change: 1 addition & 0 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ async fn dbdelta(
info!(name = name, old = old, "dbdelta requested for",);
let (stamp, patch) = db.get_delta_to(old).await.expect("dbdelta generating failed");
//TODO handle 304 (unchanged) case
//TODO the old version the client has may have already been expunged, thats a somewhat common case so better handle it gracefully.

h.insert(
axum::http::header::CONTENT_DISPOSITION,
Expand Down

0 comments on commit 6ad6af7

Please sign in to comment.