Skip to content

Commit

Permalink
Merge pull request #21 from monarch-initiative/edge-fields-to-label
Browse files Browse the repository at this point in the history
Add additional param that labels with node properties but not full closure expansion
  • Loading branch information
kevinschaper authored Nov 16, 2024
2 parents fdd7b4a + b8b5978 commit e3f9d8d
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 69 deletions.
3 changes: 3 additions & 0 deletions closurizer/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
@click.option('--additional-node-constraints', required=False,
help='additional where clause constraints to apply to the generation of the denormalized nodes output')
@click.option('--edge-fields', multiple=True, help='edge fields to expand with closure IDs, labels, etc')
@click.option('--edge-fields-to-label', multiple=True, help='edge fields to with category, label, etc but not full closure exansion')
@click.option('--node-fields', multiple=True, help='node fields to expand with closure IDs, labels, etc')
@click.option('--grouping-fields', multiple=True, help='fields to populate a single value grouping_key field')
@click.option('--dry-run', is_flag=True, help='A dry run will not write the output file, but will print the SQL query')
Expand All @@ -21,11 +22,13 @@ def main(kg: str,
additional_node_constraints: str = None,
dry_run: bool = False,
edge_fields: List[str] = None,
edge_fields_to_label: List[str] = None,
node_fields: List[str] = None,
grouping_fields: List[str] = None):
add_closure(kg_archive=kg,
closure_file=closure,
edge_fields=edge_fields,
edge_fields_to_label=edge_fields_to_label,
node_fields=node_fields,
edges_output_file=edges_output,
nodes_output_file=nodes_output,
Expand Down
29 changes: 11 additions & 18 deletions closurizer/closurizer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from typing import List
from typing import List, Optional

import os
import tarfile
import duckdb

def edge_columns(field):
def edge_columns(field: str, include_closure_fields: bool =True):
column_text = f"""
{field}.name as {field}_label,
{field}.category as {field}_category,
Expand All @@ -19,14 +19,14 @@ def edge_columns(field):
"""
return column_text

def edge_joins(field):
def edge_joins(field: str, include_closure_joins: bool =True):
return f"""
left outer join nodes as {field} on edges.{field} = {field}.id
left outer join closure_id as {field}_closure on {field}.id = {field}_closure.id
left outer join closure_label as {field}_closure_label on {field}.id = {field}_closure_label.id
"""

def evidence_sum(evidence_fields):
def evidence_sum(evidence_fields: List[str]):
""" Sum together the length of each field after splitting on | """
evidence_count_sum = "+".join([f"ifnull(len(split({field}, '|')),0)" for field in evidence_fields])
return f"{evidence_count_sum} as evidence_count,"
Expand Down Expand Up @@ -73,29 +73,20 @@ def add_closure(kg_archive: str,
closure_file: str,
nodes_output_file: str,
edges_output_file: str,
node_fields: List[str] = None,
node_fields: List[str] = [],
edge_fields: List[str] = ['subject', 'object'],
additional_node_constraints: str = None,
edge_fields_to_label: List[str] = [],
additional_node_constraints: Optional[str] = None,
dry_run: bool = False,
evidence_fields: List[str] = None,
grouping_fields: List[str] = None
evidence_fields: List[str] = ['has_evidence', 'publications'],
grouping_fields: List[str] = ['subject', 'negated', 'predicate', 'object']
):
print("Generating closure KG...")
print(f"kg_archive: {kg_archive}")
print(f"closure_file: {closure_file}")

db = duckdb.connect(database='monarch-kg.duckdb')

if edge_fields is None or len(edge_fields) == 0:
edge_fields = ['subject', 'object']

if evidence_fields is None or len(evidence_fields) == 0:
evidence_fields = ['has_evidence', 'publications']

if grouping_fields is None or len(grouping_fields) == 0:
grouping_fields = ['subject', 'negated', 'predicate', 'object']


if not dry_run:
print(f"fields: {','.join(edge_fields)}")
print(f"output_file: {edges_output_file}")
Expand Down Expand Up @@ -139,10 +130,12 @@ def add_closure(kg_archive: str,
create or replace table denormalized_edges as
select edges.*,
{"".join([edge_columns(field) for field in edge_fields])}
{"".join([edge_columns(field, include_closure_fields=False) for field in edge_fields_to_label])}
{evidence_sum(evidence_fields)}
{grouping_key(grouping_fields)}
from edges
{"".join([edge_joins(field) for field in edge_fields])}
{"".join([edge_joins(field, include_closure_joins=False) for field in edge_fields_to_label])}
"""

print(edges_query)
Expand Down
Loading

0 comments on commit e3f9d8d

Please sign in to comment.