Skip to content

Commit

Permalink
Merge pull request databendlabs#2629 from ZhiHanZ/stateful-test
Browse files Browse the repository at this point in the history
[CLI] more robust load interface
  • Loading branch information
databend-bot authored Nov 3, 2021
2 parents 4464dfb + f2af275 commit afea2df
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 53 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ miri:
cargo miri setup
MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test

cluster: build
mkdir -p ./.databend/local/bin/test
cp ./target/release/databend-query ./.databend/local/bin/test/databend-query
cp ./target/release/databend-meta ./.databend/local/bin/test/databend-meta
./target/release/bendctl cluster create --databend_dir ./.databend --group local --version test --force
run: build
bash ./scripts/deploy/databend-query-standalone.sh release

Expand Down
1 change: 1 addition & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ async-std = "1.10.0"
tryhard = "0.4.0"
rayon = "1.5.1"
tokio-util = "0.6.9"
csv = "1.1"

[dev-dependencies]
pretty_assertions = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion cli/docs/ontime/build_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,4 @@ CREATE TABLE ontime
Div5LongestGTime String,
Div5WheelsOff String,
Div5TailNum String
) ENGINE = CSV location = {{ .csv_location }};
) ENGINE = FUSE;
106 changes: 55 additions & 51 deletions cli/src/cmds/loads/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,14 @@ use clap::App;
use clap::AppSettings;
use clap::Arg;
use clap::ArgMatches;
use common_base::tokio::fs::File;
use common_base::tokio::io::AsyncBufReadExt;
use common_base::tokio::io::AsyncRead;
use common_base::tokio::io::BufReader;
use common_base::tokio::macros::support::Pin;
use common_base::tokio::time;
// Lets us call into_async_read() to convert a futures::stream::Stream into a
// futures::io::AsyncRead.
use futures::stream::TryStreamExt;
use itertools::Itertools;
use lexical_util::num::AsPrimitive;
use num_format::Locale;
use num_format::ToFormattedString;
use rayon::prelude::*;
use tokio_util::compat::FuturesAsyncReadCompatExt;

