Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New API for tasks #53

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ actix-session = { version = "0.9", features = ["cookie-session"] }
actix-web = "4.6"
age = { version = "0.10", features = ["armor"] }
argon2 = "0.5"
async-recursion = "1.1"
async-stream = "0.3"
async-trait = "0.1"
clap = { version = "4.5", features = ["derive", "env"] }
Expand Down
1 change: 0 additions & 1 deletion typhon-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ edition.workspace = true
typhon-types.workspace = true
age.workspace = true
argon2.workspace = true
async-recursion.workspace = true
async-stream.workspace = true
async-trait.workspace = true
diesel.workspace = true
Expand Down
70 changes: 33 additions & 37 deletions typhon-core/src/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,45 +179,41 @@ impl Action {
) -> Result<(), error::Error> {
use crate::log_event;

let run = {
let self_ = self.clone();
move |sender| async move {
action(
&projects::Project {
refresh_task: None, // FIXME?
project: self_.project.clone(),
},
&self_.action.path,
&self_.action.name,
&Value::from_str(&self_.action.input).unwrap(),
sender,
)
.await
.map_err(|e| e.into())
}
};

let finish = {
let handle = self.handle();
move |res: Option<Result<String, error::Error>>| {
let status = match res {
Some(Err(_)) => {
let _ = finish(None);
TaskStatusKind::Failure
}
Some(Ok(stdout)) => finish(Some(stdout)),
None => {
let _ = finish(None);
TaskStatusKind::Canceled
}
};
(status, Event::ActionFinished(handle))
}
};

log_event(Event::ActionNew(self.handle()));

self.task.run(conn, run, finish)?;
let action_handle = self.handle();
let self_ = self.clone();
self.task.run(conn, |handle, sender| async move {
let res = handle
.spawn(async move {
action(
&projects::Project {
refresh_task: None, // FIXME?
project: self_.project.clone(),
},
&self_.action.path,
&self_.action.name,
&Value::from_str(&self_.action.input).unwrap(),
sender,
)
.await
.map_err(Into::<error::Error>::into)
})
.await;
let status_kind = tokio::task::spawn_blocking(|| match res {
Some(Err(_)) => {
let _ = finish(None);
TaskStatusKind::Failure
}
Some(Ok(stdout)) => finish(Some(stdout)),
None => {
let _ = finish(None);
TaskStatusKind::Canceled
}
})
.await?;
Ok((status_kind, Some(Event::ActionFinished(action_handle))))
})?;

Ok(())
}
Expand Down
41 changes: 17 additions & 24 deletions typhon-core/src/build_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,35 +111,28 @@ impl State {
};
self.join_set.spawn(abort);

let run = {
let drv = drv.clone();
let sender = sender.clone();
move |sender_log| run_build(drv, sender, sender_log)
};
let finish = {
let drv = drv.clone();
let handle = build.handle();
let sender = sender.clone();
|res| {
let status = finish_build(drv, sender, res);
(status, Event::BuildFinished(handle))
}
};
build.task.run(&mut self.conn, run, finish)?;
let build_handle = build.handle();
let sender = sender.clone();
build
.task
.run(&mut self.conn, move |handle, sender_log| async move {
let drv_bis = drv.clone();
let res = handle
.spawn(run_build(drv_bis, sender.clone(), sender_log))
.await;
let _ = sender.send(Msg::Finished(drv, res.clone()));
let status_kind = match res {
Some(Some(())) => TaskStatusKind::Success,
Some(None) => TaskStatusKind::Failure,
None => TaskStatusKind::Canceled,
};
Ok((status_kind, Some(Event::BuildFinished(build_handle))))
})?;

Ok(build.build.id)
}
}

fn finish_build(drv: DrvPath, sender: mpsc::UnboundedSender<Msg>, res: Output) -> TaskStatusKind {
let _ = sender.send(Msg::Finished(drv, res.clone()));
match res {
Some(Some(())) => TaskStatusKind::Success,
Some(None) => TaskStatusKind::Failure,
None => TaskStatusKind::Canceled,
}
}

async fn run_build(
drv: DrvPath,
sender: mpsc::UnboundedSender<Msg>,
Expand Down
9 changes: 9 additions & 0 deletions typhon-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub enum Error {
LoginError,
TaskError(task_manager::Error),
BadWebhookOutput,
TokioError,
}

impl Error {
Expand Down Expand Up @@ -83,6 +84,7 @@ impl std::fmt::Display for Error {
UnexpectedTimeError(e) => write!(f, "Time error: {}", e),
TaskError(e) => write!(f, "Task error: {}", e),
BadWebhookOutput => write!(f, "Bad webhook output"),
TokioError => write!(f, "Tokio error"),
}
}
}
Expand Down Expand Up @@ -117,6 +119,12 @@ impl From<task_manager::Error> for Error {
}
}

