Skip to content

Commit

Permalink
Merge pull request #846 from mgcam/cache_no_merge
Browse files Browse the repository at this point in the history
Cache no merge
  • Loading branch information
srl147 authored Jun 12, 2024
2 parents 9351315 + 593d819 commit eef2b72
Show file tree
Hide file tree
Showing 8 changed files with 433 additions and 57 deletions.
41 changes: 33 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# NPG Pipelines for Processing Illumina Sequencing Data

This software provides the Sanger NPG team's automation for analysing and
internally archiving Illumina sequencing on behalf of DNA Pipelines for their
customers.
internally archiving Illumina sequencing data on behalf of DNA Pipelines for
their customers.

There are two main pipelines:

Expand All @@ -18,16 +18,16 @@ sequencing flowcell, or each tagged library (within a pool on the flowcell).
## Batch Processing and Dependency Tracking with LSF or wr

With this system, all of a pipeline's jobs for its steps are submitted for
execution to the LSF, or wr, batch/job processing system as the pipeline is
execution to `LSF` or `wr` batch/job processing system as the pipeline is
initialised. As such, a _submitted_ pipeline does not have an orchestration
script or daemon running: managing the runtime dependencies of jobs within an
instance of a pipeline is delegated to the batch/job processing system.

How is this done? The job representing the start point of a graph is submitted
to LSF, or wr, in a suspended state and is resumed once all other jobs have been
to `LSF` or `wr` in a suspended state and is resumed once all other jobs have been
submitted thus ensuring that the execution starts only if all steps are
successfully submitted to LSF, or wr. If an error occurs at any point during job
submissions, all submitted jobs, apart from the start job, are killed.
successfully submitted. If an error occurs at any point during job submissions,
all submitted jobs, apart from the start job, are killed.

## Pipeline Creation

Expand Down Expand Up @@ -84,8 +84,8 @@ The input for an instance of the pipeline is the instrument output run folder
(BCL and associated files) and LIMS information which drives appropriate
processing.

The key data products are aligned CRAM files and indexes, or unaligned CRAM
files. However per study (a LIMS datum) pipeline configuration allows for the
The key data products are aligned or unaligned CRAM files and indexes.
However per study (a LIMS datum) pipeline configuration allows for the
creation of GATK gVCF files, or the running for external tool/pipeline e.g.
ncov2012-artic-nf

Expand Down Expand Up @@ -135,3 +135,28 @@ flow DAGs.

