Skip to content

Commit

Permalink
Adding quantitative comparisons to CompareAdam
Browse files Browse the repository at this point in the history
This is a major update and extension to the "comparison" framework in CompareAdam.

First, an extension to CompareAdam has been created, that allows for interchangeable "generators" of
pairwise read-comparison metrics.  We've added our own implementations of a few basic metrics, found
in DefaultComparisons, for things like duplicate flags, position of alignment, base qualities, and map
qualities.  These metrics are then aggregated using an aggregator (of which the HistogramAggregator
is the prominent example at the moment).

CompareAdam now runs a set of these metrics over the reads from a given pair of files, and writes output
suitable for graphical report generation using the plots.R script.

The code for this traversal-and-metric-generation has been oved to a class in adam-core, called
ComparisonTraversalEngine.

Finally, a new CLI tool has been added, "find_reads", supported by the class FindReads.  The find_reads
command is written to facilitate diagnostics on ADAM files which differ in one or more metrics.  It takes a
pair of ADAM files, the names of one or more generators, and filter expressions on each of those generators,
and produces the set of read names from the pair of files whose generated metric values fall within the
corresponding filters.
  • Loading branch information
tdanford authored and massie committed Feb 10, 2014
1 parent 8753063 commit bf60cb9
Show file tree
Hide file tree
Showing 21 changed files with 1,946 additions and 188 deletions.
9 changes: 8 additions & 1 deletion CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@ ADAM Changelog
Trunk (not yet released)

NEW FEATURES
* Added ability to load and merge multiple ADAM files into a single RDD.

* Added ability to load and merge multiple ADAM files into a single RDD.

* Pairwise, quantitative ADAM file comparisons: the CompareAdam command has been extended to calculate
metrics on pairs of ADAM files which contain the same reads processed in two different ways (e.g.
two different implementations of a pre-processing pipeline). This can be used to compare different
pipelines based on their read-by-read concordance across a number of fields: position, alignment,
mapping and base quality scores, and can be extended to support new metrics or aggregations.

OPTIMIZATIONS

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ object AdamMain extends Logging {
ComputeVariants,
Bam2Adam,
Adam2Vcf,
Vcf2Adam)
Vcf2Adam,
FindReads)

