Skip to content

Commit

Permalink
Use sampling for unexpected sequence reports
Browse files Browse the repository at this point in the history
* Rename variable

* Instantiate unexpected sequence tracker in PoolQ

* Rework unexpected sequence tracker and writer

* Wire in updated sequence tracker

* Delete irrelevant test

* Update existing test

* Port test to munit

* Add test for sampling

* Set version to 3.9.0-SNAPSHOT

* Update changelog

* Update actions to latest

* PR feedback

* Extra test for unexpected sequence writer + bugfix

* Add some comments to clarify what is going on

* Double for samplePct
  • Loading branch information
mtomko authored Feb 6, 2024
1 parent e33eebd commit ced2bc7
Show file tree
Hide file tree
Showing 15 changed files with 304 additions and 159 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ jobs:
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
steps:
- uses: actions/checkout@v3.1.0
- uses: actions/checkout@v4
- uses: jrouly/scalafmt-native-action@v3
- name: Set up JDK 8
uses: actions/setup-java@v3.6.0
uses: actions/setup-java@v4
with:
java-version: '8'
distribution: 'temurin'
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ jobs:
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
steps:
- uses: actions/checkout@v3.1.0
- uses: actions/checkout@v4
- run: git config --global user.email "[email protected]"
- run: git config --global user.name "GPP Informatics"
- uses: jrouly/scalafmt-native-action@v3
- name: Set up JDK 8
uses: actions/setup-java@v3.6.0
uses: actions/setup-java@v4
with:
java-version: '8'
distribution: 'temurin'
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 3.9.0
* Use sampling technique for generating unexpected sequence reports

## 3.8.0
* Replace kantan.csv with scala-csv

