From c83c24f3ee6c85ee17f518f8b06baafebf168648 Mon Sep 17 00:00:00 2001 From: Ed Summers Date: Fri, 28 Jun 2024 17:43:13 -0400 Subject: [PATCH] Use JSON Lines This commit moves our sul_pub, dimensions and openalex harvesters over to writing records as JSON Lines (jsonl). This means we will preserve the dictionary and list data structures we received from the APIs and will be able to use them for querying later, e.g. ORCID IDs embedded in author objects. Reading in JSON-L can be achieved with Pandas like: ```python pandas.read_json('data.jsonl', orient='records', lines=True) ``` and Polars: ```python polars.read_ndjson('data.jsonl') ``` or lazily: ```python polars.scan_ndjson('data.jsonl') ``` --- rialto_airflow/dags/harvest.py | 27 ++-- rialto_airflow/harvest/dimensions.py | 14 +- rialto_airflow/harvest/doi_set.py | 15 +- rialto_airflow/harvest/merge_pubs.py | 6 +- rialto_airflow/harvest/openalex.py | 13 +- rialto_airflow/harvest/sul_pub.py | 46 +----- rialto_airflow/utils.py | 10 ++ test/harvest/test_dimensions.py | 10 +- test/harvest/test_doi_set.py | 22 +-- test/harvest/test_merge_pubs.py | 227 ++++++++++++--------------- test/harvest/test_openalex.py | 10 +- test/harvest/test_sul_pub.py | 12 +- 12 files changed, 182 insertions(+), 230 deletions(-) diff --git a/rialto_airflow/dags/harvest.py b/rialto_airflow/dags/harvest.py index 94db713..829fd7f 100644 --- a/rialto_airflow/dags/harvest.py +++ b/rialto_airflow/dags/harvest.py @@ -5,8 +5,7 @@ from airflow.decorators import dag, task from rialto_airflow.utils import create_snapshot_dir, rialto_authors_file -from rialto_airflow.harvest import dimensions, openalex, merge_pubs -from rialto_airflow.harvest.sul_pub import sul_pub_csv +from rialto_airflow.harvest import dimensions, openalex, merge_pubs, sul_pub from rialto_airflow.harvest.doi_set import create_doi_set @@ -64,10 +63,10 @@ def sul_pub_harvest(snapshot_dir): """ Harvest data from SUL-Pub. """ - csv_file = Path(snapshot_dir) / "sulpub.csv" - sul_pub_csv(csv_file, sul_pub_host, sul_pub_key, limit=dev_limit) + jsonl_file = Path(snapshot_dir) / "sulpub.jsonl" + sul_pub.publications_jsonl(jsonl_file, sul_pub_host, sul_pub_key, limit=dev_limit) - return str(csv_file) + return str(jsonl_file) @task() def doi_set(dimensions, openalex, sul_pub): @@ -82,18 +81,18 @@ def dimensions_harvest_pubs(dois, snapshot_dir): """ Harvest publication metadata from Dimensions using the dois from doi_set. """ - csv_file = Path(snapshot_dir) / "dimensions-pubs.csv" - dimensions.publications_csv(dois, csv_file) - return str(csv_file) + jsonl_file = Path(snapshot_dir) / "dimensions-pubs.csv" + dimensions.publications_jsonl(dois, jsonl_file) + return str(jsonl_file) @task() def openalex_harvest_pubs(dois, snapshot_dir): """ Harvest publication metadata from OpenAlex using the dois from doi_set. """ - csv_file = Path(snapshot_dir) / "openalex-pubs.csv" - openalex.publications_csv(dois, csv_file) - return str(csv_file) + jsonl_file = Path(snapshot_dir) / "openalex-pubs.csv" + openalex.publications_jsonl(dois, jsonl_file) + return str(jsonl_file) @task() def merge_publications(sul_pub, openalex_pubs, dimensions_pubs, snapshot_dir): @@ -129,19 +128,19 @@ def publish(dataset): authors_csv = find_authors_csv() - sul_pub = sul_pub_harvest(snapshot_dir) + sul_pub_pubs = sul_pub_harvest(snapshot_dir) dimensions_dois = dimensions_harvest_dois(authors_csv, snapshot_dir) openalex_dois = openalex_harvest_dois(authors_csv, snapshot_dir) - dois = doi_set(dimensions_dois, openalex_dois, sul_pub) + dois = doi_set(dimensions_dois, openalex_dois, sul_pub_pubs) dimensions_pubs = dimensions_harvest_pubs(dois, snapshot_dir) openalex_pubs = openalex_harvest_pubs(dois, snapshot_dir) - pubs = merge_publications(sul_pub, openalex_pubs, dimensions_pubs, snapshot_dir) + pubs = merge_publications(sul_pub_pubs, openalex_pubs, dimensions_pubs, snapshot_dir) pubs_authors = join_authors(pubs, authors_csv) diff --git a/rialto_airflow/harvest/dimensions.py b/rialto_airflow/harvest/dimensions.py index fe69bab..f019051 100644 --- a/rialto_airflow/harvest/dimensions.py +++ b/rialto_airflow/harvest/dimensions.py @@ -1,4 +1,4 @@ -import csv +import json import logging import os import pickle @@ -58,13 +58,11 @@ def doi_orcids_pickle(authors_csv, pickle_file, limit=None) -> None: pickle.dump(invert_dict(orcid_dois), handle, protocol=pickle.HIGHEST_PROTOCOL) -def publications_csv(dois, csv_file) -> None: - with open(csv_file, "w") as output: - writer = csv.DictWriter(output, publication_fields()) - writer.writeheader() - for pub in publications_from_dois(dois): - logging.info(f"writing metadata for {pub.get('doi')}") - writer.writerow(pub) +def publications_jsonl(dois, jsonl_file) -> None: + with open(jsonl_file, "w") as output: + for record in publications_from_dois(dois): + logging.info(f"writing metadata for {record.get('doi')}") + output.write(json.dumps(record, ensure_ascii=False) + "\n") def publications_from_dois(dois: list, batch_size=200): diff --git a/rialto_airflow/harvest/doi_set.py b/rialto_airflow/harvest/doi_set.py index 6bfc199..add6620 100644 --- a/rialto_airflow/harvest/doi_set.py +++ b/rialto_airflow/harvest/doi_set.py @@ -3,11 +3,13 @@ import pickle -def create_doi_set(dimensions: str, openalex: str, sul_pub_csv: str) -> list: +import polars as pl + +def create_doi_set(dimensions: str, openalex: str, sul_pub_jsonl: str) -> list: """Get DOIs from each source and dedupe.""" dimensions_dois = dois_from_pickle(dimensions) openalex_dois = dois_from_pickle(openalex) - sul_pub_dois = get_sul_pub_dois(sul_pub_csv) + sul_pub_dois = get_sul_pub_dois(sul_pub_jsonl) unique_dois = list(set(dimensions_dois + openalex_dois + sul_pub_dois)) logging.info(f"found {len(unique_dois)}") @@ -23,10 +25,7 @@ def dois_from_pickle(pickle_file: str) -> list: return dois -def get_sul_pub_dois(sul_pub_csv: str) -> list: +def get_sul_pub_dois(sul_pub_jsonl: str) -> list: """Extract DOIs from sul_pub CSV and remove empty values.""" - with open(sul_pub_csv, "r") as file: - reader = csv.DictReader(file) - doi_column = [row["doi"] for row in reader if row["doi"]] - - return doi_column + df = pl.read_ndjson(sul_pub_jsonl) + return df['doi'].to_list() diff --git a/rialto_airflow/harvest/merge_pubs.py b/rialto_airflow/harvest/merge_pubs.py index 3477210..d9da8c2 100644 --- a/rialto_airflow/harvest/merge_pubs.py +++ b/rialto_airflow/harvest/merge_pubs.py @@ -44,7 +44,7 @@ def dimensions_pubs_df(dimensions_pubs): # Create a LazyFrame of dimension pubs to avoid loading all data into memory """ # Polars is inferring volume is an integer, but it should be a string e.g. "97-B" - df = pl.scan_csv(dimensions_pubs, schema_overrides={"volume": pl.String}) + df = pl.scan_ndjson(dimensions_pubs) df = df.select( pl.col( "authors", @@ -69,7 +69,7 @@ def openalex_pubs_df(openalex_pubs): """ Create an openalex pubs LazyFrame and rename columns """ - df = pl.scan_csv(openalex_pubs) + df = pl.scan_ndjson(openalex_pubs) df = df.select( pl.col("doi").str.replace("https://doi.org/", ""), pl.col( @@ -84,7 +84,7 @@ def sulpub_df(sul_pub): """ Create a sulpub LazyFrame and rename columns """ - df = pl.scan_csv(sul_pub) + df = pl.scan_ndjson(sul_pub) df = df.drop_nulls("doi") df = df.with_columns(pl.col("doi").str.replace("https://doi.org/", "")) df = df.rename(lambda column_name: "sul_pub_" + column_name) diff --git a/rialto_airflow/harvest/openalex.py b/rialto_airflow/harvest/openalex.py index e1d9eeb..d658e02 100644 --- a/rialto_airflow/harvest/openalex.py +++ b/rialto_airflow/harvest/openalex.py @@ -1,4 +1,5 @@ import csv +import json import logging import os import pickle @@ -67,15 +68,13 @@ def dois_from_orcid(orcid: str, limit=None): yield pub.get("doi").replace("https://doi.org/", "") -def publications_csv(dois: list, csv_file: str) -> None: +def publications_jsonl(dois: list, jsonl_file: str) -> None: """ - Get publication records for a list of DOIs and create a CSV file. + Get publication records for a list of DOIs and create a JSONL file. """ - with open(csv_file, "w") as output: - writer = csv.DictWriter(output, fieldnames=FIELDS) - writer.writeheader() - for pub in publications_from_dois(dois): - writer.writerow(pub) + with open(jsonl_file, "w") as output: + for record in publications_from_dois(dois): + output.write(json.dumps(record, ensure_ascii=False) + "\n") def publications_from_dois(dois: list, batch_size=75): diff --git a/rialto_airflow/harvest/sul_pub.py b/rialto_airflow/harvest/sul_pub.py index 9395f1e..7b8463a 100644 --- a/rialto_airflow/harvest/sul_pub.py +++ b/rialto_airflow/harvest/sul_pub.py @@ -1,44 +1,15 @@ import csv +import json import logging import requests -SUL_PUB_FIELDS = [ - "authorship", - "title", - "abstract", - "author", - "year", - "type", - "mesh_headings", - "publisher", - "journal", - "provenance", - "doi", - "issn", - "sulpubid", - "sw_id", - "pmid", - "identifier", - "last_updated", - "pages", - "date", - "country", - "booktitle", - "edition", - "series", - "chapter", - "editor", -] - - -def sul_pub_csv(csv_file, host, key, since=None, limit=None): - with open(csv_file, "w") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=SUL_PUB_FIELDS) - writer.writeheader() - for row in harvest(host, key, since, limit): - writer.writerow(row) +def publications_jsonl(jsonl_file, host, key, since=None, limit=None): + with open(jsonl_file, "w") as output: + for record in harvest(host, key, since, limit): + json.dump(record, output, ensure_ascii=False) + output.write("\n") def harvest(host, key, since, limit): @@ -73,10 +44,9 @@ def harvest(host, key, since, limit): more = False break - pub = {key: record[key] for key in record if key in SUL_PUB_FIELDS} - pub["doi"] = extract_doi(record) + record["doi"] = extract_doi(record) - yield pub + yield record def extract_doi(record): diff --git a/rialto_airflow/utils.py b/rialto_airflow/utils.py index 4a48597..94fe230 100644 --- a/rialto_airflow/utils.py +++ b/rialto_airflow/utils.py @@ -1,4 +1,5 @@ import csv +import json import datetime from pathlib import Path @@ -54,3 +55,12 @@ def invert_dict(dict): inverted_dict[i] = [k for k, v in dict.items() if i in v] return inverted_dict + + +def write_jsonl(filename, records): + """ + Write a list of dictionaries as line-oriented JSON. + """ + with open(filename, 'w') as output: + for record in records: + output.write(json.dumps(record) + "\n") diff --git a/test/harvest/test_dimensions.py b/test/harvest/test_dimensions.py index 9aff083..53731fa 100644 --- a/test/harvest/test_dimensions.py +++ b/test/harvest/test_dimensions.py @@ -43,13 +43,13 @@ def test_publication_fields(): assert "title" in fields -def test_publications_csv(tmpdir): - pubs_csv = tmpdir / "dimensions-pubs.csv" - dimensions.publications_csv( - ["10.48550/arxiv.1706.03762", "10.1145/3442188.3445922"], pubs_csv +def test_publications_jsonl(tmpdir): + pubs_jsonl = tmpdir / "dimensions-pubs.jsonl" + dimensions.publications_jsonl( + ["10.48550/arxiv.1706.03762", "10.1145/3442188.3445922"], pubs_jsonl ) - df = pandas.read_csv(pubs_csv) + df = pandas.read_json(pubs_jsonl, orient='records', lines=True) assert len(df) == 2 diff --git a/test/harvest/test_doi_set.py b/test/harvest/test_doi_set.py index c3265d4..16b0c74 100644 --- a/test/harvest/test_doi_set.py +++ b/test/harvest/test_doi_set.py @@ -1,9 +1,9 @@ -import csv import pickle import pytest from rialto_airflow.harvest.doi_set import create_doi_set +from rialto_airflow.utils import write_jsonl @pytest.fixture @@ -33,18 +33,20 @@ def openalex_pickle(tmp_path): @pytest.fixture -def sul_pub_csv(tmp_path): - fixture_file = tmp_path / "sul_pub.csv" - with open(fixture_file, "w", newline="") as csvfile: - writer = csv.writer(csvfile) - writer.writerow(["sunetid", "title", "doi"]) - writer.writerow(["author1", "A Publication", "10.0000/aaaa"]) - writer.writerow(["author2", "A Research Article", "10.0000/1234"]) +def sul_pub_jsonl(tmp_path): + fixture_file = tmp_path / "sul_pub.jsonl" + write_jsonl( + fixture_file, + [ + {"sunetid": "author1", "title": "A Publication", "doi": "10.0000/aaaa"}, + {"sunetid": "author2", "title": "A Research Article", "doi": "10.0000/1234"} + ] + ) return fixture_file -def test_doi_set(dimensions_pickle, openalex_pickle, sul_pub_csv): - dois = create_doi_set(dimensions_pickle, openalex_pickle, sul_pub_csv) +def test_doi_set(dimensions_pickle, openalex_pickle, sul_pub_jsonl): + dois = create_doi_set(dimensions_pickle, openalex_pickle, sul_pub_jsonl) assert len(dois) == 4 assert set(dois) == set( ["10.0000/1234", "10.0000/aaaa", "10.0000/cccc", "10.0000/zzzz"] diff --git a/test/harvest/test_merge_pubs.py b/test/harvest/test_merge_pubs.py index b4d62fa..0056f03 100644 --- a/test/harvest/test_merge_pubs.py +++ b/test/harvest/test_merge_pubs.py @@ -1,143 +1,118 @@ -import csv +import json import polars as pl import pytest from rialto_airflow.harvest import merge_pubs - +from rialto_airflow.utils import write_jsonl @pytest.fixture -def dimensions_pubs_csv(tmp_path): - fixture_file = tmp_path / "dimensions-pubs.csv" - with open(fixture_file, "w") as csvfile: - writer = csv.writer(csvfile) - header = [ - "bogus", - "volume", - "authors", - "document_type", - "doi", - "funders", - "funding_section", - "open_access", - "publisher", - "research_orgs", - "researchers", - "title", - "type", - "year", +def dimensions_pubs_jsonl(tmp_path): + fixture_file = tmp_path / "dimensions-pubs.jsonl" + write_jsonl( + fixture_file, + [ + { + "bogus": "a", + "volume": "1", + "authors": [], + "document_type": "ARTICLE", + "doi": "10.0000/aaaa", + "funders": [], + "funding_section": [], + "open_access": "True", + "publisher": "publisher", + "research_orgs": [], + "researchers": [], + "title": "A Publication", + "type": "article", + "year": "2024", + }, + { + "bogus": "b", + "volume": "2", + "authors": [], + "document_type": "ARTICLE", + "doi": "10.0000/1234", + "funders": [], + "funding_section": [], + "open_access": "True", + "publisher": "publisher", + "research_orgs": [], + "researchers": [], + "title": "A Research Article", + "type": "article", + "year": "2024", + } ] - writer.writerow(header) - writer.writerow( - [ - "a", - "1", - "[]", - "ARTICLE", - "10.0000/aaaa", - "[]", - "[]", - "True", - "publisher", - "[]", - "[]", - "A Publication", - "article", - "2024", - ] - ) - writer.writerow( - [ - "b", - "2", - "[]", - "ARTICLE", - "10.0000/1234", - "[]", - "[]", - "True", - "publisher", - "[]", - "[]", - "A Research Article", - "article", - "2024", - ] - ) + ) + return fixture_file @pytest.fixture -def openalex_pubs_csv(tmp_path): - fixture_file = tmp_path / "openalex-pubs.csv" - with open(fixture_file, "w") as csvfile: - writer = csv.writer(csvfile) - header = [ - "bogus", - "apc_paid", - "authorships", - "grants", - "publication_year", - "title", - "type", - "doi", +def openalex_pubs_jsonl(tmp_path): + fixture_file = tmp_path / "openalex-pubs.jsonl" + write_jsonl( + fixture_file, + [ + { + "bogus": "blah", + "apc_paid": 10, + "authorships": [], + "grants": [], + "publication_year": "2024", + "title": "A Publication", + "type": "article", + "doi": "https://doi.org/10.0000/cccc" + }, + { + "bogus": "blah", + "apc_paid": 0, + "authorships": [], + "grants": [], + "publication_year": "2024", + "title": "A Research Article", + "type": "article", + "doi": "https://doi.org/10.0000/1234", + } ] - writer.writerow(header) - writer.writerow( - [ - "blah", - 10, - "[]", - "[]", - "2024", - "A Publication", - "article", - "https://doi.org/10.0000/cccc", - ] - ) - writer.writerow( - [ - "blah", - 0, - "[]", - "[]", - "2024", - "A Research Article", - "article", - "https://doi.org/10.0000/1234", - ] - ) + ) + return fixture_file @pytest.fixture -def sul_pubs_csv(tmp_path): - fixture_file = tmp_path / "sulpub.csv" - with open(fixture_file, "w") as csvfile: - writer = csv.writer(csvfile) - header = ["authorship", "title", "year", "doi"] - writer.writerow(header) - writer.writerow(["[]", "A Publication", "2024", "10.0000/cccc"]) - writer.writerow( - [ - "[]", - "A Research Article", - "2024", - ] - ) - writer.writerow( - [ - "[]", - "A Published Research Article", - "2024", - "https://doi.org/10.0000/dddd", - ] - ) +def sul_pubs_jsonl(tmp_path): + fixture_file = tmp_path / "sulpub.jsonl" + write_jsonl( + fixture_file, + [ + { + "authorship": [], + "title": "A Publication", + "year": "2024", + "doi": "10.0000/cccc" + }, + { + "authorship": [], + "title": "A Research Article", + "year": "2024" + }, + { + "authorship": [], + "title": "A Published Research Article", + "year": "2024", + "doi": "https://doi.org/10.0000/dddd", + } + ] + ) + return fixture_file -def test_dimensions_pubs_df(dimensions_pubs_csv): - lazy_df = merge_pubs.dimensions_pubs_df(dimensions_pubs_csv) +def test_dimensions_pubs_df(dimensions_pubs_jsonl): + lazy_df = merge_pubs.dimensions_pubs_df(dimensions_pubs_jsonl) assert type(lazy_df) == pl.lazyframe.frame.LazyFrame df = lazy_df.collect() assert df.shape[0] == 2 @@ -145,8 +120,8 @@ def test_dimensions_pubs_df(dimensions_pubs_csv): assert df["dim_doi"].to_list() == ["10.0000/aaaa", "10.0000/1234"] -def test_openalex_pubs_df(openalex_pubs_csv): - lazy_df = merge_pubs.openalex_pubs_df(openalex_pubs_csv) +def test_openalex_pubs_df(openalex_pubs_jsonl): + lazy_df = merge_pubs.openalex_pubs_df(openalex_pubs_jsonl) assert type(lazy_df) == pl.lazyframe.frame.LazyFrame df = lazy_df.collect() assert df.shape[0] == 2 @@ -154,8 +129,8 @@ def test_openalex_pubs_df(openalex_pubs_csv): assert df["openalex_doi"].to_list() == ["10.0000/cccc", "10.0000/1234"] -def test_sulpub_df(sul_pubs_csv): - lazy_df = merge_pubs.sulpub_df(sul_pubs_csv) +def test_sulpub_df(sul_pubs_jsonl): + lazy_df = merge_pubs.sulpub_df(sul_pubs_jsonl) assert type(lazy_df) == pl.lazyframe.frame.LazyFrame df = lazy_df.collect() assert df.shape[0] == 2, "Row without a doi has been dropped" @@ -168,9 +143,9 @@ def test_sulpub_df(sul_pubs_csv): assert df["sul_pub_doi"].to_list() == ["10.0000/cccc", "10.0000/dddd"] -def test_merge(tmp_path, sul_pubs_csv, openalex_pubs_csv, dimensions_pubs_csv): +def test_merge(tmp_path, sul_pubs_jsonl, openalex_pubs_jsonl, dimensions_pubs_jsonl): output = tmp_path / "merged_pubs.parquet" - merge_pubs.merge(sul_pubs_csv, openalex_pubs_csv, dimensions_pubs_csv, output) + merge_pubs.merge(sul_pubs_jsonl, openalex_pubs_jsonl, dimensions_pubs_jsonl, output) assert output.is_file(), "output file has been created" df = pl.read_parquet(output) assert df.shape[0] == 4 diff --git a/test/harvest/test_openalex.py b/test/harvest/test_openalex.py index fdd054c..1bea165 100644 --- a/test/harvest/test_openalex.py +++ b/test/harvest/test_openalex.py @@ -55,13 +55,13 @@ def test_publications_from_dois(): assert len(pubs[1].keys()) == 51, "second publication has 51 columns" -def test_publications_csv(tmp_path): - pubs_csv = tmp_path / "openalex-pubs.csv" - openalex.publications_csv( - ["10.48550/arxiv.1706.03762", "10.1145/3442188.3445922"], pubs_csv +def test_publications_jsonl(tmp_path): + pubs_jsonl = tmp_path / "openalex-pubs.jsonl" + openalex.publications_jsonl( + ["10.48550/arxiv.1706.03762", "10.1145/3442188.3445922"], pubs_jsonl ) - df = pandas.read_csv(pubs_csv) + df = pandas.read_json(pubs_jsonl, orient="records", lines=True) assert len(df) == 2 diff --git a/test/harvest/test_sul_pub.py b/test/harvest/test_sul_pub.py index 7fbad27..367d776 100644 --- a/test/harvest/test_sul_pub.py +++ b/test/harvest/test_sul_pub.py @@ -4,7 +4,7 @@ import pandas import pytest -from rialto_airflow.harvest.sul_pub import sul_pub_csv +from rialto_airflow.harvest import sul_pub dotenv.load_dotenv() @@ -15,12 +15,12 @@ @pytest.mark.skipif(no_auth, reason="no sul_pub key") -def test_sul_pub_csv(tmpdir): - csv_file = tmpdir / "sul_pub.csv" - sul_pub_csv(csv_file, sul_pub_host, sul_pub_key, limit=2000) - assert csv_file.isfile() +def test_publications_jsonl(tmpdir): + jsonl_file = tmpdir / "sul_pub.jsonl" + sul_pub.publications_jsonl(jsonl_file, sul_pub_host, sul_pub_key, limit=2000) + assert jsonl_file.isfile() - df = pandas.read_csv(csv_file) + df = pandas.read_json(jsonl_file, orient='records', lines=True) assert len(df) == 2000 assert "title" in df.columns