use crate::cmds::clusters::cluster::ClusterProfile;
use crate::cmds::command::Command;
Expand Down Expand Up @@ -151,11 +144,11 @@ impl LoadCommand {
.required(false),
)
.arg(
Arg::new("skip-head-lines").long("skip-head-lines")
.about("skip head line in file for example: \
bendctl load test.csv --skip-head-lines 10 would ignore the first ten lines in csv file")
.takes_value(true)
.required(false),
Arg::new("with_header").long("with_header")
.about("state on whether CSV has dataset header for example: \
bendctl load test.csv --with_header true would ignore the first ten lines in csv file")
.required(false)
.takes_value(false),
)
.arg(
Arg::new("table").long("table")
Expand All @@ -169,17 +162,9 @@ impl LoadCommand {
async fn local_exec_match(&self, writer: &mut Writer, args: &ArgMatches) -> Result<()> {
match self.local_exec_precheck(args).await {
Ok(_) => {
let mut reader = build_reader(args.value_of("load")).await.lines();
for _ in 0..args
.value_of("skip-head-lines")
.unwrap_or("0")
.parse::<usize>()
.unwrap()
{
if reader.next_line().await?.is_none() {
return Ok(());
}
}
let mut reader =
build_reader(args.value_of("load"), args.value_of("with_header")).await;
let mut record = reader.records();
let table = args.value_of("table").unwrap();
let schema = args.value_of("schema");
let table_format = match schema {
Expand All @@ -202,9 +187,17 @@ impl LoadCommand {
let mut batch = vec![];
// possible optimization is to run iterator in parallel
for _ in 0..100_000 {
if let Some(line) = reader.next_line().await? {
batch.push(line);
count += 1;
if let Some(line) = record.next() {
if let Ok(line) = line {
batch.push(line);
count += 1;
} else {
writer.write_err(format!(
"cannot read csv line {}, error: {}",
count,
line.unwrap_err()
))
}
} else {
break;
}
Expand All @@ -215,6 +208,17 @@ impl LoadCommand {
let values = batch
.into_iter()
.par_bridge()
.map(|s| {
s.iter()
.map(|i| {
if i.trim().is_empty() {
"null".to_string()
} else {
"'".to_owned() + i + &*"'".to_owned()
}
})
.join(",")
})
.map(|e| format!("({})", e.trim()))
.filter(|e| !e.trim().is_empty())
.reduce_with(|a, b| format!("{}, {}", a, b));
Expand All @@ -228,6 +232,7 @@ impl LoadCommand {
}
}
}

let elapsed = start.elapsed();
let time = elapsed.as_millis() as f64 / 1000f64;
writer.write_ok(format!(
Expand All @@ -238,6 +243,7 @@ impl LoadCommand {
.to_formatted_string(&Locale::en),
time
));

Ok(())
}
Err(e) => {
Expand Down Expand Up @@ -281,43 +287,41 @@ impl LoadCommand {
}
}

async fn build_reader(load: Option<&str>) -> BufReader<Pin<Box<dyn AsyncRead + Send>>> {
async fn build_reader(
load: Option<&str>,
header: Option<&str>,
) -> csv::Reader<Box<dyn std::io::Read + Send + Sync>> {
let header = header.is_some();
match load {
Some(val) => {
if Path::new(val).exists() {
let f = File::open(val)
.await
.expect("cannot open file: permission denied");
BufReader::new(Box::pin(f))
let f = std::fs::File::open(val).expect("cannot open file: permission denied");
csv::ReaderBuilder::new()
.has_headers(header)
.from_reader(Box::new(f))
} else if val.contains("://") {
// Attempt to download ferris..
let target = reqwest::get(val)
.await
.expect("cannot connect to target url")
.error_for_status()
.expect("return code is not OK"); // generate an error if server didn't respond OK

// Convert the body of the response into a futures::io::Stream.
let target_stream = target.bytes_stream();

// Convert the stream into an futures::io::AsyncRead.
// We must first convert the reqwest::Error into an futures::io::Error.
let target_stream = target_stream
.map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e))
.into_async_read();

// Convert the futures::io::AsyncRead into a tokio::io::AsyncRead.
let target_stream = target_stream.compat();

BufReader::new(Box::pin(target_stream))
.expect("return code is not OK")
.text()
.await
.expect("cannot fetch for target"); // generate an error if server didn't respond
csv::ReaderBuilder::new()
.has_headers(header)
.from_reader(Box::new(Cursor::new(target)))
} else {
let bytes = val.to_string();
BufReader::new(Box::pin(Cursor::new(bytes.as_bytes().to_owned())))
csv::ReaderBuilder::new()
.has_headers(header)
.from_reader(Box::new(Cursor::new(val.to_string().as_bytes().to_owned())))
}
}
None => {
let io = common_base::tokio::io::stdin();
BufReader::new(Box::pin(io))
let io = std::io::stdin();
csv::ReaderBuilder::new()
.has_headers(header)
.from_reader(Box::new(io))
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions cli/src/cmds/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,10 @@ impl LocalRuntime for LocalQueryConfig {
databend_query::configs::config_storage::STORAGE_TYPE,
conf.storage.storage_type,
)
.env(
databend_query::configs::config_meta::META_EMBEDDED_DIR,
"/tmp/embedded",
)
.env(
databend_query::configs::config_storage::DISK_STORAGE_DATA_PATH,
conf.storage.disk.data_path,
Expand Down
3 changes: 2 additions & 1 deletion query/src/configs/config_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::configs::Config;
pub const META_ADDRESS: &str = "META_ADDRESS";
pub const META_USERNAME: &str = "META_USERNAME";
pub const META_PASSWORD: &str = "META_PASSWORD";
pub const META_EMBEDDED_DIR: &str = "META_EMBEDDED_DIR";
pub const META_RPC_TLS_SERVER_ROOT_CA_CERT: &str = "META_RPC_TLS_SERVER_ROOT_CA_CERT";
pub const META_RPC_TLS_SERVICE_DOMAIN_NAME: &str = "META_RPC_TLS_SERVICE_DOMAIN_NAME";

Expand All @@ -31,7 +32,7 @@ pub const META_RPC_TLS_SERVICE_DOMAIN_NAME: &str = "META_RPC_TLS_SERVICE_DOMAIN_
#[derive(Clone, serde::Serialize, serde::Deserialize, PartialEq, StructOpt, StructOptToml)]
pub struct MetaConfig {
/// The dir to store persisted meta state for a embedded meta store
#[structopt(long, default_value = "./_meta_embedded")]
#[structopt(long, env = META_EMBEDDED_DIR, default_value = "./_meta_embedded")]
#[serde(default)]
pub meta_embedded_dir: String,

Expand Down

0 comments on commit afea2df

Please sign in to comment.