impl From<tokio::task::JoinError> for Error {
fn from(_: tokio::task::JoinError) -> Error {
Error::TokioError
}
}

impl Into<typhon_types::responses::ResponseError> for Error {
fn into(self) -> typhon_types::responses::ResponseError {
use {typhon_types::responses::ResponseError::*, Error::*};
Expand All @@ -125,6 +133,7 @@ impl Into<typhon_types::responses::ResponseError> for Error {
| UnexpectedDatabaseError(_)
| UnexpectedTimeError(_)
| TaskError(_)
| TokioError
| Todo => InternalError,
EvaluationNotFound(_)
| JobNotFound(_)
Expand Down
55 changes: 32 additions & 23 deletions typhon-core/src/evaluations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::models;
use crate::nix;
use crate::responses;
use crate::schema;
use crate::task_manager::TaskHandle;
use crate::tasks;
use crate::Conn;
use crate::POOL;
Expand Down Expand Up @@ -94,18 +95,6 @@ impl Evaluation {
self.task.cancel()
}

pub fn finish(self, r: Option<Result<nix::NewJobs, nix::Error>>) -> TaskStatusKind {
let mut conn = POOL.get().unwrap();
match r {
Some(Ok(new_jobs)) => match self.create_new_jobs(&mut conn, new_jobs) {
Ok(()) => TaskStatusKind::Success,
Err(_) => TaskStatusKind::Failure,
},
Some(Err(_)) => TaskStatusKind::Failure,
None => TaskStatusKind::Canceled,
}
}

pub fn get(conn: &mut Conn, handle: &handles::Evaluation) -> Result<Self, Error> {
let (evaluation, project, task) = schema::evaluations::table
.inner_join(schema::projects::table)
Expand Down Expand Up @@ -249,20 +238,40 @@ impl Evaluation {

pub async fn run(
self,
handle: TaskHandle,
sender: mpsc::UnboundedSender<String>,
) -> Result<nix::NewJobs, nix::Error> {
let res = nix::eval_jobs(&self.evaluation.url, self.evaluation.flake).await;
match &res {
Err(e) => {
for line in e.to_string().split("\n") {
// TODO: hide internal error messages?
// TODO: error management
let _ = sender.send(line.to_string());
) -> Result<TaskStatusKind, Error> {
let url = self.evaluation.url.clone();
let flake = self.evaluation.flake;
let res = handle
.spawn(async move {
let res = nix::eval_jobs(&url, flake).await;
match &res {
Err(e) => {
for line in e.to_string().split("\n") {
// TODO: hide internal error messages?
// TODO: error management
let _ = sender.send(line.to_string());
}
}
_ => (),
}
res
})
.await;
let status_kind = tokio::task::spawn_blocking(move || {
let mut conn = POOL.get().unwrap();
match res {
Some(Ok(new_jobs)) => match self.create_new_jobs(&mut conn, new_jobs) {
Ok(()) => TaskStatusKind::Success,
Err(_) => TaskStatusKind::Failure,
},
Some(Err(_)) => TaskStatusKind::Failure,
None => TaskStatusKind::Canceled,
}
_ => (),
}
res
})
.await?;
Ok(status_kind)
}

fn create_new_jobs(&self, conn: &mut Conn, new_jobs: nix::NewJobs) -> Result<(), Error> {
Expand Down
24 changes: 9 additions & 15 deletions typhon-core/src/jobsets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,23 +125,17 @@ impl Jobset {
})
})?;

let run = {
let evaluation = evaluation.clone();
move |sender| evaluation.run(sender)
};

let finish = {
let evaluation = evaluation.clone();
move |r| {
let handle = evaluation.handle();
let status = evaluation.finish(r);
(status, Event::EvaluationFinished(handle))
}
};

log_event(Event::EvaluationNew(evaluation.handle()));

evaluation.task.run(conn, run, finish)?;
let evaluation_ = evaluation.clone();
let evaluation_handle = evaluation.handle();
evaluation.task.run(conn, |handle, sender| async move {
let status_kind = evaluation_.run(handle, sender).await?;
Ok((
status_kind,
Some(Event::EvaluationFinished(evaluation_handle)),
))
})?;

gcroots::update(conn);

Expand Down
Loading