From 63bd5096f541445a347b8533f415772f082a5ad4 Mon Sep 17 00:00:00 2001 From: Michael Gattozzi Date: Thu, 23 Jan 2025 10:02:26 -0500 Subject: [PATCH] feat: loosen 72 hour query/write restriction (#25890) This commit does a few key things: - Removes the 72 hour query and write restrictions in Core - Limits the queries to a default number of parquet files. We chose 432 as this is about 72 hours using default settings for the gen1 timeblock - The file limit can be increased, but the help text and error message when exceeded note that query performance will likely be degraded as a result. - We warn users to use smaller time ranges if possible if they hit this query error With this we eliminate the hard restriction we have in place, but instead create a soft one that users can choose to take the performance hit with. If they can't take that hit then it's recomended that they upgrade to Enterprise which has the compactor built in to make performant historical queries. --- influxdb3/src/commands/serve.rs | 10 + influxdb3_processing_engine/src/lib.rs | 1 + influxdb3_server/src/lib.rs | 1 + influxdb3_server/src/query_executor/mod.rs | 236 +++++++++++++++++- influxdb3_write/src/lib.rs | 3 - influxdb3_write/src/write_buffer/mod.rs | 29 ++- .../src/write_buffer/persisted_files.rs | 75 ++---- .../src/write_buffer/queryable_buffer.rs | 11 +- influxdb3_write/src/write_buffer/validator.rs | 8 - 9 files changed, 295 insertions(+), 79 deletions(-) diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index 88c573fcf13..f528cd133c1 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -345,6 +345,15 @@ pub struct Config { action )] pub telemetry_endpoint: String, + + /// Set the limit for number of parquet files allowed in a query. Defaults + /// to 432 which is about 3 days worth of files using default settings. + /// This number can be increased to allow more files to be queried, but + /// query performance will likely suffer, RAM usage will spike, and the + /// process might be OOM killed as a result. It would be better to specify + /// smaller time ranges if possible in a query. + #[clap(long = "query-file-limit", env = "INFLUXDB3_QUERY_FILE_LIMIT", action)] + pub query_file_limit: Option, } /// Specified size of the Parquet cache in megabytes (MB) @@ -541,6 +550,7 @@ pub async fn command(config: Config) -> Result<()> { parquet_cache, metric_registry: Arc::clone(&metrics), snapshotted_wal_files_to_keep: config.snapshotted_wal_files_to_keep, + query_file_limit: config.query_file_limit, }) .await .map_err(|e| Error::WriteBufferInit(e.into()))?; diff --git a/influxdb3_processing_engine/src/lib.rs b/influxdb3_processing_engine/src/lib.rs index a0ac0f8ee8b..b11c6da5e18 100644 --- a/influxdb3_processing_engine/src/lib.rs +++ b/influxdb3_processing_engine/src/lib.rs @@ -1207,6 +1207,7 @@ mod tests { parquet_cache: None, metric_registry: Arc::clone(&metric_registry), snapshotted_wal_files_to_keep: 10, + query_file_limit: None, }) .await .unwrap(); diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index 0fc0bb03d28..301fbef40a9 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -779,6 +779,7 @@ mod tests { parquet_cache: Some(parquet_cache), metric_registry: Arc::clone(&metrics), snapshotted_wal_files_to_keep: 100, + query_file_limit: None, }, ) .await diff --git a/influxdb3_server/src/query_executor/mod.rs b/influxdb3_server/src/query_executor/mod.rs index 005cf528c9f..9dfae427ff1 100644 --- a/influxdb3_server/src/query_executor/mod.rs +++ b/influxdb3_server/src/query_executor/mod.rs @@ -776,6 +776,7 @@ mod tests { use metric::Registry; use object_store::{local::LocalFileSystem, ObjectStore}; use parquet_file::storage::{ParquetStorage, StorageId}; + use pretty_assertions::assert_eq; use super::CreateQueryExecutorArgs; @@ -801,7 +802,9 @@ mod tests { )) } - pub(crate) async fn setup() -> ( + pub(crate) async fn setup( + query_file_limit: Option, + ) -> ( Arc, QueryExecutorImpl, Arc, @@ -841,6 +844,7 @@ mod tests { parquet_cache: Some(parquet_cache), metric_registry: Default::default(), snapshotted_wal_files_to_keep: 1, + query_file_limit, }) .await .unwrap(); @@ -874,7 +878,7 @@ mod tests { #[test_log::test(tokio::test)] async fn system_parquet_files_success() { - let (write_buffer, query_executor, time_provider, _) = setup().await; + let (write_buffer, query_executor, time_provider, _) = setup(None).await; // Perform some writes to multiple tables let db_name = "test_db"; // perform writes over time to generate WAL files and some snapshots @@ -982,4 +986,232 @@ mod tests { assert_batches_sorted_eq!(t.expected, &batches); } } + + #[test_log::test(tokio::test)] + async fn query_file_limits_default() { + let (write_buffer, query_executor, time_provider, _) = setup(None).await; + // Perform some writes to multiple tables + let db_name = "test_db"; + // perform writes over time to generate WAL files and some snapshots + // the time provider is bumped to trick the system into persisting files: + for i in 0..1298 { + let time = i * 10; + let _ = write_buffer + .write_lp( + NamespaceName::new(db_name).unwrap(), + "\ + cpu,host=a,region=us-east usage=250\n\ + mem,host=a,region=us-east usage=150000\n\ + ", + Time::from_timestamp_nanos(time), + false, + influxdb3_write::Precision::Nanosecond, + ) + .await + .unwrap(); + + time_provider.set(Time::from_timestamp(time + 1, 0).unwrap()); + } + + // bump time again and sleep briefly to ensure time to persist things + time_provider.set(Time::from_timestamp(20, 0).unwrap()); + tokio::time::sleep(Duration::from_millis(500)).await; + + struct TestCase<'a> { + query: &'a str, + expected: &'a [&'a str], + } + + let test_cases = [ + TestCase { + query: "\ + SELECT COUNT(*) \ + FROM system.parquet_files \ + WHERE table_name = 'cpu'", + expected: &[ + "+----------+", + "| count(*) |", + "+----------+", + "| 432 |", + "+----------+", + ], + }, + TestCase { + query: "\ + SELECT Count(host) \ + FROM cpu", + expected: &[ + "+-----------------+", + "| count(cpu.host) |", + "+-----------------+", + "| 1298 |", + "+-----------------+", + ], + }, + ]; + + for t in test_cases { + let batch_stream = query_executor + .query_sql(db_name, t.query, None, None, None) + .await + .unwrap(); + let batches: Vec = batch_stream.try_collect().await.unwrap(); + assert_batches_sorted_eq!(t.expected, &batches); + } + + // put us over the parquet limit + let time = 12990; + let _ = write_buffer + .write_lp( + NamespaceName::new(db_name).unwrap(), + "\ + cpu,host=a,region=us-east usage=250\n\ + mem,host=a,region=us-east usage=150000\n\ + ", + Time::from_timestamp_nanos(time), + false, + influxdb3_write::Precision::Nanosecond, + ) + .await + .unwrap(); + + time_provider.set(Time::from_timestamp(time + 1, 0).unwrap()); + + // bump time again and sleep briefly to ensure time to persist things + time_provider.set(Time::from_timestamp(20, 0).unwrap()); + tokio::time::sleep(Duration::from_millis(500)).await; + + match query_executor + .query_sql(db_name, "SELECT COUNT(host) FROM CPU", None, None, None) + .await { + Ok(_) => panic!("expected to exceed parquet file limit, yet query succeeded"), + Err(err) => assert_eq!(err.to_string(), "error while planning query: External error: Query would exceed file limit of 432 parquet files. Please specify a smaller time range for your query. You can increase the file limit with the `--query-file-limit` option in the serve command, however, query performance will be slower and the server may get OOM killed or become unstable as a result".to_string()) + } + + // Make sure if we specify a smaller time range that queries will still work + query_executor + .query_sql( + db_name, + "SELECT COUNT(host) FROM CPU WHERE time < '1970-01-01T00:00:00.000000010Z'", + None, + None, + None, + ) + .await + .unwrap(); + } + + #[test_log::test(tokio::test)] + async fn query_file_limits_configured() { + let (write_buffer, query_executor, time_provider, _) = setup(Some(3)).await; + // Perform some writes to multiple tables + let db_name = "test_db"; + // perform writes over time to generate WAL files and some snapshots + // the time provider is bumped to trick the system into persisting files: + for i in 0..11 { + let time = i * 10; + let _ = write_buffer + .write_lp( + NamespaceName::new(db_name).unwrap(), + "\ + cpu,host=a,region=us-east usage=250\n\ + mem,host=a,region=us-east usage=150000\n\ + ", + Time::from_timestamp_nanos(time), + false, + influxdb3_write::Precision::Nanosecond, + ) + .await + .unwrap(); + + time_provider.set(Time::from_timestamp(time + 1, 0).unwrap()); + } + + // bump time again and sleep briefly to ensure time to persist things + time_provider.set(Time::from_timestamp(20, 0).unwrap()); + tokio::time::sleep(Duration::from_millis(500)).await; + + struct TestCase<'a> { + query: &'a str, + expected: &'a [&'a str], + } + + let test_cases = [ + TestCase { + query: "\ + SELECT COUNT(*) \ + FROM system.parquet_files \ + WHERE table_name = 'cpu'", + expected: &[ + "+----------+", + "| count(*) |", + "+----------+", + "| 3 |", + "+----------+", + ], + }, + TestCase { + query: "\ + SELECT Count(host) \ + FROM cpu", + expected: &[ + "+-----------------+", + "| count(cpu.host) |", + "+-----------------+", + "| 11 |", + "+-----------------+", + ], + }, + ]; + + for t in test_cases { + let batch_stream = query_executor + .query_sql(db_name, t.query, None, None, None) + .await + .unwrap(); + let batches: Vec = batch_stream.try_collect().await.unwrap(); + assert_batches_sorted_eq!(t.expected, &batches); + } + + // put us over the parquet limit + let time = 120; + let _ = write_buffer + .write_lp( + NamespaceName::new(db_name).unwrap(), + "\ + cpu,host=a,region=us-east usage=250\n\ + mem,host=a,region=us-east usage=150000\n\ + ", + Time::from_timestamp_nanos(time), + false, + influxdb3_write::Precision::Nanosecond, + ) + .await + .unwrap(); + + time_provider.set(Time::from_timestamp(time + 1, 0).unwrap()); + + // bump time again and sleep briefly to ensure time to persist things + time_provider.set(Time::from_timestamp(20, 0).unwrap()); + tokio::time::sleep(Duration::from_millis(500)).await; + + match query_executor + .query_sql(db_name, "SELECT COUNT(host) FROM CPU", None, None, None) + .await { + Ok(_) => panic!("expected to exceed parquet file limit, yet query succeeded"), + Err(err) => assert_eq!(err.to_string(), "error while planning query: External error: Query would exceed file limit of 3 parquet files. Please specify a smaller time range for your query. You can increase the file limit with the `--query-file-limit` option in the serve command, however, query performance will be slower and the server may get OOM killed or become unstable as a result".to_string()) + } + + // Make sure if we specify a smaller time range that queries will still work + query_executor + .query_sql( + db_name, + "SELECT COUNT(host) FROM CPU WHERE time < '1970-01-01T00:00:00.000000010Z'", + None, + None, + None, + ) + .await + .unwrap(); + } } diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index b53c6ebf6d8..73e091db3e2 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -47,9 +47,6 @@ use thiserror::Error; use twox_hash::XxHash64; use write_buffer::INDEX_HASH_SEED; -/// Used to determine if writes are older than what we can accept or query -pub const THREE_DAYS: Duration = Duration::from_secs(60 * 60 * 24 * 3); - #[derive(Debug, Error)] pub enum Error { #[error("object store path error: {0}")] diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 7e272df8ae2..7467e3fc513 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -146,6 +146,8 @@ pub struct WriteBufferImpl { metrics: WriteMetrics, distinct_cache: Arc, last_cache: Arc, + /// The number of files we will accept for a query + query_file_limit: usize, } /// The maximum number of snapshots to load on start @@ -163,6 +165,7 @@ pub struct WriteBufferImplArgs { pub parquet_cache: Option>, pub metric_registry: Arc, pub snapshotted_wal_files_to_keep: u64, + pub query_file_limit: Option, } impl WriteBufferImpl { @@ -178,6 +181,7 @@ impl WriteBufferImpl { parquet_cache, metric_registry, snapshotted_wal_files_to_keep, + query_file_limit, }: WriteBufferImplArgs, ) -> Result> { // load snapshots and replay the wal into the in memory buffer @@ -199,7 +203,6 @@ impl WriteBufferImpl { } let persisted_files = Arc::new(PersistedFiles::new_from_persisted_snapshots( - Arc::clone(&time_provider), persisted_snapshots, )); let queryable_buffer = Arc::new(QueryableBuffer::new(QueryableBufferArgs { @@ -209,7 +212,6 @@ impl WriteBufferImpl { last_cache_provider: Arc::clone(&last_cache), distinct_cache_provider: Arc::clone(&distinct_cache), persisted_files: Arc::clone(&persisted_files), - time_provider: Arc::clone(&time_provider), parquet_cache: parquet_cache.clone(), })); @@ -239,6 +241,7 @@ impl WriteBufferImpl { persisted_files, buffer: queryable_buffer, metrics: WriteMetrics::new(&metric_registry), + query_file_limit: query_file_limit.unwrap_or(432), }); Ok(result) } @@ -327,6 +330,21 @@ impl WriteBufferImpl { self.persisted_files .get_files_filtered(db_schema.id, table_def.table_id, filter); + if parquet_files.len() > self.query_file_limit { + return Err(DataFusionError::External( + format!( + "Query would exceed file limit of {} parquet files. \ + Please specify a smaller time range for your \ + query. You can increase the file limit with the \ + `--query-file-limit` option in the serve command, however, \ + query performance will be slower and the server may get \ + OOM killed or become unstable as a result", + self.query_file_limit + ) + .into(), + )); + } + let mut chunk_order = chunks.len() as i64; for parquet_file in parquet_files { @@ -723,7 +741,7 @@ impl DatabaseManager for WriteBufferImpl { "int64" => FieldDataType::Integer, "bool" => FieldDataType::Boolean, "utf8" => FieldDataType::String, - _ => todo!(), + _ => unreachable!(), }, }); } @@ -957,6 +975,7 @@ mod tests { parquet_cache: Some(Arc::clone(&parquet_cache)), metric_registry: Default::default(), snapshotted_wal_files_to_keep: 10, + query_file_limit: None, }) .await .unwrap(); @@ -1048,6 +1067,7 @@ mod tests { parquet_cache: Some(Arc::clone(&parquet_cache)), metric_registry: Default::default(), snapshotted_wal_files_to_keep: 10, + query_file_limit: None, }) .await .unwrap(); @@ -1119,6 +1139,7 @@ mod tests { parquet_cache: wbuf.parquet_cache.clone(), metric_registry: Default::default(), snapshotted_wal_files_to_keep: 10, + query_file_limit: None, }) .await .unwrap() @@ -1359,6 +1380,7 @@ mod tests { parquet_cache: write_buffer.parquet_cache.clone(), metric_registry: Default::default(), snapshotted_wal_files_to_keep: 10, + query_file_limit: None, }) .await .unwrap(); @@ -3043,6 +3065,7 @@ mod tests { parquet_cache, metric_registry: Arc::clone(&metric_registry), snapshotted_wal_files_to_keep: 10, + query_file_limit: None, }) .await .unwrap(); diff --git a/influxdb3_write/src/write_buffer/persisted_files.rs b/influxdb3_write/src/write_buffer/persisted_files.rs index c28983b69b8..ece6b6efaae 100644 --- a/influxdb3_write/src/write_buffer/persisted_files.rs +++ b/influxdb3_write/src/write_buffer/persisted_files.rs @@ -8,35 +8,24 @@ use hashbrown::HashMap; use influxdb3_id::DbId; use influxdb3_id::TableId; use influxdb3_telemetry::ParquetMetrics; -use iox_time::TimeProvider; use parking_lot::RwLock; -use std::sync::Arc; type DatabaseToTables = HashMap; type TableToFiles = HashMap>; -#[derive(Debug)] +#[derive(Debug, Default)] pub struct PersistedFiles { - /// The time provider to check if something is older than 3 days - time_provider: Arc, inner: RwLock, } impl PersistedFiles { - pub fn new(time_provider: Arc) -> Self { - Self { - time_provider, - inner: Default::default(), - } + pub fn new() -> Self { + Default::default() } /// Create a new `PersistedFiles` from a list of persisted snapshots - pub fn new_from_persisted_snapshots( - time_provider: Arc, - persisted_snapshots: Vec, - ) -> Self { + pub fn new_from_persisted_snapshots(persisted_snapshots: Vec) -> Self { let inner = Inner::new_from_persisted_snapshots(persisted_snapshots); Self { - time_provider, inner: RwLock::new(inner), } } @@ -61,20 +50,16 @@ impl PersistedFiles { table_id: TableId, filter: &ChunkFilter, ) -> Vec { - let three_days_ago = (self.time_provider.now() - crate::THREE_DAYS).timestamp_nanos(); - let mut files = { - let inner = self.inner.read(); - inner - .files - .get(&db_id) - .and_then(|tables| tables.get(&table_id)) - .cloned() - .unwrap_or_default() - .into_iter() - .filter(|file| filter.test_time_stamp_min_max(file.min_time, file.max_time)) - .filter(|file| file.min_time > three_days_ago) - .collect::>() - }; + let inner = self.inner.read(); + let mut files = inner + .files + .get(&db_id) + .and_then(|tables| tables.get(&table_id)) + .cloned() + .unwrap_or_default() + .into_iter() + .filter(|file| filter.test_time_stamp_min_max(file.min_time, file.max_time)) + .collect::>(); files.sort_by(|a, b| b.min_time.cmp(&a.min_time)); @@ -190,11 +175,10 @@ mod tests { use influxdb3_catalog::catalog::TableDefinition; use influxdb3_id::ColumnId; use influxdb3_wal::{SnapshotSequenceNumber, WalFileSequenceNumber}; - use iox_time::MockProvider; - use iox_time::Time; use observability_deps::tracing::info; use pretty_assertions::assert_eq; use schema::InfluxColumnType; + use std::sync::Arc; use crate::ParquetFileId; @@ -203,12 +187,8 @@ mod tests { #[test_log::test(test)] fn test_get_metrics_after_initial_load() { let all_persisted_snapshot_files = build_persisted_snapshots(); - let time_provider: Arc = - Arc::new(MockProvider::new(Time::from_timestamp(0, 0).unwrap())); - let persisted_file = PersistedFiles::new_from_persisted_snapshots( - time_provider, - all_persisted_snapshot_files, - ); + let persisted_file = + PersistedFiles::new_from_persisted_snapshots(all_persisted_snapshot_files); let (file_count, size_in_mb, row_count) = persisted_file.get_metrics(); @@ -221,12 +201,8 @@ mod tests { #[test_log::test(test)] fn test_get_metrics_after_update() { let all_persisted_snapshot_files = build_persisted_snapshots(); - let time_provider: Arc = - Arc::new(MockProvider::new(Time::from_timestamp(0, 0).unwrap())); - let persisted_file = PersistedFiles::new_from_persisted_snapshots( - time_provider, - all_persisted_snapshot_files, - ); + let persisted_file = + PersistedFiles::new_from_persisted_snapshots(all_persisted_snapshot_files); let parquet_files = build_parquet_files(5); let new_snapshot = build_snapshot(parquet_files, 1, 1, 1); persisted_file.add_persisted_snapshot_files(new_snapshot); @@ -255,12 +231,8 @@ mod tests { .cloned() .unwrap(); - let time_provider: Arc = - Arc::new(MockProvider::new(Time::from_timestamp(0, 0).unwrap())); - let persisted_file = PersistedFiles::new_from_persisted_snapshots( - time_provider, - all_persisted_snapshot_files, - ); + let persisted_file = + PersistedFiles::new_from_persisted_snapshots(all_persisted_snapshot_files); let mut parquet_files = build_parquet_files(4); info!(all_persisted_files = ?persisted_file, "Full persisted file"); info!(already_existing_file = ?already_existing_file, "Existing file"); @@ -300,10 +272,7 @@ mod tests { }) .collect(); let persisted_snapshots = vec![build_snapshot(parquet_files, 0, 0, 0)]; - let persisted_files = PersistedFiles::new_from_persisted_snapshots( - Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))), - persisted_snapshots, - ); + let persisted_files = PersistedFiles::new_from_persisted_snapshots(persisted_snapshots); struct TestCase<'a> { filter: &'a [Expr], diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index 9ad2f621edb..b1ab80932b3 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -25,7 +25,6 @@ use iox_query::chunk_statistics::{create_chunk_statistics, NoColumnRanges}; use iox_query::exec::Executor; use iox_query::frontend::reorg::ReorgPlanner; use iox_query::QueryChunk; -use iox_time::TimeProvider; use object_store::path::Path; use observability_deps::tracing::{error, info}; use parking_lot::RwLock; @@ -47,7 +46,6 @@ pub struct QueryableBuffer { persisted_files: Arc, buffer: Arc>, parquet_cache: Option>, - time_provider: Arc, /// Sends a notification to this watch channel whenever a snapshot info is persisted persisted_snapshot_notify_rx: tokio::sync::watch::Receiver>, persisted_snapshot_notify_tx: tokio::sync::watch::Sender>, @@ -61,7 +59,6 @@ pub struct QueryableBufferArgs { pub distinct_cache_provider: Arc, pub persisted_files: Arc, pub parquet_cache: Option>, - pub time_provider: Arc, } impl QueryableBuffer { @@ -74,7 +71,6 @@ impl QueryableBuffer { distinct_cache_provider, persisted_files, parquet_cache, - time_provider, }: QueryableBufferArgs, ) -> Self { let buffer = Arc::new(RwLock::new(BufferState::new(Arc::clone(&catalog)))); @@ -89,7 +85,6 @@ impl QueryableBuffer { persisted_files, buffer, parquet_cache, - time_provider, persisted_snapshot_notify_rx, persisted_snapshot_notify_tx, } @@ -118,9 +113,6 @@ impl QueryableBuffer { .partitioned_record_batches(Arc::clone(&table_def), buffer_filter) .map_err(|e| DataFusionError::Execution(format!("error getting batches {}", e)))? .into_iter() - .filter(|(_, (ts_min_max, _))| { - ts_min_max.min > (self.time_provider.now() - crate::THREE_DAYS).timestamp_nanos() - }) .map(|(gen_time, (ts_min_max, batches))| { let row_count = batches.iter().map(|b| b.num_rows()).sum::(); let chunk_stats = create_chunk_statistics( @@ -801,8 +793,7 @@ mod tests { Arc::clone(&catalog), ) .unwrap(), - time_provider: Arc::clone(&time_provider), - persisted_files: Arc::new(PersistedFiles::new(Arc::clone(&time_provider))), + persisted_files: Arc::new(PersistedFiles::new()), parquet_cache: None, }; let queryable_buffer = QueryableBuffer::new(queryable_buffer_args); diff --git a/influxdb3_write/src/write_buffer/validator.rs b/influxdb3_write/src/write_buffer/validator.rs index ead88a31527..4ce101d5b13 100644 --- a/influxdb3_write/src/write_buffer/validator.rs +++ b/influxdb3_write/src/write_buffer/validator.rs @@ -242,14 +242,6 @@ fn validate_and_qualify_v1_line( .map(|ts| apply_precision_to_timestamp(precision, ts)) .unwrap_or(ingest_time.timestamp_nanos()); - if timestamp_ns < (ingest_time - crate::THREE_DAYS).timestamp_nanos() { - return Err(WriteLineError { - original_line: line.to_string(), - line_number: line_number + 1, - error_message: "line contained a date that was more than 3 days ago".into(), - }); - } - fields.push(Field::new(time_col_id, FieldData::Timestamp(timestamp_ns))); // if we have new columns defined, add them to the db_schema table so that subsequent lines