Skip to content

Commit

Permalink
chore: check home_dir in init and clean (#489)
Browse files Browse the repository at this point in the history
  • Loading branch information
v3g42 authored Dec 30, 2022
1 parent 8a0e2ed commit f86ada0
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 23 deletions.
9 changes: 8 additions & 1 deletion dozer-orchestrator/src/cli/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub enum Commands {
App(App),
Connector(Connector),
Clean,
Init,
Init(Init),
}

#[derive(Debug, Args)]
Expand All @@ -33,6 +33,13 @@ pub struct Api {
pub command: ApiCommands,
}

#[derive(Debug, Args)]
#[command(args_conflicts_with_subcommands = true)]
pub struct Init {
#[arg(short = 'f')]
pub force: Option<Option<String>>,
}

#[derive(Debug, Args)]
#[command(args_conflicts_with_subcommands = true)]
pub struct App {
Expand Down
4 changes: 2 additions & 2 deletions dozer-orchestrator/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use dozer_types::{serde_yaml, thiserror};
pub enum OrchestrationError {
#[error("Failed to write config yaml: {0:?}")]
FailedToWriteConfigYaml(#[source] serde_yaml::Error),
#[error("Failed to initialize dozer config..")]
InitializationFailed,
#[error("Failed to initialize. {0}[/api/generated,/cache] are not empty. Use -f to clean the directory and overwrite. Warning! there will be data loss.")]
InitializationFailed(String),
#[error("Failed to generate token: {0:?}")]
GenerateTokenFailed(String),
#[error("Failed to initialize api server..")]
Expand Down
2 changes: 1 addition & 1 deletion dozer-orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ mod test_utils;
mod utils;

pub trait Orchestrator {
fn init(&mut self) -> Result<(), OrchestrationError>;
fn init(&mut self, force: bool) -> Result<(), OrchestrationError>;
fn clean(&mut self) -> Result<(), OrchestrationError>;
fn run_api(&mut self, running: Arc<AtomicBool>) -> Result<(), OrchestrationError>;
fn run_apps(
Expand Down
17 changes: 15 additions & 2 deletions dozer-orchestrator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use dozer_orchestrator::{ConnectorError, Orchestrator};
use dozer_types::crossbeam::channel;
use dozer_types::log::{error, info};
use dozer_types::prettytable::{row, Table};
use dozer_types::tracing::warn;
use std::borrow::BorrowMut;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -99,7 +100,10 @@ fn run() -> Result<(), OrchestrationError> {
Ok(())
}
},
Commands::Init => dozer.init(),
Commands::Init(init) => {
let force = init.force.is_some();
dozer.init(force)
}
Commands::Clean => dozer.clean(),
}
} else {
Expand All @@ -109,7 +113,16 @@ fn run() -> Result<(), OrchestrationError> {

let (tx, rx) = channel::unbounded::<bool>();

dozer.init()?;
if let Err(e) = dozer.init(false) {
if let OrchestrationError::InitializationFailed(_) = e {
warn!(
"{} is already present. Skipping initialisation..",
dozer.config.home_dir.to_owned()
)
} else {
return Err(e);
}
}

let pipeline_thread = thread::spawn(move || {
if let Err(e) = dozer.borrow_mut().run_apps(running, Some(tx)) {
Expand Down
2 changes: 1 addition & 1 deletion dozer-orchestrator/src/pipeline/source_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ mod tests {
name: "prices_history".to_string(),
table_name: "prices_history".to_string(),
columns: vec!["id".to_string()],
connection: Some(events2_conn.clone()),
connection: Some(events2_conn),
refresh_config: None,
app_id: None,
},
Expand Down
24 changes: 16 additions & 8 deletions dozer-orchestrator/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,24 @@ impl Orchestrator for SimpleOrchestrator {
))
}

fn init(&mut self) -> Result<(), OrchestrationError> {
fn init(&mut self, force: bool) -> Result<(), OrchestrationError> {
self.write_internal_config()
.map_err(|e| InternalError(Box::new(e)))?;
let pipeline_home_dir = get_pipeline_dir(self.config.to_owned());
let api_dir = get_api_dir(self.config.to_owned());
let cache_dir = get_cache_dir(self.config.to_owned());

if api_dir.exists() || pipeline_home_dir.exists() || cache_dir.exists() {
if force {
self.clean()?;
} else {
return Err(OrchestrationError::InitializationFailed(
self.config.home_dir.to_string(),
));
}
}
// Ingestion channel
let (ingestor, iterator) = Ingestor::initialize_channel(IngestionConfig::default());
let cache_dir = get_cache_dir(self.config.to_owned());

let cache_endpoints: Vec<CacheEndpoint> = self.get_cache_endpoints(cache_dir)?;

Expand Down Expand Up @@ -254,13 +263,12 @@ impl Orchestrator for SimpleOrchestrator {
Ok(())
}

// Cleaning the entire folder as there will be inconsistencies
// between pipeline, cache and generated proto files.
fn clean(&mut self) -> Result<(), OrchestrationError> {
self.write_internal_config()
.map_err(|e| InternalError(Box::new(e)))?;
let api_dir = get_api_dir(self.config.to_owned());
let generated_path = api_dir.join("generated");
if generated_path.exists() {
fs::remove_dir_all(&generated_path).map_err(|e| InternalError(Box::new(e)))?;
let home_dir = PathBuf::from(self.config.home_dir.clone());
if home_dir.exists() {
fs::remove_dir_all(&home_dir).map_err(|e| InternalError(Box::new(e)))?;
};
Ok(())
}
Expand Down
9 changes: 1 addition & 8 deletions dozer-orchestrator/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,7 @@ pub fn get_cache_dir(config: Config) -> PathBuf {
}

pub fn get_api_dir(config: Config) -> PathBuf {
PathBuf::from(
config
.api
.unwrap_or_default()
.api_internal
.unwrap_or_default()
.home_dir,
)
PathBuf::from(format!("{:}/api", config.home_dir))
}
pub fn get_grpc_config(config: Config) -> ApiGrpc {
config.api.unwrap_or_default().grpc.unwrap_or_default()
Expand Down

0 comments on commit f86ada0

Please sign in to comment.