Expand Down
21 changes: 16 additions & 5 deletions src/main/scala/org/broadinstitute/gpp/poolq3/PoolQ.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@ import cats.syntax.all._
import org.broadinstitute.gpp.poolq3.PoolQConfig.synthesizeArgs
import org.broadinstitute.gpp.poolq3.barcode.{BarcodePolicy, Barcodes, barcodeSource}
import org.broadinstitute.gpp.poolq3.parser.{BarcodeSet, CloseableIterable, ReferenceData}
import org.broadinstitute.gpp.poolq3.process.{Consumer, NoOpConsumer, PoolQProcess, ScoringConsumer}
import org.broadinstitute.gpp.poolq3.process.{
Consumer,
NoOpConsumer,
PoolQProcess,
ScoringConsumer,
UnexpectedSequenceTracker
}
import org.broadinstitute.gpp.poolq3.reference.{ExactReference, Reference, referenceFor}
import org.broadinstitute.gpp.poolq3.reports.{
BarcodeCountsWriter,
Expand Down Expand Up @@ -121,7 +127,7 @@ object PoolQ {
val barcodes: CloseableIterable[Barcodes] =
barcodeSource(config.input, rowBarcodePolicy, revRowBarcodePolicyOpt, colBarcodePolicyOrLength, umiInfo.map(_._2))

lazy val unexpectedSequenceCacheDir: Option[Path] =
lazy val unexpectedSequenceCacheDirOpt: Option[Path] =
if (config.skipUnexpectedSequenceReport) None
else {
val ret = config.unexpectedSequenceCacheDir.map(Files.createDirectories(_)).orElse {
Expand All @@ -132,6 +138,9 @@ object PoolQ {
ret
}

lazy val unexpectedSequenceTrackerOpt: Option[UnexpectedSequenceTracker] =
unexpectedSequenceCacheDirOpt.map(new UnexpectedSequenceTracker(_, colReference))

val consumer =
if (config.noopConsumer) new NoOpConsumer
else
Expand All @@ -141,7 +150,7 @@ object PoolQ {
config.countAmbiguous,
config.alwaysCountColumnBarcodes,
umiInfo.map(_._1),
unexpectedSequenceCacheDir,
unexpectedSequenceTrackerOpt,
config.isPairedEnd
)

Expand Down Expand Up @@ -183,16 +192,18 @@ object PoolQ {
)
_ = log.info(s"Writing correlation file ${config.output.correlationFile}")
cfto <- CorrelationFileWriter.write(config.output.correlationFile, normalizedCounts, rowReference, colReference)
usfto <- unexpectedSequenceCacheDir.fold(Try(Option.empty[OutputFileType])) { dir =>
usfto <- unexpectedSequenceCacheDirOpt.fold(Try(Option.empty[OutputFileType])) { dir =>
log.info(s"Writing unexpected sequence report ${config.output.unexpectedSequencesFile}")
val ret =
UnexpectedSequenceWriter
.write(
config.output.unexpectedSequencesFile,
dir,
unexpectedSequenceTrackerOpt.map(_.unexpectedBarcodeCounts).getOrElse(Map.empty),
config.unexpectedSequencesToReport,
colReference,
globalReference
globalReference,
config.unexpectedSequenceSamplePct
)
.as(UnexpectedSequencesFileType.some)
if (config.removeUnexpectedSequenceCache) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ final case class PoolQConfig(
unexpectedSequenceCacheDir: Option[Path] = None,
removeUnexpectedSequenceCache: Boolean = true,
unexpectedSequencesToReport: Int = 100,
unexpectedSequenceSamplePct: Double = 0.02,
skipShortReads: Boolean = false,
reportsDialect: ReportsDialect = PoolQ3Dialect,
alwaysCountColumnBarcodes: Boolean = false,
Expand Down Expand Up @@ -287,6 +288,10 @@ object PoolQConfig {
else failure(s"Unexpected sequence threshold must be greater than 0, got: $n")
}

val _ = opt[Double]("unexpected-sequence-sample-pct").valueName("<pct>").action { (f, c) =>
c.copy(unexpectedSequenceSamplePct = f)
}

val _ = opt[Path]("unexpected-sequences").valueName("<file>").action { (f, c) =>
c.copy(output = c.output.copy(unexpectedSequencesFile = f))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/
package org.broadinstitute.gpp.poolq3.process

import java.nio.file.Path
import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}

import org.broadinstitute.gpp.poolq3.barcode.{Barcodes, FoundBarcode}
Expand All @@ -21,7 +20,7 @@ final class ScoringConsumer(
countAmbiguous: Boolean,
alwaysCountColumnBarcodes: Boolean,
umiReference: Option[BarcodeSet],
unexpectedReportCache: Option[Path],
unexpectedSequenceTrackerOpt: Option[UnexpectedSequenceTracker],
pairedEndMode: Boolean
) extends Consumer {

Expand All @@ -30,9 +29,6 @@ final class ScoringConsumer(
private[this] val unexpectedSequenceQueue: ArrayBlockingQueue[(Array[Char], Array[Char])] =
new ArrayBlockingQueue(1000)

private[this] val unexpectedSequenceTrackerOpt: Option[UnexpectedSequenceTracker] =
unexpectedReportCache.map(new UnexpectedSequenceTracker(_))

// used to tell the unexpected sequence tracker thread when processing is done
@volatile private[this] var done = false

Expand Down Expand Up @@ -65,11 +61,11 @@ final class ScoringConsumer(
}

override def start(): Unit = {
unexpectedReportCache.foreach(_ => unexpectedSequenceTrackerThread.start())
unexpectedSequenceTrackerOpt.foreach(_ => unexpectedSequenceTrackerThread.start())
}

override def close(): Unit =
unexpectedReportCache.foreach { _ =>
unexpectedSequenceTrackerOpt.foreach { _ =>
done = true
unexpectedSequenceTrackerThread.join()
unexpectedSequenceTrackerOpt.foreach(_.close())
Expand Down Expand Up @@ -103,7 +99,9 @@ final class ScoringConsumer(
// if we are tracking unexpected sequences and we matched the column barcode to the reference data but didn't
// match the row barcode to the reference data, and the row barcode doesn't have an N in it, then queue the
// row barcode for inclusion in the unexpected sequence report
if (unexpectedReportCache.isDefined && colBc.nonEmpty && rowBc.isEmpty && !containsN(parsedRow.barcode)) {
if (
unexpectedSequenceTrackerOpt.isDefined && colBc.nonEmpty && rowBc.isEmpty && !containsN(parsedRow.barcode)
) {
unexpectedSequenceQueue.put((parsedRow.barcode, parsedCol.barcode))
}
}
Expand All @@ -128,7 +126,9 @@ final class ScoringConsumer(
// if we are tracking unexpected sequences and we matched the column barcode to the reference data but didn't
// match the row barcode to the reference data, and the row barcode doesn't have an N in it, then queue the
// row barcode for inclusion in the unexpected sequence report
if (unexpectedReportCache.isDefined && colBc.nonEmpty && rowBc.isEmpty && !containsN(parsedRow.barcode)) {
if (
unexpectedSequenceTrackerOpt.isDefined && colBc.nonEmpty && rowBc.isEmpty && !containsN(parsedRow.barcode)
) {
unexpectedSequenceQueue.put((parsedRow.barcode, parsedCol.barcode))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,36 +8,44 @@ package org.broadinstitute.gpp.poolq3.process
import java.io.{BufferedWriter, Closeable, OutputStreamWriter}
import java.nio.file.{Files, Path}

import scala.collection.mutable
import scala.util.control.NonFatal

import it.unimi.dsi.fastutil.objects.{Object2ObjectMap, Object2ObjectRBTreeMap}
import org.broadinstitute.gpp.poolq3.seq.Bases
import org.broadinstitute.gpp.poolq3.process.UnexpectedSequenceTracker.nameFor
import org.broadinstitute.gpp.poolq3.reference.Reference
import org.log4s.{Logger, getLogger}

final class UnexpectedSequenceTracker(reportDir: Path) extends Closeable {
final class UnexpectedSequenceTracker(cacheDir: Path, colReference: Reference) extends Closeable {

private[this] val log: Logger = getLogger

private[this] val fileExtension: String = ".txt"
private[this] val unexpectedCountsByColBarcode: mutable.Map[String, Int] = mutable.HashMap()

private[this] val outputFileWriters: Object2ObjectMap[String, BufferedWriter] = new Object2ObjectRBTreeMap()
private[this] val log: Logger = getLogger

// prep the directory and create file writers
Files.createDirectories(reportDir)
enumeratePrefixes.foreach(shard => outputFileWriters.put(shard, newWriterFor(shard)))
private[this] val outputFileWriters: Map[String, BufferedWriter] = {
val _ = Files.createDirectories(cacheDir)
colReference.allBarcodes.map(barcode => barcode -> newWriterFor(barcode)).toMap
}

def reportUnexpected(barcodes: (Array[Char], Array[Char])): Unit = {
val (rowBarcode, columnBarcode) = barcodes

val rowBc = new String(rowBarcode)
val colBc = new String(columnBarcode)

val writer = outputFileWriters(rowBc.substring(0, 4))
writer.write(rowBc + "," + colBc + "\n")
val writer = outputFileWriters(colBc)
writer.write(rowBc)
writer.write("\n")
val _ = unexpectedCountsByColBarcode.updateWith(colBc) {
case None => Some(1)
case Some(pc) => Some(pc + 1)
}
}

def unexpectedBarcodeCounts: Map[String, Int] = unexpectedCountsByColBarcode.toMap

override def close(): Unit =
outputFileWriters.values().forEach { writer =>
outputFileWriters.values.foreach { writer =>
try {
writer.close()
} catch {
Expand All @@ -46,16 +54,16 @@ final class UnexpectedSequenceTracker(reportDir: Path) extends Closeable {
}

private[this] def newWriterFor(shard: String): BufferedWriter = {
val name = reportDir.resolve(s"unexpected-$shard$fileExtension")
val name = cacheDir.resolve(nameFor(shard))
new BufferedWriter(new OutputStreamWriter(Files.newOutputStream(name)))
}

private[this] def enumeratePrefixes: Iterable[String] =
for {
b1 <- Bases
b2 <- Bases
b3 <- Bases
b4 <- Bases
} yield s"$b1$b2$b3$b4"
}

object UnexpectedSequenceTracker {

private[this] val fileExtension: String = ".txt"

def nameFor(shard: String) = s"unexpected-$shard$fileExtension"

}
Loading

0 comments on commit ced2bc7

Please sign in to comment.