Skip to content

Commit

Permalink
Several pending code quality control changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
DFINITYManu committed Jul 29, 2024
1 parent e732408 commit d1d7d64
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 121 deletions.
7 changes: 7 additions & 0 deletions rollout-dashboard/server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rollout-dashboard/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ axum-server = "0.6.0"
chrono = { version = "0.4.38", features = ["serde"] }
env_logger = "0.11.3"
futures = "0.3.30"
lazy_static = "1.5.0"
log = "0.4.22"
regex = "1.10.5"
reqwest = { version = "0.12.5", features = ["json", "cookies"] }
Expand Down
137 changes: 111 additions & 26 deletions rollout-dashboard/server/src/airflow_client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
// FIXME remove all use of unwrap().
// FIXME tolerate other types of error not just AirflowError.
// FIXME make AirflowError more explanatory, not just ::Other()

use async_recursion::async_recursion;
use chrono::{DateTime, TimeDelta, Utc};
use log::{debug, trace, warn};
Expand All @@ -13,10 +9,12 @@ use serde::{Deserialize, Deserializer};
use std::cmp::min;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::f64;
use std::fmt::Display;
use std::future::Future;
use std::string::FromUtf8Error;
use std::sync::Arc;
use std::time::Duration;
use std::{f64, fmt};
use std::{vec, vec::Vec};
use urlencoding::decode;
/// Default maximum batch size for paged requests in Airflow.
Expand Down Expand Up @@ -50,7 +48,9 @@ pub struct DagRunsResponseItem {
pub dag_run_id: String,
pub dag_id: String,
pub logical_date: DateTime<Utc>,
#[allow(dead_code)]
pub start_date: Option<DateTime<Utc>>,
#[allow(dead_code)]
pub end_date: Option<DateTime<Utc>>,
pub last_scheduling_decision: Option<DateTime<Utc>>,
pub state: DagRunState,
Expand Down Expand Up @@ -95,12 +95,18 @@ impl Pageable for DagRunsResponse {

#[derive(Debug, Deserialize)]
pub struct XComEntryResponse {
#[allow(dead_code)]
pub key: String,
#[allow(dead_code)]
pub timestamp: DateTime<Utc>,
#[allow(dead_code)]
pub execution_date: DateTime<Utc>,
#[serde(deserialize_with = "negative_is_none")]
#[allow(dead_code)]
pub map_index: Option<usize>,
#[allow(dead_code)]
pub task_id: String,
#[allow(dead_code)]
pub dag_id: String,
pub value: String,
}
Expand Down Expand Up @@ -158,20 +164,29 @@ where
#[derive(Debug, Deserialize, Clone)]
pub struct TaskInstancesResponseItem {
pub task_id: String,
#[allow(dead_code)]
pub task_display_name: String,
pub dag_id: String,
pub dag_run_id: String,
#[allow(dead_code)]
pub execution_date: DateTime<Utc>,
#[allow(dead_code)]
pub start_date: Option<DateTime<Utc>>,
pub end_date: Option<DateTime<Utc>>,
#[allow(dead_code)]
pub duration: Option<f64>,
pub state: Option<TaskInstanceState>,
#[allow(dead_code)]
pub try_number: usize,
#[serde(deserialize_with = "negative_is_none")]
pub map_index: Option<usize>,
#[allow(dead_code)]
pub max_tries: usize,
#[allow(dead_code)]
pub operator: Option<String>,
#[allow(dead_code)]
pub rendered_map_index: Option<String>,
#[allow(dead_code)]
pub note: Option<String>,
}

Expand Down Expand Up @@ -215,26 +230,57 @@ impl Pageable for TaskInstancesResponse {
}
}

#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "snake_case")]
pub enum TriggerRule {
AllSuccess,
AllFailed,
AllDone,
AllDoneSetupSuccess,
OneSuccess,
OneFailed,
OneDone,
NoneFailed,
NoneSkipped,
NoneFailedOrSkipped,
NoneFailedMinOneSuccess,
Dummy,
AllSkipped,
Always,
}

#[derive(Debug, Deserialize, Clone)]
pub struct TasksResponseItem {
pub task_id: String,
#[allow(dead_code)]
pub task_display_name: String,
#[allow(dead_code)]
pub owner: String,
#[allow(dead_code)]
pub start_date: Option<DateTime<Utc>>,
#[allow(dead_code)]
pub end_date: Option<DateTime<Utc>>,
// FIXME: make this into a TriggerRule enum.
// https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/get_tasks
pub trigger_rule: String,
#[allow(dead_code)]
pub trigger_rule: TriggerRule,
#[allow(dead_code)]
pub is_mapped: bool,
#[allow(dead_code)]
pub wait_for_downstream: bool,
#[allow(dead_code)]
pub retries: f64,
#[serde(deserialize_with = "airflow_timedelta")]
#[allow(dead_code)]
pub execution_timeout: Option<TimeDelta>,
#[serde(deserialize_with = "airflow_timedelta")]
#[allow(dead_code)]
pub retry_delay: Option<TimeDelta>,
#[allow(dead_code)]
pub retry_exponential_backoff: bool,
#[allow(dead_code)]
pub ui_color: String,
#[allow(dead_code)]
pub ui_fgcolor: String,
#[allow(dead_code)]
pub template_fields: Vec<String>,
pub downstream_task_ids: Vec<String>,
}
Expand Down Expand Up @@ -322,13 +368,16 @@ where
}
};
// Then the order by.
if order_by.is_some() {
suburl = suburl.clone()
+ match suburl.find('?') {
Some(_) => "&",
None => "?",
}
+ format!("{}={}", "order_by", order_by.clone().unwrap()).as_str()
match &order_by {
None => (),
Some(order_by) => {
suburl = suburl.clone()
+ match suburl.find('?') {
Some(_) => "&",
None => "?",
}
+ format!("{}={}", "order_by", order_by.as_str()).as_str();
}
}

debug!(target: "paged_get",
Expand Down Expand Up @@ -382,35 +431,68 @@ where
Ok(results)
}

#[derive(Debug)]
pub enum AirflowClientCreationError {
Reqwest(reqwest::Error),
Utf8(FromUtf8Error),
}

impl From<reqwest::Error> for AirflowClientCreationError {
fn from(err: reqwest::Error) -> Self {
Self::Reqwest(err)
}
}

impl From<FromUtf8Error> for AirflowClientCreationError {
fn from(err: FromUtf8Error) -> Self {
Self::Utf8(err)
}
}

impl Display for AirflowClientCreationError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Cannot create Airflow client: {}",
match self {
Self::Reqwest(e) => {
format!("{}", e)
}
Self::Utf8(e) => {
format!("{}", e)
}
}
)
}
}

