Skip to content

Commit

Permalink
feat: sync starts history
Browse files Browse the repository at this point in the history
  • Loading branch information
vladkens committed Aug 16, 2024
1 parent f26563c commit e547626
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 30 deletions.
28 changes: 18 additions & 10 deletions src/db_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,10 @@ impl DbClient {

// MARK: Getters

pub async fn get_repos_ids(&self) -> Res<Vec<u64>> {
pub async fn get_repos_ids(&self) -> Res<Vec<i64>> {
let qs = "SELECT id FROM repos WHERE hidden = FALSE;";
let items: Vec<(i64,)> = sqlx::query_as(qs).fetch_all(&self.db).await?;
Ok(items.into_iter().map(|x| x.0 as u64).collect())
Ok(items.into_iter().map(|x| x.0).collect())
}

pub async fn get_repo_totals(&self, repo: &str) -> Res<Option<RepoTotals>> {
Expand Down Expand Up @@ -434,17 +434,21 @@ impl DbClient {
Ok(())
}

pub async fn insert_stars(&self, repo: &str, stars: &Vec<(String, u32)>) -> Res {
pub async fn insert_stars(&self, repo_id: i64, stars: &Vec<(String, u32, u32)>) -> Res {
let qs = "
INSERT INTO repo_stats AS t (repo_id, date, stars)
VALUES ((SELECT id FROM repos WHERE name = $1), $2, $3)
VALUES ((SELECT id FROM repos WHERE id = $1), $2, $3)
ON CONFLICT(repo_id, date) DO UPDATE SET
stars = MAX(t.stars, excluded.stars);
";

for (date, count) in stars {
let _ =
sqlx::query(qs).bind(repo).bind(&date).bind(count.clone() as i32).execute(&self.db).await?;
for (date, acc_count, _) in stars {
let _ = sqlx::query(qs)
.bind(repo_id)
.bind(&date)
.bind(acc_count.clone() as i32)
.execute(&self.db)
.await?;
}

Ok(())
Expand Down Expand Up @@ -544,7 +548,6 @@ impl DbClient {
// MARK: Updater

pub async fn update_deltas(&self) -> Res {
let stime = std::time::Instant::now();
let items = [("repo_referrers", "referrer"), ("repo_popular_paths", "path")];

for (table, col) in items {
Expand All @@ -567,14 +570,19 @@ impl DbClient {
let _ = sqlx::query(qs.as_str()).execute(&self.db).await?;
}

tracing::info!("update_deltas took {:?}", stime.elapsed());
Ok(())
}

pub async fn mark_hidden_repos(&self, repos_ids: &Vec<u64>) -> Res {
pub async fn mark_repo_hidden(&self, repos_ids: &Vec<i64>) -> Res {
let ids = repos_ids.iter().map(|x| x.to_string()).collect::<Vec<_>>().join(",");
let qs = format!("UPDATE repos SET hidden = TRUE WHERE id IN ({});", ids);
let _ = sqlx::query(&qs).execute(&self.db).await?;
Ok(())
}

pub async fn mark_repo_stars_synced(&self, repo_id: i64) -> Res {
let qs = "UPDATE repos SET stars_synced = TRUE WHERE id = $1;";
let _ = sqlx::query(qs).bind(repo_id).execute(&self.db).await?;
Ok(())
}
}
53 changes: 34 additions & 19 deletions src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,10 @@ use crate::{
};

async fn check_hidden_repos(db: &DbClient, repos: &Vec<Repo>) -> Res {
let now_ids = repos.iter().map(|r| r.id).collect::<Vec<_>>();
let now_ids = repos.iter().map(|r| r.id as i64).collect::<Vec<_>>();
let was_ids = db.get_repos_ids().await?;
let hidden = was_ids.into_iter().filter(|id| !now_ids.contains(id)).collect::<Vec<_>>();
let _ = db.mark_hidden_repos(&hidden).await?;

let hidden_names = repos
.iter()
.filter(|r| hidden.contains(&r.id))
.map(|r| r.full_name.clone())
.collect::<Vec<_>>();

if !hidden_names.is_empty() {
tracing::warn!("repos marked as hidden: {:?}", hidden_names);
}
let _ = db.mark_repo_hidden(&hidden).await?;

Ok(())
}
Expand Down Expand Up @@ -48,6 +38,7 @@ pub async fn update_metrics(db: &DbClient, gh: &GhClient) -> Res {

tracing::info!("update_metrics took {:?} for {} repos", stime.elapsed(), repos.len());
db.update_deltas().await?;
sync_stars(db, gh).await?;

Ok(())
}
Expand All @@ -68,7 +59,9 @@ async fn update_repo_metrics(db: &DbClient, gh: &GhClient, repo: &Repo, date: &s
Ok(())
}

pub async fn get_stars_history(gh: &GhClient, repo: &str) -> Res<Vec<(String, u32)>> {
/// Get stars history for a repo
/// vec![(date_str, acc_stars, new_stars)), ...]
pub async fn get_stars_history(gh: &GhClient, repo: &str) -> Res<Vec<(String, u32, u32)>> {
let stars = gh.get_stars(repo).await?;

let mut dat: HashMap<String, u32> = HashMap::new();
Expand All @@ -81,16 +74,24 @@ pub async fn get_stars_history(gh: &GhClient, repo: &str) -> Res<Vec<(String, u3
let mut dat = dat.into_iter().collect::<Vec<_>>();
dat.sort_by(|a, b| a.0.cmp(&b.0));

for i in 1..dat.len() {
dat[i].1 += dat[i - 1].1;
let mut rs: Vec<(String, u32, u32)> = Vec::with_capacity(dat.len());
for i in 0..dat.len() {
let (date, new_count) = &dat[i];
let acc_count = if i > 0 { rs[i - 1].1 + new_count } else { new_count.clone() };
rs.push((date.clone(), acc_count, new_count.clone()));
}

Ok(dat)
Ok(rs)
}

pub async fn sync_stars(db: &DbClient, gh: &GhClient) -> Res {
let mut pages_collected = 0;

let repos = db.repos_to_sync().await?;
for repo in repos {
let stime = std::time::Instant::now();
// tracing::info!("sync_stars for {}", repo.name);

let stars = match get_stars_history(gh, &repo.name).await {
Ok(stars) => stars,
Err(e) => {
Expand All @@ -99,7 +100,23 @@ pub async fn sync_stars(db: &DbClient, gh: &GhClient) -> Res {
}
};

db.insert_stars(&repo.name, &stars).await?;
db.insert_stars(repo.id, &stars).await?;
db.mark_repo_stars_synced(repo.id).await?;

let stars_count = stars.iter().map(|(_, _, c)| c).sum::<u32>();
tracing::info!(
"sync_stars for {} done in {:?}, {stars_count} starts added",
repo.name,
stime.elapsed(),
);

// gh api rate limit is 5000 req/h, so this code will do up to 1000 req/h
// to not block other possible user pipelines
pages_collected += (stars_count + 99) / 100;
if pages_collected > 1000 {
tracing::info!("sync_stars: {} pages collected, will continue next hour", pages_collected);
break;
}
}

Ok(())
Expand Down Expand Up @@ -137,8 +154,6 @@ fn is_included(repo: &str, rules: &str) -> bool {
return true;
}

println!("rules: {:?}", rules);

let exclude: Vec<&str> = rules.iter().filter_map(|x| x.strip_prefix('!')).collect();
let include: Vec<&str> = rules.iter().filter(|&&x| !x.starts_with('!')).cloned().collect();

Expand Down
11 changes: 10 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,16 @@ async fn main() -> Res {

let state = Arc::new(AppState::new().await?);
let service = router.with_state(state.clone()).into_make_service();
start_cron(state.clone()).await?;

let cron_state = state.clone();
tokio::spawn(async move {
loop {
match start_cron(cron_state.clone()).await {
Err(e) => tracing::error!("failed to start cron: {:?}", e),
Ok(_) => break,
}
}
});

let host = std::env::var("HOST").unwrap_or("127.0.0.1".to_string());
let port = std::env::var("PORT").unwrap_or("8080".to_string());
Expand Down

0 comments on commit e547626

Please sign in to comment.