Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for CRAM index generation #317

Merged
merged 3 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/cljam/algo/cram_indexer.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
(ns cljam.algo.cram-indexer
(:require [cljam.io.crai :as crai]
[cljam.io.cram.core :as cram.core]
[cljam.io.cram.reader :as cram.reader]
[cljam.io.sam.util.header :as sam.header]))

(defn create-index
"Creates a CRAM index file from the given CRAM file.

Takes the following options:
- skip-sort-order-check?: By default, the CRAM indexer checks if the header
is declared as `SO:coordinate` and raises an error if not.
If this option is set to true, the CRAM indexer will skip the header check
and create an index file regardless of the header declaration."
[in-cram out-crai & {:keys [skip-sort-order-check?]}]
(with-open [r (cram.core/reader in-cram {})
w (crai/writer out-crai)]
(let [header @(.-header r)]
(when (and (not skip-sort-order-check?)
(not= (sam.header/sort-order header) sam.header/order-coordinate))
(throw
(ex-info "Cannot create CRAM index file for CRAM file not declared as sorted by coordinate"
{:sort-order (sam.header/sort-order header)})))
(crai/write-index-entries w (cram.reader/generate-index-entries r)))))
82 changes: 70 additions & 12 deletions src/cljam/io/crai.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,86 @@
(:require [cljam.util :as util]
[cljam.util.intervals :as intervals]
[clojure.java.io :as io]
[clojure.string :as str]))
[clojure.string :as str])
(:import [java.io Closeable OutputStreamWriter PrintWriter]))

(defn- read-index-entries [rdr]
(->> (line-seq rdr)
(map (fn [line]
(let [[^long seq-id start span container-offset slice-offset size]
(map #(Long/parseLong %) (str/split line #"\t"))
unmapped? (neg? seq-id)]
{:ref-seq-id seq-id
:start (if unmapped? 0 start)
:span (if unmapped? 0 span)
:container-offset container-offset
:slice-offset slice-offset
:size size})))))

(defn read-index
"Reads a CRAI file `f` and creates an index."
[f refs]
(let [refs (vec refs)]
(with-open [rdr (io/reader (util/compressor-input-stream f))]
(->> (line-seq rdr)
(map (fn [line]
(let [[^long seq-id ^long start ^long span container-offset slice-offset size]
(map #(Long/parseLong %) (str/split line #"\t"))
unmapped? (neg? seq-id)]
{:chr (if unmapped? "*" (:name (nth refs seq-id)))
:start (if unmapped? 0 start)
:end (if unmapped? 0 (+ start span))
:container-offset container-offset
:slice-offset slice-offset
:size size})))
(->> (read-index-entries rdr)
(map (fn [{:keys [^long ref-seq-id ^long start ^long span] :as entry}]
(-> entry
(assoc :chr
(if (neg? ref-seq-id)
"*"
(:name (nth refs ref-seq-id)))
:end (+ start span))
(dissoc :ref-seq-id :span))))
intervals/index-intervals))))

(defn find-overlapping-entries
"Finds and returns all entries from the index that overlap with the specified
region."
[idx chr start end]
(intervals/find-overlap-intervals idx chr start end))

(deftype CRAIWriter [^PrintWriter writer]
Closeable
(close [_]
(.close writer)))

(defn writer
"Creates and returns a CRAI writer for the file `f`."
^Closeable [f]
(-> (util/compressor-output-stream f :gzip)
OutputStreamWriter.
PrintWriter.
->CRAIWriter))

(defn- write-index-entry
[^CRAIWriter wtr {:keys [^long ref-seq-id start span container-offset slice-offset size]}]
(let [unmapped? (neg? ref-seq-id)]
(doto ^PrintWriter (.-writer wtr)
(.print ref-seq-id)
(.print \tab)
(.print (if unmapped? 0 (long start)))
(.print \tab)
(.print (if unmapped? 0 (long span)))
(.print \tab)
(.print (long container-offset))
(.print \tab)
(.print (long slice-offset))
(.print \tab)
(.print (long size))
(.print \newline))))

(defn write-index-entries
"Writes CRAM index entries.
Each CRAM index entry consists of the following keys:
- ref-seq-id: Reference sequence id (-1 for slices containing only unmapped records)
- start: Alignment start
- span: Alignment span
- container-offset: Absolute byte offset of the container header in the file
- slice-offset: Relative byte offset of the slice header block
- size: Slice size in bytes
Note that if :ref-seq-id equals to -1 (unmapped), the CRAI writer will ignore
the actual values of :start/:span and write zeros for both fields."
[^CRAIWriter wtr index-entries]
(run! (partial write-index-entry wtr) index-entries))
13 changes: 10 additions & 3 deletions src/cljam/io/cram.clj
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,16 @@
The function also takes an optional argument `option`, which is a map that
consists of:
- reference: A string representing the path to the reference file, or
a sequence reader that reads sequences from the reference file.
This may be omitted only when the CRAM file to be read does not
require a reference file."
a sequence reader that reads sequences from the reference file.
This may be omitted only when the CRAM file to be read does not require
a reference file.
- create-index?: If true, creates a .crai index file in the course of CRAM
file writing.
- skip-sort-order-check?: When creating a CRAM index for the CRAM file,
the CRAM writer, by default, checks if the header is declared as
`SO:coordinate` and raises an error if not.
If this option is set to true, the CRAM writer will skip the header check
and create an index file regardless of the header declaration."
(^CRAMWriter [f] (writer f {}))
(^CRAMWriter [f option] (cram/writer f option)))

Expand Down
19 changes: 14 additions & 5 deletions src/cljam/io/cram/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
[clojure.string :as str])
(:import [cljam.io.cram.reader CRAMReader]
[cljam.io.cram.writer CRAMWriter]
[java.io FileNotFoundException]
[java.io DataOutputStream FileNotFoundException]
[java.net URL]
[java.nio.channels FileChannel]
[java.nio.file OpenOption StandardOpenOption]))
Expand Down Expand Up @@ -78,14 +78,23 @@
"Creates a new CRAM writer that writes to a CRAM file f.

