Skip to content

Commit

Permalink
feat: loosen 72 hour query/write restriction
Browse files Browse the repository at this point in the history
This commit does a few key things:

- Removes the 72 hour query and write restrictions in Core
- Limits the queries to a default number of parquet files. We chose 432
  as this is about 72 hours using default settings for the gen1
  timeblock
- The file limit can be increased, but the help text and error message
  when exceeded note that query performance will likely be degraded as
  a result.
- We warn users to use smaller time ranges if possible if they hit this
  query error

With this we eliminate the hard restriction we have in place, but
instead create a soft one that users can choose to take the performance
hit with. If they can't take that hit then it's recomended that they
upgrade to Enterprise which has the compactor built in to make
performant historical queries.
  • Loading branch information
mgattozzi committed Jan 22, 2025
1 parent d1fd155 commit 7a9ade9
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 79 deletions.
10 changes: 10 additions & 0 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,15 @@ pub struct Config {
action
)]
pub telemetry_endpoint: String,

/// Set the limit for number of parquet files allowed in a query. Defaults
/// to 432 which is about 3 days worth of files using default settings.
/// This number can be increased to allow more files to be queried, but
/// query performance will likely suffer, RAM usage will spike, and the
/// process might be OOM killed as a result. It would be better to specify
/// smaller time ranges if possible in a query.
#[clap(long = "query-file-limit", env = "INFLUXDB3_QUERY_FILE_LIMIT", action)]
pub query_file_limit: Option<usize>,
}

/// Specified size of the Parquet cache in megabytes (MB)
Expand Down Expand Up @@ -541,6 +550,7 @@ pub async fn command(config: Config) -> Result<()> {
parquet_cache,
metric_registry: Arc::clone(&metrics),
snapshotted_wal_files_to_keep: config.snapshotted_wal_files_to_keep,
query_file_limit: config.query_file_limit,
})
.await
.map_err(|e| Error::WriteBufferInit(e.into()))?;
Expand Down
1 change: 1 addition & 0 deletions influxdb3_processing_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1042,6 +1042,7 @@ mod tests {
parquet_cache: None,
metric_registry: Arc::clone(&metric_registry),
snapshotted_wal_files_to_keep: 10,
query_file_limit: None,
})
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions influxdb3_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,7 @@ mod tests {
parquet_cache: Some(parquet_cache),
metric_registry: Arc::clone(&metrics),
snapshotted_wal_files_to_keep: 100,
query_file_limit: None,
},
)
.await
Expand Down
236 changes: 234 additions & 2 deletions influxdb3_server/src/query_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,7 @@ mod tests {
use metric::Registry;
use object_store::{local::LocalFileSystem, ObjectStore};
use parquet_file::storage::{ParquetStorage, StorageId};
use pretty_assertions::assert_eq;

use super::CreateQueryExecutorArgs;

Expand All @@ -798,7 +799,9 @@ mod tests {
))
}

pub(crate) async fn setup() -> (
pub(crate) async fn setup(
query_file_limit: Option<usize>,
) -> (
Arc<dyn WriteBuffer>,
QueryExecutorImpl,
Arc<MockProvider>,
Expand Down Expand Up @@ -838,6 +841,7 @@ mod tests {
parquet_cache: Some(parquet_cache),
metric_registry: Default::default(),
snapshotted_wal_files_to_keep: 1,
query_file_limit,
})
.await
.unwrap();
Expand Down Expand Up @@ -871,7 +875,7 @@ mod tests {

#[test_log::test(tokio::test)]
async fn system_parquet_files_success() {
let (write_buffer, query_executor, time_provider, _) = setup().await;
let (write_buffer, query_executor, time_provider, _) = setup(None).await;
// Perform some writes to multiple tables
let db_name = "test_db";
// perform writes over time to generate WAL files and some snapshots
Expand Down Expand Up @@ -979,4 +983,232 @@ mod tests {
assert_batches_sorted_eq!(t.expected, &batches);
}
}

#[test_log::test(tokio::test)]
async fn query_file_limits_default() {
let (write_buffer, query_executor, time_provider, _) = setup(None).await;
// Perform some writes to multiple tables
let db_name = "test_db";
// perform writes over time to generate WAL files and some snapshots
// the time provider is bumped to trick the system into persisting files:
for i in 0..1298 {
let time = i * 10;
let _ = write_buffer
.write_lp(
NamespaceName::new(db_name).unwrap(),
"\
cpu,host=a,region=us-east usage=250\n\
mem,host=a,region=us-east usage=150000\n\
",
Time::from_timestamp_nanos(time),
false,
influxdb3_write::Precision::Nanosecond,
)
.await
.unwrap();

time_provider.set(Time::from_timestamp(time + 1, 0).unwrap());
}

// bump time again and sleep briefly to ensure time to persist things
time_provider.set(Time::from_timestamp(20, 0).unwrap());
tokio::time::sleep(Duration::from_millis(500)).await;

struct TestCase<'a> {
query: &'a str,
expected: &'a [&'a str],
}

let test_cases = [
TestCase {
query: "\
SELECT COUNT(*) \
FROM system.parquet_files \
WHERE table_name = 'cpu'",
expected: &[
"+----------+",
"| count(*) |",
"+----------+",
"| 432 |",
"+----------+",
],
},
TestCase {
query: "\
SELECT Count(host) \
FROM cpu",
expected: &[
"+-----------------+",
"| count(cpu.host) |",
"+-----------------+",
"| 1298 |",
"+-----------------+",
],
},
];

for t in test_cases {
let batch_stream = query_executor
.query_sql(db_name, t.query, None, None, None)
.await
.unwrap();
let batches: Vec<RecordBatch> = batch_stream.try_collect().await.unwrap();
assert_batches_sorted_eq!(t.expected, &batches);
}

// put us over the parquet limit
let time = 12990;
let _ = write_buffer
.write_lp(
NamespaceName::new(db_name).unwrap(),
"\
cpu,host=a,region=us-east usage=250\n\
mem,host=a,region=us-east usage=150000\n\
",
Time::from_timestamp_nanos(time),
false,
influxdb3_write::Precision::Nanosecond,
)
.await
.unwrap();

time_provider.set(Time::from_timestamp(time + 1, 0).unwrap());

// bump time again and sleep briefly to ensure time to persist things
time_provider.set(Time::from_timestamp(20, 0).unwrap());
tokio::time::sleep(Duration::from_millis(500)).await;

match query_executor
.query_sql(db_name, "SELECT COUNT(host) FROM CPU", None, None, None)
.await {
Ok(_) => panic!("expected to exceed parquet file limit, yet query succeeded"),
Err(err) => assert_eq!(err.to_string(), "error while planning query: External error: Query would exceed file limit of 432 parquet files. Please specify a smaller time range for your query. You can increase the file limit with the `--query-file-limit` option in the serve command, however, query performance will be slower and the server may get OOM killed or become unstable as a result".to_string())
}

// Make sure if we specify a smaller time range that queries will still work
query_executor
.query_sql(
db_name,
"SELECT COUNT(host) FROM CPU WHERE time < '1970-01-01T00:00:00.000000010Z'",
None,
None,
None,
)
.await
.unwrap();
}