private def printCommands() {
println("\n")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2013 Genome Bridge LLC
/*
* Copyright 2013, 2014 Genome Bridge LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,49 +15,234 @@
*/
package edu.berkeley.cs.amplab.adam.cli

import edu.berkeley.cs.amplab.adam.util._
import edu.berkeley.cs.amplab.adam.projections.ADAMRecordField._
import edu.berkeley.cs.amplab.adam.projections.FieldValue
import edu.berkeley.cs.amplab.adam.rdd.AdamContext._
import org.kohsuke.args4j.Argument
import org.apache.spark.SparkContext
import edu.berkeley.cs.amplab.adam.util._
import org.apache.hadoop.fs.{Path, FileSystem}

import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.kohsuke.args4j.{Option => Args4jOption, Argument}

import scala.collection.Seq
import java.util.logging.Level
import edu.berkeley.cs.amplab.adam.rdd.compare.{CompareAdam => CoreCompareAdam}
import java.io.{PrintWriter, OutputStreamWriter}
import edu.berkeley.cs.amplab.adam.rdd.comparisons.ComparisonTraversalEngine
import edu.berkeley.cs.amplab.adam.metrics.{DefaultComparisons, CombinedComparisons, Collection, BucketComparisons}
import edu.berkeley.cs.amplab.adam.metrics.aggregators.{HistogramAggregator, CombinedAggregator, AggregatedCollection, Writable}

/**
* CompareAdam is a tool for pairwise comparison of ADAM files (or merged sets of ADAM files, see the
* note on the -recurse{1,2} optional parameters, below).
*
* The canonical use-case for CompareAdam involves a single input file run through (for example) two
* different implementations of the same pipeline, producing two comparable ADAM files at the end.
*
* CompareAdam will load these ADAM files and perform a read-name-based equi-join. It then computes
* one or more metrics (embodied as BucketComparisons values) across the joined records, as specified
* on the command-line, and aggregates each metric into a histogram (although, this can be modified if
* other aggregations are required in the future) and outputs the resulting histograms to a specified
* directory as text files.
*
* There is an R script in the adam-scripts module to process those outputs into a figure.
*
* The available metrics to be calculated are defined, by name, in the DefaultComparisons object.
*
* A subsequent tool like FindReads can be used to track down which reads give rise to particular aggregated
* bins in the output histograms, if further diagnosis is needed.
*/
object CompareAdam extends AdamCommandCompanion with Serializable {

val commandName: String = "compare"
val commandDescription: String = "Compare two ADAM files based on read name"

def apply(cmdLine: Array[String]): AdamCommand = {
new CompareAdam(Args4j[CompareAdamArgs](cmdLine))
}
}

type GeneratedResults[A] = RDD[(CharSequence, Seq[A])]

/**
* @see CompareAdamArgs.recurse1, CompareAdamArgs.recurse2
*/
def setupTraversalEngine(sc: SparkContext,
input1Path: String,
recurse1: String,
input2Path: String,
recurse2: String,
generator: BucketComparisons[Any]): ComparisonTraversalEngine = {

val schemas = Seq[FieldValue](
recordGroupId,
readName,
readMapped,
primaryAlignment,
readPaired,
firstOfPair) ++ generator.schemas

new ComparisonTraversalEngine(schemas, sc.findFiles(new Path(input1Path), recurse1), sc.findFiles(new Path(input2Path), recurse2))(sc)
}

def parseGenerators(nameList : String) : Seq[BucketComparisons[Any]] = {
nameList match {
case null => DefaultComparisons.comparisons
case s => parseGenerators(s.split(","))
}
}

def parseGenerators(names : Seq[String]) : Seq[BucketComparisons[Any]] =
names.map(DefaultComparisons.findComparison)
}

class CompareAdamArgs extends Args4jBase with SparkArgs with ParquetArgs with Serializable {

@Argument(required = true, metaVar = "INPUT1", usage = "The first ADAM file to compare", index = 0)
val input1Path: String = null

@Argument(required = true, metaVar = "INPUT2", usage = "The second ADAM file to compare", index = 1)
val input2Path: String = null

@Args4jOption(required = false, name = "-recurse1", metaVar="REGEX",
usage = "Optional regex; if specified, INPUT1 is recursively searched for directories matching this " +
"pattern, whose contents are loaded and merged prior to running the comparison")
val recurse1: String = null

@Args4jOption(required = false, name = "-recurse2", metaVar="REGEX",
usage = "Optional regex; if specified, INPUT2 is recursively searched for directories matching this " +
"pattern, whose contents are loaded and merged prior to running the comparison")
val recurse2: String = null

@Args4jOption(required = false, name = "-comparisons", usage = "Comma-separated list of comparisons to run")
val comparisons: String = null

@Args4jOption(required = false, name = "-list_comparisons",
usage = "If specified, lists all the comparisons that are available")
val listComparisons: Boolean = false

@Args4jOption(required = false, name = "-output", metaVar = "DIRECTORY",
usage = "Directory to generate the comparison output files (default: output to STDOUT)")
val directory: String = null
}

