Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

Adding CRAM support #19

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 77 additions & 32 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,20 @@ import (
"github.com/googlegenomics/htsget/internal/analytics"
"github.com/googlegenomics/htsget/internal/bam"
"github.com/googlegenomics/htsget/internal/bgzf"
"github.com/googlegenomics/htsget/internal/cram"
"github.com/googlegenomics/htsget/internal/genomics"
"golang.org/x/oauth2"
"google.golang.org/api/googleapi"
"google.golang.org/api/option"
)

const (
readsPath = "/reads/"
blockPath = "/block/"
readsPath = "/reads/"
bamBlockPath = "/block/bam/"
cramBlockPath = "/block/cram/"

eofMarkerDataURL = "data:;base64,H4sIBAAAAAAA/wYAQkMCABsAAwAAAAAAAAAAAA=="
bamEOFMarkerDataURL = "data:;base64,H4sIBAAAAAAA/wYAQkMCABsAAwAAAAAAAAAAAA=="
cramEOFMarkerDataURL = "data:;base64,DwAAAP////8P4EVPRgAAAAABAAW92U8AAQAGBgEAAQABAO5jAUs="
)

var (
Expand Down Expand Up @@ -92,7 +95,8 @@ func (server *Server) Whitelist(buckets []string) {
// bytes, though BAM chunks that already exceed this size will not be split.
func (server *Server) Export(mux *http.ServeMux) {
mux.HandleFunc(readsPath, server.serveReads)
mux.HandleFunc(blockPath, server.serveBlocks)
mux.HandleFunc(bamBlockPath, server.serveBAMBlocks)
mux.HandleFunc(cramBlockPath, server.serveCRAMBlocks)
}

func (server *Server) serveReads(w http.ResponseWriter, req *http.Request) {
Expand All @@ -102,8 +106,40 @@ func (server *Server) serveReads(w http.ResponseWriter, req *http.Request) {
track(analytics.Event("Reads", "Reads Request Received", "", nil))

query := req.URL.Query()
if err := parseFormat(query.Get("format")); err != nil {
writeError(w, newUnsupportedFormatError(err))
format := query.Get("format")
if format == "" {
format = "BAM"
}

supportedFormats := map[string]struct {
blockPath string
eofMarkerDataURL string
indexExtension string
getReferenceID func(io.Reader, string) (int32, error)
handle func(*readsRequest) ([]interface{}, error)
}{
"BAM": {
bamBlockPath,
bamEOFMarkerDataURL,
".bai",
bam.GetReferenceID,
func(req *readsRequest) ([]interface{}, error) {
return req.handleBAM(ctx)
},
},
"CRAM": {
cramBlockPath,
cramEOFMarkerDataURL,
".crai",
cram.GetReferenceID,
func(req *readsRequest) ([]interface{}, error) {
return req.handleCRAM(ctx)
},
},
}
formatParams, ok := supportedFormats[format]
if !ok {
writeError(w, newUnsupportedFormatError(fmt.Errorf("unsupported format %q", format)))
return
}

Expand Down Expand Up @@ -131,7 +167,7 @@ func (server *Server) serveReads(w http.ResponseWriter, req *http.Request) {
}
defer data.Close()

region, err := parseRegion(query, data)
region, err := parseRegion(query, data, formatParams.getReferenceID)
if err != nil {
writeError(w, newInvalidInputError("parsing region", err))
return
Expand All @@ -143,12 +179,12 @@ func (server *Server) serveReads(w http.ResponseWriter, req *http.Request) {
}

request := &readsRequest{
indexObject: gcs.Bucket(bucket).Object(object + ".bai"),
indexObject: gcs.Bucket(bucket).Object(object + formatParams.indexExtension),
blockSizeLimit: server.blockSizeLimit,
region: region,
}

chunks, err := request.handle(ctx)
chunks, err := formatParams.handle(request)
if err != nil {
track(analytics.Event("Reads", "Reads Internal Error", "", nil))
writeError(w, err)
Expand All @@ -164,7 +200,7 @@ func (server *Server) serveReads(w http.ResponseWriter, req *http.Request) {
}
base += req.Host
}
base += strings.Replace(req.URL.Path, readsPath, blockPath, 1)
base += strings.Replace(req.URL.Path, readsPath, formatParams.blockPath, 1)

var urls []map[string]interface{}
for _, chunk := range chunks {
Expand All @@ -188,11 +224,11 @@ func (server *Server) serveReads(w http.ResponseWriter, req *http.Request) {
}
urls = append(urls, url)
}
urls = append(urls, map[string]interface{}{"url": eofMarkerDataURL})
urls = append(urls, map[string]interface{}{"url": formatParams.eofMarkerDataURL})

writeJSON(w, http.StatusOK, map[string]interface{}{
"htsget": map[string]interface{}{
"format": "BAM",
"format": format,
"urls": urls,
}})

Expand All @@ -201,8 +237,30 @@ func (server *Server) serveReads(w http.ResponseWriter, req *http.Request) {
track(analytics.Event("Reads", "Reads Response Sent", "", nil))
}

func (server *Server) serveBlocks(w http.ResponseWriter, req *http.Request) {
bucket, object, err := parseID(req.URL.Path[len(blockPath):])
func (server *Server) serveBAMBlocks(w http.ResponseWriter, req *http.Request) {
var chunk bgzf.Chunk
server.blockHelper(bamBlockPath, w, req, &chunk, func(object *storage.ObjectHandle) (io.ReadCloser, error) {
response, err := handleBAMRequest(req.Context(), object, chunk)
if err != nil {
return nil, fmt.Errorf("handling BAM request: %v", err)
}
return response, nil
})
}

func (server *Server) serveCRAMBlocks(w http.ResponseWriter, req *http.Request) {
var chunk cram.Chunk
server.blockHelper(cramBlockPath, w, req, &chunk, func(object *storage.ObjectHandle) (io.ReadCloser, error) {
response, err := handleCRAMRequest(req.Context(), object, chunk)
if err != nil {
return nil, fmt.Errorf("handling CRAM request: %v", err)
}
return response, nil
})
}

func (server *Server) blockHelper(path string, w http.ResponseWriter, req *http.Request, query interface{}, fn func(*storage.ObjectHandle) (io.ReadCloser, error)) {
bucket, object, err := parseID(req.URL.Path[len(path):])
if err != nil {
writeError(w, newInvalidInputError("parsing readset ID", err))
return
Expand All @@ -213,8 +271,7 @@ func (server *Server) serveBlocks(w http.ResponseWriter, req *http.Request) {
return
}

var chunk bgzf.Chunk
if err := decodeRawQuery(req.URL.RawQuery, &chunk); err != nil {
if err := decodeRawQuery(req.URL.RawQuery, query); err != nil {
writeError(w, fmt.Errorf("decoding raw query: %v", err))
return
}
Expand All @@ -225,14 +282,9 @@ func (server *Server) serveBlocks(w http.ResponseWriter, req *http.Request) {
return
}

request := &blockRequest{
object: gcs.Bucket(bucket).Object(object),
chunk: chunk,
}

response, err := request.handle(req.Context())
response, err := fn(gcs.Bucket(bucket).Object(object))
if err != nil {
writeError(w, err)
writeError(w, fmt.Errorf("block handler: %v", err))
return
}
defer response.Close()
Expand Down Expand Up @@ -275,14 +327,7 @@ func parseID(path string) (string, string, error) {
return "", "", errInvalidOrUnspecifiedID
}

func parseFormat(format string) error {
if format != "" && format != "BAM" {
return fmt.Errorf("unsupported format %q", format)
}
return nil
}

func parseRegion(query url.Values, data io.Reader) (genomics.Region, error) {
func parseRegion(query url.Values, r io.Reader, getReferenceID func(io.Reader, string) (int32, error)) (genomics.Region, error) {
var (
name = query.Get("referenceName")
start = query.Get("start")
Expand All @@ -295,7 +340,7 @@ func parseRegion(query url.Values, data io.Reader) (genomics.Region, error) {
return genomics.Region{}, errMissingReferenceName
}

id, err := bam.GetReferenceID(data, name)
id, err := getReferenceID(r, name)
if err != nil {
return genomics.Region{}, fmt.Errorf("resolving reference %q: %v", name, err)
}
Expand Down
78 changes: 44 additions & 34 deletions internal/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func TestInvalidInputs(t *testing.T) {
func TestUnsupportedFormats(t *testing.T) {
testCases := []struct{ name, url string }{
{"unknown format", "/reads/bucket/object?format=XYZ"},
{"cram format", "/reads/bucket/object?format=CRAM"},
{"lowercase bam", "/reads/bucket/object?format=bam"},
}
ctx := context.Background()
Expand All @@ -72,41 +71,52 @@ func TestMissingObject(t *testing.T) {
}

func TestSimpleRead(t *testing.T) {
fakeClient := &http.Client{Transport: &fakeGCS{t}}
ctx := context.WithValue(context.Background(), testHTTPClientKey, fakeClient)
resp := testQuery(ctx, t, "/reads/testdata/NA12878.chr20.sample.bam")

if got, want := resp.StatusCode, http.StatusOK; got != want {
t.Errorf("Wrong status code: got %v, want %v", got, want)
testCases := []string{
"/reads/testdata/NA12878.chr20.sample.bam",
"/reads/testdata/NA12878.chr20.sample.bam?format=BAM",
"/reads/testdata/NA12878.chr21.sample.cram?format=CRAM",
"/reads/testdata/NA12878.chr21.sample.cram?format=CRAM&referenceName=chr21",
}

var body struct {
URLs []struct {
URL string `json:"url"`
} `json:"urls"`
}
if err := json.NewDecoder(resp.Body).Decode(&body); err != nil {
t.Fatalf("Failed to decode response: %v", err)
}

for _, url := range body.URLs {
if url.URL == eofMarkerDataURL {
continue
}

resp := testQuery(ctx, t, url.URL)
if got, want := resp.StatusCode, http.StatusOK; got != want {
t.Errorf("Wrong status code: got %v, want %v", got, want)
continue
}
length, err := io.Copy(ioutil.Discard, resp.Body)
if err != nil {
t.Errorf("Failed to read response body: %v", err)
continue
}
if got, want := length, int64(testBlockSizeLimit); got > want {
t.Errorf("Data block too large: got %v, want at most %v", got, want)
}
for _, tc := range testCases {
t.Run(tc, func(t *testing.T) {
fakeClient := &http.Client{Transport: &fakeGCS{t}}
ctx := context.WithValue(context.Background(), testHTTPClientKey, fakeClient)
resp := testQuery(ctx, t, tc)

if got, want := resp.StatusCode, http.StatusOK; got != want {
t.Errorf("Wrong status code: got %v, want %v", got, want)
}

var body struct {
URLs []struct {
URL string `json:"url"`
} `json:"urls"`
}
if err := json.NewDecoder(resp.Body).Decode(&body); err != nil {
t.Fatalf("Failed to decode response: %v", err)
}

for _, url := range body.URLs {
if url.URL == bamEOFMarkerDataURL {
continue
}

resp := testQuery(ctx, t, url.URL)
if got, want := resp.StatusCode, http.StatusOK; got != want {
t.Errorf("Wrong status code: got %v, want %v", got, want)
continue
}
length, err := io.Copy(ioutil.Discard, resp.Body)
if err != nil {
t.Errorf("Failed to read response body: %v", err)
continue
}
if got, want := length, int64(testBlockSizeLimit); got > want {
t.Errorf("Data block too large: got %v, want at most %v", got, want)
}
}
})
}
}

Expand Down
28 changes: 17 additions & 11 deletions internal/api/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,18 @@ import (
"io/ioutil"

"cloud.google.com/go/storage"

"github.com/googlegenomics/htsget/internal/bgzf"
"github.com/googlegenomics/htsget/internal/cram"
)

type blockRequest struct {
object *storage.ObjectHandle
chunk bgzf.Chunk
}

func (req *blockRequest) handle(ctx context.Context) (io.ReadCloser, error) {
start, end := req.chunk.Start, req.chunk.End
func handleBAMRequest(ctx context.Context, object *storage.ObjectHandle, chunk bgzf.Chunk) (io.ReadCloser, error) {
start, end := chunk.Start, chunk.End
head, tail := int64(start.BlockOffset()), int64(end.BlockOffset())

// The simple (unlikely) case is when the chunk resides in a single block.
if head == tail {
block, err := req.object.NewRangeReader(ctx, head, bgzf.MaximumBlockSize)
block, err := object.NewRangeReader(ctx, head, bgzf.MaximumBlockSize)
if err != nil {
return nil, newStorageError("opening block", err)
}
Expand All @@ -60,7 +57,7 @@ func (req *blockRequest) handle(ctx context.Context) (io.ReadCloser, error) {

// Read the first block and reconstruct a prefix block.
if start.DataOffset() != 0 {
first, err := req.object.NewRangeReader(ctx, head, bgzf.MaximumBlockSize)
first, err := object.NewRangeReader(ctx, head, bgzf.MaximumBlockSize)
if err != nil {
return nil, newStorageError("opening first block", err)
}
Expand All @@ -82,7 +79,7 @@ func (req *blockRequest) handle(ctx context.Context) (io.ReadCloser, error) {

// Read any intermediate blocks (no modification needed).
if tail-head > 0 {
r, err := req.object.NewRangeReader(ctx, head, tail-head)
r, err := object.NewRangeReader(ctx, head, tail-head)
if err != nil {
return nil, newStorageError("opening body block", err)
}
Expand All @@ -92,7 +89,7 @@ func (req *blockRequest) handle(ctx context.Context) (io.ReadCloser, error) {

// Read the last block and reconstruct a suffix block.
if end.DataOffset() != 0 {
last, err := req.object.NewRangeReader(ctx, tail, bgzf.MaximumBlockSize)
last, err := object.NewRangeReader(ctx, tail, bgzf.MaximumBlockSize)
if err != nil {
return nil, newStorageError("opening last block", err)
}
Expand All @@ -115,6 +112,15 @@ func (req *blockRequest) handle(ctx context.Context) (io.ReadCloser, error) {
}, nil
}

func handleCRAMRequest(ctx context.Context, object *storage.ObjectHandle, chunk cram.Chunk) (io.ReadCloser, error) {
r, err := object.NewRangeReader(ctx, int64(chunk.Start), int64(chunk.Length()))
if err != nil {
return nil, newStorageError("opening block", err)
}

return r, nil
}

type multiReadCloser struct {
io.Reader

Expand Down
Loading