Skip to content

Commit

Permalink
Merge pull request #237 from RobokopU24/Issue236
Browse files Browse the repository at this point in the history
Fix for get_latest_source_version being called unnecessarily.
  • Loading branch information
EvanDietzMorris authored Jan 21, 2025
2 parents b5ab1b6 + 39cb957 commit 4719a0a
Show file tree
Hide file tree
Showing 14 changed files with 598 additions and 355 deletions.
11 changes: 6 additions & 5 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ jobs:

- name: create env params
run: |
echo "PYTHONPATH=$PWD:$PWD/robokop-genetics" >> $GITHUB_ENV
echo "ROBOKOP_HOME=$PWD" >> $GITHUB_ENV
mkdir -p $PWD/tests/logs
mkdir -p $PWD/tests/storage
echo "ORION_LOGS=$PWD/tests/logs" >> $GITHUB_ENV
echo "ORION_STORAGE=$PWD/tests/storage" >> $GITHUB_ENV
mkdir -p $PWD/tests/workspace/logs
mkdir -p $PWD/tests/workspace/storage
mkdir -p $PWD/tests/workspace/graphs
echo "ORION_LOGS=$PWD/tests/workspace/logs" >> $GITHUB_ENV
echo "ORION_STORAGE=$PWD/tests/workspace/storage" >> $GITHUB_ENV
echo "ORION_GRAPHS=$PWD/tests/workspace/graphs" >> $GITHUB_ENV
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
Expand Down
471 changes: 251 additions & 220 deletions Common/build_manager.py

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions Common/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@


class GraphSpecError(Exception):
def __init__(self, error_message: str, actual_error: Exception = None):
self.error_message = error_message
self.actual_error = actual_error

def __str__(self):
return self.error_message


class DataVersionError(Exception):
def __init__(self, error_message: str):
self.error_message = error_message