#[test_log::test(tokio::test)]
async fn query_file_limits_configured() {
let (write_buffer, query_executor, time_provider, _) = setup(Some(3)).await;
// Perform some writes to multiple tables
let db_name = "test_db";
// perform writes over time to generate WAL files and some snapshots
// the time provider is bumped to trick the system into persisting files:
for i in 0..11 {
let time = i * 10;
let _ = write_buffer
.write_lp(
NamespaceName::new(db_name).unwrap(),
"\
cpu,host=a,region=us-east usage=250\n\
mem,host=a,region=us-east usage=150000\n\
",
Time::from_timestamp_nanos(time),
false,
influxdb3_write::Precision::Nanosecond,
)
.await
.unwrap();

time_provider.set(Time::from_timestamp(time + 1, 0).unwrap());
}

// bump time again and sleep briefly to ensure time to persist things
time_provider.set(Time::from_timestamp(20, 0).unwrap());
tokio::time::sleep(Duration::from_millis(500)).await;

struct TestCase<'a> {
query: &'a str,
expected: &'a [&'a str],
}

let test_cases = [
TestCase {
query: "\
SELECT COUNT(*) \
FROM system.parquet_files \
WHERE table_name = 'cpu'",
expected: &[
"+----------+",
"| count(*) |",
"+----------+",
"| 3 |",
"+----------+",
],
},
TestCase {
query: "\
SELECT Count(host) \
FROM cpu",
expected: &[
"+-----------------+",
"| count(cpu.host) |",
"+-----------------+",
"| 11 |",
"+-----------------+",
],
},
];

for t in test_cases {
let batch_stream = query_executor
.query_sql(db_name, t.query, None, None, None)
.await
.unwrap();
let batches: Vec<RecordBatch> = batch_stream.try_collect().await.unwrap();
assert_batches_sorted_eq!(t.expected, &batches);
}

// put us over the parquet limit
let time = 120;
let _ = write_buffer
.write_lp(
NamespaceName::new(db_name).unwrap(),
"\
cpu,host=a,region=us-east usage=250\n\
mem,host=a,region=us-east usage=150000\n\
",
Time::from_timestamp_nanos(time),
false,
influxdb3_write::Precision::Nanosecond,
)
.await
.unwrap();

time_provider.set(Time::from_timestamp(time + 1, 0).unwrap());

// bump time again and sleep briefly to ensure time to persist things
time_provider.set(Time::from_timestamp(20, 0).unwrap());
tokio::time::sleep(Duration::from_millis(500)).await;

match query_executor
.query_sql(db_name, "SELECT COUNT(host) FROM CPU", None, None, None)
.await {
Ok(_) => panic!("expected to exceed parquet file limit, yet query succeeded"),
Err(err) => assert_eq!(err.to_string(), "error while planning query: External error: Query would exceed file limit of 3 parquet files. Please specify a smaller time range for your query. You can increase the file limit with the `--query-file-limit` option in the serve command, however, query performance will be slower and the server may get OOM killed or become unstable as a result".to_string())
}

// Make sure if we specify a smaller time range that queries will still work
query_executor
.query_sql(
db_name,
"SELECT COUNT(host) FROM CPU WHERE time < '1970-01-01T00:00:00.000000010Z'",
None,
None,
None,
)
.await
.unwrap();
}
}
3 changes: 0 additions & 3 deletions influxdb3_write/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ use thiserror::Error;
use twox_hash::XxHash64;
use write_buffer::INDEX_HASH_SEED;

/// Used to determine if writes are older than what we can accept or query
pub const THREE_DAYS: Duration = Duration::from_secs(60 * 60 * 24 * 3);

#[derive(Debug, Error)]
pub enum Error {
#[error("object store path error: {0}")]
Expand Down
Loading

0 comments on commit 7a9ade9

Please sign in to comment.