Skip to content

Commit

Permalink
add reference and header parquet writers
Browse files Browse the repository at this point in the history
  • Loading branch information
RoriCremer committed Jun 17, 2024
1 parent 7a9d105 commit edd6938
Show file tree
Hide file tree
Showing 9 changed files with 482 additions and 193 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package org.broadinstitute.hellbender.tools.gvs.ingest;

import com.google.protobuf.Descriptors;
import htsjdk.samtools.SAMSequenceDictionary;
import htsjdk.variant.variantcontext.VariantContext;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.tools.gvs.common.CommonCode;
import org.broadinstitute.hellbender.tools.gvs.common.GQStateEnum;
Expand All @@ -14,12 +19,16 @@
import org.broadinstitute.hellbender.utils.GenomeLocSortedSet;
import org.broadinstitute.hellbender.utils.SimpleInterval;
import org.broadinstitute.hellbender.utils.gvs.bigquery.BigQueryUtils;
import org.broadinstitute.hellbender.utils.gvs.parquet.GvsReferenceParquetFileWriter;
import org.broadinstitute.hellbender.utils.gvs.parquet.GvsVariantParquetFileWriter;
import org.json.JSONObject;

import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;


public final class RefCreator {
Expand All @@ -31,6 +40,7 @@ public final class RefCreator {

private final boolean writeReferenceRanges;
private final Long sampleId;
private GvsReferenceParquetFileWriter refRangesParquetFileWriter = null;
private SimpleInterval previousInterval;
private final Set<GQStateEnum> gqStatesToIgnore;
private final GenomeLocSortedSet coverageLocSortedSet;
Expand All @@ -43,7 +53,7 @@ public static boolean doRowsExistFor(CommonCode.OutputType outputType, String pr
return BigQueryUtils.doRowsExistFor(projectId, datasetName, REF_RANGES_FILETYPE_PREFIX + tableNumber, SchemaUtils.SAMPLE_ID_FIELD_NAME, sampleId);
}

public RefCreator(String sampleIdentifierForOutputFileName, Long sampleId, String tableNumber, SAMSequenceDictionary seqDictionary, Set<GQStateEnum> gqStatesToIgnore, final File outputDirectory, final CommonCode.OutputType outputType, final boolean writeReferenceRanges, final String projectId, final String datasetName, final boolean storeCompressedReferences) {
public RefCreator(String sampleIdentifierForOutputFileName, Long sampleId, String tableNumber, SAMSequenceDictionary seqDictionary, Set<GQStateEnum> gqStatesToIgnore, final File outputDirectory, final CommonCode.OutputType outputType, final boolean writeReferenceRanges, final String projectId, final String datasetName, final boolean storeCompressedReferences, final MessageType parquetSchema) {
this.sampleId = sampleId;
this.outputType = outputType;
this.writeReferenceRanges = writeReferenceRanges;
Expand All @@ -65,11 +75,16 @@ public RefCreator(String sampleIdentifierForOutputFileName, Long sampleId, Strin
case TSV:
refRangesWriter = new RefRangesTsvWriter(refOutputFile.getCanonicalPath());
break;
case AVRO:
case AVRO: // when do we use this/!?!
refRangesWriter = new RefRangesAvroWriter(refOutputFile.getCanonicalPath());
break;
case PARQUET:
refRangesParquetFileWriter = new GvsReferenceParquetFileWriter(new Path(refOutputFile.toURI()), parquetSchema, false, CompressionCodecName.SNAPPY);
break;
}
}
} catch (final FileAlreadyExistsException fs) {
throw new UserException("This reference parquet file already exists", fs);
} catch (final IOException ioex) {
throw new UserException("Could not create reference range outputs", ioex);
}
Expand Down Expand Up @@ -104,20 +119,44 @@ public void apply(VariantContext variant, List<GenomeLoc> intervalsToWrite) thro
int localStart = start;
while ( localStart <= end ) {
int length = Math.min(end - localStart + 1, IngestConstants.MAX_REFERENCE_BLOCK_BASES);
if (storeCompressedReferences) {
refRangesWriter.writeCompressed(
SchemaUtils.encodeCompressedRefBlock(variantChr, localStart, length,
switch(outputType) {
case BQ:
try {
if (storeCompressedReferences) {
refRangesWriter.writeCompressed(
SchemaUtils.encodeCompressedRefBlock(variantChr, localStart, length,
getGQStateEnum(variant.getGenotype(0).getGQ()).getCompressedValue()),
sampleId
);
} else {
refRangesWriter.write(SchemaUtils.encodeLocation(variantChr, localStart),
sampleId,
length,
getGQStateEnum(variant.getGenotype(0).getGQ()).getValue()
);
}
} catch (IOException ex) {
throw new IOException("BQ exception", ex);
}
break;
case PARQUET:
if (storeCompressedReferences) {
JSONObject record = GvsReferenceParquetFileWriter.writeCompressed(
SchemaUtils.encodeCompressedRefBlock(variantChr, localStart, length,
getGQStateEnum(variant.getGenotype(0).getGQ()).getCompressedValue()),
sampleId
);
} else {
refRangesWriter.write(SchemaUtils.encodeLocation(variantChr, localStart),
sampleId,
length,
getGQStateEnum(variant.getGenotype(0).getGQ()).getValue()
);
sampleId
);
refRangesParquetFileWriter.write(record);
} else {
JSONObject record = GvsReferenceParquetFileWriter.writeJson(SchemaUtils.encodeLocation(variantChr, localStart), sampleId, length, getGQStateEnum(variant.getGenotype(0).getGQ()).getValue());
refRangesParquetFileWriter.write(record);
}
break;

}



localStart = localStart + length ;
}

Expand Down Expand Up @@ -267,6 +306,13 @@ public void commitData() {
if (writeReferenceRanges && refRangesWriter != null) {
refRangesWriter.commitData();
}
} else if (outputType == CommonCode.OutputType.PARQUET && refRangesParquetFileWriter != null) {
try {
refRangesParquetFileWriter.close();
} catch (IOException exception) {
System.out.println("ERROR CLOSING PARQUET FILE: ");
exception.printStackTrace();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
package org.broadinstitute.hellbender.tools.gvs.ingest;

import com.google.protobuf.Descriptors;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.tools.gvs.common.CommonCode;
import org.broadinstitute.hellbender.tools.gvs.common.IngestConstants;
import org.broadinstitute.hellbender.utils.Utils;
import org.broadinstitute.hellbender.utils.gvs.bigquery.BigQueryUtils;
import org.broadinstitute.hellbender.utils.gvs.bigquery.PendingBQWriter;
import org.broadinstitute.hellbender.utils.gvs.parquet.GvsHeaderParquetFileWriter;
import org.broadinstitute.hellbender.utils.gvs.parquet.GvsVariantParquetFileWriter;
import org.broadinstitute.hellbender.utils.tsv.SimpleXSVWriter;
import org.json.JSONObject;

import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutionException;
Expand All @@ -17,9 +26,13 @@ public class VcfHeaderLineScratchCreator {
private final String datasetName;

private PendingBQWriter vcfHeaderBQJsonWriter = null;
private GvsHeaderParquetFileWriter vcfHeaderParquetFileWriter = null;
private static final String NON_SCRATCH_TABLE_NAME = "vcf_header_lines";
private static final String SCRATCH_TABLE_NAME = "vcf_header_lines_scratch";

private static final String HEADER_FILETYPE_PREFIX = "header_";


public static boolean doScratchRowsExistFor(String projectId, String datasetName, Long sampleId) {
return BigQueryUtils.doRowsExistFor(projectId, datasetName, "vcf_header_lines_scratch", "sample_id", sampleId);
}
Expand All @@ -36,16 +49,33 @@ private static boolean doNonScratchRowsExistFor(String projectId, String dataset
return BigQueryUtils.doRowsExistFor(projectId, datasetName, NON_SCRATCH_TABLE_NAME, "vcf_header_lines_hash", headerLineHash);
}

public VcfHeaderLineScratchCreator(Long sampleId, String projectId, String datasetName) {
public VcfHeaderLineScratchCreator(Long sampleId, String projectId, String datasetName, File outputDirectory, CommonCode.OutputType outputType, MessageType headersRowSchema) {
try {
this.sampleId = sampleId;
this.projectId = projectId;
this.datasetName = datasetName;

String PREFIX_SEPARATOR = "_"; // TODO should this be moved to a common place?

if (projectId == null || datasetName == null) {
throw new UserException("Must specify project-id and dataset-name.");
}
vcfHeaderBQJsonWriter = new PendingBQWriter(projectId, datasetName, SCRATCH_TABLE_NAME);

switch (outputType) {

case BQ:
if (projectId == null || datasetName == null) {
throw new UserException("Must specify project-id and dataset-name when using BQ output mode.");
}
vcfHeaderBQJsonWriter = new PendingBQWriter(projectId, datasetName, SCRATCH_TABLE_NAME);
break;
case PARQUET:
// TODO ensure that there doesn't need to be a table_number or sampleIdentifierForOutputFileName--it's all tables/samples, yes?
final File parquetOutputFile = new File(outputDirectory, HEADER_FILETYPE_PREFIX + ".parquet");
vcfHeaderParquetFileWriter = new GvsHeaderParquetFileWriter(new Path(parquetOutputFile.toURI()), headersRowSchema, false, CompressionCodecName.SNAPPY);
break;

}
}
catch (Exception e) {
throw new UserException("Could not create VCF Header Scratch Table Writer", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.protobuf.Descriptors;
import htsjdk.variant.variantcontext.VariantContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
Expand Down Expand Up @@ -68,6 +69,8 @@ public VetCreator(String sampleIdentifierForOutputFileName, Long sampleId, Strin
vetParquetFileWriter = new GvsVariantParquetFileWriter(new Path(parquetOutputFile.toURI()), parquetSchema, false, CompressionCodecName.SNAPPY);
break;
}
} catch (final FileAlreadyExistsException fs) {
throw new UserException("This variants parquet file already exists", fs);
} catch (final IOException ioex) {
throw new UserException("Could not create vet outputs", ioex);
}
Expand Down Expand Up @@ -149,8 +152,6 @@ public void commitData() {
exception.printStackTrace();
}
}


}

public void closeTool() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package org.broadinstitute.hellbender.utils.gvs.parquet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.schema.MessageType;
import org.json.JSONObject;

import java.io.IOException;

public class GvsHeaderParquetFileWriter extends ParquetWriter<JSONObject> {

/**
* This is very deprecated, and we'll need to figure out how to do this from a builder once it works!
* @param file
* @param schema
* @param enableDictionary
* @param codecName
* @throws IOException
*/
public GvsHeaderParquetFileWriter(
Path file,
MessageType schema,
boolean enableDictionary,
CompressionCodecName codecName
) throws FileAlreadyExistsException, IOException {
super(file, new GvsReferenceWriteSupport(schema), codecName, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE, enableDictionary, false);
}

GvsHeaderParquetFileWriter(
Path file,
GvsVariantWriteSupport writeSupport,
CompressionCodecName compressionCodecName,
int blockSize,
int pageSize,
boolean enableDictionary,
boolean enableValidation,
ParquetProperties.WriterVersion writerVersion,
Configuration conf)
throws IOException {
super(
file,
writeSupport,
compressionCodecName,
blockSize,
pageSize,
pageSize,
enableDictionary,
enableValidation,
writerVersion,
conf);
}

public static JSONObject writeJson(Long sampleId, String headerLineHash) {
JSONObject record = new JSONObject();
record.put("sample_id", sampleId);
record.put("headerLineHash", headerLineHash);
return record;
}

public static class Builder extends ParquetWriter.Builder<JSONObject, Builder> {
private MessageType schema = null;

private Builder(Path file) {
super(file);
}

private Builder(OutputFile file) {
super(file);
}

public Builder withType(MessageType type) {
this.schema = type;
return this;
}

@Override
protected Builder self() {
return this;
}

@Override
protected GvsVariantWriteSupport getWriteSupport(Configuration conf) {
return new GvsVariantWriteSupport(schema);
}
}

}
Loading

0 comments on commit edd6938

Please sign in to comment.