Skip to content

Commit

Permalink
chore: 使用 tokio::spawn 运行主任务 (#237)
Browse files Browse the repository at this point in the history
  • Loading branch information
amtoaer authored Feb 1, 2025
1 parent cc7f773 commit 51672e8
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 33 deletions.
5 changes: 4 additions & 1 deletion crates/bili_sync/src/adapter/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ pub(super) async fn collection_from<'a>(
path: &Path,
bili_client: &'a BiliClient,
connection: &DatabaseConnection,
) -> Result<(VideoListModelEnum, Pin<Box<dyn Stream<Item = Result<VideoInfo>> + 'a>>)> {
) -> Result<(
VideoListModelEnum,
Pin<Box<dyn Stream<Item = Result<VideoInfo>> + 'a + Send>>,
)> {
let collection = Collection::new(bili_client, collection_item);
let collection_info = collection.get_info().await?;
collection::Entity::insert(collection::ActiveModel {
Expand Down
5 changes: 4 additions & 1 deletion crates/bili_sync/src/adapter/favorite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ pub(super) async fn favorite_from<'a>(
path: &Path,
bili_client: &'a BiliClient,
connection: &DatabaseConnection,
) -> Result<(VideoListModelEnum, Pin<Box<dyn Stream<Item = Result<VideoInfo>> + 'a>>)> {
) -> Result<(
VideoListModelEnum,
Pin<Box<dyn Stream<Item = Result<VideoInfo>> + 'a + Send>>,
)> {
let favorite = FavoriteList::new(bili_client, fid.to_owned());
let favorite_info = favorite.get_info().await?;
favorite::Entity::insert(favorite::ActiveModel {
Expand Down
5 changes: 4 additions & 1 deletion crates/bili_sync/src/adapter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ pub async fn video_list_from<'a>(
path: &Path,
bili_client: &'a BiliClient,
connection: &DatabaseConnection,
) -> Result<(VideoListModelEnum, Pin<Box<dyn Stream<Item = Result<VideoInfo>> + 'a>>)> {
) -> Result<(
VideoListModelEnum,
Pin<Box<dyn Stream<Item = Result<VideoInfo>> + 'a + Send>>,
)> {
match args {
Args::Favorite { fid } => favorite_from(fid, path, bili_client, connection).await,
Args::Collection { collection_item } => collection_from(collection_item, path, bili_client, connection).await,
Expand Down
5 changes: 4 additions & 1 deletion crates/bili_sync/src/adapter/submission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ pub(super) async fn submission_from<'a>(
path: &Path,
bili_client: &'a BiliClient,
connection: &DatabaseConnection,
) -> Result<(VideoListModelEnum, Pin<Box<dyn Stream<Item = Result<VideoInfo>> + 'a>>)> {
) -> Result<(
VideoListModelEnum,
Pin<Box<dyn Stream<Item = Result<VideoInfo>> + 'a + Send>>,
)> {
let submission = Submission::new(bili_client, upper_id.to_owned());
let upper = submission.get_info().await?;
submission::Entity::insert(submission::ActiveModel {
Expand Down
5 changes: 4 additions & 1 deletion crates/bili_sync/src/adapter/watch_later.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ pub(super) async fn watch_later_from<'a>(
path: &Path,
bili_client: &'a BiliClient,
connection: &DatabaseConnection,
) -> Result<(VideoListModelEnum, Pin<Box<dyn Stream<Item = Result<VideoInfo>> + 'a>>)> {
) -> Result<(
VideoListModelEnum,
Pin<Box<dyn Stream<Item = Result<VideoInfo>> + 'a + Send>>,
)> {
let watch_later = WatchLater::new(bili_client);
watch_later::Entity::insert(watch_later::ActiveModel {
id: Set(1),
Expand Down
52 changes: 27 additions & 25 deletions crates/bili_sync/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,35 +31,37 @@ async fn main() {
let mut anchor = chrono::Local::now().date_naive();
let bili_client = BiliClient::new();
let params = build_params();
loop {
'inner: {
match bili_client.wbi_img().await.map(|wbi_img| wbi_img.into()) {
Ok(Some(mixin_key)) => bilibili::set_global_mixin_key(mixin_key),
Ok(_) => {
error!("解析 mixin key 失败,等待下一轮执行");
break 'inner;
tokio::spawn(async move {
loop {
'inner: {
match bili_client.wbi_img().await.map(|wbi_img| wbi_img.into()) {
Ok(Some(mixin_key)) => bilibili::set_global_mixin_key(mixin_key),
Ok(_) => {
error!("解析 mixin key 失败,等待下一轮执行");
break 'inner;
}
Err(e) => {
error!("获取 mixin key 遇到错误:{e},等待下一轮执行");
break 'inner;
}
};
if anchor != chrono::Local::now().date_naive() {
if let Err(e) = bili_client.check_refresh().await {
error!("检查刷新 Credential 遇到错误:{e},等待下一轮执行");
break 'inner;
}
anchor = chrono::Local::now().date_naive();
}
Err(e) => {
error!("获取 mixin key 遇到错误:{e},等待下一轮执行");
break 'inner;
}
};
if anchor != chrono::Local::now().date_naive() {
if let Err(e) = bili_client.check_refresh().await {
error!("检查刷新 Credential 遇到错误:{e},等待下一轮执行");
break 'inner;
}
anchor = chrono::Local::now().date_naive();
}
for (args, path) in &params {
if let Err(e) = process_video_list(*args, &bili_client, path, &connection).await {
error!("处理过程遇到错误:{e}");
for (args, path) in &params {
if let Err(e) = process_video_list(*args, &bili_client, path, &connection).await {
error!("处理过程遇到错误:{e}");
}
}
info!("本轮任务执行完毕,等待下一轮执行");
}
info!("本轮任务执行完毕,等待下一轮执行");
time::sleep(time::Duration::from_secs(CONFIG.interval)).await;
}
time::sleep(time::Duration::from_secs(CONFIG.interval)).await;
}
});
}

fn build_params() -> Vec<(Args<'static>, &'static PathBuf)> {
Expand Down
6 changes: 3 additions & 3 deletions crates/bili_sync/src/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub async fn process_video_list(
/// 请求接口,获取视频列表中所有新添加的视频信息,将其写入数据库
pub async fn refresh_video_list<'a>(
video_list_model: &VideoListModelEnum,
video_streams: Pin<Box<dyn Stream<Item = Result<VideoInfo>> + 'a>>,
video_streams: Pin<Box<dyn Stream<Item = Result<VideoInfo>> + 'a + Send>>,
connection: &DatabaseConnection,
) -> Result<()> {
video_list_model.log_refresh_video_start();
Expand Down Expand Up @@ -223,7 +223,7 @@ pub async fn download_video_pages(
let is_single_page = video_model.single_page.context("single_page is null")?;
// 对于单页视频,page 的下载已经足够
// 对于多页视频,page 下载仅包含了分集内容,需要额外补上视频的 poster 的 tvshow.nfo
let tasks: Vec<Pin<Box<dyn Future<Output = Result<()>>>>> = vec![
let tasks: Vec<Pin<Box<dyn Future<Output = Result<()>> + Send>>> = vec![
// 下载视频封面
Box::pin(fetch_video_poster(
seprate_status[0] && !is_single_page,
Expand Down Expand Up @@ -413,7 +413,7 @@ pub async fn download_page(
dimension,
..Default::default()
};
let tasks: Vec<Pin<Box<dyn Future<Output = Result<()>>>>> = vec![
let tasks: Vec<Pin<Box<dyn Future<Output = Result<()>> + Send>>> = vec![
Box::pin(fetch_page_poster(
seprate_status[0],
video_model,
Expand Down

0 comments on commit 51672e8

Please sign in to comment.