Skip to content

Commit

Permalink
updating/fixing SNPEFF
Browse files Browse the repository at this point in the history
switch to downloading snpeff from renci server,
changing provenance infores to robokop-snpeff,
adding original snpeff effect property
  • Loading branch information
EvanDietzMorris committed Feb 4, 2025
1 parent 9b65b10 commit 9b5eaa2
Showing 1 changed file with 52 additions and 30 deletions.
82 changes: 52 additions & 30 deletions Common/supplementation.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@
from Common.kgx_file_writer import KGXFileWriter
from Common.kgx_file_normalizer import KGXFileNormalizer

SNPEFF_PROVENANCE = "infores:robokop-snpeff"

# These are terms from Sequence Ontology that SNPEFF uses for annotations. SNPEFF doesn't provide the SO identifiers,
# so here we map the terms, preferably to the exact corresponding SO identifier curie, but sometimes directly to
# biolink predicates (when the SO curie would not successfully map to it). If SO mappings were fully implemented in
# biolink we could use all SO terms here.
SNPEFF_SO_PREDICATES = {
"3_prime_UTR_variant": "biolink:is_non_coding_variant_of", # SO:0001624
"5_prime_UTR_premature_start_codon_gain_variant": "biolink:is_non_coding_variant_of", # SO:0001988
Expand Down Expand Up @@ -52,7 +57,7 @@ def __init__(self, error_message: str, actual_error: str = ''):

class SequenceVariantSupplementation:

SUPPLEMENTATION_VERSION = "1.0"
SUPPLEMENTATION_VERSION = "1.1"

def __init__(self):

Expand All @@ -64,8 +69,17 @@ def __init__(self):
# if the snpEff dir exists, assume we already downloaded it
self.snpeff_dir = path.join(workspace_dir, "snpEff")
if not path.isdir(self.snpeff_dir):
self.logger.info('SNPEFF not found, downloading and installing..')

# TODO
# Snpeff is building their latest versions with Java 21 which is not compatible with the docker
# container ORION uses (due to neo4j running on older versions of Java). We need to either run a different
# java container for snpeff, or build new versions of snpeff with older java, then we could start using the
# latest snpeff again.
# snpeff_url = 'https://snpeff.blob.core.windows.net/versions/snpEff_latest_core.zip'

# otherwise fetch and unzip SNPEFF
snpeff_url = 'https://snpeff.blob.core.windows.net/versions/snpEff_latest_core.zip'
snpeff_url = 'https://stars.renci.org/var/data_services/snpeff/snpEff_5_1d.zip'
with urlopen(snpeff_url) as snpeff_resource:
with ZipFile(BytesIO(snpeff_resource.read())) as snpeff_zip:
snpeff_zip.extractall(workspace_dir)
Expand All @@ -92,15 +106,15 @@ def find_supplemental_data(self,
self.run_snpeff(vcf_file_path,
annotated_vcf_path)

self.logger.debug('Converting annotated VCF to KGX File..')
self.logger.info('Converting annotated VCF to KGX File..')
supplementation_metadata = self.convert_snpeff_to_kgx(annotated_vcf_path,
supp_nodes_file_path,
supp_edges_file_path)

os.remove(vcf_file_path)
os.remove(annotated_vcf_path)

self.logger.debug('Normalizing Supplemental KGX File..')
self.logger.info('Normalizing Supplemental KGX File..')
file_normalizer = KGXFileNormalizer(source_nodes_file_path=supp_nodes_file_path,
nodes_output_file_path=supp_nodes_norm_file_path,
node_norm_map_file_path=supp_node_norm_map_file_path,
Expand All @@ -114,7 +128,6 @@ def find_supplemental_data(self,
process_in_memory=False)
supp_normalization_info = file_normalizer.normalize_kgx_files()
supplementation_metadata['supplementation_normalization_info'] = supp_normalization_info

return supplementation_metadata

def run_snpeff(self,
Expand Down Expand Up @@ -143,7 +156,7 @@ def convert_snpeff_to_kgx(self,
kgx_nodes_path: str,
kgx_edges_path: str):
supplementation_info = {}
gene_biotypes_to_ignore = set()
# gene_biotypes_to_ignore = set()

with open(annotated_vcf_path, 'r') as snpeff_output, \
KGXFileWriter(nodes_output_file_path=kgx_nodes_path,
Expand All @@ -160,36 +173,46 @@ def convert_snpeff_to_kgx(self,
info_field = vcf_line_split[7].split(';')
for info in info_field:
if info.startswith('ANN='):
annotations_to_write = defaultdict(set)
gene_distances = {}
annotations_to_write = defaultdict(list)
annotations = info[4:].split(',')
for annotation in annotations:
annotation_info = annotation.split('|')
gene_biotype = annotation_info[7]
if gene_biotype not in gene_biotypes_to_ignore:
effects = annotation_info[1].split("&")
gene_ids = annotation_info[4].split('-')
distance_info = annotation_info[14]
for gene_id in gene_ids:
gene_curie = f'ENSEMBL:{gene_id}'
gene_distances[gene_curie] = distance_info
for effect in effects:
effect_predicate = SNPEFF_SO_PREDICATES.get(effect, FALLBACK_EDGE_PREDICATE)
annotations_to_write[effect_predicate].add(gene_curie)
for effect_predicate, gene_ids in annotations_to_write.items():
# annotation_info[7] is gene_biotype
# if annotation_info[7] not in gene_biotypes_to_ignore:
effects = annotation_info[1].split("&")
gene_ids = annotation_info[4].split('-')
distance_info = annotation_info[14]
for gene_id in gene_ids:
gene_curie = f'ENSEMBL:{gene_id}'
for effect in effects:
effect_predicate = SNPEFF_SO_PREDICATES.get(effect, FALLBACK_EDGE_PREDICATE)
# The effect property should probably be a list, so that if we merged multiple
# edges with the same variant/gene/predicate the different effects would
# accumulate instead of being overwritten, but that adds more work for the
# merging phase and probably isn't worth it.
# TODO see if the above comment is true,
# run it both ways and see how many mergers end up with multiple effects
annotations_to_write[effect_predicate].append({
'gene_id': gene_curie,
'distance': distance_info,
'effect': effect
})
for effect_predicate, annotations in annotations_to_write.items():
for annotation in annotations:
gene_curie = annotation['gene_id']
edge_props = {KNOWLEDGE_LEVEL: PREDICATION,
AGENT_TYPE: COMPUTATIONAL_MODEL}
if gene_distances[gene_id]:
try:
edge_props['distance_to_feature']: int(gene_distances[gene_id])
except ValueError:
pass
output_file_writer.write_node(gene_id, "", [NAMED_THING, GENE])
AGENT_TYPE: COMPUTATIONAL_MODEL,
'snpeff_effect': annotation['effect']}
try:
distance = annotation['distance']
edge_props['distance_to_feature'] = int(distance)
except ValueError:
pass
output_file_writer.write_node(gene_curie, "", [NAMED_THING])
output_file_writer.write_edge(subject_id=variant_id,
object_id=gene_id,
object_id=gene_curie,
predicate=effect_predicate,
primary_knowledge_source='infores:snpeff',
primary_knowledge_source=SNPEFF_PROVENANCE,
edge_properties=edge_props)
break

Expand Down Expand Up @@ -232,7 +255,6 @@ def create_vcf_from_variant_nodes(self,
'PASS',
''])
vcf_file.write(f'{current_variant_line}\n')
break
except json.JSONDecodeError as e:
norm_error_msg = f'Error decoding json from {nodes_file_path} on line number {e.lineno}'
raise SupplementationFailedError(error_message=norm_error_msg, actual_error=e.msg)

0 comments on commit 9b5eaa2

Please sign in to comment.