Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rsc: Track hidden output dirs #1651

Merged
merged 1 commit into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/entity/src/output_dir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub struct Model {
pub mode: i32,
pub job_id: Uuid,
pub created_at: DateTime,
pub hidden: bool,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down
2 changes: 2 additions & 0 deletions rust/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ mod m20240731_152842_create_job_size_proc;
mod m20240731_201632_create_job_blob_timestamp_index;
mod m20240805_163520_create_blob_id_fk_indexes;
mod m20240819_193352_add_output_indexes;
mod m20240919_214610_add_hidden_to_output_dir;

pub struct Migrator;

Expand All @@ -35,6 +36,7 @@ impl MigratorTrait for Migrator {
Box::new(m20240731_201632_create_job_blob_timestamp_index::Migration),
Box::new(m20240805_163520_create_blob_id_fk_indexes::Migration),
Box::new(m20240819_193352_add_output_indexes::Migration),
Box::new(m20240919_214610_add_hidden_to_output_dir::Migration),
]
}
}
36 changes: 36 additions & 0 deletions rust/migration/src/m20240919_214610_add_hidden_to_output_dir.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(OutputDir::Table)
.add_column(ColumnDef::new(OutputDir::Hidden).boolean().not_null().default(false))
.to_owned(),
)
.await
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(OutputDir::Table)
.drop_column(OutputDir::Hidden)
.to_owned(),
)
.await
}
}

#[derive(DeriveIden)]
enum OutputDir {
Table,
Hidden,
}