Also, the [npg_irods](https://github.com/wtsi-npg/npg_irods) system is essential
for the internal archival of data products.

## Data Merging across Lanes of a Flowcell

If the same library is sequenced in different lanes of a flowcell, under certain
conditions the pipeline will automatically merge all data for a library into
a single end product. Spiked-in PhiX libraries data and unassigned to any tags
data (tag zero) are not merged. The following scenarios trigger the merge:

* NovaSeq Standard flowcell - a merge across all two or four lanes is performed.

* Any flowcell run on a NovaSeqX instrument - if multiple lanes belong to the
same pool, the data from individual libraries will be merged across those
lanes. Thus the output of a NovaSeqX run might contain a mixture of merged and
unmerged products.

If the data quality in a lane is poor, the lane should be excluded from the merge.
The `--process_separately_lanes` pipeline option is used to list lanes like this.
Usually this option is used when running the analysis pipeline. The pipeline caches
the supplied lane numbers so that the archival pipeline can generate a consistent
with the analysis pipeline list of data products. The same relates to the
`npg_run_is_deletable` script. The cached value is retrieved only if the
`--process_separately_lanes` argument was not set when any of these scripts are
invoked.


67 changes: 62 additions & 5 deletions lib/npg_pipeline/base.pm
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use POSIX qw(strftime);
use Math::Random::Secure qw{irand};
use List::MoreUtils qw{any uniq};
use File::Basename;
use JSON;
use Perl6::Slurp;
use Readonly;
use Try::Tiny;

use npg_tracking::glossary::rpt;
use npg_tracking::glossary::composition::factory::rpt_list;
Expand All @@ -23,6 +26,7 @@ with qw{
WTSI::DNAP::Utilities::Loggable
npg_tracking::util::pipeline_config
npg_pipeline::base::options
npg_pipeline::runfolder_scaffold
};

Readonly::Array my @NO_SCRIPT_ARG_ATTRS => qw/
Expand Down Expand Up @@ -234,22 +238,32 @@ sub _build_merge_by_library {

=head2 process_separately_lanes
An array of lane (position) numbers, which should not be merged with anyother
An array of lane (position) numbers, which should not be merged with any other
lanes. To be used in conjunction with C<merge_lanes> or C<merge_by_library>
attributes. Does not have any impact if both of these attributes are false.
attributes. A consistency check is triggered when the value is set in order
to prevent this setting to be cached if no merge is intended.
Defaults to an empty array value, meaning that all possible entities will be
merged.
merged.
=cut

has q{process_separately_lanes} => (
isa => q{ArrayRef},
is => q{ro},
default => sub { return []; },
trigger => \&_validate_process_separately_lanes,
documentation => q{Array of lane numbers, which have to be excluded from } .
q{a merge},
);
sub _validate_process_separately_lanes {
my ($self, $new_value) = @_;
if (!$self->merge_lanes && !$self->merge_by_library && (@{$new_value} != 0)) {
$self->logcroak('One of merge options should be enabled if ' .
'process_separately_lanes is set');
}
return;
}

=head2 lims
Expand Down Expand Up @@ -348,6 +362,21 @@ zero products, hashed under the 'data_products' key.
If product_rpt_list attribute is set, the 'lanes' key maps to an empty
array.
While computing the lists of data products, we examine whether data in any
of the lanes can be merged across lanes. Some of the lanes might be explicitly
excluded from the merge by setting the `process_separately_lanes` attribute
from the command line. This is likely to be done when the analysis pipeline
is run manually. Then the same lanes have to be excluded from the merge by
the archival pipeline and by the script that evaluates whether the run folder
can be deleted. To enable this, the value of the `process_separately_lanes`
attribute is saved to the metadate_cache_<ID_RUN> directory immediately after
the pipeline establishes the location of the samplesheet file or generates a
new samplesheet.
This method looks at the `process_separately_lanes` attribute first. If the
`process_separately_lanes` array is empty, an attempt to retrieve the cached
value is made.
=cut

has q{products} => (
Expand All @@ -373,9 +402,14 @@ sub _build_products {
}

if ($self->merge_lanes || $self->merge_by_library) {
my $attr_name = 'process_separately_lanes';
my $separate_lanes = $self->$attr_name;
if (@{$separate_lanes} == 0) {
$separate_lanes = $self->_cached_process_separately_lanes($attr_name);
}

my $all_lims = $self->lims->aggregate_libraries(
\@lane_lims, $self->process_separately_lanes);
\@lane_lims, $separate_lanes);
@data_lims = @{$all_lims->{'singles'}}; # Might be empty.

# merge_lanes option implies a merge across all lanes.
Expand Down Expand Up @@ -483,6 +517,27 @@ sub _check_lane_merge_is_viable {
return 1;
}

sub _cached_process_separately_lanes {
my ($self, $key) = @_;
$key or $self->logcroak('Key should be defined');

my $path = $self->analysis_options_file_path();
if (-f $path) {
my $options;
try {
$options = decode_json(slurp($path));
} catch {
$self->logcroak("Error reading or parsing ${path} : $_");
};
if ($options->{$key}) {
$self->info("Found $key analysis option in $path: " .
join q[, ], @{$options->{$key}});
return $options->{$key};
}
}
return [];
}

__PACKAGE__->meta->make_immutable;

1;
Expand Down Expand Up @@ -511,6 +566,8 @@ __END__
=item File::Basename
=item JSON
=item Readonly
=item npg_tracking::glossary::rpt
Expand Down Expand Up @@ -538,7 +595,7 @@ Marina Gourtovaia
=head1 LICENSE AND COPYRIGHT
Copyright (C) 2014,2015,2016,2017,2018,2019,2020,2023 Genome Research Ltd.
Copyright (C) 2014,2015,2016,2017,2018,2019,2020,2023,2024 Genome Research Ltd.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
Expand Down
5 changes: 2 additions & 3 deletions lib/npg_pipeline/pluggable.pm
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ use npg_pipeline::pluggable::registry;
extends q{npg_pipeline::base};

with qw{ MooseX::AttributeCloner
npg_pipeline::executor::options
npg_pipeline::runfolder_scaffold };
npg_pipeline::executor::options };

our $VERSION = '0';

Expand Down Expand Up @@ -883,7 +882,7 @@ __END__
=head1 LICENSE AND COPYRIGHT
Copyright (C) 2014,2015,2016,2017,2018,2019,2020,2021 Genome Research Ltd.
Copyright (C) 2014,2015,2016,2017,2018,2019,2020,2021,2024 Genome Research Ltd.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
Expand Down
48 changes: 46 additions & 2 deletions lib/npg_pipeline/pluggable/central.pm
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package npg_pipeline::pluggable::central;
use Moose;
use MooseX::StrictConstructor;
use namespace::autoclean;
use JSON;
use File::Slurp qw(read_file write_file);

extends 'npg_pipeline::pluggable';

Expand All @@ -29,13 +31,17 @@ Pipeline runner for the analysis pipeline.
Inherits from parent's method. Sets all paths needed during the lifetime
of the analysis runfolder. Creates any of the paths that do not exist.
Saves lane numbers given by the `process_separately_lanes` option to a
JSON file.
=cut

override 'prepare' => sub {
my $self = shift;

$self->_scaffold('create_top_level');
super(); # Corect order
super(); # Correct order, sets up a samplesheet.
$self->_save_merge_options();
$self->_scaffold('create_product_level');

return;
Expand All @@ -56,6 +62,40 @@ sub _scaffold {
return;
}

sub _save_merge_options {
my $self = shift;

my $attr_name = 'process_separately_lanes';
my @given_lanes = sort {$a <=> $b} @{$self->$attr_name};
if (@given_lanes) {
my $cached_options = {};
my $found = 0;
my $path = $self->analysis_options_file_path();
if (-f $path) {
$cached_options = decode_json(read_file($path));
if ($cached_options->{$attr_name} && @{$cached_options->{$attr_name}}) {
my $sep = q[, ];
my $cached_lanes = join $sep, @{$cached_options->{$attr_name}};
$self->info("Found cached merge options in $path: " .
"lanes $cached_lanes should not be merged.");
if ($cached_lanes ne join $sep, @given_lanes) {
$self->logcroak('Lane list from process_separately_lanes attribute ' .
'is inconsistent with cached value');
}
$found = 1;
}
}

if (!$found) {
$cached_options->{$attr_name} = \@given_lanes;
write_file($path, encode_json($cached_options)) or
$self->logcroak("Failed to write to $path");
}
}

return;
}

__PACKAGE__->meta->make_immutable;

1;
Expand All @@ -76,6 +116,10 @@ __END__
=item namespace::autoclean
=item JSON
=item File::Slurp
=back
=head1 INCOMPATIBILITIES
Expand All @@ -89,7 +133,7 @@ Marina Gourtovaia
=head1 LICENSE AND COPYRIGHT
Copyright (C) 2018 Genome Research Limited
Copyright (C) 2018,2024 Genome Research Ltd.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
Expand Down
13 changes: 12 additions & 1 deletion lib/npg_pipeline/runfolder_scaffold.pm
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Readonly::Scalar my $ANALYSIS_PATH_COMPONENT => q[/analysis/];
Readonly::Scalar my $LOG_DIR_NAME => q[log];
Readonly::Scalar my $STATUS_FILES_DIR_NAME => q[status];
Readonly::Scalar my $METADATA_CACHE_DIR_NAME => q[metadata_cache_];
Readonly::Scalar my $ANALYSIS_OPTIONS_FILE_NAME => q[analysis_options.json];
Readonly::Scalar my $TILEVIZ_INDEX_DIR_NAME => q[tileviz];
Readonly::Scalar my $TILEVIZ_INDEX_FILE_NAME => q[index.html];
Readonly::Scalar my $IRODS_PUBLISHER_RSART_DIR_NAME => q[irods_publisher_restart_files];
Expand Down Expand Up @@ -149,6 +150,11 @@ sub metadata_cache_dir_path {
return catdir($apath, $METADATA_CACHE_DIR_NAME . $self->id_run());
}

sub analysis_options_file_path {
my $self = shift;
return catfile($self->metadata_cache_dir_path, $ANALYSIS_OPTIONS_FILE_NAME);
}

sub irods_publisher_rstart_dir_path {
my $self = shift;
my $apath = $self->analysis_path;
Expand Down Expand Up @@ -303,6 +309,11 @@ is empty. Can be called both as an instance and a class method.
=head2 metadata_cache_dir_path
=head2 analysis_options_file_path
A full path for a JSON file, which captures line numbers given by the
C<process_separately_lanes> pipeline attribute and other analysis options.
=head2 irods_publisher_rstart_dir_path
=head2 irods_locations_dir_path
Expand Down Expand Up @@ -355,7 +366,7 @@ Given a path in analysis directory changes it to outgoing directory.
=head1 LICENSE AND COPYRIGHT
Copyright (C) 2018,2019,2020,2022 Genome Research Ltd.
Copyright (C) 2018,2019,2020,2022,2024 Genome Research Ltd.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
Expand Down
Loading

0 comments on commit eef2b72

Please sign in to comment.