class CompareAdam(protected val args: CompareAdamArgs) extends AdamSparkCommand[CompareAdamArgs] with Serializable {

val companion: AdamCommandCompanion = CompareAdam

/**
* prints out a high-level summary of the compared files, including
*
* - the number of reads in each file
*
* - the number of reads which were unique to each file (i.e. not found in the other file, by name)
*
* - for each generator / aggregation created, prints out the total number of reads that went into the
* aggregation as well as the total which represent "identity" between the compared files (right now,
* this second number really only makes sense for the Histogram aggregated value, but that's the only
* aggregator we're using anyway.)
*
* @param engine The ComparisonTraversalEngine used for the aggregated values
* @param writer The PrintWriter to print the summary with.
*/
def printSummary(engine : ComparisonTraversalEngine,
generators : Seq[BucketComparisons[Any]],
aggregateds : Seq[Histogram[Any]],
writer : PrintWriter) {

writer.println("%15s: %s".format("INPUT1", args.input1Path))
writer.println("\t%15s: %d".format("total-reads", engine.named1.count()))
writer.println("\t%15s: %d".format("unique-reads", engine.uniqueToNamed1()))

writer.println("%15s: %s".format("INPUT2", args.input2Path))
writer.println("\t%15s: %d".format("total-reads", engine.named2.count()))
writer.println("\t%15s: %d".format("unique-reads", engine.uniqueToNamed2()))

for( ( generator, aggregated ) <- generators.zip(aggregateds) ) {
writer.println()
writer.println(generator.name)

val count = aggregated.count()
val countIdentity = aggregated.countIdentical()
val diffFrac = (count - countIdentity).toDouble / count.toDouble

writer.println("\t%15s: %d".format("count", count))
writer.println("\t%15s: %d".format("identity", countIdentity))
writer.println("\t%15s: %.5f".format("diff%", 100.0 * diffFrac))
}
}

def run(sc: SparkContext, job: Job): Unit = {

ParquetLogger.hadoopLoggerLevel(Level.SEVERE)

val (comp1, comp2) =
sc.adamCompareFiles(args.input1Path, args.input2Path, CoreCompareAdam.readLocationsMatchPredicate)
if(args.listComparisons) {
println("\nAvailable comparisons:")
DefaultComparisons.comparisons.foreach {
generator =>
println("\t%10s : %s".format(generator.name, generator.description))
}

return
}

val generators: Seq[BucketComparisons[Any]] = CompareAdam.parseGenerators(args.comparisons)
val aggregators = (0 until generators.size).map( i => new HistogramAggregator[Any]() )

val generator = new CombinedComparisons(generators)
val aggregator = new CombinedAggregator(aggregators)

// Separately constructing the traversal engine, since we'll need it to generate the aggregated values
// and the summary statistics separately.
val engine = CompareAdam.setupTraversalEngine(sc,
args.input1Path,
args.recurse1,
args.input2Path,
args.recurse2,
generator)

// generate the raw values...
val generated : CompareAdam.GeneratedResults[Collection[Seq[Any]]] = engine.generate(generator)

// ... and aggregate them.
val values : AggregatedCollection[Any,Histogram[Any]] = ComparisonTraversalEngine.combine(generated, aggregator)

val aggValues = values.asInstanceOf[AggregatedCollection[Any, Histogram[Any]]].values

if (args.directory != null) {
val fs = FileSystem.get(sc.hadoopConfiguration)
val nameOutput = new OutputStreamWriter(fs.create(new Path(args.directory, "files")))
nameOutput.write(args.input1Path)
nameOutput.write("\n")
nameOutput.write(args.input2Path)
nameOutput.write("\n")
nameOutput.close()

val summaryOutput = new PrintWriter(new OutputStreamWriter(fs.create(new Path(args.directory, "summary.txt"))))
printSummary(engine, generators, aggValues, summaryOutput)
summaryOutput.close()

generators.zip(aggValues).foreach {
case (gen, value) => {
val output = new OutputStreamWriter(fs.create(new Path(args.directory, gen.name)))
value.asInstanceOf[Writable].write(output)

output.close()
}
}
} else {

// Just send everything to STDOUT if the directory isn't specified.
val writer = new PrintWriter(System.out)

assert(comp1.matching == comp2.matching, "Matching numbers should be identical for pairwise comparison")
printSummary(engine, generators, aggValues, writer)

println("# Reads in INPUT1: %d".format(comp1.total))
println("# Reads in INPUT2: %d".format(comp2.total))
println("# Reads Unique to INPUT1: %d".format(comp1.unique))
println("# Reads Unique to INPUT2: %d".format(comp2.unique))
println("# Matching Reads: %d".format(comp1.matching))
generators.zip(aggValues).foreach {
case (gen, value) =>
writer.println(gen.name)
writer.println(value)
}
}
}

}
Expand Down
Loading

0 comments on commit bf60cb9

Please sign in to comment.