Takes an option map as the second argument. An option map consists of:
- reference: a string representing the path to a reference file"
^CRAMWriter [f {:keys [reference]}]
- reference: A string representing the path to a reference file
- create-index?: If true, creates a .crai index file in the course of CRAM
file writing.
- skip-sort-order-check?: When creating a CRAM index for the CRAM file,
the CRAM writer, by default, checks if the header is declared as
`SO:coordinate` and raises an error if not.
If this option is set to true, the CRAM writer will skip the header check
and create an index file regardless of the header declaration."
^CRAMWriter [f {:keys [reference create-index?] :as opts}]
(let [file (cio/file f)
url (cio/as-url file)
url' (str url)
file-id (subs url' 0 (min 20 (count url')))
out (cio/output-stream file)
out (DataOutputStream. (cio/output-stream file))
index-writer (when create-index?
(crai/writer (util/as-url (str url' ".crai"))))
seq-resolver (some-> reference resolver/seq-resolver resolver/cached-resolver)
wtr (writer/->CRAMWriter url out seq-resolver)]
wtr (writer/->CRAMWriter url out seq-resolver index-writer opts)]
(writer/write-file-definition wtr file-id)
wtr))
62 changes: 47 additions & 15 deletions src/cljam/io/cram/encode/alignment_stats.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
(ns cljam.io.cram.encode.alignment-stats)
(ns cljam.io.cram.encode.alignment-stats
(:import [java.util HashMap]))

(defn- update-single-alignment-span! [^longs cur-span ^long pos ^long end]
(let [cur-pos (aget cur-span 0)]
(when (pos? pos)
(when (or (zero? cur-pos) (< pos cur-pos))
(aset cur-span 0 pos))
(when (< (aget cur-span 1) end)
(aset cur-span 1 end)))))

