Skip to content

Commit

Permalink
fix shared schema replicatioon (#1437)
Browse files Browse the repository at this point in the history
* allow schema db replication

* load master schema first when replicating shared schema db
  • Loading branch information
MarinPostma authored Jun 4, 2024
1 parent 07a679e commit 97beabd
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 11 deletions.
9 changes: 9 additions & 0 deletions libsql-server/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::Arc;
use bottomless::SavepointTracker;

use crate::connection::{MakeConnection, RequestContext};
use crate::replication::ReplicationLogger;

pub use self::primary::{PrimaryConnection, PrimaryConnectionMaker, PrimaryDatabase};
pub use self::replica::{ReplicaConnection, ReplicaDatabase};
Expand Down Expand Up @@ -178,6 +179,14 @@ impl Database {
}
}

pub fn logger(&self) -> Option<Arc<ReplicationLogger>> {
match self {
Database::Primary(p) => Some(p.wal_wrapper.wrapper().logger()),
Database::Replica(_) => None,
Database::Schema(s) => Some(s.wal_wrapper.wrapper().logger()),
}
}

pub fn as_primary(&self) -> Option<&PrimaryDatabase> {
if let Self::Primary(v) = self {
Some(v)
Expand Down
7 changes: 6 additions & 1 deletion libsql-server/src/namespace/fork.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use crate::replication::{LogReadError, ReplicationLogger};
use crate::{BLOCKING_RT, LIBSQL_PAGE_SIZE};

use super::meta_store::MetaStoreHandle;
use super::{Namespace, NamespaceBottomlessDbId, NamespaceConfig, NamespaceName, RestoreOption};
use super::{
Namespace, NamespaceBottomlessDbId, NamespaceConfig, NamespaceName, NamespaceStore,
RestoreOption,
};

type Result<T> = crate::Result<T, ForkError>;

Expand Down Expand Up @@ -62,6 +65,7 @@ pub struct ForkTask<'a> {
pub bottomless_db_id: NamespaceBottomlessDbId,
pub ns_config: &'a NamespaceConfig,
pub resolve_attach: ResolveNamespacePathFn,
pub store: NamespaceStore,
}

pub struct PointInTimeRestore {
Expand Down Expand Up @@ -110,6 +114,7 @@ impl<'a> ForkTask<'a> {
&self.to_namespace,
Box::new(|_op| {}),
self.resolve_attach.clone(),
self.store.clone(),
)
.await
.map_err(|e| ForkError::CreateNamespace(Box::new(e)))
Expand Down
9 changes: 8 additions & 1 deletion libsql-server/src/namespace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ impl Namespace {
name: &NamespaceName,
reset: ResetCb,
resolve_attach_path: ResolveNamespacePathFn,
store: NamespaceStore,
) -> crate::Result<Self> {
match ns_config.db_kind {
DatabaseKind::Primary if db_config.get().is_shared_schema => {
Expand Down Expand Up @@ -137,6 +138,7 @@ impl Namespace {
db_config,
reset,
resolve_attach_path,
store,
)
.await
}
Expand Down Expand Up @@ -460,14 +462,15 @@ impl Namespace {
})
}

#[tracing::instrument(skip(config, reset, meta_store_handle, resolve_attach_path))]
#[tracing::instrument(skip_all, fields(name))]
#[async_recursion::async_recursion]
async fn new_replica(
config: &NamespaceConfig,
name: NamespaceName,
meta_store_handle: MetaStoreHandle,
reset: ResetCb,
resolve_attach_path: ResolveNamespacePathFn,
store: NamespaceStore,
) -> crate::Result<Self> {
tracing::debug!("creating replica namespace");
let db_path = config.base_path.join("dbs").join(name.as_str());
Expand All @@ -480,6 +483,7 @@ impl Namespace {
rpc_client,
&db_path,
meta_store_handle.clone(),
store.clone(),
)
.await?;
let applied_frame_no_receiver = client.current_frame_no_notifier.subscribe();
Expand Down Expand Up @@ -507,6 +511,7 @@ impl Namespace {
meta_store_handle,
reset,
resolve_attach_path,
store,
)
.await;
}
Expand Down Expand Up @@ -623,6 +628,7 @@ impl Namespace {
to_config: MetaStoreHandle,
timestamp: Option<NaiveDateTime>,
resolve_attach: ResolveNamespacePathFn,
store: NamespaceStore,
) -> crate::Result<Namespace> {
let from_config = from_config.get();
match ns_config.db_kind {
Expand Down Expand Up @@ -664,6 +670,7 @@ impl Namespace {
to_config,
ns_config,
resolve_attach,
store,
};

let ns = fork_task.fork().await?;
Expand Down
3 changes: 3 additions & 0 deletions libsql-server/src/namespace/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ impl NamespaceStore {
&namespace,
self.make_reset_cb(),
self.resolve_attach_fn(),
self.clone(),
)
.await?;

Expand Down Expand Up @@ -286,6 +287,7 @@ impl NamespaceStore {
handle.clone(),
timestamp,
self.resolve_attach_fn(),
self.clone(),
)
.await?;

Expand Down Expand Up @@ -376,6 +378,7 @@ impl NamespaceStore {
&namespace,
self.make_reset_cb(),
self.resolve_attach_fn(),
self.clone(),
)
.await?;
tracing::info!("loaded namespace: `{namespace}`");
Expand Down
19 changes: 17 additions & 2 deletions libsql-server/src/replication/replicator_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ use tokio::sync::watch;
use tokio_stream::{Stream, StreamExt};
use tonic::metadata::{AsciiMetadataValue, BinaryMetadataValue};
use tonic::transport::Channel;
use tonic::{Code, Request};
use tonic::{Code, Request, Status};

use crate::connection::config::DatabaseConfig;
use crate::metrics::{
REPLICATION_LATENCY, REPLICATION_LATENCY_CACHE_MISS, REPLICATION_LATENCY_OUT_OF_SYNC,
};
use crate::namespace::meta_store::MetaStoreHandle;
use crate::namespace::NamespaceName;
use crate::namespace::{NamespaceName, NamespaceStore};
use crate::replication::FrameNo;

pub struct Client {
Expand All @@ -34,6 +34,7 @@ pub struct Client {
meta_store_handle: MetaStoreHandle,
// the primary current replication index, as reported by the last handshake
pub primary_replication_index: Option<FrameNo>,
store: NamespaceStore,
}

impl Client {
Expand All @@ -42,6 +43,7 @@ impl Client {
client: ReplicationLogClient<Channel>,
path: &Path,
meta_store_handle: MetaStoreHandle,
store: NamespaceStore,
) -> crate::Result<Self> {
let (current_frame_no_notifier, _) = watch::channel(None);
let meta = WalIndexMeta::open(path).await?;
Expand All @@ -54,6 +56,7 @@ impl Client {
session_token: None,
meta_store_handle,
primary_replication_index: None,
store,
})
}

Expand Down Expand Up @@ -101,6 +104,18 @@ impl ReplicatorClient for Client {
self.session_token.replace(hello.session_token.clone());

if let Some(config) = &hello.config {
// HACK: if we load a shared schema db before the main schema is replicated,
// inserting the new database in the meta store will cause a foreign constraint Error
// because we have a constraint check that ensure shared schema dbs point to a valid
// main schema. To prevent that, we load the main schema first.
if let Some(ref name) = config.shared_schema_name {
let name = NamespaceName::from_string(name.clone())
.map_err(|_| Status::new(Code::InvalidArgument, "invalid namespace name"))?;
self.store
.with(name, |_| ())
.await
.map_err(|e| Status::new(Code::Internal, e.to_string()))?;
}
self.meta_store_handle
.store(DatabaseConfig::from(config))
.await
Expand Down
6 changes: 1 addition & 5 deletions libsql-server/src/rpc/replication_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,8 @@ impl ReplicationLogService {
.with(namespace, |ns| -> Result<_, Status> {
let logger = ns
.db
.as_primary()
.ok_or_else(|| Status::invalid_argument("not a primary"))?
.wal_wrapper
.wrapper()
.logger()
.clone();
.ok_or_else(|| Status::invalid_argument("not a primary"))?;
let config_changed = ns.config_changed();
let config = ns.config();
let version = ns.config_version();
Expand Down
2 changes: 1 addition & 1 deletion libsql-wal/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl<FS: Io> WalManager for LibsqlWalManager<FS> {
.registry
.clone()
.open(db_path.as_ref())
.map_err(|e| dbg!(e.into()))?;
.map_err(|e| e.into())?;
let conn_id = self
.next_conn_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Expand Down
2 changes: 1 addition & 1 deletion libsql-wal/tests/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ fn run_test_sample(path: &Path) -> Result {
Ok(_) => (),
Err(e) => {
let path = tmp.into_path();
std::fs::rename(dbg!(path), "./failure").unwrap();
std::fs::rename(path, "./failure").unwrap();
std::panic::resume_unwind(e)
}
}
Expand Down

0 comments on commit 97beabd

Please sign in to comment.