From 97beabd9a2116b20bff85432571a1029ec2d9835 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Tue, 4 Jun 2024 18:01:18 +0200 Subject: [PATCH] fix shared schema replicatioon (#1437) * allow schema db replication * load master schema first when replicating shared schema db --- libsql-server/src/database/mod.rs | 9 +++++++++ libsql-server/src/namespace/fork.rs | 7 ++++++- libsql-server/src/namespace/mod.rs | 9 ++++++++- libsql-server/src/namespace/store.rs | 3 +++ .../src/replication/replicator_client.rs | 19 +++++++++++++++++-- libsql-server/src/rpc/replication_log.rs | 6 +----- libsql-wal/src/wal.rs | 2 +- libsql-wal/tests/oracle.rs | 2 +- 8 files changed, 46 insertions(+), 11 deletions(-) diff --git a/libsql-server/src/database/mod.rs b/libsql-server/src/database/mod.rs index d66968dd31..5c7049de68 100644 --- a/libsql-server/src/database/mod.rs +++ b/libsql-server/src/database/mod.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use bottomless::SavepointTracker; use crate::connection::{MakeConnection, RequestContext}; +use crate::replication::ReplicationLogger; pub use self::primary::{PrimaryConnection, PrimaryConnectionMaker, PrimaryDatabase}; pub use self::replica::{ReplicaConnection, ReplicaDatabase}; @@ -178,6 +179,14 @@ impl Database { } } + pub fn logger(&self) -> Option> { + match self { + Database::Primary(p) => Some(p.wal_wrapper.wrapper().logger()), + Database::Replica(_) => None, + Database::Schema(s) => Some(s.wal_wrapper.wrapper().logger()), + } + } + pub fn as_primary(&self) -> Option<&PrimaryDatabase> { if let Self::Primary(v) = self { Some(v) diff --git a/libsql-server/src/namespace/fork.rs b/libsql-server/src/namespace/fork.rs index 202e9c1de5..4b8cf2f150 100644 --- a/libsql-server/src/namespace/fork.rs +++ b/libsql-server/src/namespace/fork.rs @@ -18,7 +18,10 @@ use crate::replication::{LogReadError, ReplicationLogger}; use crate::{BLOCKING_RT, LIBSQL_PAGE_SIZE}; use super::meta_store::MetaStoreHandle; -use super::{Namespace, NamespaceBottomlessDbId, NamespaceConfig, NamespaceName, RestoreOption}; +use super::{ + Namespace, NamespaceBottomlessDbId, NamespaceConfig, NamespaceName, NamespaceStore, + RestoreOption, +}; type Result = crate::Result; @@ -62,6 +65,7 @@ pub struct ForkTask<'a> { pub bottomless_db_id: NamespaceBottomlessDbId, pub ns_config: &'a NamespaceConfig, pub resolve_attach: ResolveNamespacePathFn, + pub store: NamespaceStore, } pub struct PointInTimeRestore { @@ -110,6 +114,7 @@ impl<'a> ForkTask<'a> { &self.to_namespace, Box::new(|_op| {}), self.resolve_attach.clone(), + self.store.clone(), ) .await .map_err(|e| ForkError::CreateNamespace(Box::new(e))) diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index 2109291e32..a8e5f8902a 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -108,6 +108,7 @@ impl Namespace { name: &NamespaceName, reset: ResetCb, resolve_attach_path: ResolveNamespacePathFn, + store: NamespaceStore, ) -> crate::Result { match ns_config.db_kind { DatabaseKind::Primary if db_config.get().is_shared_schema => { @@ -137,6 +138,7 @@ impl Namespace { db_config, reset, resolve_attach_path, + store, ) .await } @@ -460,7 +462,7 @@ impl Namespace { }) } - #[tracing::instrument(skip(config, reset, meta_store_handle, resolve_attach_path))] + #[tracing::instrument(skip_all, fields(name))] #[async_recursion::async_recursion] async fn new_replica( config: &NamespaceConfig, @@ -468,6 +470,7 @@ impl Namespace { meta_store_handle: MetaStoreHandle, reset: ResetCb, resolve_attach_path: ResolveNamespacePathFn, + store: NamespaceStore, ) -> crate::Result { tracing::debug!("creating replica namespace"); let db_path = config.base_path.join("dbs").join(name.as_str()); @@ -480,6 +483,7 @@ impl Namespace { rpc_client, &db_path, meta_store_handle.clone(), + store.clone(), ) .await?; let applied_frame_no_receiver = client.current_frame_no_notifier.subscribe(); @@ -507,6 +511,7 @@ impl Namespace { meta_store_handle, reset, resolve_attach_path, + store, ) .await; } @@ -623,6 +628,7 @@ impl Namespace { to_config: MetaStoreHandle, timestamp: Option, resolve_attach: ResolveNamespacePathFn, + store: NamespaceStore, ) -> crate::Result { let from_config = from_config.get(); match ns_config.db_kind { @@ -664,6 +670,7 @@ impl Namespace { to_config, ns_config, resolve_attach, + store, }; let ns = fork_task.fork().await?; diff --git a/libsql-server/src/namespace/store.rs b/libsql-server/src/namespace/store.rs index 43ab70d073..6a7ca29416 100644 --- a/libsql-server/src/namespace/store.rs +++ b/libsql-server/src/namespace/store.rs @@ -181,6 +181,7 @@ impl NamespaceStore { &namespace, self.make_reset_cb(), self.resolve_attach_fn(), + self.clone(), ) .await?; @@ -286,6 +287,7 @@ impl NamespaceStore { handle.clone(), timestamp, self.resolve_attach_fn(), + self.clone(), ) .await?; @@ -376,6 +378,7 @@ impl NamespaceStore { &namespace, self.make_reset_cb(), self.resolve_attach_fn(), + self.clone(), ) .await?; tracing::info!("loaded namespace: `{namespace}`"); diff --git a/libsql-server/src/replication/replicator_client.rs b/libsql-server/src/replication/replicator_client.rs index 1dff43c871..3b60abf379 100644 --- a/libsql-server/src/replication/replicator_client.rs +++ b/libsql-server/src/replication/replicator_client.rs @@ -15,14 +15,14 @@ use tokio::sync::watch; use tokio_stream::{Stream, StreamExt}; use tonic::metadata::{AsciiMetadataValue, BinaryMetadataValue}; use tonic::transport::Channel; -use tonic::{Code, Request}; +use tonic::{Code, Request, Status}; use crate::connection::config::DatabaseConfig; use crate::metrics::{ REPLICATION_LATENCY, REPLICATION_LATENCY_CACHE_MISS, REPLICATION_LATENCY_OUT_OF_SYNC, }; use crate::namespace::meta_store::MetaStoreHandle; -use crate::namespace::NamespaceName; +use crate::namespace::{NamespaceName, NamespaceStore}; use crate::replication::FrameNo; pub struct Client { @@ -34,6 +34,7 @@ pub struct Client { meta_store_handle: MetaStoreHandle, // the primary current replication index, as reported by the last handshake pub primary_replication_index: Option, + store: NamespaceStore, } impl Client { @@ -42,6 +43,7 @@ impl Client { client: ReplicationLogClient, path: &Path, meta_store_handle: MetaStoreHandle, + store: NamespaceStore, ) -> crate::Result { let (current_frame_no_notifier, _) = watch::channel(None); let meta = WalIndexMeta::open(path).await?; @@ -54,6 +56,7 @@ impl Client { session_token: None, meta_store_handle, primary_replication_index: None, + store, }) } @@ -101,6 +104,18 @@ impl ReplicatorClient for Client { self.session_token.replace(hello.session_token.clone()); if let Some(config) = &hello.config { + // HACK: if we load a shared schema db before the main schema is replicated, + // inserting the new database in the meta store will cause a foreign constraint Error + // because we have a constraint check that ensure shared schema dbs point to a valid + // main schema. To prevent that, we load the main schema first. + if let Some(ref name) = config.shared_schema_name { + let name = NamespaceName::from_string(name.clone()) + .map_err(|_| Status::new(Code::InvalidArgument, "invalid namespace name"))?; + self.store + .with(name, |_| ()) + .await + .map_err(|e| Status::new(Code::Internal, e.to_string()))?; + } self.meta_store_handle .store(DatabaseConfig::from(config)) .await diff --git a/libsql-server/src/rpc/replication_log.rs b/libsql-server/src/rpc/replication_log.rs index 5a93b3331e..2c0f372557 100644 --- a/libsql-server/src/rpc/replication_log.rs +++ b/libsql-server/src/rpc/replication_log.rs @@ -153,12 +153,8 @@ impl ReplicationLogService { .with(namespace, |ns| -> Result<_, Status> { let logger = ns .db - .as_primary() - .ok_or_else(|| Status::invalid_argument("not a primary"))? - .wal_wrapper - .wrapper() .logger() - .clone(); + .ok_or_else(|| Status::invalid_argument("not a primary"))?; let config_changed = ns.config_changed(); let config = ns.config(); let version = ns.config_version(); diff --git a/libsql-wal/src/wal.rs b/libsql-wal/src/wal.rs index c256bc09e6..a59f51d834 100644 --- a/libsql-wal/src/wal.rs +++ b/libsql-wal/src/wal.rs @@ -52,7 +52,7 @@ impl WalManager for LibsqlWalManager { .registry .clone() .open(db_path.as_ref()) - .map_err(|e| dbg!(e.into()))?; + .map_err(|e| e.into())?; let conn_id = self .next_conn_id .fetch_add(1, std::sync::atomic::Ordering::Relaxed); diff --git a/libsql-wal/tests/oracle.rs b/libsql-wal/tests/oracle.rs index 465f77759b..b55c725d6e 100644 --- a/libsql-wal/tests/oracle.rs +++ b/libsql-wal/tests/oracle.rs @@ -119,7 +119,7 @@ fn run_test_sample(path: &Path) -> Result { Ok(_) => (), Err(e) => { let path = tmp.into_path(); - std::fs::rename(dbg!(path), "./failure").unwrap(); + std::fs::rename(path, "./failure").unwrap(); std::panic::resume_unwind(e) } }