(defprotocol IAlignmentStatsBuilder
(update! [_ ri pos end nbases nrecords])
Expand All @@ -9,39 +18,33 @@
(deftype AlignmentStatsBuilder
[^:unsynchronized-mutable first?
^:unsynchronized-mutable ^long ref-index
^:unsynchronized-mutable ^long start
^:unsynchronized-mutable ^long end
^:unsynchronized-mutable ^long nbases
^:unsynchronized-mutable ^long nrecords]
^:unsynchronized-mutable ^long nrecords
^longs span]
IAlignmentStatsBuilder
(build [_]
(let [ri (if first? -1 ref-index)]
(->AlignmentStats ri
(if (neg? ri) 0 start)
(if (neg? ri) 0 end)
(if (neg? ri) 0 (aget span 0))
(if (neg? ri) 0 (aget span 1))
nbases nrecords)))
(update! [this ri pos end nbases nrecords]
(let [ri (long ri)
pos (long pos)
end (long end)]
(let [ri (long ri)]
(cond first?
(do (set! ref-index ri)
(set! first? false))

(not= ri ref-index)
(set! ref-index -2))
(when (and (>= ref-index 0) (pos? pos))
(when (or (zero? (.-start this)) (< pos (.-start this)))
(set! (.-start this) pos))
(when (< (.-end this) end)
(set! (.-end this) end)))
(when (>= ref-index 0)
(update-single-alignment-span! span pos end))
(set! (.-nbases this) (+ (.-nbases this) (long nbases)))
(set! (.-nrecords this) (+ (.-nrecords this) (long nrecords))))))

(defn make-alignment-stats-builder
"Creates a new alignment stats builder."
[]
(->AlignmentStatsBuilder true 0 0 0 0 0))
(->AlignmentStatsBuilder true 0 0 0 (long-array 2)))

(defn merge-stats
"Merges multiple alignment stats into one."
Expand All @@ -51,3 +54,32 @@
(update! builder ri start end nbases nrecords))
ss)
(build builder)))

(defprotocol IAlignmentSpansBuilder
(build-spans [_])
(update-span! [_ ri pos end]))

(defn- ref-index-comparator [^long ri1 ^long ri2]
(cond (= ri1 ri2) 0
(= ri1 -1) 1
(= ri2 -1) -1
:else (compare ri1 ri2)))

(defn make-alignment-spans-builder
"Creates a new alignment spans builder."
[]
(let [ri->span (HashMap.)]
(reify IAlignmentSpansBuilder
(build-spans [_]
(into (sorted-map-by ref-index-comparator)
(map (fn [[ri ^longs span]]
(let [start (aget span 0)
end (aget span 1)]
[ri {:start start, :span (inc (- end start))}])))
ri->span))
(update-span! [_ ri pos end]
(let [span (or (get ri->span ri)
(let [span (long-array 2)]
(.put ri->span ri span)
span))]
(update-single-alignment-span! span pos end))))))
100 changes: 51 additions & 49 deletions src/cljam/io/cram/encode/record.clj
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,40 @@
(BA (aget bs i)))
(encode-qual record QS))))

(defn- build-cram-record-encoder
[cram-header rname->idx tag-dict {:keys [BF CF] :as ds-encoders} tag-encoders stats-builder]
(let [pos-encoder (build-positional-data-encoder cram-header ds-encoders)
name-encoder (build-read-name-encoder ds-encoders)
mate-encoder (build-mate-read-encoder rname->idx ds-encoders)
tags-encoder (build-auxiliary-tags-encoder tag-dict ds-encoders tag-encoders)
mapped-encoder (build-mapped-read-encoder ds-encoders)
unmapped-encoder (build-unmapped-read-encoder ds-encoders)]
(fn [record]
(let [bf (bit-and (long (:flag record))
(bit-not (sam.flag/encoded #{:next-reversed :next-unmapped})))]
(stats/update! stats-builder
(::ref-index record) (:pos record) (::end record)
(count (:seq record)) 1)
(BF bf)
(CF (::flag record))
(pos-encoder record)
(name-encoder record)
(mate-encoder record)
(tags-encoder record)
(if (sam.flag/unmapped? (:flag record))
(unmapped-encoder record)
(mapped-encoder record))))))

(defn encode-slice-records
"Encodes CRAM records in a slice all at once using the given data series encoders
and tag encoders. Returns the alignment stats for this slice."
[cram-header rname->idx tag-dict ds-encoders tag-encoders records]
(let [stats-builder (stats/make-alignment-stats-builder)
record-encoder (build-cram-record-encoder cram-header rname->idx tag-dict
ds-encoders tag-encoders stats-builder)]
(run! record-encoder records)
(stats/build stats-builder)))

(defn- add-mismatches
[n subst-mat ^bytes ref-bases rpos ^bytes read-bases ^bytes qs spos fs]
(loop [i (long n), rpos (long rpos), spos (long spos), fs fs]
Expand Down Expand Up @@ -163,52 +197,20 @@
\P (recur more rpos spos (conj! fs {:code :padding :pos pos :len n}))))
[(persistent! fs) (dec rpos)])))))

