diff --git a/crates/bili_sync/src/adapter/collection.rs b/crates/bili_sync/src/adapter/collection.rs index 1a75eff..3207864 100644 --- a/crates/bili_sync/src/adapter/collection.rs +++ b/crates/bili_sync/src/adapter/collection.rs @@ -1,4 +1,3 @@ -use std::collections::HashSet; use std::path::Path; use std::pin::Pin; @@ -9,7 +8,7 @@ use futures::Stream; use sea_orm::entity::prelude::*; use sea_orm::sea_query::{IntoCondition, OnConflict}; use sea_orm::ActiveValue::Set; -use sea_orm::{DatabaseConnection, TransactionTrait}; +use sea_orm::{DatabaseConnection, TransactionTrait, Unchanged}; use crate::adapter::{helper, VideoListModel}; use crate::bilibili::{self, BiliClient, Collection, CollectionItem, CollectionType, VideoInfo}; @@ -17,10 +16,6 @@ use crate::utils::status::Status; #[async_trait] impl VideoListModel for collection::Model { - async fn video_count(&self, connection: &DatabaseConnection) -> Result { - helper::count_videos(video::Column::CollectionId.eq(self.id).into_condition(), connection).await - } - async fn unfilled_videos(&self, connection: &DatabaseConnection) -> Result> { helper::filter_videos( video::Column::CollectionId @@ -52,20 +47,6 @@ impl VideoListModel for collection::Model { .await } - async fn exist_labels( - &self, - videos_info: &[VideoInfo], - connection: &DatabaseConnection, - ) -> Result> { - helper::video_keys( - video::Column::CollectionId.eq(self.id), - videos_info, - [video::Column::Bvid, video::Column::Pubtime], - connection, - ) - .await - } - fn video_model_by_info(&self, video_info: &VideoInfo, base_model: Option) -> video::ActiveModel { let mut video_model = video_info.to_model(base_model); video_model.collection_id = Set(Some(self.id)); @@ -81,7 +62,7 @@ impl VideoListModel for collection::Model { let info: Result<_> = async { Ok((video.get_tags().await?, video.get_view_info().await?)) }.await; match info { Ok((tags, view_info)) => { - let VideoInfo::View { pages, .. } = &view_info else { + let VideoInfo::Detail { pages, .. } = &view_info else { unreachable!("view_info must be VideoInfo::View") }; let txn = connection.begin().await?; @@ -101,6 +82,21 @@ impl VideoListModel for collection::Model { Ok(()) } + fn get_latest_row_at(&self) -> DateTime { + self.latest_row_at + } + + async fn update_latest_row_at(&self, datetime: DateTime, connection: &DatabaseConnection) -> Result<()> { + collection::ActiveModel { + id: Unchanged(self.id), + latest_row_at: Set(datetime), + ..Default::default() + } + .update(connection) + .await?; + Ok(()) + } + fn log_fetch_video_start(&self) { info!( "开始获取{} {} - {} 的视频与分页信息...", @@ -146,14 +142,13 @@ impl VideoListModel for collection::Model { ); } - fn log_refresh_video_end(&self, got_count: usize, new_count: u64) { + fn log_refresh_video_end(&self, count: usize) { info!( - "扫描{}: {} - {} 的新视频完成,获取了 {} 条新视频,其中有 {} 条新视频", + "扫描{}: {} - {} 的新视频完成,获取了 {} 条新视频", CollectionType::from(self.r#type), self.s_id, self.name, - got_count, - new_count, + count, ); } } diff --git a/crates/bili_sync/src/adapter/favorite.rs b/crates/bili_sync/src/adapter/favorite.rs index 8e672ef..692fd80 100644 --- a/crates/bili_sync/src/adapter/favorite.rs +++ b/crates/bili_sync/src/adapter/favorite.rs @@ -1,4 +1,3 @@ -use std::collections::HashSet; use std::path::Path; use std::pin::Pin; @@ -9,7 +8,7 @@ use futures::Stream; use sea_orm::entity::prelude::*; use sea_orm::sea_query::{IntoCondition, OnConflict}; use sea_orm::ActiveValue::Set; -use sea_orm::{DatabaseConnection, TransactionTrait}; +use sea_orm::{DatabaseConnection, TransactionTrait, Unchanged}; use crate::adapter::{helper, VideoListModel}; use crate::bilibili::{self, BiliClient, FavoriteList, VideoInfo}; @@ -17,10 +16,6 @@ use crate::utils::status::Status; #[async_trait] impl VideoListModel for favorite::Model { - async fn video_count(&self, connection: &DatabaseConnection) -> Result { - helper::count_videos(video::Column::FavoriteId.eq(self.id).into_condition(), connection).await - } - async fn unfilled_videos(&self, connection: &DatabaseConnection) -> Result> { helper::filter_videos( video::Column::FavoriteId @@ -52,20 +47,6 @@ impl VideoListModel for favorite::Model { .await } - async fn exist_labels( - &self, - videos_info: &[VideoInfo], - connection: &DatabaseConnection, - ) -> Result> { - helper::video_keys( - video::Column::FavoriteId.eq(self.id), - videos_info, - [video::Column::Bvid, video::Column::Favtime], - connection, - ) - .await - } - fn video_model_by_info(&self, video_info: &VideoInfo, base_model: Option) -> video::ActiveModel { let mut video_model = video_info.to_model(base_model); video_model.favorite_id = Set(Some(self.id)); @@ -98,6 +79,21 @@ impl VideoListModel for favorite::Model { Ok(()) } + fn get_latest_row_at(&self) -> DateTime { + self.latest_row_at + } + + async fn update_latest_row_at(&self, datetime: DateTime, connection: &DatabaseConnection) -> Result<()> { + favorite::ActiveModel { + id: Unchanged(self.id), + latest_row_at: Set(datetime), + ..Default::default() + } + .update(connection) + .await?; + Ok(()) + } + fn log_fetch_video_start(&self) { info!("开始获取收藏夹 {} - {} 的视频与分页信息...", self.f_id, self.name); } @@ -118,10 +114,10 @@ impl VideoListModel for favorite::Model { info!("开始扫描收藏夹: {} - {} 的新视频...", self.f_id, self.name); } - fn log_refresh_video_end(&self, got_count: usize, new_count: u64) { + fn log_refresh_video_end(&self, count: usize) { info!( - "扫描收藏夹: {} - {} 的新视频完成,获取了 {} 条新视频,其中有 {} 条新视频", - self.f_id, self.name, got_count, new_count + "扫描收藏夹: {} - {} 的新视频完成,获取了 {} 条新视频", + self.f_id, self.name, count ); } } diff --git a/crates/bili_sync/src/adapter/helper/mod.rs b/crates/bili_sync/src/adapter/helper/mod.rs index 620ee7c..930db2e 100644 --- a/crates/bili_sync/src/adapter/helper/mod.rs +++ b/crates/bili_sync/src/adapter/helper/mod.rs @@ -1,21 +1,14 @@ -use std::collections::HashSet; use std::path::Path; use anyhow::Result; use bili_sync_entity::*; use sea_orm::entity::prelude::*; -use sea_orm::sea_query::{OnConflict, SimpleExpr}; +use sea_orm::sea_query::OnConflict; use sea_orm::ActiveValue::Set; -use sea_orm::{Condition, DatabaseTransaction, QuerySelect}; +use sea_orm::{Condition, DatabaseTransaction}; use crate::bilibili::{BiliError, PageInfo, VideoInfo}; use crate::config::{PathSafeTemplate, TEMPLATE}; -use crate::utils::id_time_key; - -/// 使用 condition 筛选视频,返回视频数量 -pub(super) async fn count_videos(condition: Condition, conn: &DatabaseConnection) -> Result { - Ok(video::Entity::find().filter(condition).count(conn).await?) -} /// 使用 condition 筛选视频,返回视频列表 pub(super) async fn filter_videos(condition: Condition, conn: &DatabaseConnection) -> Result> { @@ -34,29 +27,6 @@ pub(super) async fn filter_videos_with_pages( .await?) } -/// 返回 videos_info 存在于视频表里那部分对应的 key -pub(super) async fn video_keys( - expr: SimpleExpr, - videos_info: &[VideoInfo], - columns: [video::Column; 2], - conn: &DatabaseConnection, -) -> Result> { - Ok(video::Entity::find() - .filter( - video::Column::Bvid - .is_in(videos_info.iter().map(|v| v.bvid().to_string())) - .and(expr), - ) - .select_only() - .columns(columns) - .into_tuple() - .all(conn) - .await? - .into_iter() - .map(|(bvid, time)| id_time_key(&bvid, &time)) - .collect()) -} - /// 返回设置了 path 的视频 pub(super) fn video_with_path( mut video_model: video::ActiveModel, diff --git a/crates/bili_sync/src/adapter/mod.rs b/crates/bili_sync/src/adapter/mod.rs index 60db867..3d0a003 100644 --- a/crates/bili_sync/src/adapter/mod.rs +++ b/crates/bili_sync/src/adapter/mod.rs @@ -4,7 +4,6 @@ mod helper; mod submission; mod watch_later; -use std::collections::HashSet; use std::path::Path; use std::pin::Pin; @@ -43,9 +42,6 @@ pub async fn video_list_from<'a>( #[async_trait] pub trait VideoListModel { - /// 与视频列表关联的视频总数 - async fn video_count(&self, connection: &DatabaseConnection) -> Result; - /// 未填充的视频 async fn unfilled_videos(&self, connection: &DatabaseConnection) -> Result>; @@ -55,10 +51,6 @@ pub trait VideoListModel { connection: &DatabaseConnection, ) -> Result)>>; - /// 该批次视频的存在标记 - async fn exist_labels(&self, videos_info: &[VideoInfo], connection: &DatabaseConnection) - -> Result>; - /// 视频信息对应的视频 model fn video_model_by_info( &self, @@ -74,6 +66,12 @@ pub trait VideoListModel { connection: &DatabaseConnection, ) -> Result<()>; + /// 获取视频 model 中记录的最新时间 + fn get_latest_row_at(&self) -> DateTime; + + /// 更新视频 model 中记录的最新时间 + async fn update_latest_row_at(&self, datetime: DateTime, connection: &DatabaseConnection) -> Result<()>; + /// 开始获取视频 fn log_fetch_video_start(&self); @@ -90,5 +88,5 @@ pub trait VideoListModel { fn log_refresh_video_start(&self); /// 结束刷新视频 - fn log_refresh_video_end(&self, got_count: usize, new_count: u64); + fn log_refresh_video_end(&self, count: usize); } diff --git a/crates/bili_sync/src/adapter/submission.rs b/crates/bili_sync/src/adapter/submission.rs index 805941b..731a8f7 100644 --- a/crates/bili_sync/src/adapter/submission.rs +++ b/crates/bili_sync/src/adapter/submission.rs @@ -1,4 +1,3 @@ -use std::collections::HashSet; use std::path::Path; use std::pin::Pin; @@ -9,7 +8,7 @@ use futures::Stream; use sea_orm::entity::prelude::*; use sea_orm::sea_query::{IntoCondition, OnConflict}; use sea_orm::ActiveValue::Set; -use sea_orm::{DatabaseConnection, TransactionTrait}; +use sea_orm::{DatabaseConnection, TransactionTrait, Unchanged}; use crate::adapter::helper::video_with_path; use crate::adapter::{helper, VideoListModel}; @@ -18,10 +17,6 @@ use crate::utils::status::Status; #[async_trait] impl VideoListModel for submission::Model { - async fn video_count(&self, connection: &DatabaseConnection) -> Result { - helper::count_videos(video::Column::SubmissionId.eq(self.id).into_condition(), connection).await - } - async fn unfilled_videos(&self, connection: &DatabaseConnection) -> Result> { helper::filter_videos( video::Column::SubmissionId @@ -53,20 +48,6 @@ impl VideoListModel for submission::Model { .await } - async fn exist_labels( - &self, - videos_info: &[VideoInfo], - connection: &DatabaseConnection, - ) -> Result> { - helper::video_keys( - video::Column::SubmissionId.eq(self.id), - videos_info, - [video::Column::Bvid, video::Column::Ctime], - connection, - ) - .await - } - fn video_model_by_info(&self, video_info: &VideoInfo, base_model: Option) -> video::ActiveModel { let mut video_model = video_info.to_model(base_model); video_model.submission_id = Set(Some(self.id)); @@ -82,7 +63,7 @@ impl VideoListModel for submission::Model { let info: Result<_> = async { Ok((video.get_tags().await?, video.get_view_info().await?)) }.await; match info { Ok((tags, view_info)) => { - let VideoInfo::View { pages, .. } = &view_info else { + let VideoInfo::Detail { pages, .. } = &view_info else { unreachable!("view_info must be VideoInfo::View") }; let txn = connection.begin().await?; @@ -102,6 +83,21 @@ impl VideoListModel for submission::Model { Ok(()) } + fn get_latest_row_at(&self) -> DateTime { + self.latest_row_at + } + + async fn update_latest_row_at(&self, datetime: DateTime, connection: &DatabaseConnection) -> Result<()> { + submission::ActiveModel { + id: Unchanged(self.id), + latest_row_at: Set(datetime), + ..Default::default() + } + .update(connection) + .await?; + Ok(()) + } + fn log_fetch_video_start(&self) { info!( "开始获取 UP 主 {} - {} 投稿的视频与分页信息...", @@ -134,10 +130,10 @@ impl VideoListModel for submission::Model { info!("开始扫描 UP 主 {} - {} 投稿的新视频...", self.upper_id, self.upper_name); } - fn log_refresh_video_end(&self, got_count: usize, new_count: u64) { + fn log_refresh_video_end(&self, count: usize) { info!( - "扫描 UP 主 {} - {} 投稿的新视频完成,获取了 {} 条新视频,其中有 {} 条新视频", - self.upper_id, self.upper_name, got_count, new_count, + "扫描 UP 主 {} - {} 投稿的新视频完成,获取了 {} 条新视频", + self.upper_id, self.upper_name, count, ); } } diff --git a/crates/bili_sync/src/adapter/watch_later.rs b/crates/bili_sync/src/adapter/watch_later.rs index d0d2c5d..eedf256 100644 --- a/crates/bili_sync/src/adapter/watch_later.rs +++ b/crates/bili_sync/src/adapter/watch_later.rs @@ -1,4 +1,3 @@ -use std::collections::HashSet; use std::path::Path; use std::pin::Pin; @@ -9,7 +8,7 @@ use futures::Stream; use sea_orm::entity::prelude::*; use sea_orm::sea_query::{IntoCondition, OnConflict}; use sea_orm::ActiveValue::Set; -use sea_orm::{DatabaseConnection, TransactionTrait}; +use sea_orm::{DatabaseConnection, TransactionTrait, Unchanged}; use crate::adapter::helper::video_with_path; use crate::adapter::{helper, VideoListModel}; @@ -18,10 +17,6 @@ use crate::utils::status::Status; #[async_trait] impl VideoListModel for watch_later::Model { - async fn video_count(&self, connection: &DatabaseConnection) -> Result { - helper::count_videos(video::Column::WatchLaterId.eq(self.id).into_condition(), connection).await - } - async fn unfilled_videos(&self, connection: &DatabaseConnection) -> Result> { helper::filter_videos( video::Column::WatchLaterId @@ -53,20 +48,6 @@ impl VideoListModel for watch_later::Model { .await } - async fn exist_labels( - &self, - videos_info: &[VideoInfo], - connection: &DatabaseConnection, - ) -> Result> { - helper::video_keys( - video::Column::WatchLaterId.eq(self.id), - videos_info, - [video::Column::Bvid, video::Column::Favtime], - connection, - ) - .await - } - fn video_model_by_info(&self, video_info: &VideoInfo, base_model: Option) -> video::ActiveModel { let mut video_model = video_info.to_model(base_model); video_model.watch_later_id = Set(Some(self.id)); @@ -99,6 +80,21 @@ impl VideoListModel for watch_later::Model { Ok(()) } + fn get_latest_row_at(&self) -> DateTime { + self.latest_row_at + } + + async fn update_latest_row_at(&self, datetime: DateTime, connection: &DatabaseConnection) -> Result<()> { + watch_later::ActiveModel { + id: Unchanged(self.id), + latest_row_at: Set(datetime), + ..Default::default() + } + .update(connection) + .await?; + Ok(()) + } + fn log_fetch_video_start(&self) { info!("开始获取稍后再看的视频与分页信息..."); } @@ -119,11 +115,8 @@ impl VideoListModel for watch_later::Model { info!("开始扫描稍后再看的新视频..."); } - fn log_refresh_video_end(&self, got_count: usize, new_count: u64) { - info!( - "扫描稍后再看的新视频完成,获取了 {} 条新视频,其中有 {} 条新视频", - got_count, new_count, - ); + fn log_refresh_video_end(&self, count: usize) { + info!("扫描稍后再看的新视频完成,获取了 {} 条新视频", count); } } diff --git a/crates/bili_sync/src/bilibili/mod.rs b/crates/bili_sync/src/bilibili/mod.rs index 280e67e..1630c30 100644 --- a/crates/bili_sync/src/bilibili/mod.rs +++ b/crates/bili_sync/src/bilibili/mod.rs @@ -61,7 +61,7 @@ impl Validate for serde_json::Value { /// > Serde will try to match the data against each variant in order and the first one that deserializes successfully is the one returned. pub enum VideoInfo { /// 从视频详情接口获取的视频信息 - View { + Detail { title: String, bvid: String, #[serde(rename = "desc")] @@ -77,8 +77,8 @@ pub enum VideoInfo { pages: Vec, state: i32, }, - /// 从收藏夹中获取的视频信息 - Detail { + /// 从收藏夹接口获取的视频信息 + Favorite { title: String, #[serde(rename = "type")] vtype: i32, @@ -94,7 +94,7 @@ pub enum VideoInfo { pubtime: DateTime, attr: i32, }, - /// 从稍后再看中获取的视频信息 + /// 从稍后再看接口获取的视频信息 WatchLater { title: String, bvid: String, @@ -112,8 +112,8 @@ pub enum VideoInfo { pubtime: DateTime, state: i32, }, - /// 从视频列表中获取的视频信息 - Simple { + /// 从视频合集/视频列表接口获取的视频信息 + Collection { bvid: String, #[serde(rename = "pic")] cover: String, @@ -122,6 +122,7 @@ pub enum VideoInfo { #[serde(rename = "pubdate", with = "ts_seconds")] pubtime: DateTime, }, + // 从用户投稿接口获取的视频信息 Submission { title: String, bvid: String, @@ -136,7 +137,7 @@ pub enum VideoInfo { #[cfg(test)] mod tests { - use futures::{pin_mut, StreamExt}; + use futures::StreamExt; use super::*; use crate::utils::init_logger; @@ -151,28 +152,30 @@ mod tests { panic!("获取 mixin key 失败"); }; set_global_mixin_key(mixin_key); - let video = Video::new(&bili_client, "BV1Z54y1C7ZB".to_string()); - assert!(matches!(video.get_view_info().await, Ok(VideoInfo::View { .. }))); + // 测试视频合集 let collection_item = CollectionItem { mid: "521722088".to_string(), - sid: "387214".to_string(), - collection_type: CollectionType::Series, + sid: "4523".to_string(), + collection_type: CollectionType::Season, }; let collection = Collection::new(&bili_client, &collection_item); - let stream = collection.into_simple_video_stream(); - pin_mut!(stream); - assert!(matches!(stream.next().await, Some(VideoInfo::Simple { .. }))); - let favorite = FavoriteList::new(&bili_client, "3084505258".to_string()); - let stream = favorite.into_video_stream(); - pin_mut!(stream); - assert!(matches!(stream.next().await, Some(VideoInfo::Detail { .. }))); + let videos = collection.into_simple_video_stream().take(20).collect::>().await; + assert!(videos.iter().all(|v| matches!(v, VideoInfo::Collection { .. }))); + assert!(videos.iter().rev().is_sorted_by_key(|v| v.release_datetime())); + // 测试收藏夹 + let favorite = FavoriteList::new(&bili_client, "3144336058".to_string()); + let videos = favorite.into_video_stream().take(20).collect::>().await; + assert!(videos.iter().all(|v| matches!(v, VideoInfo::Favorite { .. }))); + assert!(videos.iter().rev().is_sorted_by_key(|v| v.release_datetime())); + // 测试稍后再看 let watch_later = WatchLater::new(&bili_client); - let stream = watch_later.into_video_stream(); - pin_mut!(stream); - assert!(matches!(stream.next().await, Some(VideoInfo::WatchLater { .. }))); + let videos = watch_later.into_video_stream().take(20).collect::>().await; + assert!(videos.iter().all(|v| matches!(v, VideoInfo::WatchLater { .. }))); + assert!(videos.iter().rev().is_sorted_by_key(|v| v.release_datetime())); + // 测试投稿 let submission = Submission::new(&bili_client, "956761".to_string()); - let stream = submission.into_video_stream(); - pin_mut!(stream); - assert!(matches!(stream.next().await, Some(VideoInfo::Submission { .. }))); + let videos = submission.into_video_stream().take(20).collect::>().await; + assert!(videos.iter().all(|v| matches!(v, VideoInfo::Submission { .. }))); + assert!(videos.iter().rev().is_sorted_by_key(|v| v.release_datetime())); } } diff --git a/crates/bili_sync/src/utils/convert.rs b/crates/bili_sync/src/utils/convert.rs index 404fe2d..bdfa68e 100644 --- a/crates/bili_sync/src/utils/convert.rs +++ b/crates/bili_sync/src/utils/convert.rs @@ -1,10 +1,10 @@ +use chrono::{DateTime, Utc}; use sea_orm::ActiveValue::{NotSet, Set}; use sea_orm::IntoActiveModel; use serde_json::json; use crate::bilibili::VideoInfo; use crate::config::CONFIG; -use crate::utils::id_time_key; impl VideoInfo { /// 将 VideoInfo 转换为 ActiveModel @@ -20,7 +20,7 @@ impl VideoInfo { } }; match self { - VideoInfo::Simple { + VideoInfo::Collection { bvid, cover, ctime, @@ -34,7 +34,7 @@ impl VideoInfo { valid: Set(true), ..base_model }, - VideoInfo::Detail { + VideoInfo::Favorite { title, vtype, bvid, @@ -63,7 +63,7 @@ impl VideoInfo { upper_face: Set(upper.face.clone()), ..base_model }, - VideoInfo::View { + VideoInfo::Detail { title, bvid, intro, @@ -140,8 +140,8 @@ impl VideoInfo { pub fn to_fmt_args(&self) -> Option { match self { - VideoInfo::Simple { .. } | VideoInfo::Submission { .. } => None, // 不能从简单视频信息中构造格式化参数 - VideoInfo::Detail { + VideoInfo::Collection { .. } | VideoInfo::Submission { .. } => None, // 不能从简单视频信息中构造格式化参数 + VideoInfo::Favorite { title, bvid, upper, @@ -164,7 +164,7 @@ impl VideoInfo { "pubtime": pubtime.format(&CONFIG.time_format).to_string(), "fav_time": fav_time.format(&CONFIG.time_format).to_string(), })), - VideoInfo::View { + VideoInfo::Detail { title, bvid, upper, @@ -184,31 +184,12 @@ impl VideoInfo { } } - pub fn video_key(&self) -> String { - match self { - // 对于合集没有 fav_time,只能用 pubtime 代替 - VideoInfo::Simple { - bvid, pubtime: time, .. - } - | VideoInfo::Detail { - bvid, fav_time: time, .. - } - | VideoInfo::WatchLater { - bvid, fav_time: time, .. - } - | VideoInfo::Submission { bvid, ctime: time, .. } => id_time_key(bvid, time), - // 详情接口返回的数据仅用于填充详情,不会被作为 video_key - _ => unreachable!(), - } - } - - pub fn bvid(&self) -> &str { + pub fn release_datetime(&self) -> &DateTime { match self { - VideoInfo::Simple { bvid, .. } - | VideoInfo::Detail { bvid, .. } - | VideoInfo::WatchLater { bvid, .. } - | VideoInfo::Submission { bvid, .. } => bvid, - // 同上 + VideoInfo::Collection { pubtime: time, .. } + | VideoInfo::Favorite { fav_time: time, .. } + | VideoInfo::WatchLater { fav_time: time, .. } + | VideoInfo::Submission { ctime: time, .. } => time, _ => unreachable!(), } } diff --git a/crates/bili_sync/src/utils/mod.rs b/crates/bili_sync/src/utils/mod.rs index a5ce98a..a765e88 100644 --- a/crates/bili_sync/src/utils/mod.rs +++ b/crates/bili_sync/src/utils/mod.rs @@ -4,7 +4,6 @@ pub mod model; pub mod nfo; pub mod status; -use chrono::{DateTime, Utc}; use tracing_subscriber::util::SubscriberInitExt; pub fn init_logger(log_level: &str) { @@ -17,8 +16,3 @@ pub fn init_logger(log_level: &str) { .try_init() .expect("初始化日志失败"); } - -/// 生成视频的唯一标记,均由 bvid 和时间戳构成 -pub fn id_time_key(bvid: &String, time: &DateTime) -> String { - format!("{}-{}", bvid, time.timestamp()) -} diff --git a/crates/bili_sync/src/workflow.rs b/crates/bili_sync/src/workflow.rs index 63ba0bb..a8a24f4 100644 --- a/crates/bili_sync/src/workflow.rs +++ b/crates/bili_sync/src/workflow.rs @@ -27,49 +27,62 @@ pub async fn process_video_list( path: &Path, connection: &DatabaseConnection, ) -> Result<()> { + // 从参数中获取视频列表的 Model 与视频流 let (video_list_model, video_streams) = video_list_from(args, path, bili_client, connection).await?; - let video_list_model = refresh_video_list(video_list_model, video_streams, connection).await?; - let video_list_model = fetch_video_details(bili_client, video_list_model, connection).await?; + // 从视频流中获取新视频的简要信息,写入数据库 + refresh_video_list(video_list_model.as_ref(), video_streams, connection).await?; + // 单独请求视频详情接口,获取视频的详情信息与所有的分页,写入数据库 + fetch_video_details(bili_client, video_list_model.as_ref(), connection).await?; if ARGS.scan_only { warn!("已开启仅扫描模式,跳过视频下载..."); - return Ok(()); + } else { + // 从数据库中查找所有未下载的视频与分页,下载并处理 + download_unprocessed_videos(bili_client, video_list_model.as_ref(), connection).await?; } - download_unprocessed_videos(bili_client, video_list_model, connection).await + Ok(()) } /// 请求接口,获取视频列表中所有新添加的视频信息,将其写入数据库 pub async fn refresh_video_list<'a>( - video_list_model: Box, + video_list_model: &dyn VideoListModel, video_streams: Pin + 'a>>, connection: &DatabaseConnection, -) -> Result> { +) -> Result<()> { video_list_model.log_refresh_video_start(); - let mut video_streams = video_streams.chunks(10); - let mut got_count = 0; - let mut new_count = video_list_model.video_count(connection).await?; + let latest_row_at = video_list_model.get_latest_row_at().and_utc(); + let mut max_datetime = latest_row_at; + let mut video_streams = video_streams + .take_while(|v| { + // 虽然 video_streams 是从新到旧的,但由于此处是分页请求,极端情况下可能发生访问完第一页时插入了两整页视频的情况 + // 此时获取到的第二页视频比第一页的还要新,因此为了确保正确,理应对每一页的第一个视频进行时间比较 + // 但在 streams 的抽象下,无法判断具体是在哪里分页的,所以暂且对每个视频都进行比较,希望不会有太大性能损失 + let release_datetime = v.release_datetime(); + if release_datetime > &max_datetime { + max_datetime = *release_datetime; + } + futures::future::ready(release_datetime > &latest_row_at) + }) + .chunks(10); + let mut count = 0; while let Some(videos_info) = video_streams.next().await { - got_count += videos_info.len(); - let exist_labels = video_list_model.exist_labels(&videos_info, connection).await?; - // 如果发现有视频的收藏时间和 bvid 和数据库中重合,说明到达了上次处理到的地方,可以直接退出 - let should_break = videos_info.iter().any(|v| exist_labels.contains(&v.video_key())); - // 将视频信息写入数据库 - create_videos(&videos_info, video_list_model.as_ref(), connection).await?; - if should_break { - info!("到达上一次处理的位置,提前中止"); - break; - } + count += videos_info.len(); + create_videos(&videos_info, video_list_model, connection).await?; } - new_count = video_list_model.video_count(connection).await? - new_count; - video_list_model.log_refresh_video_end(got_count, new_count); - Ok(video_list_model) + if max_datetime != latest_row_at { + video_list_model + .update_latest_row_at(max_datetime.naive_utc(), connection) + .await?; + } + video_list_model.log_refresh_video_end(count); + Ok(()) } /// 筛选出所有未获取到全部信息的视频,尝试补充其详细信息 pub async fn fetch_video_details( bili_client: &BiliClient, - video_list_model: Box, + video_list_model: &dyn VideoListModel, connection: &DatabaseConnection, -) -> Result> { +) -> Result<()> { video_list_model.log_fetch_video_start(); let videos_model = video_list_model.unfilled_videos(connection).await?; for video_model in videos_model { @@ -79,13 +92,13 @@ pub async fn fetch_video_details( .await?; } video_list_model.log_fetch_video_end(); - Ok(video_list_model) + Ok(()) } /// 下载所有未处理成功的视频 pub async fn download_unprocessed_videos( bili_client: &BiliClient, - video_list_model: Box, + video_list_model: &dyn VideoListModel, connection: &DatabaseConnection, ) -> Result<()> { video_list_model.log_download_video_start(); diff --git a/crates/bili_sync_entity/src/entities/collection.rs b/crates/bili_sync_entity/src/entities/collection.rs index 620d8d3..4272090 100644 --- a/crates/bili_sync_entity/src/entities/collection.rs +++ b/crates/bili_sync_entity/src/entities/collection.rs @@ -13,6 +13,7 @@ pub struct Model { pub r#type: i32, pub path: String, pub created_at: String, + pub latest_row_at: DateTime, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/crates/bili_sync_entity/src/entities/favorite.rs b/crates/bili_sync_entity/src/entities/favorite.rs index 1654598..be75351 100644 --- a/crates/bili_sync_entity/src/entities/favorite.rs +++ b/crates/bili_sync_entity/src/entities/favorite.rs @@ -12,6 +12,7 @@ pub struct Model { pub name: String, pub path: String, pub created_at: String, + pub latest_row_at: DateTime, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/crates/bili_sync_entity/src/entities/submission.rs b/crates/bili_sync_entity/src/entities/submission.rs index f456f5a..5cd7bb4 100644 --- a/crates/bili_sync_entity/src/entities/submission.rs +++ b/crates/bili_sync_entity/src/entities/submission.rs @@ -11,6 +11,7 @@ pub struct Model { pub upper_name: String, pub path: String, pub created_at: String, + pub latest_row_at: DateTime, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/crates/bili_sync_entity/src/entities/watch_later.rs b/crates/bili_sync_entity/src/entities/watch_later.rs index 66066f3..615f016 100644 --- a/crates/bili_sync_entity/src/entities/watch_later.rs +++ b/crates/bili_sync_entity/src/entities/watch_later.rs @@ -9,6 +9,7 @@ pub struct Model { pub id: i32, pub path: String, pub created_at: String, + pub latest_row_at: DateTime, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/crates/bili_sync_migration/src/lib.rs b/crates/bili_sync_migration/src/lib.rs index 59e70ef..7e82418 100644 --- a/crates/bili_sync_migration/src/lib.rs +++ b/crates/bili_sync_migration/src/lib.rs @@ -4,6 +4,7 @@ mod m20240322_000001_create_table; mod m20240505_130850_add_collection; mod m20240709_130914_watch_later; mod m20240724_161008_submission; +mod m20250122_062926_add_latest_row_at; pub struct Migrator; @@ -15,6 +16,7 @@ impl MigratorTrait for Migrator { Box::new(m20240505_130850_add_collection::Migration), Box::new(m20240709_130914_watch_later::Migration), Box::new(m20240724_161008_submission::Migration), + Box::new(m20250122_062926_add_latest_row_at::Migration), ] } } diff --git a/crates/bili_sync_migration/src/m20250122_062926_add_latest_row_at.rs b/crates/bili_sync_migration/src/m20250122_062926_add_latest_row_at.rs new file mode 100644 index 0000000..2d6d21a --- /dev/null +++ b/crates/bili_sync_migration/src/m20250122_062926_add_latest_row_at.rs @@ -0,0 +1,122 @@ +use sea_orm_migration::prelude::*; +use sea_orm_migration::schema::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // 为四张 video list 表添加 latest_row_at 字段,表示该列表处理到的最新时间 + manager + .alter_table( + Table::alter() + .table(Favorite::Table) + .add_column(timestamp(Favorite::LatestRowAt).default("1970-01-01 00:00:00")) + .to_owned(), + ) + .await?; + manager + .alter_table( + Table::alter() + .table(Collection::Table) + .add_column(timestamp(Collection::LatestRowAt).default("1970-01-01 00:00:00")) + .to_owned(), + ) + .await?; + manager + .alter_table( + Table::alter() + .table(WatchLater::Table) + .add_column(timestamp(WatchLater::LatestRowAt).default("1970-01-01 00:00:00")) + .to_owned(), + ) + .await?; + manager + .alter_table( + Table::alter() + .table(Submission::Table) + .add_column(timestamp(Submission::LatestRowAt).default("1970-01-01 00:00:00")) + .to_owned(), + ) + .await?; + // 手动写 SQL 更新这四张表的 latest 字段到当前取值 + let db = manager.get_connection(); + db.execute_unprepared( + "UPDATE favorite SET latest_row_at = (SELECT IFNULL(MAX(favtime), '1970-01-01 00:00:00') FROM video WHERE favorite_id = favorite.id)", + ) + .await?; + db.execute_unprepared( + "UPDATE collection SET latest_row_at = (SELECT IFNULL(MAX(pubtime), '1970-01-01 00:00:00') FROM video WHERE collection_id = collection.id)", + ) + .await?; + db.execute_unprepared( + "UPDATE watch_later SET latest_row_at = (SELECT IFNULL(MAX(favtime), '1970-01-01 00:00:00') FROM video WHERE watch_later_id = watch_later.id)", + ) + .await?; + db.execute_unprepared( + "UPDATE submission SET latest_row_at = (SELECT IFNULL(MAX(ctime), '1970-01-01 00:00:00') FROM video WHERE submission_id = submission.id)", + ) + .await?; + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Favorite::Table) + .drop_column(Favorite::LatestRowAt) + .to_owned(), + ) + .await?; + manager + .alter_table( + Table::alter() + .table(Collection::Table) + .drop_column(Collection::LatestRowAt) + .to_owned(), + ) + .await?; + manager + .alter_table( + Table::alter() + .table(WatchLater::Table) + .drop_column(WatchLater::LatestRowAt) + .to_owned(), + ) + .await?; + manager + .alter_table( + Table::alter() + .table(Submission::Table) + .drop_column(Submission::LatestRowAt) + .to_owned(), + ) + .await + } +} + +#[derive(DeriveIden)] +enum Favorite { + Table, + LatestRowAt, +} + +#[derive(DeriveIden)] +enum Collection { + Table, + LatestRowAt, +} + +#[derive(DeriveIden)] +enum WatchLater { + Table, + LatestRowAt, +} + +#[derive(DeriveIden)] +enum Submission { + Table, + LatestRowAt, +}