Skip to content

Commit

Permalink
Introduce GlobalState struct that contains common
Browse files Browse the repository at this point in the history
parameters
  • Loading branch information
djugei committed Feb 11, 2025
1 parent 85ba929 commit 34435f3
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 39 deletions.
58 changes: 29 additions & 29 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ async fn do_upgrade(
multi: MultiProgress,
fuz: bool,
) -> anyhow::Result<(u64, u64, Option<Duration>)> {
//TODO: download full packages too, ratelimited by maxpar_dl.
//TODO: download full packages too, rate-limited by maxpar_dl.
let (upgrade_candidates, _downloads) = util::find_deltaupgrade_candidates(&blacklist, fuz)?;
info!("downloading {} updates", upgrade_candidates.len());

Expand All @@ -237,19 +237,23 @@ async fn do_upgrade(
total_pg.tick();
total_pg.enable_steady_tick(Duration::from_millis(100));

let global = GlobalState {
multi,
total_pg,
maxpar_req,
maxpar_dl,
maxpar_cpu,
client: client.clone(),
};

let mut set = JoinSet::new();
for (url, newpkg, oldpkg, oldfile, dec_size) in upgrade_candidates {
let get_delta_f = get_delta(
global.clone(),
newpkg.clone(),
oldpkg.clone(),
oldfile,
dec_size.clone(),
multi.clone(),
total_pg.clone(),
maxpar_req.clone(),
maxpar_dl.clone(),
maxpar_cpu.clone(),
client.clone(),
delta_cache.clone(),
server.clone(),
);
Expand Down Expand Up @@ -293,22 +297,27 @@ async fn do_upgrade(
lasterror.ok_or((deltasize, newsize, comptime)).swap()
}

//TODO: maybe have less parameters somehow?
async fn get_delta(
newpkg: Package,
oldpkg: Package,
oldfile: Mmap,
mut dec_size: u64,
#[derive(Clone)]
pub(crate) struct GlobalState {
multi: MultiProgress,
total_pg: ProgressBar,
maxpar_req: Arc<Semaphore>,
maxpar_dl: Arc<Semaphore>,
maxpar_cpu: Arc<Semaphore>,
client: Client,
}

//TODO: maybe have less parameters somehow?
async fn get_delta(
global: GlobalState,
newpkg: Package,
oldpkg: Package,
oldfile: Mmap,
mut dec_size: u64,
delta_cache: PathBuf,
server: Url,
) -> Result<(u64, u64, Option<Duration>), anyhow::Error> {
total_pg.inc_length(dec_size);
global.total_pg.inc_length(dec_size);
let mut file_name = delta_cache.clone();
file_name.push(newpkg.to_string());
let delta = parsing::Delta::try_from((oldpkg.clone(), newpkg.clone()))?;
Expand All @@ -321,32 +330,23 @@ async fn get_delta(
.await
.unwrap();
let url = server.join(&format!("/arch/{oldpkg}/{newpkg}"))?;
let pg = util::do_download(
multi.clone(),
&newpkg,
client.clone(),
maxpar_req,
maxpar_dl,
url.clone(),
&mut deltafile,
)
.await?;
let pg = util::do_download(global.clone(), &newpkg, url.clone(), &mut deltafile).await?;
info!(
"downloaded {} in {} seconds",
deltafile_name.display(),
pg.elapsed().as_secs_f64()
);
total_pg.inc(dec_size / 2);
global.total_pg.inc(dec_size / 2);
// To not lose 1 in case of integer division round-down
dec_size -= dec_size / 2;
let comptime;
{
let file_name = file_name.clone();
let p_pg = ProgressBar::new(0);
let p_pg = multi.insert_after(&pg, p_pg);
let p_pg = global.multi.insert_after(&pg, p_pg);
pg.finish_and_clear();
multi.remove(&pg);
let guard_cpu = maxpar_cpu.acquire_owned().await;
global.multi.remove(&pg);
let guard_cpu = global.maxpar_cpu.acquire_owned().await;
comptime = tokio::task::spawn_blocking(move || -> Result<_, _> {
let oldfile = Cursor::new(oldfile);
let oldfile = zstd::decode_all(oldfile).unwrap();
Expand All @@ -358,7 +358,7 @@ async fn get_delta(
use std::os::unix::fs::MetadataExt;
let deltasize = deltafile.metadata().await?.size();
let newsize = tokio::fs::File::open(&file_name).await?.metadata().await?.size();
total_pg.inc(dec_size);
global.total_pg.inc(dec_size);
Ok((deltasize, newsize, comptime))
}

Expand Down
16 changes: 6 additions & 10 deletions client/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use ruma_headers::ContentDisposition;
use tokio::{
io::{AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt},
pin,
sync::Semaphore,
};

type Str = Box<str>;
Expand All @@ -32,12 +31,9 @@ pub(crate) fn progress_style() -> ProgressStyle {
.progress_chars("█▇▆▅▄▃▂▁ ")
}

pub(crate) async fn do_download<W: AsyncRead + AsyncWrite + AsyncSeek, G: AsRef<Semaphore>>(
multi: MultiProgress,
pub(crate) async fn do_download<W: AsyncRead + AsyncWrite + AsyncSeek>(
global: crate::GlobalState,
pkg: &Package,
client: Client,
request_guard: G,
dl_guard: G,
url: Url,
target: W,
) -> Result<ProgressBar, anyhow::Error> {
Expand All @@ -46,19 +42,19 @@ pub(crate) async fn do_download<W: AsyncRead + AsyncWrite + AsyncSeek, G: AsRef<
.with_message(format!("{}-{}", pkg.get_name(), pkg.get_version()))
.with_style(progress_style());

let pg = multi.add(pg);
let pg = global.multi.add(pg);
pin!(target);
let mut target = pg.wrap_async_write(target);
let mut tries = 8_u8;
//TODO: abstract retry logic
'retry: loop {
let guard = request_guard.as_ref().acquire().await?;
let guard = global.maxpar_req.as_ref().acquire().await?;
pg.tick();
let write_offset = target.seek(std::io::SeekFrom::End(0)).await.unwrap();
// get the server to generate the delta, this is expected to time out, possibly multiple times
let mut delta = {
loop {
let mut req = client.get(url.clone());
let mut req = global.client.get(url.clone());
if write_offset != 0 {
let range = format!("bytes={write_offset}-");
debug!("{pkg:?} sending range request {range}");
Expand Down Expand Up @@ -96,7 +92,7 @@ pub(crate) async fn do_download<W: AsyncRead + AsyncWrite + AsyncSeek, G: AsRef<
// acquire guard after sending request but before using the body
// so the deltas can get generated on the server as parallel as possible
// but the download does not get fragmented/overwhelmed
let guard = dl_guard.as_ref().acquire().await?;
let guard = global.maxpar_dl.as_ref().acquire().await?;

match delta.status() {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => (),
Expand Down

0 comments on commit 34435f3

Please sign in to comment.