Skip to content

Commit

Permalink
refactor(katana-db): update mdbx implementations (#1250)
Browse files Browse the repository at this point in the history
* refactor(katana-db): update mdbx implementations

* fix doc
  • Loading branch information
kariy authored Dec 11, 2023
1 parent 4f5861a commit 79317c3
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 100 deletions.
2 changes: 1 addition & 1 deletion crates/katana/storage/db/src/codecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::error::CodecError;

/// A trait for encoding the key of a table.
pub trait Encode {
type Encoded: AsRef<[u8]>;
type Encoded: AsRef<[u8]> + Into<Vec<u8>>;
fn encode(self) -> Self::Encoded;
}

Expand Down
4 changes: 2 additions & 2 deletions crates/katana/storage/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub mod models;
pub mod tables;
pub mod utils;

use mdbx::{DbEnv, EnvKind};
use mdbx::{DbEnv, DbEnvKind};
use utils::is_database_empty;

/// Initialize the database at the given path and returning a handle to the its
Expand All @@ -34,7 +34,7 @@ pub fn init_db<P: AsRef<Path>>(path: P) -> anyhow::Result<DbEnv> {

/// Open the database at the given `path` in read-write mode.
pub fn open_db<P: AsRef<Path>>(path: P) -> anyhow::Result<DbEnv> {
DbEnv::open(path.as_ref(), EnvKind::RW).with_context(|| {
DbEnv::open(path.as_ref(), DbEnvKind::RW).with_context(|| {
format!("Opening database in read-write mode at path {}", path.as_ref().display())
})
}
237 changes: 202 additions & 35 deletions crates/katana/storage/db/src/mdbx/cursor.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
//! Cursor wrapper for libmdbx-sys.
use std::borrow::Cow;
use std::marker::PhantomData;

use libmdbx::{self, TransactionKind, WriteFlags, RW};

use crate::codecs::{Compress, Encode};
use crate::error::DatabaseError;
use crate::tables::{DupSort, Table};
use crate::utils::{decode_one, decode_value, KeyValue};
use crate::utils::{decode_one, decode_value, decoder, KeyValue};

/// Cursor wrapper to access KV items.
/// Takes key/value pair from the database and decodes it appropriately.
macro_rules! decode {
($v:expr) => {
$v.map_err($crate::error::DatabaseError::Read)?.map($crate::utils::decoder::<T>).transpose()
};
}

/// Cursor for navigating the items within a database.
#[derive(Debug)]
pub struct Cursor<K: TransactionKind, T: Table> {
/// Inner `libmdbx` cursor.
pub(crate) inner: libmdbx::Cursor<K>,
inner: libmdbx::Cursor<K>,
/// Phantom data to enforce encoding/decoding.
_dbi: PhantomData<T>,
}
Expand All @@ -24,65 +32,90 @@ impl<K: TransactionKind, T: Table> Cursor<K, T> {
}
}

/// Takes `(key, value)` from the database and decodes it appropriately.
#[macro_export]
macro_rules! decode {
($v:expr) => {
$v.map_err($crate::error::DatabaseError::Read)?.map($crate::utils::decoder::<T>).transpose()
};
}

#[allow(clippy::should_implement_trait)]
impl<K: TransactionKind, T: DupSort> Cursor<K, T> {
impl<K: TransactionKind, T: Table> Cursor<K, T> {
/// Retrieves the first key/value pair, positioning the cursor at the first key/value pair in
/// the table.
pub fn first(&mut self) -> Result<Option<KeyValue<T>>, DatabaseError> {
decode!(libmdbx::Cursor::first(&mut self.inner))
}

pub fn seek_exact(
&mut self,
key: <T as Table>::Key,
) -> Result<Option<KeyValue<T>>, DatabaseError> {
decode!(libmdbx::Cursor::set_key(&mut self.inner, key.encode().as_ref()))
}

pub fn seek(&mut self, key: <T as Table>::Key) -> Result<Option<KeyValue<T>>, DatabaseError> {
decode!(libmdbx::Cursor::set_range(&mut self.inner, key.encode().as_ref()))
/// Retrieves key/value pair at current cursor position.
pub fn current(&mut self) -> Result<Option<KeyValue<T>>, DatabaseError> {
decode!(libmdbx::Cursor::get_current(&mut self.inner))
}

/// Retrieves the next key/value pair, positioning the cursor at the next key/value pair in
/// the table.
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Result<Option<KeyValue<T>>, DatabaseError> {
decode!(libmdbx::Cursor::next(&mut self.inner))
}

/// Retrieves the previous key/value pair, positioning the cursor at the previous key/value pair
/// in the table.
pub fn prev(&mut self) -> Result<Option<KeyValue<T>>, DatabaseError> {
decode!(libmdbx::Cursor::prev(&mut self.inner))
}

/// Retrieves the last key/value pair, positioning the cursor at the last key/value pair in
/// the table.
pub fn last(&mut self) -> Result<Option<KeyValue<T>>, DatabaseError> {
decode!(libmdbx::Cursor::last(&mut self.inner))
}

pub fn current(&mut self) -> Result<Option<KeyValue<T>>, DatabaseError> {
decode!(libmdbx::Cursor::get_current(&mut self.inner))
/// Set the cursor to the specified key, returning and positioning the cursor at the item if
/// found.
pub fn set(&mut self, key: <T as Table>::Key) -> Result<Option<KeyValue<T>>, DatabaseError> {
decode!(libmdbx::Cursor::set_key(&mut self.inner, key.encode().as_ref()))
}

/// Returns the next `(key, value)` pair of a DUPSORT table.
pub fn next_dup(&mut self) -> Result<Option<KeyValue<T>>, DatabaseError> {
decode!(libmdbx::Cursor::next_dup(&mut self.inner))
/// Search for a `key` in a table, returning and positioning the cursor at the first item whose
/// key is greater than or equal to `key`.
pub fn seek(&mut self, key: <T as Table>::Key) -> Result<Option<KeyValue<T>>, DatabaseError> {
decode!(libmdbx::Cursor::set_range(&mut self.inner, key.encode().as_ref()))
}

/// Returns the next `(key, value)` pair skipping the duplicates.
pub fn next_no_dup(&mut self) -> Result<Option<KeyValue<T>>, DatabaseError> {
decode!(libmdbx::Cursor::next_nodup(&mut self.inner))
/// Creates a walker to iterate over the table items.
///
/// If `start_key` is `None`, the walker will start at the first item of the table. Otherwise,
/// it will start at the first item whose key is greater than or equal to `start_key`.
pub fn walk(&mut self, start_key: Option<T::Key>) -> Result<Walker<'_, K, T>, DatabaseError> {
let start = if let Some(start_key) = start_key {
self.inner
.set_range(start_key.encode().as_ref())
.map_err(DatabaseError::Read)?
.map(decoder::<T>)
} else {
self.first().transpose()
};

Ok(Walker::new(self, start))
}
}

impl<K: TransactionKind, T: DupSort> Cursor<K, T> {
/// Positions the cursor at next data item of current key, returning the next `key-value`
/// pair of a DUPSORT table.
pub fn next_dup(&mut self) -> Result<Option<KeyValue<T>>, DatabaseError> {
decode!(libmdbx::Cursor::next_dup(&mut self.inner))
}

/// Returns the next `value` of a duplicate `key`.
/// Similar to [`Self::next_dup()`], but instead of returning a `key-value` pair, it returns
/// only the `value`.
pub fn next_dup_val(&mut self) -> Result<Option<<T as Table>::Value>, DatabaseError> {
libmdbx::Cursor::next_dup(&mut self.inner)
.map_err(DatabaseError::Read)?
.map(decode_value::<T>)
.transpose()
}

/// Returns the next key/value pair skipping the duplicates.
pub fn next_no_dup(&mut self) -> Result<Option<KeyValue<T>>, DatabaseError> {
decode!(libmdbx::Cursor::next_nodup(&mut self.inner))
}

/// Search for a `key` and `subkey` pair in a DUPSORT table. Positioning the cursor at the first
/// item whose `subkey` is greater than or equal to the specified `subkey`.
pub fn seek_by_key_subkey(
&mut self,
key: <T as Table>::Key,
Expand All @@ -97,16 +130,63 @@ impl<K: TransactionKind, T: DupSort> Cursor<K, T> {
.map(decode_one::<T>)
.transpose()
}

/// Depending on its arguments, returns an iterator starting at:
/// - Some(key), Some(subkey): a `key` item whose data is >= than `subkey`
/// - Some(key), None: first item of a specified `key`
/// - None, Some(subkey): like first case, but in the first key
/// - None, None: first item in the table
/// of a DUPSORT table.
pub fn walk_dup(
&mut self,
key: Option<T::Key>,
subkey: Option<T::SubKey>,
) -> Result<DupWalker<'_, K, T>, DatabaseError> {
let start = match (key, subkey) {
(Some(key), Some(subkey)) => {
// encode key and decode it after.
let key: Vec<u8> = key.encode().into();
self.inner
.get_both_range(key.as_ref(), subkey.encode().as_ref())
.map_err(DatabaseError::Read)?
.map(|val| decoder::<T>((Cow::Owned(key), val)))
}

(Some(key), None) => {
let key: Vec<u8> = key.encode().into();
self.inner
.set(key.as_ref())
.map_err(DatabaseError::Read)?
.map(|val| decoder::<T>((Cow::Owned(key), val)))
}

(None, Some(subkey)) => {
if let Some((key, _)) = self.first()? {
let key: Vec<u8> = key.encode().into();
self.inner
.get_both_range(key.as_ref(), subkey.encode().as_ref())
.map_err(DatabaseError::Read)?
.map(|val| decoder::<T>((Cow::Owned(key), val)))
} else {
Some(Err(DatabaseError::Read(libmdbx::Error::NotFound)))
}
}

(None, None) => self.first().transpose(),
};

Ok(DupWalker::new(self, start))
}
}

impl<T: Table> Cursor<RW, T> {
/// Database operation that will update an existing row if a specified value already
/// exists in a table, and insert a new row if the specified value doesn't already exist
///
/// For a DUPSORT table, `upsert` will not actually update-or-insert. If the key already exists,
/// it will append the value to the subkey, even if the subkeys are the same. So if you want
/// to properly upsert, you'll need to `seek_exact` & `delete_current` if the key+subkey was
/// found, before calling `upsert`.
/// For a `DUPSORT` table, `upsert` will not actually update-or-insert. If the key already
/// exists, it will append the value to the subkey, even if the subkeys are the same. So if
/// you want to properly upsert, you'll need to `seek_exact` & `delete_current` if the
/// key+subkey was found, before calling `upsert`.
pub fn upsert(&mut self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> {
let key = Encode::encode(key);
let value = Compress::compress(value);
Expand All @@ -119,6 +199,8 @@ impl<T: Table> Cursor<RW, T> {
})
}

/// Puts a key/value pair into the database. The cursor will be positioned at the new data item,
/// or on failure, usually near it.
pub fn insert(&mut self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> {
let key = Encode::encode(key);
let value = Compress::compress(value);
Expand Down Expand Up @@ -150,12 +232,17 @@ impl<T: Table> Cursor<RW, T> {
})
}

/// Deletes the current key/value pair.
pub fn delete_current(&mut self) -> Result<(), DatabaseError> {
libmdbx::Cursor::del(&mut self.inner, WriteFlags::CURRENT).map_err(DatabaseError::Delete)
}
}

impl<T: DupSort> Cursor<RW, T> {
/// Deletes all values for the current key.
///
/// This will delete all values for the current duplicate key of a `DUPSORT` table, including
/// the current item.
pub fn delete_current_duplicates(&mut self) -> Result<(), DatabaseError> {
libmdbx::Cursor::del(&mut self.inner, WriteFlags::NO_DUP_DATA)
.map_err(DatabaseError::Delete)
Expand All @@ -173,3 +260,83 @@ impl<T: DupSort> Cursor<RW, T> {
})
}
}

/// A key-value pair coming from an iterator.
///
/// The `Result` represents that the operation might fail, while the `Option` represents whether or
/// not there is an entry.
pub type IterPairResult<T> = Option<Result<KeyValue<T>, DatabaseError>>;

/// Provides an iterator to a `Cursor` when handling `Table`.
#[derive(Debug)]
pub struct Walker<'c, K: TransactionKind, T: Table> {
/// Cursor to be used to walk through the table.
cursor: &'c mut Cursor<K, T>,
/// Initial position of the dup walker. The value (key/value pair) where to start the walk.
start: IterPairResult<T>,
}

impl<'c, K, T> Walker<'c, K, T>
where
K: TransactionKind,
T: Table,
{
/// Create a new [`Walker`] from a [`Cursor`] and a [`IterPairResult`].
pub(super) fn new(cursor: &'c mut Cursor<K, T>, start: IterPairResult<T>) -> Self {
Self { cursor, start }
}
}

impl<T: Table> Walker<'_, RW, T> {
/// Delete the `key/value` pair item at the current position of the walker.
pub fn delete_current(&mut self) -> Result<(), DatabaseError> {
self.cursor.delete_current()
}
}

impl<K: TransactionKind, T: Table> std::iter::Iterator for Walker<'_, K, T> {
type Item = Result<KeyValue<T>, DatabaseError>;
fn next(&mut self) -> Option<Self::Item> {
if let value @ Some(_) = self.start.take() { value } else { self.cursor.next().transpose() }
}
}

/// A cursor iterator for `DUPSORT` table.
///
/// Similar to [`Walker`], but for `DUPSORT` table.
#[derive(Debug)]
pub struct DupWalker<'c, K: TransactionKind, T: DupSort> {
/// Cursor to be used to walk through the table.
cursor: &'c mut Cursor<K, T>,
/// Initial position of the dup walker. The value (key/value pair) where to start the walk.
start: IterPairResult<T>,
}

impl<'c, K, T> DupWalker<'c, K, T>
where
K: TransactionKind,
T: DupSort,
{
/// Creates a new [`DupWalker`] from a [`Cursor`] and a [`IterPairResult`].
pub(super) fn new(cursor: &'c mut Cursor<K, T>, start: IterPairResult<T>) -> Self {
Self { cursor, start }
}
}

impl<T: DupSort> DupWalker<'_, RW, T> {
/// Delete the item at the current position of the walker.
pub fn delete_current(&mut self) -> Result<(), DatabaseError> {
self.cursor.delete_current()
}
}

impl<K: TransactionKind, T: DupSort> std::iter::Iterator for DupWalker<'_, K, T> {
type Item = Result<KeyValue<T>, DatabaseError>;
fn next(&mut self) -> Option<Self::Item> {
if let value @ Some(_) = self.start.take() {
value
} else {
self.cursor.next_dup().transpose()
}
}
}
Loading

0 comments on commit 79317c3

Please sign in to comment.