Skip to content

Commit

Permalink
Adding Web of Science harvest
Browse files Browse the repository at this point in the history
Closes #165
  • Loading branch information
edsu committed Mar 5, 2025
1 parent f3943b0 commit d5c8af1
Show file tree
Hide file tree
Showing 5 changed files with 395 additions and 2 deletions.
1 change: 1 addition & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ x-airflow-common:
AIRFLOW_VAR_MAIS_SECRET: ${AIRFLOW_VAR_MAIS_SECRET}
AIRFLOW_VAR_SUL_PUB_HOST: ${AIRFLOW_VAR_SUL_PUB_HOST}
AIRFLOW_VAR_SUL_PUB_KEY: ${AIRFLOW_VAR_SUL_PUB_KEY}
AIRFLOW_VAR_WOS_KEY: ${AIRFLOW_VAR_WOS_KEY}
AIRFLOW_VAR_DEV_LIMIT: ${AIRFLOW_VAR_DEV_LIMIT}
AIRFLOW_VAR_DATA_DIR: /opt/airflow/data
AIRFLOW_VAR_PUBLISH_DIR: /opt/airflow/data/latest
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ dependencies = [
[tool.pytest.ini_options]
pythonpath = ["."]
markers = "mais_tests: Tests requiring MAIS access"
addopts = "-v --cov --cov-report=html --cov-report=term"
addopts = "-v --cov --cov-report=html --cov-report=term --log-level INFO --log-file test.log"


[tool.coverage.run]
omit = ["test/*"]
Expand Down
14 changes: 13 additions & 1 deletion rialto_airflow/dags/harvest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from airflow.decorators import dag, task
from airflow.models import Variable

from rialto_airflow.harvest import authors, dimensions, merge_pubs, openalex
from rialto_airflow.harvest import authors, dimensions, merge_pubs, openalex, wos
from rialto_airflow.harvest.doi_sunet import create_doi_sunet_pickle
from rialto_airflow.harvest import sul_pub
from rialto_airflow.harvest.contribs import create_contribs
Expand Down Expand Up @@ -74,6 +74,15 @@ def openalex_harvest(snapshot):

return jsonl_file

@task()
def wos_harvest(snapshot):
"""
Fetch the data by ORCID from OpenAlex.
"""
jsonl_file = wos.harvest(snapshot, limit=dev_limit)

return jsonl_file

@task()
def sul_pub_harvest(snapshot):
"""
Expand Down Expand Up @@ -150,6 +159,9 @@ def publish(pubs_to_contribs, merge_publications):

openalex_jsonl = openalex_harvest(snapshot)

# TODO: use the return value here to hook into the workflow
wos_harvest(snapshot)

doi_sunet = create_doi_sunet(
dimensions_dois,
openalex_jsonl,
Expand Down
192 changes: 192 additions & 0 deletions rialto_airflow/harvest/wos.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import json
import logging
import os
import re
from pathlib import Path

import requests
from typing import Generator, Optional, Dict, Union
from sqlalchemy.dialects.postgresql import insert

from rialto_airflow.database import (
Author,
Publication,
get_session,
pub_author_association,
)
from rialto_airflow.snapshot import Snapshot
from rialto_airflow.utils import normalize_doi

Params = Dict[str, Union[int, str]]


def harvest(snapshot: Snapshot, limit=None) -> Path:
"""
Walk through all the Author ORCIDs and generate publications for them.
"""
jsonl_file = snapshot.path / "wos.jsonl"
count = 0
stop = False

with jsonl_file.open("w") as jsonl_output:
with get_session(snapshot.database_name).begin() as select_session:
# get all authors that have an ORCID
# TODO: should we just pull the relevant bits back into memory since
# that's what's going on with our client-side buffering connection
# and there aren't that many of them?
for author in (
select_session.query(Author).where(Author.orcid.is_not(None)).all() # type: ignore
):
if stop is True:
logging.info(f"Reached limit of {limit} publications stopping")
break

for wos_pub in orcid_publications(author.orcid):
count += 1
if limit is not None and count > limit:
stop = True
break

doi = get_doi(wos_pub)

with get_session(snapshot.database_name).begin() as insert_session:
# if there's a DOI constraint violation we need to update instead of insert
pub_id = insert_session.execute(
insert(Publication)
.values(
doi=doi,
wos_json=wos_pub,
)
.on_conflict_do_update(
constraint="publication_doi_key",
set_=dict(wos_json=wos_pub),
)
.returning(Publication.id)
).scalar_one()

# a constraint violation is ok here, since it means we
# already know that the publication is by the author
insert_session.execute(
insert(pub_author_association)
.values(publication_id=pub_id, author_id=author.id)
.on_conflict_do_nothing()
)

jsonl_output.write(json.dumps(wos_pub) + "\n")

return jsonl_file


def orcid_publications(orcid) -> Generator[dict, None, None]:
"""
A generator that returns publications associated with a given ORCID.
"""

# For API details see: https://api.clarivate.com/swagger-ui/

# WoS doesn't recognize ORCID URIs which are stored in User table
if m := re.match(r"^https?://orcid.org/(.+)$", orcid):
orcid = m.group(1)

wos_key = os.environ.get("AIRFLOW_VAR_WOS_KEY")
base_url = "https://wos-api.clarivate.com/api/wos"
headers = {"Accept": "application/json", "X-ApiKey": wos_key}

# the number of records to get in each request (100 is max)
batch_size = 100

params: Params = {
"databaseId": "WOK",
"usrQuery": f"AI=({orcid})",
"count": batch_size,
"firstRecord": 1,
}

http = requests.Session()

# get the initial set of results, which also gives us a Query ID to fetch
# subsequent pages of results if there are any

logging.info(f"fetching {base_url} with {params}")
resp: requests.Response = http.get(base_url, params=params, headers=headers)

if not check_status(resp):
return

results = get_json(resp)
if results is None:
return

if results["QueryResult"]["RecordsFound"] == 0:
logging.info(f"No results found for ORCID {orcid}")
return

yield from results["Data"]["Records"]["records"]["REC"]

# get subsequent results using the Query ID

query_id = results["QueryResult"]["QueryID"]
records_found = results["QueryResult"]["RecordsFound"]
first_record = batch_size + 1 # since the initial set included 100

# if there aren't any more results to fetch this loop will never be entered

logging.info(f"{records_found} records found")
while first_record < records_found:
page_params: Params = {"firstRecord": first_record, "count": batch_size}
logging.info(f"fetching {base_url}/query/{query_id} with {page_params}")

resp = http.get(
f"{base_url}/query/{query_id}", params=page_params, headers=headers
)

if not check_status(resp):
return

records = get_json(resp)
if records is None:
break

yield from records["Records"]["records"]["REC"]

# move the offset along in the results list
first_record += batch_size


def get_json(resp: requests.Response) -> Optional[dict]:
try:
return resp.json()
except requests.exceptions.JSONDecodeError as e:
if resp.text == "":
# some items seem to return 200 OK but with empty JSON payloads, e.g.
# curl -i --header "application/json" --header 'X-ApiKey: API_KEY' 'https://wos-api.clarivate.com/api/wos?databaseId=WOK&usrQuery=AI%3D%280000-0002-3271-7861%29&count=100&firstRecord=1'
logging.error(
f"got empty string instead of JSON when looking up {resp.url}"
)
return None
else:
logging.error(f"uhoh, instead of JSON we got: {resp.text}")
raise e


def check_status(resp):
# see https://github.com/sul-dlss/rialto-airflow/issues/208
if (
resp.status_code == 500
and resp.headers.get("Content-Type") == "application/json"
and "Customization error" in resp.json().get("message", "")
):
logging.error(f"got a 500 Customization Error when looking up {resp.url}")
return False
else:
resp.raise_for_status()
return True


def get_doi(pub) -> Optional[str]:
ids = pub.get("cluster_related", {}).get("identifiers", {}).get("identifier", [])
for id in ids:
if id["type"] == "doi":
return normalize_doi(id["value"])

return None
Loading

0 comments on commit d5c8af1

Please sign in to comment.