(defn- build-cram-record-encoder
[seq-resolver cram-header tag-dict subst-mat {:keys [BF CF] :as ds-encoders}
tag-encoders stats-builder]
(let [rname->idx (into {}
(map-indexed (fn [i {:keys [SN]}] [SN i]))
(:SQ cram-header))
pos-encoder (build-positional-data-encoder cram-header ds-encoders)
name-encoder (build-read-name-encoder ds-encoders)
mate-encoder (build-mate-read-encoder rname->idx ds-encoders)
tags-encoder (build-auxiliary-tags-encoder tag-dict ds-encoders tag-encoders)
mapped-encoder (build-mapped-read-encoder ds-encoders)
unmapped-encoder (build-unmapped-read-encoder ds-encoders)]
(fn [record]
(let [bf (bit-and (long (:flag record))
(bit-not (sam.flag/encoded #{:next-reversed :next-unmapped})))
;; these flag bits of CF are hard-coded at the moment:
;; - 0x01: quality scores stored as array (true)
;; - 0x02: detached (true)
;; - 0x04: has mate downstream (false)
cf (cond-> 0x03
(= (:seq record) "*") (bit-or 0x08))
ri (ref-index rname->idx (:rname record))
[fs end] (calculate-read-features&end seq-resolver subst-mat record)
record' (assoc record ::flag cf ::ref-index ri ::features fs)]
(stats/update! stats-builder ri (:pos record') end (count (:seq record')) 1)
(BF bf)
(CF cf)
(pos-encoder record')
(name-encoder record')
(mate-encoder record')
(tags-encoder record')
(if (sam.flag/unmapped? (:flag record'))
(unmapped-encoder record')
(mapped-encoder record'))))))

(defn encode-slice-records
"Encodes CRAM records in a slice all at once using the given data series encoders
and tag encoders. Returns the alignment stats for this slice."
[seq-resolver cram-header tag-dict subst-mat ds-encoders tag-encoders records]
(let [stats-builder (stats/make-alignment-stats-builder)
record-encoder (build-cram-record-encoder seq-resolver
cram-header
tag-dict
subst-mat
ds-encoders
tag-encoders
stats-builder)]
(run! record-encoder records)
(stats/build stats-builder)))
(defn preprocess-slice-records
"Preprocesses slice records to calculate some record fields prior to record
encoding that are necessary for the CRAM writer to generate some header
components."
[seq-resolver rname->idx subst-mat ^objects records]
(dotimes [i (alength records)]
(let [record (aget records i)
;; these flag bits of CF are hard-coded at the moment:
;; - 0x01: quality scores stored as array (true)
;; - 0x02: detached (true)
;; - 0x04: has mate downstream (false)
cf (cond-> 0x03
(= (:seq record) "*") (bit-or 0x08))
ri (ref-index rname->idx (:rname record))
[fs end] (calculate-read-features&end seq-resolver subst-mat record)
record' (assoc record ::flag cf ::ref-index ri ::end end ::features fs)]
(aset records i record'))))
Loading