Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use sampling for unexpected sequence reports #30

Merged
merged 15 commits into from
Feb 6, 2024
21 changes: 16 additions & 5 deletions src/main/scala/org/broadinstitute/gpp/poolq3/PoolQ.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@ import cats.syntax.all._
import org.broadinstitute.gpp.poolq3.PoolQConfig.synthesizeArgs
import org.broadinstitute.gpp.poolq3.barcode.{BarcodePolicy, Barcodes, barcodeSource}
import org.broadinstitute.gpp.poolq3.parser.{BarcodeSet, CloseableIterable, ReferenceData}
import org.broadinstitute.gpp.poolq3.process.{Consumer, NoOpConsumer, PoolQProcess, ScoringConsumer}
import org.broadinstitute.gpp.poolq3.process.{
Consumer,
NoOpConsumer,
PoolQProcess,
ScoringConsumer,
UnexpectedSequenceTracker
}
import org.broadinstitute.gpp.poolq3.reference.{ExactReference, Reference, referenceFor}
import org.broadinstitute.gpp.poolq3.reports.{
BarcodeCountsWriter,
Expand Down Expand Up @@ -121,7 +127,7 @@ object PoolQ {
val barcodes: CloseableIterable[Barcodes] =
barcodeSource(config.input, rowBarcodePolicy, revRowBarcodePolicyOpt, colBarcodePolicyOrLength, umiInfo.map(_._2))

lazy val unexpectedSequenceCacheDir: Option[Path] =
lazy val unexpectedSequenceCacheDirOpt: Option[Path] =
if (config.skipUnexpectedSequenceReport) None
else {
val ret = config.unexpectedSequenceCacheDir.map(Files.createDirectories(_)).orElse {
Expand All @@ -132,6 +138,9 @@ object PoolQ {
ret
}

lazy val unexpectedSequenceTrackerOpt: Option[UnexpectedSequenceTracker] =
unexpectedSequenceCacheDirOpt.map(new UnexpectedSequenceTracker(_, colReference))

val consumer =
if (config.noopConsumer) new NoOpConsumer
else
Expand All @@ -141,7 +150,7 @@ object PoolQ {
config.countAmbiguous,
config.alwaysCountColumnBarcodes,
umiInfo.map(_._1),
unexpectedSequenceCacheDir,
unexpectedSequenceTrackerOpt,
config.isPairedEnd
)

Expand Down Expand Up @@ -183,16 +192,18 @@ object PoolQ {
)
_ = log.info(s"Writing correlation file ${config.output.correlationFile}")
cfto <- CorrelationFileWriter.write(config.output.correlationFile, normalizedCounts, rowReference, colReference)
usfto <- unexpectedSequenceCacheDir.fold(Try(Option.empty[OutputFileType])) { dir =>
usfto <- unexpectedSequenceCacheDirOpt.fold(Try(Option.empty[OutputFileType])) { dir =>
log.info(s"Writing unexpected sequence report ${config.output.unexpectedSequencesFile}")
val ret =
UnexpectedSequenceWriter
.write(
config.output.unexpectedSequencesFile,
dir,
unexpectedSequenceTrackerOpt.map(_.unexpectedBarcodeCounts).getOrElse(Map.empty),
config.unexpectedSequencesToReport,
colReference,
globalReference
globalReference,
config.unexpectedSequenceSamplePct
)
.as(UnexpectedSequencesFileType.some)
if (config.removeUnexpectedSequenceCache) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ final case class PoolQConfig(
unexpectedSequenceCacheDir: Option[Path] = None,
removeUnexpectedSequenceCache: Boolean = true,
unexpectedSequencesToReport: Int = 100,
unexpectedSequenceSamplePct: Float = 0.02f,
mtomko marked this conversation as resolved.
Show resolved Hide resolved
skipShortReads: Boolean = false,
reportsDialect: ReportsDialect = PoolQ3Dialect,
alwaysCountColumnBarcodes: Boolean = false,
Expand Down Expand Up @@ -287,6 +288,10 @@ object PoolQConfig {
else failure(s"Unexpected sequence threshold must be greater than 0, got: $n")
}

val _ = opt[Double]("unexpected-sequence-sample-pct").valueName("<pct>").action { (f, c) =>
c.copy(unexpectedSequenceSamplePct = f.toFloat)
}

val _ = opt[Path]("unexpected-sequences").valueName("<file>").action { (f, c) =>
c.copy(output = c.output.copy(unexpectedSequencesFile = f))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/
package org.broadinstitute.gpp.poolq3.process

import java.nio.file.Path
import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}

import org.broadinstitute.gpp.poolq3.barcode.{Barcodes, FoundBarcode}
Expand All @@ -21,7 +20,7 @@ final class ScoringConsumer(
countAmbiguous: Boolean,
alwaysCountColumnBarcodes: Boolean,
umiReference: Option[BarcodeSet],
unexpectedReportCache: Option[Path],
unexpectedSequenceTrackerOpt: Option[UnexpectedSequenceTracker],
pairedEndMode: Boolean
) extends Consumer {

Expand All @@ -30,9 +29,6 @@ final class ScoringConsumer(
private[this] val unexpectedSequenceQueue: ArrayBlockingQueue[(Array[Char], Array[Char])] =
new ArrayBlockingQueue(1000)

private[this] val unexpectedSequenceTrackerOpt: Option[UnexpectedSequenceTracker] =
unexpectedReportCache.map(new UnexpectedSequenceTracker(_))

// used to tell the unexpected sequence tracker thread when processing is done
@volatile private[this] var done = false

Expand Down Expand Up @@ -65,11 +61,11 @@ final class ScoringConsumer(
}

override def start(): Unit = {
unexpectedReportCache.foreach(_ => unexpectedSequenceTrackerThread.start())
unexpectedSequenceTrackerOpt.foreach(_ => unexpectedSequenceTrackerThread.start())
}

override def close(): Unit =
unexpectedReportCache.foreach { _ =>
unexpectedSequenceTrackerOpt.foreach { _ =>
done = true
unexpectedSequenceTrackerThread.join()
unexpectedSequenceTrackerOpt.foreach(_.close())
Expand Down Expand Up @@ -103,7 +99,9 @@ final class ScoringConsumer(
// if we are tracking unexpected sequences and we matched the column barcode to the reference data but didn't
// match the row barcode to the reference data, and the row barcode doesn't have an N in it, then queue the
// row barcode for inclusion in the unexpected sequence report
if (unexpectedReportCache.isDefined && colBc.nonEmpty && rowBc.isEmpty && !containsN(parsedRow.barcode)) {
if (
unexpectedSequenceTrackerOpt.isDefined && colBc.nonEmpty && rowBc.isEmpty && !containsN(parsedRow.barcode)
) {
unexpectedSequenceQueue.put((parsedRow.barcode, parsedCol.barcode))
}
}
Expand All @@ -128,7 +126,9 @@ final class ScoringConsumer(
// if we are tracking unexpected sequences and we matched the column barcode to the reference data but didn't
// match the row barcode to the reference data, and the row barcode doesn't have an N in it, then queue the
// row barcode for inclusion in the unexpected sequence report
if (unexpectedReportCache.isDefined && colBc.nonEmpty && rowBc.isEmpty && !containsN(parsedRow.barcode)) {
if (
unexpectedSequenceTrackerOpt.isDefined && colBc.nonEmpty && rowBc.isEmpty && !containsN(parsedRow.barcode)
) {
unexpectedSequenceQueue.put((parsedRow.barcode, parsedCol.barcode))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,36 +8,43 @@ package org.broadinstitute.gpp.poolq3.process
import java.io.{BufferedWriter, Closeable, OutputStreamWriter}
import java.nio.file.{Files, Path}

import scala.collection.mutable
import scala.util.control.NonFatal

import it.unimi.dsi.fastutil.objects.{Object2ObjectMap, Object2ObjectRBTreeMap}
import org.broadinstitute.gpp.poolq3.seq.Bases
import org.broadinstitute.gpp.poolq3.process.UnexpectedSequenceTracker.nameFor
import org.broadinstitute.gpp.poolq3.reference.Reference
import org.log4s.{Logger, getLogger}

final class UnexpectedSequenceTracker(reportDir: Path) extends Closeable {
final class UnexpectedSequenceTracker(cacheDir: Path, colReference: Reference) extends Closeable {

private[this] val log: Logger = getLogger

private[this] val fileExtension: String = ".txt"
private[this] val unexpectedCountsByColBarcode: mutable.Map[String, Int] = mutable.HashMap()

private[this] val outputFileWriters: Object2ObjectMap[String, BufferedWriter] = new Object2ObjectRBTreeMap()
private[this] val log: Logger = getLogger

// prep the directory and create file writers
Files.createDirectories(reportDir)
enumeratePrefixes.foreach(shard => outputFileWriters.put(shard, newWriterFor(shard)))
private[this] val outputFileWriters: Map[String, BufferedWriter] = {
val _ = Files.createDirectories(cacheDir)
colReference.allBarcodes.map(barcode => barcode -> newWriterFor(barcode)).toMap
}

def reportUnexpected(barcodes: (Array[Char], Array[Char])): Unit = {
val (rowBarcode, columnBarcode) = barcodes

val rowBc = new String(rowBarcode)
val colBc = new String(columnBarcode)

val writer = outputFileWriters(rowBc.substring(0, 4))
writer.write(rowBc + "," + colBc + "\n")
val writer = outputFileWriters(colBc)
writer.write(rowBc + "\n")
mtomko marked this conversation as resolved.
Show resolved Hide resolved
val _ = unexpectedCountsByColBarcode.updateWith(colBc) {
case None => Some(1)
case Some(pc) => Some(pc + 1)
}
}

def unexpectedBarcodeCounts: Map[String, Int] = unexpectedCountsByColBarcode.toMap

override def close(): Unit =
outputFileWriters.values().forEach { writer =>
outputFileWriters.values.foreach { writer =>
try {
writer.close()
} catch {
Expand All @@ -46,16 +53,16 @@ final class UnexpectedSequenceTracker(reportDir: Path) extends Closeable {
}

private[this] def newWriterFor(shard: String): BufferedWriter = {
val name = reportDir.resolve(s"unexpected-$shard$fileExtension")
val name = cacheDir.resolve(nameFor(shard))
new BufferedWriter(new OutputStreamWriter(Files.newOutputStream(name)))
}

private[this] def enumeratePrefixes: Iterable[String] =
for {
b1 <- Bases
b2 <- Bases
b3 <- Bases
b4 <- Bases
} yield s"$b1$b2$b3$b4"
}

object UnexpectedSequenceTracker {

private[this] val fileExtension: String = ".txt"

def nameFor(shard: String) = s"unexpected-$shard$fileExtension"

}
Loading