def __str__(self):
return self.error_message
9 changes: 6 additions & 3 deletions Common/kgx_file_merger.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ def merge(self,
primary_sources = []
secondary_sources = []
for graph_source in chain(graph_spec.sources, graph_spec.subgraphs):
if graph_source.merge_strategy == 'default':
if not graph_source.merge_strategy:
primary_sources.append(graph_source)
elif graph_source.merge_strategy == 'connected_edge_subset':
secondary_sources.append(graph_source)
else:
return {'merge_error': f'Unsupported merge strategy specified: {graph_source.merge_strategy}'}

# TODO we should be able to process a single primary source more efficiently (ie copy and paste it)
# if len(primary_sources) == 1:
Expand All @@ -73,8 +75,9 @@ def merge(self,
all_source_ids = [graph_source.id for graph_source in chain(graph_spec.sources, graph_spec.subgraphs)]
missing_data_sets = [source_id for source_id in all_source_ids if
source_id not in merge_metadata['sources'].keys()]
self.logger.error(f"Error merging graph {graph_spec.graph_id}! could not merge: {missing_data_sets}")

error_message = f"Error merging graph {graph_spec.graph_id}! could not merge: {missing_data_sets}"
self.logger.error(error_message)
merge_metadata["merge_error"] = error_message
return merge_metadata

def merge_primary_sources(self,
Expand Down
48 changes: 37 additions & 11 deletions Common/kgxmodel.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from dataclasses import dataclass
from Common.biolink_constants import NAMED_THING
from Common.metadata import GraphMetadata
from Common.metadata import GraphMetadata, get_source_release_version
from Common.normalization import NormalizationScheme


class kgxnode:
def __init__(self,
identifier,
Expand Down Expand Up @@ -60,21 +61,31 @@ def get_metadata_representation(self):
@dataclass
class GraphSource:
id: str
version: str = None
merge_strategy: str = 'default'
merge_strategy: str = None
file_paths: list = None

# Version may be generated when requested and differs for subclasses of GraphSource.
def __getattribute__(self, name):
if name == "version":
return self.generate_version()
else:
return object.__getattribute__(self, name)


@dataclass
class SubGraphSource(GraphSource):
graph_version: str = None
graph_metadata: GraphMetadata = None

def get_metadata_representation(self):
return {'graph_id': self.id,
'release_version': self.version,
'graph_version': self.graph_version,
'merge_strategy:': self.merge_strategy,
'graph_metadata': self.graph_metadata.metadata if self.graph_metadata else None}

def generate_version(self):
return self.graph_version


@dataclass
class DataSource(GraphSource):
Expand All @@ -86,14 +97,29 @@ class DataSource(GraphSource):

def get_metadata_representation(self):
metadata = {'source_id': self.id,
'source_version': self.source_version,
'release_version': self.version,
'parsing_version': self.parsing_version,
'supplementation_version': self.supplementation_version,
'normalization_scheme': self.normalization_scheme.get_metadata_representation(),
'merge_strategy': self.merge_strategy}
'source_version': self.source_version,
'parsing_version': self.parsing_version,
'supplementation_version': self.supplementation_version,
'normalization_scheme': self.normalization_scheme.get_metadata_representation(),
'release_version': self.generate_version(),
'merge_strategy': self.merge_strategy}
if self.release_info:
metadata.update(self.release_info)
return metadata


# We can use generate_version to see if a source_version was already set. If not, we don't try to generate an
# overall version because we can't. Typical usage would be a lazy instantiation approach, first setting
# source_version to None, then checking this and retrieving/setting the source_version if needed,
# after which the overall version can be generated.
#
# We use get_source_release_version to generate versions for data sources the same deterministic way that
# the data source pipeline uses, so a version generated by a graph spec will match the release version generated by
# previous runs of the pipeline.
def generate_version(self):
if self.source_version is None:
return None
return get_source_release_version(self.id,
self.source_version,
self.parsing_version,
self.normalization_scheme.get_composite_normalization_version(),
self.supplementation_version)
41 changes: 20 additions & 21 deletions Common/load_manager.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import os
import argparse
import datetime
import time
from collections import defaultdict

from Common.data_sources import SourceDataLoaderClassFactory, RESOURCE_HOGS, get_available_data_sources
from Common.exceptions import DataVersionError
from Common.utils import LoggingUtil, GetDataPullError
from Common.kgx_file_normalizer import KGXFileNormalizer
from Common.normalization import NormalizationScheme, NodeNormalizer, EdgeNormalizer, NormalizationFailedError
Expand Down Expand Up @@ -124,7 +126,7 @@ def run_fetch_stage(self, source_id: str, source_version: str):
self.logger.info(f"Fetching source data for {source_id} (version: {source_version})...")
return self.fetch_source(source_id, source_version=source_version)

def get_latest_source_version(self, source_id: str, retries: int=0):
def get_latest_source_version(self, source_id: str, retries: int = 0):
if source_id in self.latest_source_version_lookup:
return self.latest_source_version_lookup[source_id]

Expand All @@ -136,19 +138,18 @@ def get_latest_source_version(self, source_id: str, retries: int=0):
self.latest_source_version_lookup[source_id] = latest_source_version
return latest_source_version
except GetDataPullError as failed_error:
self.logger.error(
f"Error while checking for latest source version for {source_id}: {failed_error.error_message}")
error_message = f"Error while checking for latest source version for {source_id}: " \
f"{failed_error.error_message}"
self.logger.error(error_message)
if retries < 2:
time.sleep(3)
return self.get_latest_source_version(source_id, retries=retries+1)
else:
# TODO what should we do here?
# no great place to write an error in metadata because metadata is specific to source versions
# source_metadata.set_version_checking_error(failed_error.error_message)
return None
raise DataVersionError(error_message=error_message)
except Exception as e:
self.logger.error(
f"Error while checking for latest source version for {source_id}: {repr(e)}-{str(e)}")
return None
error_message = f"Error while checking for latest source version for {source_id}: {repr(e)}-{str(e)}"
self.logger.error(error_message)
raise DataVersionError(error_message=error_message)

def fetch_source(self, source_id: str, source_version: str='latest', retries: int=0):

Expand Down Expand Up @@ -503,20 +504,18 @@ def run_qc_and_metadata_stage(self,
parsing_version: str,
supplementation_version: str,
normalization_scheme: NormalizationScheme):
# source data QC here
source_metadata = self.get_source_metadata(source_id, source_version)
normalization_version = normalization_scheme.get_composite_normalization_version()
# source data QC should go here

self.logger.info(f'Generating release for {source_id}')
source_metadata = self.get_source_metadata(source_id, source_version)
loader = SOURCE_DATA_LOADER_CLASSES[source_id](test_mode=self.test_mode)
source_meta_information = loader.get_source_meta_information()
source_metadata.generate_release_metadata(parsing_version=parsing_version,
supplementation_version=supplementation_version,
normalization_version=normalization_version,
source_meta_information=source_meta_information)
return source_metadata.get_release_version(parsing_version=parsing_version,
supplementation_version=supplementation_version,
normalization_version=normalization_version)
normalization_version = normalization_scheme.get_composite_normalization_version()
release_version = source_metadata.generate_release_metadata(parsing_version=parsing_version,
supplementation_version=supplementation_version,
normalization_version=normalization_version,
source_meta_information=source_meta_information)
self.logger.info(f'Generating release version for {source_id}: {release_version}')
return release_version

def get_source_metadata(self, source_id: str, source_version):
if source_id not in self.source_metadata or source_version not in self.source_metadata[source_id]:
Expand Down
60 changes: 20 additions & 40 deletions Common/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ def init_metadata(self):
raise NotImplementedError()

def save_metadata(self):
if not os.path.isdir(os.path.dirname(self.metadata_file_path)):
os.makedirs(os.path.dirname(self.metadata_file_path))
with open(self.metadata_file_path, 'w') as meta_json_file:
json.dump(self.metadata, meta_json_file, indent=4)

Expand Down Expand Up @@ -295,31 +297,18 @@ def has_supplemental_data(self, parsing_version: str, normalization_version: str
except KeyError:
return False

def get_release_version(self,
parsing_version: str,
normalization_version: str,
supplementation_version: str):
if "releases" in self.metadata:
for release_version, release in self.metadata["releases"].items():
if ((release["parsing_version"] == parsing_version) and
(release["normalization_version"] == normalization_version) and
(release["supplementation_version"] == supplementation_version)):
return release_version
return None

def generate_release_metadata(self,
parsing_version: str,
normalization_version: str,
supplementation_version: str,
source_meta_information: dict):
if "releases" not in self.metadata:
self.metadata["releases"] = {}
release_info = "".join([self.source_id,
self.source_version,
parsing_version,
normalization_version,
supplementation_version])
release_version = xxh64_hexdigest(release_info)
release_version = get_source_release_version(self.source_id,
self.source_version,
parsing_version,
normalization_version,
supplementation_version)
if release_version not in self.metadata["releases"]:
self.metadata["releases"][release_version] = {
"source_version": self.source_version,
Expand All @@ -329,31 +318,22 @@ def generate_release_metadata(self,
}
self.metadata["releases"][release_version].update(source_meta_information)
self.save_metadata()
return release_version

def get_release_info(self, release_version: str):
if 'releases' in self.metadata and release_version in self.metadata['releases']:
return self.metadata['releases'][release_version]
return None

'''
these need to be updated for the new versioning format, but we may not need them
def get_final_node_count(self):
try:
node_count = 0
node_count += self.metadata['normalization_info']['final_normalized_nodes']
if self.has_supplemental_data():
node_count += self.metadata['supplementation_info']['normalization_info']['final_normalized_nodes']
return node_count
except KeyError as k:
raise RuntimeError(f'Required metadata was not available: {k}')
def get_final_edge_count(self):
try:
node_count = 0
node_count += self.metadata['normalization_info']['final_normalized_edges']
if self.has_supplemental_data():
node_count += self.metadata['supplementation_info']['normalization_info']['final_normalized_edges']
return node_count
except KeyError as k:
raise RuntimeError(f'Required metadata was not available: {k}')
'''

def get_source_release_version(source_id,
source_version,
parsing_version,
normalization_version,
supplementation_version):
release_string = "_".join([source_id,
source_version,
parsing_version,
normalization_version,
supplementation_version])
return xxh64_hexdigest(release_string)
Loading

0 comments on commit 4719a0a

Please sign in to comment.