Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/7 block offset calculation #8

Merged
merged 2 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ tokio-timerfd = "0.2"
uuid = { version = "1", features = ["v7"] }

[workspace.package]
version = "0.9.1"
version = "0.9.2"
edition = "2021"
authors = ["Ivan Kudriavtsev <[email protected]>"]
description = "ReplayDB Service"
Expand Down
44 changes: 22 additions & 22 deletions replaydb/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ mod tests {
async fn test_read_message() -> Result<()> {
let (r, w) = get_channel().await?;

let f0 = gen_properly_filled_frame();
let f0 = gen_properly_filled_frame(true);
let f0_uuid = f0.get_uuid();
let store = MockStore {
messages: vec![
Expand All @@ -672,7 +672,7 @@ mod tests {
Duration::from_millis(10),
),
(
Some((gen_properly_filled_frame().to_message(), vec![], vec![])),
Some((gen_properly_filled_frame(true).to_message(), vec![], vec![])),
Duration::from_millis(100),
),
(None, Duration::from_millis(1)),
Expand Down Expand Up @@ -713,7 +713,7 @@ mod tests {
async fn test_read_no_data() -> Result<()> {
let (r, w) = get_channel().await?;

let f0 = gen_properly_filled_frame();
let f0 = gen_properly_filled_frame(true);
let f0_uuid = f0.get_uuid();
let store = MockStore {
messages: vec![
Expand Down Expand Up @@ -786,7 +786,7 @@ mod tests {
None,
)?;

let m = job.prepare_message(gen_properly_filled_frame().to_message())?;
let m = job.prepare_message(gen_properly_filled_frame(true).to_message())?;
assert!(m.is_some());
let m = m.unwrap();
assert_eq!(
Expand Down Expand Up @@ -839,7 +839,7 @@ mod tests {
None,
)?;

let m = job.prepare_message(gen_properly_filled_frame().to_message())?;
let m = job.prepare_message(gen_properly_filled_frame(true).to_message())?;
assert!(m.is_some());
let m = m.unwrap().as_video_frame().unwrap();
assert_eq!(m.get_source_id(), "resulting_id".to_string());
Expand Down Expand Up @@ -884,13 +884,13 @@ mod tests {
let res = job.check_ts_decrease(&eos)?;
assert_eq!(res, false);

let first = gen_properly_filled_frame().to_message();
let first = gen_properly_filled_frame(true).to_message();

tokio_timerfd::sleep(Duration::from_millis(1)).await?;
let second = gen_properly_filled_frame().to_message();
let second = gen_properly_filled_frame(true).to_message();

tokio_timerfd::sleep(Duration::from_millis(1)).await?;
let third = gen_properly_filled_frame().to_message();
let third = gen_properly_filled_frame(true).to_message();

let res = job.check_ts_decrease(&first)?;
assert_eq!(res, false);
Expand Down Expand Up @@ -932,10 +932,10 @@ mod tests {
None,
)?;

let first = gen_properly_filled_frame().to_message();
let first = gen_properly_filled_frame(true).to_message();

tokio_timerfd::sleep(Duration::from_millis(1)).await?;
let second = gen_properly_filled_frame().to_message();
let second = gen_properly_filled_frame(true).to_message();

let res = job.check_ts_decrease(&second)?;
assert_eq!(res, false);
Expand Down Expand Up @@ -1033,9 +1033,9 @@ mod tests {
let (r, w) = get_channel().await?;

let frames = vec![
gen_properly_filled_frame(),
gen_properly_filled_frame(),
gen_properly_filled_frame(),
gen_properly_filled_frame(true),
gen_properly_filled_frame(true),
gen_properly_filled_frame(true),
];

let store = MockStore {
Expand Down Expand Up @@ -1095,7 +1095,7 @@ mod tests {
let mut frames = vec![];
let n = 20;
for _ in 0..n {
let f = gen_properly_filled_frame();
let f = gen_properly_filled_frame(true);
frames.push(f);
tokio_timerfd::sleep(Duration::from_millis(30)).await?;
}
Expand Down Expand Up @@ -1173,7 +1173,7 @@ mod tests {
messages: vec![
(
{
let f = gen_properly_filled_frame();
let f = gen_properly_filled_frame(true);
first_uuid = f.get_uuid();
Some((f.to_message(), vec![], vec![]))
},
Expand All @@ -1188,7 +1188,7 @@ mod tests {
Duration::from_millis(0),
),
(
Some((gen_properly_filled_frame().to_message(), vec![], vec![])),
Some((gen_properly_filled_frame(true).to_message(), vec![], vec![])),
Duration::from_millis(0),
),
(
Expand All @@ -1202,7 +1202,7 @@ mod tests {
(
Some((
{
let f = gen_properly_filled_frame();
let f = gen_properly_filled_frame(true);
last_uuid = f.get_uuid_u128();
f
}
Expand Down Expand Up @@ -1268,7 +1268,7 @@ mod tests {

let n = 20;
for _ in 0..n {
let f = gen_properly_filled_frame();
let f = gen_properly_filled_frame(true);
frames.push(f);
tokio_timerfd::sleep(Duration::from_millis(10)).await?;
}
Expand Down Expand Up @@ -1341,7 +1341,7 @@ mod tests {

let n = 20;
for _ in 0..n {
let f = gen_properly_filled_frame();
let f = gen_properly_filled_frame(true);
frames.push(f);
tokio_timerfd::sleep(Duration::from_millis(30)).await?;
}
Expand Down Expand Up @@ -1425,9 +1425,9 @@ mod tests {
let (r, w) = get_channel().await?;

let frames = vec![
gen_properly_filled_frame(),
gen_properly_filled_frame(),
gen_properly_filled_frame(),
gen_properly_filled_frame(true),
gen_properly_filled_frame(true),
gen_properly_filled_frame(true),
];

let store = MockStore {
Expand Down
8 changes: 4 additions & 4 deletions replaydb/src/job/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,14 @@ mod tests {

let mut factory =
RocksDbJobFactory::new(store.clone(), 1024u64.try_into()?, Duration::from_secs(30))?;
let f = gen_properly_filled_frame();
let f = gen_properly_filled_frame(true);
let source_id = f.get_source_id();
store
.lock()
.await
.add_message(&f.to_message(), &[], &[])
.await?;
let f = gen_properly_filled_frame();
let f = gen_properly_filled_frame(true);
store
.lock()
.await
Expand Down Expand Up @@ -153,14 +153,14 @@ mod tests {

let mut factory =
RocksDbJobFactory::new(store.clone(), 1024u64.try_into()?, Duration::from_secs(30))?;
let f = gen_properly_filled_frame();
let f = gen_properly_filled_frame(true);
let source_id = f.get_source_id();
store
.lock()
.await
.add_message(&f.to_message(), &[], &[])
.await?;
let f = gen_properly_filled_frame();
let f = gen_properly_filled_frame(true);

let f_clone = f.clone();
tokio::spawn(async move {
Expand Down
14 changes: 7 additions & 7 deletions replaydb/src/job/stop_condition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,19 +142,19 @@ mod tests {

#[test]
fn test_last_frame_stop_condition() -> Result<()> {
let frame_before = gen_properly_filled_frame();
let frame_before = gen_properly_filled_frame(true);
thread::sleep(Duration::from_millis(1));
let mut stop_condition = JobStopCondition::last_frame(incremental_uuid_v7().as_u128());
assert!(!stop_condition.check(&frame_before.to_message())?);
thread::sleep(Duration::from_millis(1));
let frame_after = gen_properly_filled_frame();
let frame_after = gen_properly_filled_frame(true);
assert!(stop_condition.check(&frame_after.to_message())?);
Ok(())
}

#[test]
fn test_frame_count_stop_condition() -> Result<()> {
let frame = gen_properly_filled_frame();
let frame = gen_properly_filled_frame(true);
let mut stop_condition = JobStopCondition::frame_count(2);
assert!(!stop_condition.check(&frame.to_message())?);
assert!(stop_condition.check(&frame.to_message())?);
Expand All @@ -163,20 +163,20 @@ mod tests {

#[test]
fn test_key_frame_count_stop_condition() -> Result<()> {
let mut frame = gen_properly_filled_frame();
let mut frame = gen_properly_filled_frame(true);
frame.set_keyframe(Some(true));
let mut stop_condition = JobStopCondition::key_frame_count(2);
assert!(!stop_condition.check(&frame.to_message())?);
frame.set_keyframe(Some(false));
assert!(!stop_condition.check(&frame.to_message())?);
let key_frame = gen_properly_filled_frame();
let key_frame = gen_properly_filled_frame(true);
assert!(stop_condition.check(&key_frame.to_message())?);
Ok(())
}

#[test]
fn test_pts_delta_stop_condition() -> Result<()> {
let mut frame = gen_properly_filled_frame();
let mut frame = gen_properly_filled_frame(true);
frame.set_time_base((1, 1_000_000));
frame.set_pts(1_000_000);
let mut stop_condition = JobStopCondition::pts_delta_sec(1.0);
Expand All @@ -190,7 +190,7 @@ mod tests {

#[test]
fn test_real_time_delta_stop_condition() -> Result<()> {
let frame = gen_properly_filled_frame();
let frame = gen_properly_filled_frame(true);
let mut stop_condition = JobStopCondition::real_time_delta_ms(500);
assert!(!stop_condition.check(&frame.to_message())?);
thread::sleep(Duration::from_millis(600));
Expand Down
4 changes: 2 additions & 2 deletions replaydb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::time::SystemTime;
use tokio::sync::Mutex;
use uuid::Uuid;

pub fn gen_properly_filled_frame() -> VideoFrameProxy {
pub fn gen_properly_filled_frame(kf: bool) -> VideoFrameProxy {
let mut f = gen_frame();
let (tbn, tbd) = (1, 1_000_000);
let now_nanos = SystemTime::now()
Expand All @@ -22,7 +22,7 @@ pub fn gen_properly_filled_frame() -> VideoFrameProxy {
f.set_pts(pts);
f.set_creation_timestamp_ns(now_nanos);
f.set_time_base((tbn, tbd));
f.set_keyframe(Some(true));
f.set_keyframe(Some(kf));
f
}

Expand Down
Loading
Loading