From c83a12987653bf473b8e7066385b83282cf95e61 Mon Sep 17 00:00:00 2001 From: "Michael S. Huang" Date: Sun, 30 Jan 2022 14:42:18 +0800 Subject: [PATCH 1/3] shell for "schema"; make stats::FieldType public --- src/cmd/mod.rs | 1 + src/cmd/schema.rs | 165 ++++++++++++++++++++++++++++++++++++++++++++++ src/cmd/stats.rs | 19 +++++- src/main.rs | 5 +- src/mainlite.rs | 5 +- 5 files changed, 190 insertions(+), 5 deletions(-) create mode 100644 src/cmd/schema.rs diff --git a/src/cmd/mod.rs b/src/cmd/mod.rs index 50bd4c881..c3faf4e40 100644 --- a/src/cmd/mod.rs +++ b/src/cmd/mod.rs @@ -32,6 +32,7 @@ pub mod rename; pub mod replace; pub mod reverse; pub mod sample; +pub mod schema; pub mod search; pub mod searchset; pub mod select; diff --git a/src/cmd/schema.rs b/src/cmd/schema.rs new file mode 100644 index 000000000..631e61d6d --- /dev/null +++ b/src/cmd/schema.rs @@ -0,0 +1,165 @@ +use crate::config::{Config, Delimiter}; +use crate::util; +use crate::CliError; +use crate::CliResult; +use crate::cmd::stats::{FieldType, FieldType::*}; +use anyhow::{anyhow, Result}; +use csv::ByteRecord; +use indicatif::{ProgressBar, ProgressDrawTarget}; +use jsonschema::{output::BasicOutput, JSONSchema}; +use log::{debug, info}; +use serde::Deserialize; +use serde_json::{value::Number, Map, Value}; +use stats::Frequencies; +use std::{env, fs::File, io::BufReader, io::BufWriter, io::Read, io::Write, ops::Add}; + +macro_rules! fail { + ($mesg:expr) => { + Err(CliError::Other($mesg)) + }; +} + +static USAGE: &str = " +Infer schmea from CSV data and output in JSON Schema format. + +Example output file from `mydata.csv`. If piped from stdin, then filename is `stdin.csv`. + +* mydata.csv.schema.json + +Usage: + qsv schema [options] [] + +fetch options: + --no-nulls Skip NULL values in type inference + +Common options: + -h, --help Display this message + -n, --no-headers When set, the first row will not be interpreted + as headers. Namely, it will be sorted with the rest + of the rows. Otherwise, the first row will always + appear as the header row in the output. + -d, --delimiter The field delimiter for reading CSV data. + Must be a single character. [default: ,] + -q, --quiet Don't show progress bars. +"; + +#[derive(Deserialize, Debug)] +struct Args { + flag_no_nulls: bool, + flag_no_headers: bool, + flag_delimiter: Option, + flag_quiet: bool, + arg_input: Option, +} + +pub fn run(argv: &[&str]) -> CliResult<()> { + let args: Args = util::get_args(USAGE, argv)?; + + // dbg!(&args); + + let rconfig = Config::new(&args.arg_input) + .delimiter(args.flag_delimiter) + .no_headers(args.flag_no_headers); + + let mut rdr = rconfig.reader()?; + + let headers = rdr.byte_headers()?.clone(); + + let input_path: &str = &args.arg_input.unwrap_or_else(|| "stdin.csv".to_string()); + + let mut schema_output_file = File::create(input_path.to_owned() + ".schema.json") + .expect("unable to create schema output file"); + + // prep progress bar + let progress = ProgressBar::new(0); + if !args.flag_quiet { + let record_count = util::count_rows(&rconfig.flexible(true)); + util::prep_progress(&progress, record_count); + } else { + progress.set_draw_target(ProgressDrawTarget::hidden()); + } + + // amortize memory allocation by reusing record + #[allow(unused_assignments)] + let mut record = csv::ByteRecord::new(); + + let mut row_index: u32 = 0; + let mut invalid_count: u32 = 0; + + // array of frequency tables + let mut frequency_tables: Vec<_> = (0..(headers.len() as u32)).map(|_| Frequencies::::new()).collect(); + + // iterate over each CSV field and determine type + let headers_iter = headers.iter().enumerate(); + + while rdr.read_byte_record(&mut record)? { + row_index = row_index.add(1); + + // dbg!(&record); + + + for (i, header) in headers_iter.clone() { + // convert csv header to string + let header_string = std::str::from_utf8(header).unwrap().to_string(); + // convert csv value to string; trim whitespace + let value_string = std::str::from_utf8(&record[i]).unwrap().trim().to_string(); + + let sample_type = FieldType::from_sample(&value_string.as_bytes()); + + debug!("{}[{}]: val={}, type={}", &header_string, &row_index, &value_string, &sample_type); + + match sample_type { + FieldType::TNull => { + if args.flag_no_nulls { + // skip + debug!("Skipped: {}[{}]", &header_string, &row_index); + } else { + frequency_tables[i].add(FieldType::TNull); + } + } + FieldType::TUnknown => { + // default to String + frequency_tables[i].add(FieldType::TUnicode); + } + x => { + frequency_tables[i].add(x); + } + } + + } + + if !args.flag_quiet { + progress.inc(1); + } + } // end main while loop over csv records + + // dbg!(&frequency_tables); + + // get most frequent type for each header column + for (i, header) in headers_iter { + let most_frequent = frequency_tables[i].most_frequent(); + let inferred_type = match most_frequent.get(0) { + Some(tuple) => tuple, + None => &(&FieldType::TNull, 0) + }; + let header_string = std::str::from_utf8(header).unwrap().to_string(); + print!("{:?}: {:?}\n", header_string, inferred_type); + } + + // flush error report; file gets closed automagically when out-of-scope + schema_output_file.flush().unwrap(); + + use thousands::Separable; + + if !args.flag_quiet { + progress.set_message(format!( + " processed {} records.", + progress.length().separate_with_commas() + )); + util::finish_progress(&progress); + } + + Ok(()) +} + + diff --git a/src/cmd/stats.rs b/src/cmd/stats.rs index 9bad5099d..3f475422e 100644 --- a/src/cmd/stats.rs +++ b/src/cmd/stats.rs @@ -530,8 +530,8 @@ impl Commute for Stats { } } -#[derive(Clone, Copy, PartialEq)] -enum FieldType { +#[derive(Clone, Copy, PartialEq, Eq, Hash)] +pub enum FieldType { TUnknown, TNull, TUnicode, @@ -541,7 +541,7 @@ enum FieldType { } impl FieldType { - fn from_sample(sample: &[u8]) -> FieldType { + pub fn from_sample(sample: &[u8]) -> FieldType { if sample.is_empty() { return TNull; } @@ -616,6 +616,19 @@ impl fmt::Display for FieldType { } } +impl fmt::Debug for FieldType { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + TUnknown => write!(f, "Unknown"), + TNull => write!(f, "NULL"), + TUnicode => write!(f, "Unicode"), + TFloat => write!(f, "Float"), + TInteger => write!(f, "Integer"), + TDate => write!(f, "Date"), + } + } +} + /// TypedSum keeps a rolling sum of the data seen. /// /// It sums integers until it sees a float, at which point it sums floats. diff --git a/src/main.rs b/src/main.rs index ae238d187..ac9c952b1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -67,10 +67,11 @@ macro_rules! command_list { partition Partition CSV data based on a column value pseudo Pseudonymise the values of a column py* Evaluate a Python expression on CSV data - sample Randomly sample CSV data rename Rename the columns of CSV data efficiently replace Replace patterns in CSV data reverse Reverse rows of CSV data + sample Randomly sample CSV data + schema Infer schema from CSV data search Search CSV data with a regex searchset Search CSV data with a regex set select Select, re-order, duplicate or drop columns @@ -251,6 +252,7 @@ enum Command { Replace, Reverse, Sample, + Schema, Search, SearchSet, Select, @@ -313,6 +315,7 @@ impl Command { Command::Replace => cmd::replace::run(argv), Command::Reverse => cmd::reverse::run(argv), Command::Sample => cmd::sample::run(argv), + Command::Schema => cmd::schema::run(argv), Command::Search => cmd::search::run(argv), Command::SearchSet => cmd::searchset::run(argv), Command::Select => cmd::select::run(argv), diff --git a/src/mainlite.rs b/src/mainlite.rs index 6e07e0038..a7f51a3a7 100644 --- a/src/mainlite.rs +++ b/src/mainlite.rs @@ -60,10 +60,11 @@ macro_rules! command_list { jsonl Convert newline-delimited JSON files to CSV partition Partition CSV data based on a column value pseudo Pseudonymise the values of a column - sample Randomly sample CSV data rename Rename the columns of CSV data efficiently replace Replace patterns in CSV data reverse Reverse rows of CSV data + sample Randomly sample CSV data + schema Infer schema from CSV data search Search CSV data with a regex searchset Search CSV data with a regex set select Select, re-order, duplicate or drop columns @@ -217,6 +218,7 @@ enum Command { Replace, Reverse, Sample, + Schema, Search, SearchSet, Select, @@ -271,6 +273,7 @@ impl Command { Command::Replace => cmd::replace::run(argv), Command::Reverse => cmd::reverse::run(argv), Command::Sample => cmd::sample::run(argv), + Command::Schema => cmd::schema::run(argv), Command::Search => cmd::search::run(argv), Command::SearchSet => cmd::searchset::run(argv), Command::Select => cmd::select::run(argv), From 4230d695281b55b5b9ad72ce12d9d46a3a96167c Mon Sep 17 00:00:00 2001 From: "Michael S. Huang" Date: Tue, 1 Feb 2022 17:59:37 +0800 Subject: [PATCH 2/3] can generate correct schema --- src/cmd/schema.rs | 95 +++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 83 insertions(+), 12 deletions(-) diff --git a/src/cmd/schema.rs b/src/cmd/schema.rs index 631e61d6d..ad8924535 100644 --- a/src/cmd/schema.rs +++ b/src/cmd/schema.rs @@ -9,7 +9,7 @@ use indicatif::{ProgressBar, ProgressDrawTarget}; use jsonschema::{output::BasicOutput, JSONSchema}; use log::{debug, info}; use serde::Deserialize; -use serde_json::{value::Number, Map, Value}; +use serde_json::{value::Number, Map, Value, json}; use stats::Frequencies; use std::{env, fs::File, io::BufReader, io::BufWriter, io::Read, io::Write, ops::Add}; @@ -83,15 +83,16 @@ pub fn run(argv: &[&str]) -> CliResult<()> { #[allow(unused_assignments)] let mut record = csv::ByteRecord::new(); - let mut row_index: u32 = 0; - let mut invalid_count: u32 = 0; + let mut row_index: u64 = 0; - // array of frequency tables + // array of frequency tables to track non-NULL type occurrences let mut frequency_tables: Vec<_> = (0..(headers.len() as u32)).map(|_| Frequencies::::new()).collect(); + // array of boolean to track if column in NULLABLE + let mut nullable_flags: Vec = vec![false; headers.len()]; // iterate over each CSV field and determine type let headers_iter = headers.iter().enumerate(); - + while rdr.read_byte_record(&mut record)? { row_index = row_index.add(1); @@ -114,7 +115,11 @@ pub fn run(argv: &[&str]) -> CliResult<()> { // skip debug!("Skipped: {}[{}]", &header_string, &row_index); } else { - frequency_tables[i].add(FieldType::TNull); + // only count NULL once, so it dominate frequency table when value is optional + if nullable_flags[i] == false { + frequency_tables[i].add(FieldType::TNull); + } + nullable_flags[i] = true; } } FieldType::TUnknown => { @@ -133,18 +138,84 @@ pub fn run(argv: &[&str]) -> CliResult<()> { } } // end main while loop over csv records - // dbg!(&frequency_tables); + debug!("freq tables: {:?}", &frequency_tables); + + // map holds "properties" object of json schema + let mut properties_map: Map = Map::new(); // get most frequent type for each header column for (i, header) in headers_iter { let most_frequent = frequency_tables[i].most_frequent(); - let inferred_type = match most_frequent.get(0) { - Some(tuple) => tuple, - None => &(&FieldType::TNull, 0) + let (inferred_type, count) = match most_frequent.get(0) { + Some(tuple) => *tuple, + None => (&FieldType::TNull, 0) }; + dbg!(&inferred_type, count, row_index); + let header_string = std::str::from_utf8(header).unwrap().to_string(); - print!("{:?}: {:?}\n", header_string, inferred_type); - } + let required: bool = if *inferred_type != FieldType::TNull && + *inferred_type != FieldType::TUnknown && + count >= row_index { + true + } else { + false + }; + debug!("{}: {:?} {}\n", header_string, inferred_type, required); + + let mut type_list: Vec = Vec::new(); + + match inferred_type { + FieldType::TUnicode => { + type_list.push(Value::String("string".to_string())); + }, + FieldType::TDate => { + type_list.push(Value::String("string".to_string())); + }, + FieldType::TInteger => { + type_list.push(Value::String("integer".to_string())); + }, + FieldType::TFloat => { + type_list.push(Value::String("number".to_string())); + }, + FieldType::TNull => { + type_list.push(Value::String("null".to_string())); + }, + _ => { + // defaults to JSON String + type_list.push(Value::String("string".to_string())); + }, + } + + // "null" type denotes optinal value + // to be compatible with "validate" command, has to come after the real type, and only if type is not NULL + if !required && *inferred_type != FieldType::TNull { + type_list.push(Value::String("null".to_string())); + } + + let mut field_map: Map = Map::new(); + field_map.insert("type".to_string(), Value::Array(type_list)); + properties_map.insert(header_string, Value::Object(field_map)); + } // end for loop over headers + + print!("\n"); + + let properties = Value::Object(properties_map); + + let schema = json!({ + "$schema": "https://json-schema.org/draft-07/schema", + "title": format!("JSON Schema for {}", input_path), + "description": "Inferred JSON Schema from QSV schema command", + "type": "object", + "properties": properties + }); + + let schema_pretty = serde_json::to_string_pretty(&schema).expect("prettify schema json"); + + println!("{}\n", &schema_pretty); + + schema_output_file + .write_all(schema_pretty.as_bytes()) + .expect("unable to write schema file"); // flush error report; file gets closed automagically when out-of-scope schema_output_file.flush().unwrap(); From 3db565cedbab0be283a9ee42fb3d78e5d67b84b4 Mon Sep 17 00:00:00 2001 From: "Michael S. Huang" Date: Wed, 2 Feb 2022 21:06:23 +0800 Subject: [PATCH 3/3] cleanup code and add some error handling --- src/cmd/schema.rs | 122 ++++++++++++++++++++++++---------------------- 1 file changed, 64 insertions(+), 58 deletions(-) diff --git a/src/cmd/schema.rs b/src/cmd/schema.rs index ad8924535..2fd0a4640 100644 --- a/src/cmd/schema.rs +++ b/src/cmd/schema.rs @@ -2,20 +2,18 @@ use crate::config::{Config, Delimiter}; use crate::util; use crate::CliError; use crate::CliResult; -use crate::cmd::stats::{FieldType, FieldType::*}; -use anyhow::{anyhow, Result}; +use crate::cmd::stats::{FieldType}; use csv::ByteRecord; use indicatif::{ProgressBar, ProgressDrawTarget}; -use jsonschema::{output::BasicOutput, JSONSchema}; -use log::{debug, info}; +use log::{debug, error}; use serde::Deserialize; -use serde_json::{value::Number, Map, Value, json}; +use serde_json::{Map, Value, json}; use stats::Frequencies; -use std::{env, fs::File, io::BufReader, io::BufWriter, io::Read, io::Write, ops::Add}; +use std::{fs::File, io::Write, ops::Add}; macro_rules! fail { ($mesg:expr) => { - Err(CliError::Other($mesg)) + return Err(CliError::Other($mesg)); }; } @@ -29,8 +27,8 @@ Example output file from `mydata.csv`. If piped from stdin, then filename is `st Usage: qsv schema [options] [] -fetch options: - --no-nulls Skip NULL values in type inference +schema options: + Common options: -h, --help Display this message @@ -45,7 +43,6 @@ Common options: #[derive(Deserialize, Debug)] struct Args { - flag_no_nulls: bool, flag_no_headers: bool, flag_delimiter: Option, flag_quiet: bool, @@ -81,13 +78,13 @@ pub fn run(argv: &[&str]) -> CliResult<()> { // amortize memory allocation by reusing record #[allow(unused_assignments)] - let mut record = csv::ByteRecord::new(); + let mut record = ByteRecord::new(); - let mut row_index: u64 = 0; + let mut row_index: u32 = 0; // array of frequency tables to track non-NULL type occurrences let mut frequency_tables: Vec<_> = (0..(headers.len() as u32)).map(|_| Frequencies::::new()).collect(); - // array of boolean to track if column in NULLABLE + // array of boolean to track if column is NULLABLE let mut nullable_flags: Vec = vec![false; headers.len()]; // iterate over each CSV field and determine type @@ -98,36 +95,32 @@ pub fn run(argv: &[&str]) -> CliResult<()> { // dbg!(&record); + for col_index in 0..headers.len() { - for (i, header) in headers_iter.clone() { - // convert csv header to string - let header_string = std::str::from_utf8(header).unwrap().to_string(); - // convert csv value to string; trim whitespace - let value_string = std::str::from_utf8(&record[i]).unwrap().trim().to_string(); + // since from_sample() parses byte slice to string, no need to do it here + let value_slice: &[u8] = &record[col_index]; - let sample_type = FieldType::from_sample(&value_string.as_bytes()); + let inferred_type: FieldType = FieldType::from_sample(value_slice); - debug!("{}[{}]: val={}, type={}", &header_string, &row_index, &value_string, &sample_type); + debug!("column_{col_index}[{row_index}]: val={value_slice:?}, type={inferred_type}"); - match sample_type { + // update frequency table for this column + match inferred_type { FieldType::TNull => { - if args.flag_no_nulls { - // skip - debug!("Skipped: {}[{}]", &header_string, &row_index); - } else { - // only count NULL once, so it dominate frequency table when value is optional - if nullable_flags[i] == false { - frequency_tables[i].add(FieldType::TNull); - } - nullable_flags[i] = true; + + // only count NULL once, so it won't dominate frequency table when value is optional + if nullable_flags[col_index] == false { + frequency_tables[col_index].add(FieldType::TNull); } + nullable_flags[col_index] = true; + } FieldType::TUnknown => { // default to String - frequency_tables[i].add(FieldType::TUnicode); + frequency_tables[col_index].add(FieldType::TUnicode); } x => { - frequency_tables[i].add(x); + frequency_tables[col_index].add(x); } } @@ -138,30 +131,57 @@ pub fn run(argv: &[&str]) -> CliResult<()> { } } // end main while loop over csv records - debug!("freq tables: {:?}", &frequency_tables); + use thousands::Separable; + + if !args.flag_quiet { + progress.set_message(format!( + " processed {} records.", + progress.length().separate_with_commas() + )); + util::finish_progress(&progress); + } + + debug!("freq tables: {frequency_tables:?}"); // map holds "properties" object of json schema let mut properties_map: Map = Map::new(); - // get most frequent type for each header column + // iterate through headers again and get most frequent type for each column for (i, header) in headers_iter { let most_frequent = frequency_tables[i].most_frequent(); let (inferred_type, count) = match most_frequent.get(0) { Some(tuple) => *tuple, - None => (&FieldType::TNull, 0) + None => { + // not good, no type info for this column + let msg = format!("Cannot determine type for column '{header:?}'"); + error!("{msg}"); + fail!(msg); + } + }; + + // convert csv header to string + let header_string: String = match std::str::from_utf8(header){ + Ok(s) => { + s.to_string() + }, + Err(e) => { + let msg = format!("Can't read header from column {i} as utf8: {e}"); + error!("{msg}"); + fail!(msg); + } }; - dbg!(&inferred_type, count, row_index); - let header_string = std::str::from_utf8(header).unwrap().to_string(); let required: bool = if *inferred_type != FieldType::TNull && *inferred_type != FieldType::TUnknown && - count >= row_index { + count as u32 >= row_index { true } else { false }; - debug!("{}: {:?} {}\n", header_string, inferred_type, required); + debug!("{header_string} has most frequent type of {inferred_type:?}, required={required}"); + + // use list since optional columns get appended a "null" type let mut type_list: Vec = Vec::new(); match inferred_type { @@ -187,7 +207,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> { } // "null" type denotes optinal value - // to be compatible with "validate" command, has to come after the real type, and only if type is not NULL + // to be compatible with "validate" command, has to come after the real type, and only if type is not already JSON Null if !required && *inferred_type != FieldType::TNull { type_list.push(Value::String("null".to_string())); } @@ -195,24 +215,20 @@ pub fn run(argv: &[&str]) -> CliResult<()> { let mut field_map: Map = Map::new(); field_map.insert("type".to_string(), Value::Array(type_list)); properties_map.insert(header_string, Value::Object(field_map)); - } // end for loop over headers - - print!("\n"); - let properties = Value::Object(properties_map); + } // end for loop over all columns + // create final JSON object for output let schema = json!({ "$schema": "https://json-schema.org/draft-07/schema", - "title": format!("JSON Schema for {}", input_path), + "title": format!("JSON Schema for {input_path}"), "description": "Inferred JSON Schema from QSV schema command", "type": "object", - "properties": properties + "properties": Value::Object(properties_map) }); let schema_pretty = serde_json::to_string_pretty(&schema).expect("prettify schema json"); - println!("{}\n", &schema_pretty); - schema_output_file .write_all(schema_pretty.as_bytes()) .expect("unable to write schema file"); @@ -220,16 +236,6 @@ pub fn run(argv: &[&str]) -> CliResult<()> { // flush error report; file gets closed automagically when out-of-scope schema_output_file.flush().unwrap(); - use thousands::Separable; - - if !args.flag_quiet { - progress.set_message(format!( - " processed {} records.", - progress.length().separate_with_commas() - )); - util::finish_progress(&progress); - } - Ok(()) }