pub struct AirflowClient {
url: reqwest::Url,
username: String,
password: String,
client: Arc<reqwest::Client>,
}

impl AirflowClient {
pub fn new(airflow_url: reqwest::Url) -> Self {
pub fn new(airflow_url: reqwest::Url) -> Result<Self, AirflowClientCreationError> {
let jar = Jar::default();
let arcjar = Arc::new(jar);
let c = reqwest::Client::builder()
.timeout(Duration::from_secs(PER_REQUEST_TIMEOUT))
.cookie_provider(arcjar.clone())
.build()
.unwrap();
let username = decode(airflow_url.username()).unwrap().into_owned();
let password = decode(airflow_url.password().unwrap_or(""))
.unwrap()
.into_owned();
.build()?;
let username = decode(airflow_url.username())?.into_owned();
let password = decode(airflow_url.password().unwrap_or(""))?.into_owned();
let mut censored_url = airflow_url.clone();
censored_url.set_username("").unwrap();
censored_url.set_password(None).unwrap();
let _ = censored_url.set_username("");
let _ = censored_url.set_password(None);

Self {
Ok(Self {
client: Arc::new(c),
url: censored_url,
username,
password,
}
})
}

async fn _get_logged_in(&self, suburl: String) -> Result<serde_json::Value, AirflowError> {
Expand All @@ -424,6 +506,7 @@ impl AirflowClient {
suburl: String,
attempt_login: bool,
) -> Result<serde_json::Value, AirflowError> {
// Next one cannot fail because self.url has already succeeded.
let url = self.url.join(suburl.as_str()).unwrap();

let c = self.client.clone();
Expand Down Expand Up @@ -593,6 +676,8 @@ impl AirflowClient {
}

/// Return mapped tasks of a task instance in a DAG run.
/// API function, will likely call in the future.
#[allow(dead_code)]
pub async fn mapped_task_instances(
&self,
dag_id: &str,
Expand Down
Loading

0 comments on commit d1d7d64

Please sign in to comment.