Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

on_commit hooks or other way to defer after commit #938

Open
dpc opened this issue Jan 20, 2025 · 1 comment
Open

on_commit hooks or other way to defer after commit #938

dpc opened this issue Jan 20, 2025 · 1 comment

Comments

@dpc
Copy link

dpc commented Jan 20, 2025

Here is a very common (to me at least) scenario:

The code is processing certain stuff and inserting it into the database. After the data is inserted, the code processing it would like to, potentially conditionally on the data, notify (e.g. send a notification over tokio::watch channel) certain subsystems to do something , but it can't really do it inline, because the side effects are not going to be visible yet. Event if the code does not abort the transaction, the new data will not be visible to any outside new actor yet.

To do it properly, one needs to somehow write/return out/upwards relevant flags to be able to call notifications etc. after the database transaction was committed. This is particularly painful if the logic has multiple layers (e.g. is decomposed into smaller functions).

Yes, there is no guarantee that program does not crash right after the commit, but fundamentally there's no way around lack of atomicity here, and such after-commit side-effects are usually just notification to improve latency and avoid polling. After crash (on program start) internal components can just do a single check if there is something for them to do and then wait for notifications and that guarantees nothing was missed.

In a project I'm working on at dayjob that uses rockdsb, we've built a whole database layer that wraps rocksdb's database transactions and one of the main reasons was adding DatabaseTransaction::on_commit which basically pushes closures to an inner on_commit: Vec<Box<dyn FnOnce()>>. Right after the transaction is committed, all the hooks from on_commit get called.

This allows clean implementation:

dbtx.insert(some_key, some_value)?;
if some_condtion {
   dbtx.on_commit(|| { some_notification_sender.send(some_message); });
}

This has been working beautifully for us, and now that I'm building on top of redb in my own project, I'm missing it a lot.

I wonder if you (or other people) have any feedback or alternative approach.

Would you consider adding something like built-in to redb API? Implementation-wise it's a single extra Vec per transaction, which doesn't even trigger allocation until used.

Sure it can be added "from the outside" by wrapping redb, but it seems to me that almost any non-trivial program must be eventually hitting similar scenarios, and given that I already wrapped redb in redb-bincode and rocksdb at $dajob, I'm aware how much boilerplate and extra hasle this requires.

@dpc dpc changed the title Could we get on_commit hooks? Or are there better ways to defer things after commit. on_commit hooks or better ways to defer after commit. Jan 20, 2025
@dpc dpc changed the title on_commit hooks or better ways to defer after commit. on_commit hooks or other way to defer after commit Jan 20, 2025
@dpc
Copy link
Author

dpc commented Jan 20, 2025

Huh... As usual, only after posting it, I started playing with the problem even more, and it occurred to me that at least in my application, I can just wrap WriteTransaction:

pub struct WriteTransactionCtx {
    dbtx: WriteTransaction,
    on_commit: std::sync::Mutex<Vec<Box<dyn FnOnce() + 'static>>>,
}

impl From<WriteTransaction> for WriteTransactionCtx {
    fn from(dbtx: WriteTransaction) -> Self {
        Self {
            dbtx,
            on_commit: std::sync::Mutex::new(vec![]),
        }
    }
}
impl ops::Deref for WriteTransactionCtx {
    type Target = WriteTransaction;

    fn deref(&self) -> &Self::Target {
        &self.dbtx
    }
}

impl ops::DerefMut for WriteTransactionCtx {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.dbtx
    }
}

impl WriteTransactionCtx {
    pub fn on_commit(&self, f: impl FnOnce() + 'static) {
        self.on_commit
            .lock()
            .expect("Locking failed")
            .push(Box::new(f));
    }

    fn commit(self) -> result::Result<(), redb::CommitError> {
        let Self { dbtx, on_commit } = self;

        dbtx.commit()?;

        for hook in on_commit.lock().expect("Locking failed").drain(..) {
            hook();
        }
        Ok(())
    }
}

And since I'm already using a sort-of async-to-blocking adapter:

impl Database {
    pub async fn write_with<T>(
        &self,
        f: impl FnOnce(&'_ WriteTransactionCtx) -> DbResult<T>,
    ) -> DbResult<T> {
        tokio::task::block_in_place(|| {
            let mut dbtx =
                WriteTransactionCtx::from(self.0.begin_write().context(TransactionSnafu)?);
            let res = f(&mut dbtx)?;

            dbtx.commit().context(CommitSnafu)?;

            Ok(res)
        })
    }
}

it works OK for my purposes. I might need to tweak Send and lifetimes to allow more ergonomic use. That Mutex in there was neccessary because WriteTransactionCtx is typically already borrowed (by Tables), so I can't create a mutable reference for the on_commit(&mut self,...). But it's OK.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant