Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add send_recording python api #9148

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions crates/top/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1572,10 +1572,20 @@ impl RecordingStream {
}
}

/// Records a single [`Chunk`].
/// Logs multiple [`Chunk`]s.
///
/// This will _not_ inject `log_tick` and `log_time` timeline columns into the chunk,
/// for that use [`Self::log_chunk`].
/// for that use [`Self::log_chunks`].
pub fn log_chunks(&self, chunks: impl IntoIterator<Item = Chunk>) {
for chunk in chunks {
self.log_chunk(chunk);
}
}

/// Records a single [`Chunk`].
///
/// Will inject `log_tick` and `log_time` timeline columns into the chunk.
/// If you don't want to inject these, use [`Self::send_chunks`] instead.
#[inline]
pub fn send_chunk(&self, chunk: Chunk) {
let f = move |inner: &RecordingStreamInner| {
Expand All @@ -1587,6 +1597,16 @@ impl RecordingStream {
}
}

/// Records multiple [`Chunk`]s.
///
/// This will _not_ inject `log_tick` and `log_time` timeline columns into the chunk,
/// for that use [`Self::log_chunks`].
pub fn send_chunks(&self, chunks: impl IntoIterator<Item = Chunk>) {
for chunk in chunks {
self.send_chunk(chunk);
}
}

/// Swaps the underlying sink for a new one.
///
/// This guarantees that:
Expand Down
12 changes: 12 additions & 0 deletions docs/snippets/all/concepts/send_recording.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""Send a dataframe to a new recording stream."""

import sys

import rerun as rr

path_to_rrd = sys.argv[1]

recording = rr.dataframe.load_recording(path_to_rrd)

rr.init(recording.application_id(), recording_id=recording.recording_id(), spawn=True)
rr.send_recording(recording)
35 changes: 35 additions & 0 deletions docs/snippets/all/concepts/send_recording.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//! Send a `.rrd` to a new recording stream.

use rerun::external::re_chunk_store::{ChunkStore, ChunkStoreConfig};
use rerun::external::re_log_types::{LogMsg, SetStoreInfo};
use rerun::external::re_tuid::Tuid;
use rerun::VersionPolicy;

fn main() -> Result<(), Box<dyn std::error::Error>> {
// Get the filename from the command-line args.
let filename = std::env::args().nth(2).ok_or("Missing filename argument")?;

// Load the chunk store from the file.
let (store_id, store) =
ChunkStore::from_rrd_filepath(&ChunkStoreConfig::DEFAULT, filename, VersionPolicy::Warn)?
.into_iter()
.next()
.ok_or("Expected exactly one recording in the archive")?;

// Use the same app and recording IDs as the original.
if let Some(info) = store.info().cloned() {
let new_recording = rerun::RecordingStreamBuilder::new(info.application_id.clone())
.recording_id(store_id.to_string())
.spawn()?;

new_recording.record_msg(LogMsg::SetStoreInfo(SetStoreInfo {
row_id: Tuid::new(),
info,
}));

// Forward all chunks to the new recording stream.
new_recording.send_chunks(store.iter_chunks().map(|chunk| (**chunk).clone()));
}

Ok(())
}
4 changes: 4 additions & 0 deletions docs/snippets/snippets.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ features = [
"cpp",
"rust",
]
"concepts/send_recording" = [ # No output
"cpp",
]
"concepts/static/log_static" = [ # pseudo-code
"py",
"cpp",
Expand Down Expand Up @@ -312,6 +315,7 @@ quick_start = [ # These examples don't have exactly the same implementation.

# `$config_dir` will be replaced with the absolute path of `docs/snippets`.
[extra_args]
"concepts/send_recording" = ["$config_dir/../../tests/assets/rrd/dna.rrd"]
"archetypes/asset3d_simple" = ["$config_dir/../../tests/assets/cube.glb"]
"archetypes/asset3d_out_of_tree" = ["$config_dir/../../tests/assets/cube.glb"]
"archetypes/video_auto_frames" = [
Expand Down
Binary file modified examples/assets/example.rrd
Binary file not shown.
1 change: 1 addition & 0 deletions rerun_py/docs/gen_common_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class Section:
"disconnect",
"save",
"send_blueprint",
"send_recording",
"serve",
"serve_web",
"spawn",
Expand Down
1 change: 1 addition & 0 deletions rerun_py/rerun_sdk/rerun/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@
disconnect as disconnect,
save as save,
send_blueprint as send_blueprint,
send_recording as send_recording,
serve as serve,
serve_web as serve_web,
spawn as spawn,
Expand Down
27 changes: 27 additions & 0 deletions rerun_py/rerun_sdk/rerun/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from typing_extensions import deprecated # type: ignore[misc, unused-ignore]

from rerun.blueprint.api import BlueprintLike, create_in_memory_blueprint
from rerun.dataframe import Recording
from rerun.recording_stream import RecordingStream, get_application_id

from ._spawn import _spawn_viewer

Expand Down Expand Up @@ -488,6 +490,31 @@ def send_blueprint(
)


def send_recording(embedded_recording: Recording, recording: RecordingStream | None = None) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This absolutely needs to also be a RecordingStream method.

Also, I don't know if this has been discussed, but our naming is all over the place, as this signature makes it explicit.

As they say, in for a penny, in for a pound. My actionable (if drastic) suggestion is either:

  • either rename all recording: RecordingStream arguments of the stateful API to stream: RecordingStream,
  • or preferably remove those arguments altogether! (if you want to act on an existing recording stream, just use one of its method)

I'm strongly advocating for the latter option.

cc @nikolausWest @jleibs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is relevant: #9187

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm late to the party on this one, but we should really think about the flow between the Recording and a RecordingStream object a bit more.

For example: the MemoryRecording would ideally go away and just be a Recording. Would be so nice to be able to log to the in-memory recording and then query it without needing to re-load it from an rrd.

"""
Send a recording from a `Recording` and send it to the `RecordingStream`.

Parameters
----------
embedded_recording:
A recording loaded from an rrd file.
recording:
Specifies the [`rerun.RecordingStream`][] to use.
If left unspecified, defaults to the current active data recording, if there is one.
See also: [`rerun.init`][], [`rerun.set_global_data_recording`][].

"""
application_id = get_application_id(recording=recording) # NOLINT

if application_id is None:
raise ValueError("No application id found. You must call rerun.init before sending a recording.")

bindings.send_recording(
embedded_recording,
recording=recording.to_native() if recording is not None else None,
)


def spawn(
*,
port: int = 9876,
Expand Down
32 changes: 30 additions & 2 deletions rerun_py/src/python_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ use pyo3::{
};

use re_log::ResultExt;
use re_log_types::LogMsg;
use re_log_types::{BlueprintActivationCommand, EntityPathPart, StoreKind};
use re_log_types::{LogMsg, SetStoreInfo};
use re_sdk::external::re_log_encoding::encoder::encode_ref_as_bytes_local;
use re_sdk::external::re_tuid::Tuid;
use re_sdk::sink::CallbackSink;
use re_sdk::{
sink::{BinaryStreamStorage, MemorySinkStorage},
Expand All @@ -42,6 +43,8 @@ impl PyRuntimeErrorExt for PyRuntimeError {

use once_cell::sync::{Lazy, OnceCell};

use crate::dataframe::PyRecording;

// The bridge needs to have complete control over the lifetimes of the individual recordings,
// otherwise all the recording shutdown machinery (which includes deallocating C, Rust and Python
// data and joining a bunch of threads) can end up running at any time depending on what the
Expand Down Expand Up @@ -169,6 +172,7 @@ fn rerun_bindings(_py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(log_file_from_contents, m)?)?;
m.add_function(wrap_pyfunction!(send_arrow_chunk, m)?)?;
m.add_function(wrap_pyfunction!(send_blueprint, m)?)?;
m.add_function(wrap_pyfunction!(send_recording, m)?)?;

// misc
m.add_function(wrap_pyfunction!(version, m)?)?;
Expand Down Expand Up @@ -1277,6 +1281,30 @@ fn send_blueprint(
}
}

/// Send a recording to the given recording stream.
///
/// If the `embedded_recording` contains store info, it will be copied to the
/// destination before sending the recording chunks.
#[pyfunction]
#[pyo3(signature = (embedded_recording, recording = None))]
fn send_recording(embedded_recording: &PyRecording, recording: Option<&PyRecordingStream>) {
let Some(recording) = get_data_recording(recording) else {
return;
};

let store = embedded_recording.store.read();
if let Some(info) = store.info().cloned() {
recording.record_msg(LogMsg::SetStoreInfo(SetStoreInfo {
row_id: Tuid::new(),
info,
}));
}

for chunk in store.iter_chunks() {
recording.send_chunk((**chunk).clone());
}
}

// --- Misc ---

/// Return a verbose version string
Expand Down Expand Up @@ -1363,7 +1391,7 @@ fn new_entity_path(parts: Vec<Bound<'_, pyo3::types::PyString>>) -> PyResult<Str

// --- Helpers ---

fn python_version(py: Python<'_>) -> re_log_types::PythonVersion {
pub(crate) fn python_version(py: Python<'_>) -> re_log_types::PythonVersion {
let py_version = py.version_info();
re_log_types::PythonVersion {
major: py_version.major,
Expand Down