Skip to content

Commit

Permalink
adding retries for node norm 5xx errors, turning on drug chemical con…
Browse files Browse the repository at this point in the history
…flate, bumping normalization code version, general clean up
  • Loading branch information
EvanDietzMorris committed Jan 30, 2024
1 parent 71ab0be commit 8f03972
Showing 1 changed file with 45 additions and 32 deletions.
77 changes: 45 additions & 32 deletions Common/normalization.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import os
import logging
import requests
import time

from robokop_genetics.genetics_normalization import GeneticsNormalizer
from Common.node_types import *
from Common.utils import LoggingUtil

NORMALIZATION_CODE_VERSION = '1.0'
NORMALIZATION_CODE_VERSION = '1.1'


class NodeNormalizer:
Expand Down Expand Up @@ -63,6 +64,37 @@ def __init__(self,
self.sequence_variant_normalizer = None
self.variant_node_types = None

def hit_node_norm_service(self, curies, retries=0):
resp: requests.models.Response = requests.post(f'{self.node_norm_endpoint}get_normalized_nodes',
json={'curies': curies,
'conflate': self.conflate_node_types,
'drug_chemical_conflate': self.conflate_node_types,
'description': True})
if resp.status_code == 200:
# if successful return the json as an object
return resp.json()
else:
error_message = f'Node norm response code: {resp.status_code}'
if resp.status_code >= 500:
# if 5xx retry 3 times
retries += 1
if retries == 4:
error_message += ', retried 3 times, giving up..'
self.logger.error(error_message)
resp.raise_for_status()
else:
error_message += f', retrying.. (attempt {retries})'
time.sleep(retries * 3)
self.logger.error(error_message)
return self.hit_node_norm_service(curies, retries)
else:
# we should never get a legitimate 4xx response from node norm,
# crash with an error for troubleshooting
if resp.status_code == 422:
error_message += f'(curies: {curies})'
self.logger.error(error_message)
resp.raise_for_status()

def normalize_node_data(self, node_list: list, block_size: int = 1000) -> list:
"""
This method calls the NodeNormalization web service to get the normalized identifier and name of the node.
Expand All @@ -76,12 +108,9 @@ def normalize_node_data(self, node_list: list, block_size: int = 1000) -> list:

self.logger.debug(f'Start of normalize_node_data. items: {len(node_list)}')

# init the cache list - this accumulates all of the results from the node norm service
# init the cache - this accumulates all the results from the node norm service
cached_node_norms: dict = {}

# save the node list count to avoid grabbing it over and over
node_count: int = len(node_list)

# create a unique set of node ids
tmp_normalize: set = set([node['id'] for node in node_list])

Expand Down Expand Up @@ -111,32 +140,16 @@ def normalize_node_data(self, node_list: list, block_size: int = 1000) -> list:
# collect a slice of records from the data frame
data_chunk: list = to_normalize[start_index: end_index]

# get the data
resp: requests.models.Response = requests.post(f'{self.node_norm_endpoint}get_normalized_nodes',
json={'curies': data_chunk,
'conflate': self.conflate_node_types,
'description': True})

# did we get a good status code
if resp.status_code == 200:
# convert json to dict
rvs: dict = resp.json()

if rvs:
# merge this list with what we have gotten so far
cached_node_norms.update(**rvs)
else:
# this is a quick fix for the API returning empty dict instead of nulls when
# none of the curies normalize
empty_responses = {curie: None for curie in data_chunk}
cached_node_norms.update(empty_responses)
# hit the node norm api
normalization_json = self.hit_node_norm_service(curies=data_chunk)
if normalization_json:
# merge the normalization results with what we have gotten so far
cached_node_norms.update(**normalization_json)
else:
# we should never get a legitimate non-200 response from node norm here, just crash with an error
error_message = f'Node norm response code: {resp.status_code}'
if resp.status_code == 422:
error_message += f'(curies: {data_chunk})'
self.logger.error(error_message)
resp.raise_for_status()
# this shouldn't happen but if the API returns an empty dict instead of nulls,
# assume none of the curies normalize
empty_responses = {curie: None for curie in data_chunk}
cached_node_norms.update(empty_responses)

# move on down the list
start_index += block_size
Expand All @@ -155,8 +168,8 @@ def normalize_node_data(self, node_list: list, block_size: int = 1000) -> list:
self.biolink_compliant_node_types = biolink_lookup.get_valid_node_types()

# for each node update the node with normalized information
# store the normalized IDs for later look up
while node_idx < node_count:
# store the normalized IDs in self.node_normalization_lookup for later look up
while node_idx < len(node_list):

# get the next node list item by index
current_node = node_list[node_idx]
Expand Down

0 comments on commit 8f03972

Please sign in to comment.