1 change: 1 addition & 0 deletions rust/rsc/src/bin/rsc/add_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ pub async fn add_job(
created_at: NotSet,
path: Set(dir.path),
mode: Set(dir.mode),
hidden: Set(dir.hidden.unwrap_or(false)),
job_id: Set(job_id),
})
.collect(),
Expand Down
1 change: 1 addition & 0 deletions rust/rsc/src/bin/rsc/read_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ pub async fn read_job(
.map(|m| Dir {
path: m.path,
mode: m.mode,
hidden: Some(m.hidden),
})
.collect();

Expand Down
2 changes: 2 additions & 0 deletions rust/rsc/src/bin/rsc/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ pub struct File {
pub struct Dir {
pub path: String,
pub mode: i32,
// Optional member to allow for soft migration
pub hidden: Option<bool>
}

#[derive(Debug, Deserialize, Serialize)]
Expand Down
14 changes: 12 additions & 2 deletions share/wake/lib/system/remote_cache_api.wake
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ tuple CacheSearchOutputDirectory =
Path: String
# The mode on disk of the directory
Mode: Integer
# If the directory is hidden or published to downstream users
Hidden: Boolean

# A symlink created by a cached job
tuple CacheSearchOutputSymlink =
Expand Down Expand Up @@ -134,6 +136,8 @@ tuple CachePostRequestOutputDirectory =
Path: String
# The mode on disk of the directory
Mode: Integer
# If the directory is hidden or published to downstream users
Hidden: Boolean

# A symlink created by a job
tuple CachePostRequestOutputSymlink =
Expand Down Expand Up @@ -793,10 +797,11 @@ def getCachePostRequestJson (req: CachePostRequest): JValue =
"blob_id" :-> JString blobId,
)

def mkOutputDirJson (CachePostRequestOutputDirectory path mode) =
def mkOutputDirJson (CachePostRequestOutputDirectory path mode hidden) =
JObject (
"path" :-> JString path,
"mode" :-> JInteger mode,
"hidden" :-> JBoolean hidden,
)

def mkOutputSymlinkJson (CachePostRequestOutputSymlink path link) =
Expand Down Expand Up @@ -906,7 +911,12 @@ def mkCacheSearchResponse (json: JValue): Result CacheSearchResponse Error =
failWithError
"rsc: JSON response has incorrect schema. output_directories[x] must have integer key 'mode'"

CacheSearchOutputDirectory path mode
require Pass (JBoolean hidden) = jField v "hidden"
else
failWithError
"rsc: JSON response has incorrect schema. output_directories[x] must have boolean key 'hidden'"

CacheSearchOutputDirectory path mode hidden
| Pass

def mkOutputFile (v: JValue): Result CacheSearchOutputFile Error =
Expand Down
53 changes: 31 additions & 22 deletions share/wake/lib/system/remote_cache_runner.wake
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ export def mkRemoteCacheRunner (rscApi: RemoteCacheApi) (hashFn: RunnerInput =>
def stdoutDownload = rscApiGetStringBlob stdoutBlob
def stderrDownload = rscApiGetStringBlob stderrBlob

def doMakeDirectory (CacheSearchOutputDirectory path mode) =
def doMakeDirectory (CacheSearchOutputDirectory path mode _hidden) =
# wake-format off
def cmd =
"mkdir",
Expand Down Expand Up @@ -188,6 +188,7 @@ export def mkRemoteCacheRunner (rscApi: RemoteCacheApi) (hashFn: RunnerInput =>

def resolvedDirectories =
outputDirs
| filter (!_.getCacheSearchOutputDirectoryHidden)
| map getCacheSearchOutputDirectoryPath

def outputs = resolvedOutputs ++ resolvedDirectories ++ resolvedSymlinks
Expand Down Expand Up @@ -350,30 +351,25 @@ def thirdBy (acceptFn: a => ThirdByGroup): (list: List a) => Triple (one: List a

# Posts a completed job to the remote cache
def postJob (rscApi: RemoteCacheApi) (job: Job) (_wakeroot: String) (hidden: String) (input: RunnerInput) (output: RunnerOutput): Result Unit Error =
require Pass stdout = job.getJobFailedStdoutRaw
require Pass stderr = job.getJobFailedStderrRaw

def allOutputs = output.getRunnerOutputOutputs

def rmapStat path = match (unsafe_stat path)
Fail x -> Fail x
Pass x -> Pass (Pair path x)

require Pass outputsStat =
allOutputs
| map rmapStat
| findFail
| addErrorContext "rsc: Failed to stat files to upload"
def filteredOutputs = output.getRunnerOutputOutputs
def cleanableOutputs = output.getRunnerOutputCleanableOutputs
def hiddenOutputs = subtract scmp cleanableOutputs filteredOutputs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do this subtract after the grouping so one of the lists is smaller?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that helps, we have to stat the full union of filteredOutputs and cleanableOutputs in order to determine which are symlinks and I think that will be the majority of the time spent in the function (fwiw we were always stating the full list so it shouldn't be a time regression)


def statToGroup (Stat t _ _) = match t
PathTypeRegularFile -> ThirdByGroupFirst
PathTypeDirectory -> ThirdByGroupSecond
PathTypeSymlink -> ThirdByGroupThird
_ -> panic "rsc: unsuported filetype: {format t}"

def (Triple regFiles directories symlinks) =
outputsStat
| thirdBy (\x x.getPairSecond.statToGroup)
def makeStatTripleThunk paths =
paths
| map (\p unsafe_stat p |< Pair p)
| findFail
| addErrorContext "rsc: Failed to stat files"
|< thirdBy (_.getPairSecond.statToGroup)

def filteredOutputsStatThunk = makeStatTripleThunk filteredOutputs
def hiddenOutputsStatThunk = makeStatTripleThunk hiddenOutputs

def uploadAndMakeFile (Pair path (Stat _ mode _)) =
def doUpload =
Expand Down Expand Up @@ -405,8 +401,10 @@ def postJob (rscApi: RemoteCacheApi) (job: Job) (_wakeroot: String) (hidden: Str
CachePostRequestOutputSymlink path link
| Pass

def makeDirectory (Pair path (Stat _ mode _)) =
CachePostRequestOutputDirectory path (mode | mode2bits)
def makeDirectory hidden (Pair path (Stat _ mode _)) =
CachePostRequestOutputDirectory path (mode | mode2bits) hidden

require Pass (Triple regFiles directories symlinks) = filteredOutputsStatThunk

def fileUploads =
regFiles
Expand All @@ -416,9 +414,20 @@ def postJob (rscApi: RemoteCacheApi) (job: Job) (_wakeroot: String) (hidden: Str
symlinks
| map makeSymlink

def directoriesUpload =
def publishedDirectoriesUpload =
directories
| map makeDirectory
| map (makeDirectory False)

require Pass (Triple _ hiddenDirs _) = hiddenOutputsStatThunk

def hiddenDirectoriesUpload =
hiddenDirs
| map (makeDirectory True)

def directoriesUpload = publishedDirectoriesUpload ++ hiddenDirectoriesUpload

require Pass stdout = job.getJobFailedStdoutRaw
require Pass stderr = job.getJobFailedStderrRaw

def stdoutUpload =
rscApi
Expand Down
Loading