diff --git a/influxdb3/tests/server/write.rs b/influxdb3/tests/server/write.rs index 161564ae286..26ceb00d0d0 100644 --- a/influxdb3/tests/server/write.rs +++ b/influxdb3/tests/server/write.rs @@ -1,4 +1,5 @@ use hyper::StatusCode; +use influxdb3_client::Precision; use pretty_assertions::assert_eq; use crate::TestServer; @@ -272,3 +273,50 @@ async fn api_v2_write_round_trip() { +------------------+-------------------------------+------+-------+" ); } + +/// Reproducer for [#25006][issue] +/// +/// [issue]: https://github.com/influxdata/influxdb/issues/25006 +#[tokio::test] +async fn writes_with_different_schema_should_fail() { + let server = TestServer::spawn().await; + // send a valid write request with the field t0_f0 as an integer: + server + .write_lp_to_db( + "foo", + "\ + t0,t0_tag0=initTag t0_f0=0i 1715694000\n\ + t0,t0_tag0=initTag t0_f0=1i 1715694001\n\ + t0,t0_tag1=initTag t0_f0=0i 1715694000", + Precision::Second, + ) + .await + .expect("writes LP with integer field"); + + // send another write request, to the same db, but with field t0_f0 as an unsigned integer: + let error = server + .write_lp_to_db( + "foo", + "\ + t0,t0_tag0=initTag t0_f0=0u 1715694000\n\ + t0,t0_tag0=initTag t0_f0=1u 1715694001\n\ + t0,t0_tag1=initTag t0_f0=0u 1715694000", + Precision::Second, + ) + .await + .expect_err("should fail when writing LP with same field as unsigned integer"); + + println!("error: {error:#?}"); + + // the request should have failed with an API error indicating incorrect schema for the field: + assert!( + matches!( + error, + influxdb3_client::Error::ApiError { + code: StatusCode::BAD_REQUEST, + message: _ + } + ), + "the request should hae failed with an API Error" + ); +} diff --git a/influxdb3_write/src/write_buffer/flusher.rs b/influxdb3_write/src/write_buffer/flusher.rs index 0fa1b0dacf0..3566eb4895e 100644 --- a/influxdb3_write/src/write_buffer/flusher.rs +++ b/influxdb3_write/src/write_buffer/flusher.rs @@ -85,6 +85,12 @@ impl WriteBufferFlusher { &self, segmented_data: Vec, ) -> crate::write_buffer::Result<()> { + // Check for presence of valid segment data, otherwise, the await on the response receiver + // will hang below. + if segmented_data.is_empty() { + return Ok(()); + } + let (response_tx, response_rx) = oneshot::channel(); self.buffer_tx diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index f9c277b0ff4..a66b3110d5a 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -469,25 +469,22 @@ pub(crate) fn parse_validate_and_update_schema( let mut valid_parsed_and_raw_lines: Vec<(ParsedLine, &str)> = vec![]; for (line_idx, maybe_line) in parse_lines(lp).enumerate() { - let line = match maybe_line { + let line = match maybe_line + .map_err(|e| WriteLineError { + // This unwrap is fine because we're moving line by line + // alongside the output from parse_lines + original_line: lp_lines.next().unwrap().to_string(), + line_number: line_idx + 1, + error_message: e.to_string(), + }) + .and_then(|l| validate_line_schema(line_idx, l, schema)) + { Ok(line) => line, Err(e) => { if !accept_partial { - return Err(Error::ParseError(WriteLineError { - // This unwrap is fine because we're moving line by line - // alongside the output from parse_lines - original_line: lp_lines.next().unwrap().to_string(), - line_number: line_idx + 1, - error_message: e.to_string(), - })); + return Err(Error::ParseError(e)); } else { - errors.push(WriteLineError { - original_line: lp_lines.next().unwrap().to_string(), - // This unwrap is fine because we're moving line by line - // alongside the output from parse_lines - line_number: line_idx + 1, - error_message: e.to_string(), - }); + errors.push(e); } continue; } @@ -512,6 +509,40 @@ pub(crate) fn parse_validate_and_update_schema( }) } +/// Validate a line of line protocol against the given schema definition +/// +/// This is for scenarios where a write comes in for a table that exists, but may have invalid field +/// types, based on the pre-existing schema. +fn validate_line_schema<'a>( + line_number: usize, + line: ParsedLine<'a>, + schema: &DatabaseSchema, +) -> Result, WriteLineError> { + let table_name = line.series.measurement.as_str(); + if let Some(table_schema) = schema.get_table_schema(table_name) { + for (field_name, field_val) in line.field_set.iter() { + if let Some(schema_col_type) = table_schema.field_type_by_name(field_name) { + let field_col_type = column_type_from_field(field_val); + if field_col_type != schema_col_type { + let field_name = field_name.to_string(); + return Err(WriteLineError { + original_line: line.to_string(), + line_number: line_number + 1, + error_message: format!( + "invalid field value in line protocol for field '{field_name}' on line \ + {line_number}: expected type {expected}, but got {got}", + expected = ColumnType::from(schema_col_type), + got = field_col_type, + ), + }); + } + } + } + } + + Ok(line) +} + /// Takes parsed lines, validates their schema. If new tables or columns are defined, they /// are passed back as a new DatabaseSchema as part of the ValidationResult. Lines are split /// into partitions and the validation result contains the data that can then be serialized