-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #121 from monarch-initiative/dadis_sync
DADIS synchronization workflow
- Loading branch information
Showing
8 changed files
with
562 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
name: Initial matching of VBO to DADIS | ||
on: | ||
workflow_dispatch: | ||
#schedule: | ||
# - cron: '0 0 * * *' | ||
jobs: | ||
dadis: | ||
runs-on: ubuntu-latest | ||
container: obolibrary/odkfull:v1.4.3 | ||
strategy: | ||
max-parallel: 1 | ||
steps: | ||
- name: Checkout main branch | ||
uses: actions/checkout@v2 | ||
with: | ||
ref: ${{ github.event.pull_request.head.ref }} | ||
- name: work around permission issue | ||
run: git config --global --add safe.directory "$GITHUB_WORKSPACE" | ||
- name: Match VBO transboundary entries | ||
run: cd src/ontology/ && make ROBOT_ENV='ROBOT_JAVA_ARGS=-Xmx6G' GITHUB_ACTION=true IMP=false MIR=false dadis-transboundary-sync | ||
env: | ||
DADIS_API_KEY: ${{ secrets.DADIS_API_KEY }} | ||
- name: Match VBO breed-country entries | ||
run: cd src/ontology/ && make ROBOT_ENV='ROBOT_JAVA_ARGS=-Xmx6G' GITHUB_ACTION=true IMP=false MIR=false dadis-local-sync | ||
env: | ||
DADIS_API_KEY: ${{ secrets.DADIS_API_KEY }} | ||
- name: Commit changes | ||
uses: EndBug/add-and-commit@v9 | ||
with: | ||
message: 'Update local breed file' | ||
add: 'src/ontology/components/dadisbreedcountry.tsv' | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
from .client import DadisClient | ||
|
||
__all__ = ["DadisClient"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
from requests import Session, Response | ||
|
||
from .schemas.responses import ApiResponse, Species, BreedResponse, TransboundaryNamesResponse | ||
|
||
DEV_URL = "https://us-central1-fao-dadis-dev.cloudfunctions.net/api/v1/" | ||
PROD_URL = "https://us-central1-dadis-ws.cloudfunctions.net/api/v1/" | ||
|
||
|
||
class DadisClient: | ||
_session: Session | ||
base_url: str | ||
|
||
def __init__(self, *, api_key: str, prod: bool = True): | ||
if prod: | ||
self.base_url = PROD_URL | ||
else: | ||
self.base_url = DEV_URL | ||
self._session = Session() | ||
self._session.headers["Authorization"] = api_key | ||
|
||
def get(self, path, **kwargs) -> Response: | ||
return self._session.get(self.base_url + path, **kwargs) | ||
|
||
def get_all_species(self) -> ApiResponse[list[Species]]: | ||
resp = self.get("species") | ||
return ApiResponse[list[Species]](**resp.json()) | ||
|
||
def get_species_by_id(self, species_id: int) -> ApiResponse[Species]: | ||
resp = self.get(f"species/{species_id}") | ||
return ApiResponse[Species](**resp.json()) | ||
|
||
def get_all_breeds(self) -> BreedResponse: | ||
resp = self.get("breeds", params={"classification": "all"}) | ||
return BreedResponse(**resp.json()) | ||
|
||
def get_all_local_breeds(self) -> BreedResponse: | ||
resp = self.get("breeds", params={"classification": "local"}) | ||
return BreedResponse(**resp.json()) | ||
|
||
def get_all_transboundary_breeds(self) -> BreedResponse: | ||
resp = self.get("breeds", params={"classification": "transboundary"}) | ||
return BreedResponse(**resp.json()) | ||
|
||
def get_all_transboundary_names(self) -> TransboundaryNamesResponse: | ||
resp = self.get("transboundary") | ||
return TransboundaryNamesResponse(**resp.json()) |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
from typing import Generic, Optional, TypeVar | ||
from pydantic import BaseModel, validator | ||
|
||
|
||
Data = TypeVar("Data") | ||
|
||
|
||
class ApiResponse(BaseModel, Generic[Data]): | ||
status: int | ||
message: str | ||
response: Data | ||
|
||
|
||
class Species(BaseModel): | ||
id: int | ||
name: dict[str, str] | ||
|
||
|
||
class Breed(BaseModel): | ||
id: str | ||
name: str | ||
iso3: str | ||
speciesId: int | ||
transboundaryId: str | ||
# updatedAt uses empty str for null values | ||
updatedAt: int | None | ||
|
||
@validator("updatedAt", pre=True) | ||
def empty_updated(cls, v): | ||
if isinstance(v, str): | ||
return None | ||
return v | ||
|
||
|
||
class TransboundaryName(BaseModel): | ||
id: Optional[str] = None | ||
speciesId: int | ||
name: str | ||
|
||
|
||
BreedResponse = ApiResponse[list[Breed]] | ||
|
||
TransboundaryNamesResponse = ApiResponse[list[TransboundaryName]] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,182 @@ | ||
import argparse | ||
import csv | ||
import logging | ||
import os | ||
from typing import Optional, TextIO | ||
|
||
import pandas as pd | ||
|
||
from dadis_client import DadisClient | ||
|
||
logger = logging.getLogger(__name__) | ||
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") | ||
|
||
|
||
def full_local_match_workflow( | ||
input_filename: str, output_filename: str, dadis_api_key: str, | ||
dadis_match_filename: Optional[str] = None | ||
) -> pd.DataFrame: | ||
""" | ||
Perform the full matching workflow: | ||
- Read VBO data from input_filename | ||
- Match to DADIS to get DADIS ids | ||
- Save to a new TSV file at output_filename | ||
""" | ||
client = DadisClient(api_key=dadis_api_key) | ||
logger.info(f"Reading VBO entries from {input_filename}") | ||
vbo_data = read_vbo_data(input_filename) | ||
logger.info(f"Matching to DADIS data") | ||
matched_breeds = match_vbo_breeds(vbo_data=vbo_data, client=client) | ||
|
||
logger.info(f"Writing output file to {output_filename}:") | ||
output_file = create_output_tsv( | ||
input_filename=input_filename, | ||
output_filename=output_filename, | ||
extra_cols=[ | ||
"dadis_breed_id", | ||
"dadis_transboundary_id", | ||
"dadis_update_date", | ||
], | ||
) | ||
matched_breeds.to_csv(output_file, sep="\t", index=False, header=False) | ||
output_file.close() | ||
logger.info("Output written.") | ||
|
||
if dadis_match_filename is not None: | ||
logger.info("Finding unmatched DADIS entries") | ||
dadis_unmatched = find_unmatched_dadis(vbo_output=matched_breeds, client=client) | ||
logger.info(f"Writing unmatched DADIS entries to {dadis_match_filename}") | ||
dadis_unmatched.to_csv(dadis_match_filename, sep="\t", index=False, header=True) | ||
return matched_breeds | ||
|
||
|
||
def read_vbo_data(filename: str) -> pd.DataFrame: | ||
vbo_breeds = pd.read_table( | ||
filename, sep="\t", skiprows=[1], low_memory=False | ||
).convert_dtypes() | ||
return vbo_breeds | ||
|
||
|
||
def get_dadis_species(client: DadisClient) -> pd.DataFrame: | ||
resp = client.get_all_species() | ||
all_species = [] | ||
for s in resp.response: | ||
species = {"dadis_species_id": s.id, "dadis_species_name": s.name["en"]} | ||
all_species.append(species) | ||
return pd.DataFrame.from_records(all_species) | ||
|
||
|
||
def get_dadis_all_breeds(client: DadisClient) -> pd.DataFrame: | ||
resp = client.get_all_breeds() | ||
df = ( | ||
pd.DataFrame.from_records([breed.model_dump() for breed in resp.response]) | ||
.convert_dtypes() | ||
.rename( | ||
columns={ | ||
"id": "dadis_breed_id", | ||
"name": "dadis_breed_name", | ||
"iso3": "dadis_iso3_code", | ||
"speciesId": "dadis_species_id", | ||
"transboundaryId": "dadis_transboundary_id", | ||
"updatedAt": "dadis_update_date", | ||
} | ||
) | ||
) | ||
df["dadis_update_date"] = df["dadis_update_date"].map( | ||
lambda d: pd.to_datetime(d, unit="ms") | ||
) | ||
# Merge species information | ||
species_df = get_dadis_species(client) | ||
df = df.merge(species_df, how="left", on="dadis_species_id") | ||
return df | ||
|
||
|
||
def match_vbo_breeds(vbo_data: pd.DataFrame, client: DadisClient) -> pd.DataFrame: | ||
""" | ||
Match VBO breed entries to DADIS, based on breed name, species, and country (ISO3 code) | ||
""" | ||
logger.info("Fetching DADIS breeds") | ||
dadis_all = get_dadis_all_breeds(client=client) | ||
merged = vbo_data.merge( | ||
dadis_all, | ||
how="left", | ||
left_on=["dadis_name", "dadis_species_name", "dadis_iso3_code"], | ||
right_on=["dadis_breed_name", "dadis_species_name", "dadis_iso3_code"], | ||
sort=False, | ||
indicator=True, | ||
) | ||
n_matched = merged["_merge"].eq("both").sum() | ||
n_total = len(merged["_merge"]) | ||
logger.info(f"{n_matched} / {n_total} VBO breeds successfully matched to DADIS IDs") | ||
merged = merged.drop(columns=["_merge", "dadis_breed_name", "dadis_species_id"]) | ||
return merged | ||
|
||
|
||
def find_unmatched_dadis(vbo_output: pd.DataFrame, client: DadisClient) -> pd.DataFrame: | ||
""" | ||
Merge all DADIS breeds with the already matched VBO data, to see how many DADIS entries | ||
match in the other direction | ||
""" | ||
dadis_all = get_dadis_all_breeds(client) | ||
dadis_unmatched = ( | ||
dadis_all | ||
.merge(vbo_output[["dadis_breed_id", "vbo_id"]], on="dadis_breed_id", how="left", indicator=True) | ||
.loc[lambda x: x._merge == "left_only"] | ||
.drop(columns=["_merge"]) | ||
) | ||
return dadis_unmatched | ||
|
||
|
||
def create_output_tsv( | ||
input_filename: str, output_filename: str, extra_cols: list[str] = None | ||
) -> TextIO: | ||
""" | ||
Copy the 2 header lines from the input file to the output file. Return | ||
a file object for the output file, so pandas can write the rest of the file | ||
""" | ||
file_out = open(output_filename, "w") | ||
csv_out = csv.writer(file_out, dialect="excel-tab") | ||
with open(input_filename) as file_in: | ||
csv_in = csv.reader(file_in, dialect="excel-tab") | ||
for index, line in enumerate(range(2)): | ||
header = next(csv_in) | ||
if extra_cols is not None: | ||
if index == 0: | ||
header += extra_cols | ||
if index == 1: | ||
header += ["" for i in range(len(extra_cols))] | ||
csv_out.writerow(header) | ||
return file_out | ||
|
||
|
||
if __name__ == "__main__": | ||
parser = argparse.ArgumentParser( | ||
description="Find DADIS entries matching VBO breeds" | ||
) | ||
parser.add_argument("--log", help="Logging level", default="INFO") | ||
parser.add_argument( | ||
"--input_filename", help="Spreadsheet (TSV) with VBO local breeds" | ||
) | ||
parser.add_argument("--output_filename", help="Filename to save the updated TSV to") | ||
parser.add_argument( | ||
"--dadis_api_key", | ||
help="API key for DADIS API (private: should be stored in Github Secrets)", | ||
default=os.getenv("DADIS_API_KEY") | ||
) | ||
parser.add_argument( | ||
"--dadis_match_filename", help="Optional filename to write unmatched DADIS entries to", | ||
default=None | ||
) | ||
args = parser.parse_args() | ||
|
||
if args.dadis_api_key is None: | ||
raise ValueError("DADIS API key not set. Set the DADIS_API_KEY environment variable or use the --dadis_api_key argument") | ||
|
||
logger.setLevel(args.log.upper()) | ||
full_local_match_workflow( | ||
input_filename=args.input_filename, | ||
output_filename=args.output_filename, | ||
dadis_api_key=args.dadis_api_key, | ||
dadis_match_filename=args.dadis_match_filename | ||
) |
Oops, something went wrong.