Skip to content

Commit

Permalink
refactor: reduce catalog locks when getting chunks
Browse files Browse the repository at this point in the history
The main refactor was to change the ChunkContainer trait to use the
DatabaseSchema and TableDefinition types directly in the signature, vs.
the names, which then required an additional catalog lock and lookups for
both entities. This was already handled upstream in the QueryTable, so
there was no need to do the lookups again.

This required the addition of a test helper in influxdb3_write::test_helpers
that provides convenience methods for getting record batches from the
WriteBuffer. We have been implementing such a method manually in several
places, so this is nice to have it unified. This provides a blanket impl
so that anything implementing WriteBuffer gets the method.

Some other house cleaning was included.
  • Loading branch information
hiltontj committed Jan 22, 2025
1 parent d39a4a2 commit 57d5ab9
Show file tree
Hide file tree
Showing 11 changed files with 228 additions and 194 deletions.
8 changes: 4 additions & 4 deletions influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ impl UpdateDatabaseSchema for CatalogOp {
}
}

impl UpdateDatabaseSchema for influxdb3_wal::TableDefinition {
impl UpdateDatabaseSchema for influxdb3_wal::WalTableDefinition {
fn update_schema<'a>(
&self,
mut database_schema: Cow<'a, DatabaseSchema>,
Expand Down Expand Up @@ -1039,7 +1039,7 @@ impl TableDefinition {
}

/// Create a new table definition from a catalog op
pub fn new_from_op(table_definition: &influxdb3_wal::TableDefinition) -> Self {
pub fn new_from_op(table_definition: &influxdb3_wal::WalTableDefinition) -> Self {
let mut columns = Vec::with_capacity(table_definition.field_definitions.len());
for field_def in &table_definition.field_definitions {
columns.push((
Expand All @@ -1059,7 +1059,7 @@ impl TableDefinition {

pub(crate) fn check_and_add_new_fields(
&self,
table_definition: &influxdb3_wal::TableDefinition,
table_definition: &influxdb3_wal::WalTableDefinition,
) -> Result<Cow<'_, Self>> {
// validate the series key is the same
if table_definition.key != self.series_key {
Expand Down Expand Up @@ -1942,7 +1942,7 @@ mod tests {
let db_name = Arc::from("foo");
let table_id = TableId::new();
let table_name = Arc::from("bar");
let table_definition = influxdb3_wal::TableDefinition {
let table_definition = influxdb3_wal::WalTableDefinition {
database_id: db_id,
database_name: Arc::clone(&db_name),
table_name: Arc::clone(&table_name),
Expand Down
39 changes: 21 additions & 18 deletions influxdb3_server/src/query_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ use datafusion_util::config::DEFAULT_SCHEMA;
use datafusion_util::MemoryStream;
use influxdb3_cache::distinct_cache::{DistinctCacheFunction, DISTINCT_CACHE_UDTF_NAME};
use influxdb3_cache::last_cache::{LastCacheFunction, LAST_CACHE_UDTF_NAME};
use influxdb3_catalog::catalog::{Catalog, DatabaseSchema};
use influxdb3_catalog::catalog::{Catalog, DatabaseSchema, TableDefinition};
use influxdb3_internal_api::query_executor::{QueryExecutor, QueryExecutorError};
use influxdb3_sys_events::SysEventStore;
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_write::WriteBuffer;
use influxdb3_write::{ChunkFilter, WriteBuffer};
use influxdb_influxql_parser::statement::Statement;
use iox_query::exec::{Executor, IOxSessionContext, QueryConfig};
use iox_query::provider::ProviderBuilder;
Expand All @@ -37,7 +37,6 @@ use iox_query::{QueryChunk, QueryNamespace};
use iox_query_params::StatementParams;
use metric::Registry;
use observability_deps::tracing::{debug, info};
use schema::Schema;
use std::any::Any;
use std::cmp::Ordering;
use std::collections::HashMap;
Expand Down Expand Up @@ -542,12 +541,11 @@ impl Database {
async fn query_table(&self, table_name: &str) -> Option<Arc<QueryTable>> {
let table_name: Arc<str> = table_name.into();
self.db_schema
.table_schema(Arc::clone(&table_name))
.map(|schema| {
.table_definition(Arc::clone(&table_name))
.map(|table_def| {
Arc::new(QueryTable {
db_schema: Arc::clone(&self.db_schema),
table_name,
schema: schema.clone(),
table_def,
write_buffer: Arc::clone(&self.write_buffer),
})
})
Expand Down Expand Up @@ -671,8 +669,7 @@ impl SchemaProvider for Database {
#[derive(Debug)]
pub struct QueryTable {
db_schema: Arc<DatabaseSchema>,
table_name: Arc<str>,
schema: Schema,
table_def: Arc<TableDefinition>,
write_buffer: Arc<dyn WriteBuffer>,
}

Expand All @@ -684,10 +681,13 @@ impl QueryTable {
filters: &[Expr],
_limit: Option<usize>,
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError> {
let buffer_filter = ChunkFilter::new(&self.table_def, filters)
.map_err(|error| DataFusionError::External(Box::new(error)))?;

self.write_buffer.get_table_chunks(
&self.db_schema.name,
&self.table_name,
filters,
Arc::clone(&self.db_schema),
Arc::clone(&self.table_def),
&buffer_filter,
projection,
ctx,
)
Expand All @@ -701,7 +701,7 @@ impl TableProvider for QueryTable {
}

fn schema(&self) -> SchemaRef {
self.schema.as_arrow()
self.table_def.schema.as_arrow()
}

fn table_type(&self) -> TableType {
Expand Down Expand Up @@ -729,17 +729,20 @@ impl TableProvider for QueryTable {
?limit,
"QueryTable as TableProvider::scan"
);
let mut builder = ProviderBuilder::new(Arc::clone(&self.table_name), self.schema.clone());
let mut builder = ProviderBuilder::new(
Arc::clone(&self.table_def.table_name),
self.table_def.schema.clone(),
);

let chunks = self.chunks(ctx, projection, &filters, limit)?;
for chunk in chunks {
builder = builder.add_chunk(chunk);
}

let provider = match builder.build() {
Ok(provider) => provider,
Err(e) => panic!("unexpected error: {e:?}"),
};
// NOTE: this build method is, at time of writing, infallible, but handle the error anyway.
let provider = builder
.build()
.map_err(|e| DataFusionError::Internal(format!("unexpected error: {e:?}")))?;

provider.scan(ctx, projection, &filters, limit).await
}
Expand Down
2 changes: 1 addition & 1 deletion influxdb3_wal/src/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub fn create_table_op(
fields: impl IntoIterator<Item = FieldDefinition>,
key: impl IntoIterator<Item = ColumnId>,
) -> CatalogOp {
CatalogOp::CreateTable(TableDefinition {
CatalogOp::CreateTable(WalTableDefinition {
database_id: db_id,
database_name: db_name.into(),
table_name: table_name.into(),
Expand Down
4 changes: 2 additions & 2 deletions influxdb3_wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ impl OrderedCatalogBatch {
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub enum CatalogOp {
CreateDatabase(DatabaseDefinition),
CreateTable(TableDefinition),
CreateTable(WalTableDefinition),
AddFields(FieldAdditions),
CreateDistinctCache(DistinctCacheDefinition),
DeleteDistinctCache(DistinctCacheDelete),
Expand Down Expand Up @@ -368,7 +368,7 @@ pub struct DeleteTableDefinition {
}

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct TableDefinition {
pub struct WalTableDefinition {
pub database_id: DbId,
pub database_name: Arc<str>,
pub table_name: Arc<str>,
Expand Down
99 changes: 85 additions & 14 deletions influxdb3_write/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,15 @@ pub trait Bufferer: Debug + Send + Sync + 'static {

/// Returns the parquet files for a given database and table
fn parquet_files(&self, db_id: DbId, table_id: TableId) -> Vec<ParquetFile> {
self.parquet_files_filtered(db_id, table_id, &BufferFilter::default())
self.parquet_files_filtered(db_id, table_id, &ChunkFilter::default())
}

/// Returns the parquet files for a given database and table that satisfy the given filter
fn parquet_files_filtered(
&self,
db_id: DbId,
table_id: TableId,
filter: &BufferFilter,
filter: &ChunkFilter,
) -> Vec<ParquetFile>;

/// A channel to watch for when new persisted snapshots are created
Expand All @@ -133,9 +133,9 @@ pub trait Bufferer: Debug + Send + Sync + 'static {
pub trait ChunkContainer: Debug + Send + Sync + 'static {
fn get_table_chunks(
&self,
database_name: &str,
table_name: &str,
filters: &[Expr],
db_schema: Arc<DatabaseSchema>,
table_def: Arc<TableDefinition>,
filter: &ChunkFilter,
projection: Option<&Vec<usize>>,
ctx: &dyn Session,
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError>;
Expand Down Expand Up @@ -428,14 +428,85 @@ pub(crate) fn guess_precision(timestamp: i64) -> Precision {

#[cfg(test)]
mod test_helpers {
use crate::write_buffer::validator::WriteValidator;
use crate::Precision;
use crate::{write_buffer::validator::WriteValidator, WriteBuffer};
use crate::{ChunkFilter, Precision};
use arrow::array::RecordBatch;
use data_types::NamespaceName;
use datafusion::prelude::Expr;
use influxdb3_catalog::catalog::Catalog;
use influxdb3_wal::{Gen1Duration, WriteBatch};
use iox_query::exec::IOxSessionContext;
use iox_time::Time;
use std::sync::Arc;

/// Helper trait for getting [`RecordBatch`]es from a [`WriteBuffer`] implementation in tests
#[async_trait::async_trait]
pub trait WriteBufferTester {
/// Get record batches for the given database and table, using the provided filter `Expr`s
async fn get_record_batches_filtered_unchecked(
&self,
database_name: &str,
table_name: &str,
filters: &[Expr],
ctx: &IOxSessionContext,
) -> Vec<RecordBatch>;

/// Get record batches for the given database and table
async fn get_record_batches_unchecked(
&self,
database_name: &str,
table_name: &str,
ctx: &IOxSessionContext,
) -> Vec<RecordBatch>;
}

#[async_trait::async_trait]
impl<T> WriteBufferTester for T
where
T: WriteBuffer,
{
async fn get_record_batches_filtered_unchecked(
&self,
database_name: &str,
table_name: &str,
filters: &[Expr],
ctx: &IOxSessionContext,
) -> Vec<RecordBatch> {
let db_schema = self
.catalog()
.db_schema(database_name)
.expect("database should exist");
let table_def = db_schema
.table_definition(table_name)
.expect("table should exist");
let filter =
ChunkFilter::new(&table_def, filters).expect("filter expressions should be valid");
let chunks = self
.get_table_chunks(db_schema, table_def, &filter, None, &ctx.inner().state())
.expect("should get query chunks");
let mut batches = vec![];
for chunk in chunks {
batches.extend(
chunk
.data()
.read_to_batches(chunk.schema(), ctx.inner())
.await,
);
}
batches
}

async fn get_record_batches_unchecked(
&self,
database_name: &str,
table_name: &str,
ctx: &IOxSessionContext,
) -> Vec<RecordBatch> {
self.get_record_batches_filtered_unchecked(database_name, table_name, &[], ctx)
.await
}
}

#[allow(dead_code)]
pub(crate) fn lp_to_write_batch(
catalog: Arc<Catalog>,
Expand Down Expand Up @@ -496,19 +567,19 @@ pub(crate) mod test_help {

/// A derived set of filters that are used to prune data in the buffer when serving queries
#[derive(Debug, Default)]
pub struct BufferFilter {
pub struct ChunkFilter {
time_lower_bound_ns: Option<i64>,
time_upper_bound_ns: Option<i64>,
guarantees: HashMap<ColumnId, BufferGuarantee>,
guarantees: HashMap<ColumnId, HashedLiteralGuarantee>,
}

#[derive(Debug)]
pub struct BufferGuarantee {
pub struct HashedLiteralGuarantee {
pub guarantee: Guarantee,
pub literal_hashes: HashSet<u64>,
}

impl BufferFilter {
impl ChunkFilter {
/// Create a new `BufferFilter` given a [`TableDefinition`] and set of filter [`Expr`]s from
/// a logical query plan.
///
Expand Down Expand Up @@ -615,7 +686,7 @@ impl BufferFilter {
// if there are multiple Expr's that lead to multiple guarantees on a given column.
guarantees
.entry(column_id)
.and_modify(|e: &mut BufferGuarantee| {
.and_modify(|e: &mut HashedLiteralGuarantee| {
debug!(current = ?e.guarantee, incoming = ?guarantee, ">>> updating existing guarantee");
use Guarantee::*;
match (e.guarantee, guarantee) {
Expand All @@ -630,7 +701,7 @@ impl BufferFilter {
}
}
})
.or_insert(BufferGuarantee {
.or_insert(HashedLiteralGuarantee {
guarantee,
literal_hashes: literals,
});
Expand Down Expand Up @@ -676,7 +747,7 @@ impl BufferFilter {
}
}

pub fn guarantees(&self) -> impl Iterator<Item = (&ColumnId, &BufferGuarantee)> {
pub fn guarantees(&self) -> impl Iterator<Item = (&ColumnId, &HashedLiteralGuarantee)> {
self.guarantees.iter()
}
}
Expand Down
4 changes: 2 additions & 2 deletions influxdb3_write/src/persister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ mod tests {
use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
use influxdb3_wal::{
CatalogBatch, CatalogOp, FieldDataType, FieldDefinition, SnapshotSequenceNumber,
TableDefinition, WalFileSequenceNumber,
WalFileSequenceNumber, WalTableDefinition,
};
use object_store::memory::InMemory;
use observability_deps::tracing::info;
Expand Down Expand Up @@ -495,7 +495,7 @@ mod tests {
database_id: db_schema.id,
database_name: Arc::clone(&db_schema.name),
time_ns: 5000,
ops: vec![CatalogOp::CreateTable(TableDefinition {
ops: vec![CatalogOp::CreateTable(WalTableDefinition {
database_id: db_schema.id,
database_name: Arc::clone(&db_schema.name),
table_name: name.into(),
Expand Down
Loading

0 comments on commit 57d5ab9

Please sign in to comment.