From 7a5ef757e421e713424d1aec5b422484ed7ec950 Mon Sep 17 00:00:00 2001 From: Michael Terry Date: Wed, 21 Dec 2022 12:59:55 -0500 Subject: [PATCH] fix: don't load all of an i2b2 file into memory The primary change in this commit is to stop loading i2b2 input files all at once, but rather stream them in, in chunks determined by the --batch-size parameter. But this commit also includes several small fixes: - Fixes location of MS tool during CI - Adds comma-formatting to a lot of progress-count prints - Continues ETL even if cTAKES can't process one message (just logs the error instead) - Changes default batch size from 10M to 200k. This works more reliably for small-memory (8G) machines. The previous number was optimized for the size of the resulting parquet files. This number is optimized for memory during the run, which feels like a safer default. - When using --input-format=ndjson and pointing at a local folder, we now still use a temporary folder and copy in just the resource ndjson files we want. This is to speed up the MS deid tool, so it doesn't have to read all possible ndjson inputs. - Add better progress messaging while reading i2b2 files. - Separate out race & ethnicity from i2b2, which combines them --- .github/workflows/ci.yaml | 2 +- cumulus/common.py | 15 ---- cumulus/config.py | 2 +- cumulus/ctakes.py | 6 +- cumulus/deid/mstool.py | 3 +- cumulus/etl.py | 6 +- cumulus/loaders/fhir/fhir_ndjson.py | 20 ++--- cumulus/loaders/i2b2/external_mappings.py | 75 +++++++++++++++++++ cumulus/loaders/i2b2/extract.py | 64 +++++++--------- cumulus/loaders/i2b2/loader.py | 21 ++++-- cumulus/loaders/i2b2/oracle/query.py | 2 +- .../i2b2/resources/external_mappings.py | 59 --------------- cumulus/loaders/i2b2/transform.py | 39 +++++----- docs/howtos/run-cumulus-etl.md | 2 +- tests/test_etl.py | 6 +- tests/test_i2b2_oracle_extract.py | 2 +- tests/test_i2b2_oracle_query.py | 2 +- 17 files changed, 170 insertions(+), 156 deletions(-) create mode 100644 cumulus/loaders/i2b2/external_mappings.py delete mode 100644 cumulus/loaders/i2b2/resources/external_mappings.py diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index c786ea41..a07d9c8a 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -36,7 +36,7 @@ jobs: --runtime=linux-x64 \ --configuration=Release \ -p:PublishSingleFile=true \ - --output=~/.local/bin \ + --output=$HOME/.local/bin \ mstool/FHIR/src/Microsoft.Health.Fhir.Anonymizer.R4.CommandLineTool - name: Test with pytest diff --git a/cumulus/common.py b/cumulus/common.py index 75e0024f..79556cf5 100644 --- a/cumulus/common.py +++ b/cumulus/common.py @@ -9,7 +9,6 @@ from urllib.parse import urlparse import fsspec -import pandas from fhirclient.models.resource import Resource from fhirclient.models.fhirabstractbase import FHIRAbstractBase @@ -58,20 +57,6 @@ def find_by_name(folder, path_contains='filemask', progress_bar=1000) -> list: return found -def extract_csv(path_csv: str, sample=1.0) -> pandas.DataFrame: - """ - :param path_csv: /path/to/file.csv - :param sample: %percentage of file to read - :return: pandas Dataframe - """ - logging.info('Reading csv %s ...', path_csv) - df = pandas.read_csv(path_csv, dtype=str, na_filter=False) - if sample != 1.0: - df = df.sample(frac=sample) - logging.info('Done reading %s .', path_csv) - return df - - def fake_id(category: str) -> str: """ Randomly generate a linked Patient identifier diff --git a/cumulus/config.py b/cumulus/config.py index 48616399..d0946bd7 100644 --- a/cumulus/config.py +++ b/cumulus/config.py @@ -87,7 +87,7 @@ def success_rate(self, show_every=1000 * 10) -> float: prct = float(self.success) / float(self.attempt) if 0 == self.attempt % show_every: - print(f'success = {self.success} rate % {prct}') + print(f'success = {self.success:,} rate % {prct}') return prct diff --git a/cumulus/ctakes.py b/cumulus/ctakes.py index fa130a2f..c32eaaca 100644 --- a/cumulus/ctakes.py +++ b/cumulus/ctakes.py @@ -42,7 +42,11 @@ def symptoms(cache: store.Root, docref: DocumentReference) -> List[Observation]: logging.warning('No text/plain content for symptoms') # ideally would print identifier, but it's PHI... return [] - ctakes_json = extract(cache, physician_note) + try: + ctakes_json = extract(cache, physician_note) + except Exception as exc: # pylint: disable=broad-except + logging.error('Could not extract symptoms: %s', exc) + return [] observations = [] for match in ctakes_json.list_sign_symptom(ctakesclient.typesystem.Polarity.pos): diff --git a/cumulus/deid/mstool.py b/cumulus/deid/mstool.py index a9d79230..187e7bb0 100644 --- a/cumulus/deid/mstool.py +++ b/cumulus/deid/mstool.py @@ -8,7 +8,7 @@ import subprocess # nosec: B404 import sys -from cumulus import errors +from cumulus import common, errors MSTOOL_CMD = 'Microsoft.Health.Fhir.Anonymizer.R4.CommandLineTool' @@ -23,6 +23,7 @@ def run_mstool(input_dir: str, output_dir: str) -> None: The input must be in ndjson format. And the output will be as well. """ + common.print_header('De-identifying data...') try: # The following call only points at some temporary directory names (which we generate), # so it should be safe, and we thus disable the security linter warning about validating inputs. diff --git a/cumulus/etl.py b/cumulus/etl.py index 88a0a95c..853d566a 100644 --- a/cumulus/etl.py +++ b/cumulus/etl.py @@ -366,9 +366,9 @@ def main(args: List[str]): help='input format (default is ndjson)') parser.add_argument('--output-format', default='parquet', choices=['json', 'ndjson', 'parquet'], help='output format (default is parquet)') - parser.add_argument('--batch-size', type=int, metavar='SIZE', default=10000000, + parser.add_argument('--batch-size', type=int, metavar='SIZE', default=200000, help='how many entries to process at once and thus ' - 'how many to put in one output file (default is 10M)') + 'how many to put in one output file (default is 200k)') parser.add_argument('--comment', help='add the comment to the log file') parser.add_argument('--s3-region', help='if using S3 paths (s3://...), this is their region') parser.add_argument('--s3-kms-key', help='if using S3 paths (s3://...), this is the KMS key ID to use') @@ -397,7 +397,7 @@ def main(args: List[str]): job_datetime = common.datetime_now() # grab timestamp before we do anything if args.input_format == 'i2b2': - config_loader = loaders.I2b2Loader(root_input) + config_loader = loaders.I2b2Loader(root_input, args.batch_size) else: config_loader = loaders.FhirNdjsonLoader(root_input, client_id=args.smart_client_id, jwks=args.smart_jwks) diff --git a/cumulus/loaders/fhir/fhir_ndjson.py b/cumulus/loaders/fhir/fhir_ndjson.py index da109a28..8aacb0f2 100644 --- a/cumulus/loaders/fhir/fhir_ndjson.py +++ b/cumulus/loaders/fhir/fhir_ndjson.py @@ -35,16 +35,18 @@ def load_all(self, resources: List[str]) -> tempfile.TemporaryDirectory: if self.root.protocol in ['http', 'https']: return self._load_from_bulk_export(resources) - # Are we reading from a local directory? - if self.root.protocol == 'file': - # We can actually just re-use the input dir without copying the files, since everything is local. - class Dir: - name: str = self.root.path - return Dir() # once we drop python3.7, we can have load_all return a Protocol for proper typing - - # Fall back to copying from a remote directory (like S3 buckets) to a local one + # Copy the resources we need from the remote directory (like S3 buckets) to a local one. + # + # We do this even if the files are local, because the next step in our pipeline is the MS deid tool, + # and it will just process *everything* in a directory. So if there are other *.ndjson sitting next to our + # target resources, they'll get processed by the MS tool and that slows down running a single task with + # "--task" a lot. + # + # This uses more disk space temporarily (copied files will get deleted once the MS tool is done and this + # TemporaryDirectory gets discarded), but that seems reasonable. tmpdir = tempfile.TemporaryDirectory() # pylint: disable=consider-using-with - self.root.get(self.root.joinpath('*.ndjson'), f'{tmpdir.name}/') + for resource in resources: + self.root.get(self.root.joinpath(f'*{resource}*.ndjson'), f'{tmpdir.name}/') return tmpdir def _load_from_bulk_export(self, resources: List[str]) -> tempfile.TemporaryDirectory: diff --git a/cumulus/loaders/i2b2/external_mappings.py b/cumulus/loaders/i2b2/external_mappings.py new file mode 100644 index 00000000..c744def8 --- /dev/null +++ b/cumulus/loaders/i2b2/external_mappings.py @@ -0,0 +1,75 @@ +# This file contains a mapping of various external coding systems to the concepts they represent + + +# PHIN VADS 1000-9, Race & Ethnicity - CDC +# https://phinvads.cdc.gov/vads/ViewCodeSystemConcept.action?oid=2.16.840.1.113883.6.238&code=1000-9 +# https://hl7.org/fhir/us/core/StructureDefinition-us-core-race.html +CDC_RACE = { + 'White': ('urn:oid:2.16.840.1.113883.6.238', '2106-3'), + 'Black or African American': ('urn:oid:2.16.840.1.113883.6.238', '2054-5'), + 'American Indian or Alaska Native': ('urn:oid:2.16.840.1.113883.6.238', '1002-5'), + 'Asian': ('urn:oid:2.16.840.1.113883.6.238', '2028-9'), + 'Native Hawaiian or Other Pacific Islander': ('urn:oid:2.16.840.1.113883.6.238', '2076-8'), + 'Other': ('urn:oid:2.16.840.1.113883.6.238', '2131-1'), + 'Declined to Answer': ('http://terminology.hl7.org/CodeSystem/v3-NullFlavor', 'ASKU'), + 'Unable to Answer': ('http://terminology.hl7.org/CodeSystem/v3-NullFlavor', 'ASKU'), + 'Unknown': ('http://terminology.hl7.org/CodeSystem/v3-NullFlavor', 'UNK'), +} + +# https://hl7.org/fhir/us/core/StructureDefinition-us-core-ethnicity.html +CDC_ETHNICITY = { + 'Hispanic or Latino': ('urn:oid:2.16.840.1.113883.6.238', '2135-2'), + 'Not Hispanic or Latino': ('urn:oid:2.16.840.1.113883.6.238', '2186-5'), + 'Declined to Answer': ('http://terminology.hl7.org/CodeSystem/v3-NullFlavor', 'ASKU'), + 'Unable to Answer': ('http://terminology.hl7.org/CodeSystem/v3-NullFlavor', 'ASKU'), + 'Unknown': ('http://terminology.hl7.org/CodeSystem/v3-NullFlavor', 'UNK'), +} + +# FHIR AdministrativeGender code (not a full gender spectrum, but quite limited) +# https://www.hl7.org/fhir/valueset-administrative-gender.html +# Anything not in this dictionary maps should map to 'other' +FHIR_GENDER = { + 'F': 'female', + 'M': 'male', + 'U': 'unknown', +} + + +# BCH internal lab codes mapping to international covid-19 codes +# system: http://loinc.org +LOINC_COVID_LAB_TESTS = { + 'LAB:1043473617': '94500-6', + 'LAB:1044804335': '94500-6', + 'LAB:1044704735': '94500-6', + 'LAB:1134792565': '95406-5', + 'LAB:1148157467': '95406-5', + 'LAB:467288722': '85477-8', + 'LAB:152831642': '85476-0', + 'LAB:467288694': '85478-6', + 'LAB:467288700': '85479-4', + 'LAB:13815125': '62462-7' +} + + +# PHIN VADS General adjectival modifier (qualifier value) {106234000 , SNOMED-CT } +# Subset of codes related to evaluating lab results +# system: http://snomed.info/sct +SNOMED_LAB_RESULT = { + 'Positive': '10828004', + 'Negative': '260385009', + 'Absent': '272519000' +} + + +# PHIN VADS Admission statuses {308277006, SNOMED-CT } +# Subset of codes related to means of admission +# system: http://snomed.info/sct +# https://terminology.hl7.org/5.0.0/ValueSet-v3-ActEncounterCode.html +# Other terms seen: +# - "Recurring Outpatient Series" (AMB?) +# - "Day Surgery" (AMB?) +SNOMED_ADMISSION = { + 'Emergency': 'EMER', + 'Inpatient': 'IMP', + 'Outpatient': 'AMB', +} diff --git a/cumulus/loaders/i2b2/extract.py b/cumulus/loaders/i2b2/extract.py index 47bbf889..1f1feb79 100644 --- a/cumulus/loaders/i2b2/extract.py +++ b/cumulus/loaders/i2b2/extract.py @@ -1,68 +1,58 @@ """Read files into data structures""" -from typing import List import logging +from typing import Iterator + import pandas -from cumulus import common + from cumulus.loaders.i2b2.schema import ObservationFact, PatientDimension, VisitDimension -def extract_csv(path_csv: str, sample=1.0) -> pandas.DataFrame: +def extract_csv(path_csv: str, batch_size: int) -> Iterator[dict]: """ :param path_csv: /path/to/i2b2_formatted_file.csv - :param sample: %percentage of file to read - :return: pandas Dataframe + :param batch_size: how many entries to load into memory at once + :return: an iterator over each row from the file """ - return common.extract_csv(path_csv, sample) + print(f'Reading csv {path_csv}...') + count = 0 + with pandas.read_csv(path_csv, dtype=str, na_filter=False, chunksize=batch_size) as reader: + for chunk in reader: + print(f' Read {count:,} entries...') + for _, row in chunk.iterrows(): + yield dict(row) + count += batch_size + print(f'Done reading {path_csv} .') -def extract_csv_observation_facts(path_csv: str, - sample=1.0) -> List[ObservationFact]: +def extract_csv_observation_facts(path_csv: str, batch_size: int) -> Iterator[ObservationFact]: """ :param path_csv: /path/to/file.csv - :param sample: %percentage of file to read + :param batch_size: how many entries to load into memory at once :return: i2b2 ObservationFact table """ - df = extract_csv(path_csv, sample) - logging.info('Transforming text into List[ObservationFact]') - facts = [] - for _, row in df.iterrows(): - facts.append(ObservationFact(row)) - - logging.info('Ready List[ObservationFact]') - return facts + for row in extract_csv(path_csv, batch_size): + yield ObservationFact(row) -def extract_csv_patients(path_csv: str, sample=1.0) -> List[PatientDimension]: +def extract_csv_patients(path_csv: str, batch_size: int) -> Iterator[PatientDimension]: """ :param path_csv: /path/to/file.csv - :param sample: %percentage of file to read + :param batch_size: how many entries to load into memory at once :return: List i2b2 patient dimension table """ - df = extract_csv(path_csv, sample) - logging.info('Transforming text into List[PatientDimension]') - patients = [] - for _, row in df.iterrows(): - patients.append(PatientDimension(row)) - - logging.info('Ready List[PatientDimension]') - return patients + for row in extract_csv(path_csv, batch_size): + yield PatientDimension(row) -def extract_csv_visits(path_csv: str, sample=1.0) -> List[VisitDimension]: +def extract_csv_visits(path_csv: str, batch_size: int) -> Iterator[VisitDimension]: """ :param path_csv: /path/to/file.csv - :param sample: %percentage of file to read + :param batch_size: how many entries to load into memory at once :return: List i2b2 visit dimension table """ - df = extract_csv(path_csv, sample) - logging.info('Transforming text into List[VisitDimension]') - visits = [] - for _, row in df.iterrows(): - visits.append(VisitDimension(row)) - - logging.info('Ready List[VisitDimension]') - return visits + for row in extract_csv(path_csv, batch_size): + yield VisitDimension(row) diff --git a/cumulus/loaders/i2b2/loader.py b/cumulus/loaders/i2b2/loader.py index 31899084..43214fdd 100644 --- a/cumulus/loaders/i2b2/loader.py +++ b/cumulus/loaders/i2b2/loader.py @@ -8,6 +8,7 @@ from fhirclient.models.resource import Resource +from cumulus import store from cumulus.loaders.base import Loader from cumulus.loaders.i2b2 import extract, schema, transform from cumulus.loaders.i2b2.oracle import extract as oracle_extract @@ -31,6 +32,15 @@ class I2b2Loader(Loader): - csv_visit """ + def __init__(self, root: store.Root, batch_size: int): + """ + Initialize a new I2b2Loader class + :param root: the base location to read data from + :param batch_size: the most entries to keep in memory at once + """ + super().__init__(root) + self.batch_size = batch_size + def load_all(self, resources: List[str]) -> tempfile.TemporaryDirectory: if self.root.protocol in ['tcp']: return self._load_all_from_oracle(resources) @@ -117,13 +127,14 @@ def _load_all_from_csv(self, resources: List[str]) -> tempfile.TemporaryDirector return self._load_all_with_extractors( resources, conditions=partial(extract.extract_csv_observation_facts, - os.path.join(path, 'observation_fact_diagnosis.csv')), + os.path.join(path, 'observation_fact_diagnosis.csv'), self.batch_size), observations=partial(extract.extract_csv_observation_facts, - os.path.join(path, 'observation_fact_lab_views.csv')), + os.path.join(path, 'observation_fact_lab_views.csv'), self.batch_size), documentreferences=partial(extract.extract_csv_observation_facts, - os.path.join(path, 'observation_fact_notes.csv')), - patients=partial(extract.extract_csv_patients, os.path.join(path, 'patient_dimension.csv')), - encounters=partial(extract.extract_csv_visits, os.path.join(path, 'visit_dimension.csv')), + os.path.join(path, 'observation_fact_notes.csv'), self.batch_size), + patients=partial(extract.extract_csv_patients, os.path.join(path, 'patient_dimension.csv'), + self.batch_size), + encounters=partial(extract.extract_csv_visits, os.path.join(path, 'visit_dimension.csv'), self.batch_size), ) ################################################################################################################### diff --git a/cumulus/loaders/i2b2/oracle/query.py b/cumulus/loaders/i2b2/oracle/query.py index 6a8e7710..a9db4728 100644 --- a/cumulus/loaders/i2b2/oracle/query.py +++ b/cumulus/loaders/i2b2/oracle/query.py @@ -37,7 +37,7 @@ def sql_visit() -> str: import_date = format_date('IMPORT_DATE') cols_dates = f'{start_date}, {end_date}, {import_date}, LENGTH_OF_STAY' - cols = 'ENCOUNTER_NUM, PATIENT_NUM, LOCATION_CD, INOUT_CD, LOCATION_CD, ' \ + cols = 'ENCOUNTER_NUM, PATIENT_NUM, LOCATION_CD, INOUT_CD, ' \ f'{cols_dates}' return f'select {cols} \n from {Table.visit.value}' diff --git a/cumulus/loaders/i2b2/resources/external_mappings.py b/cumulus/loaders/i2b2/resources/external_mappings.py deleted file mode 100644 index 5da353c8..00000000 --- a/cumulus/loaders/i2b2/resources/external_mappings.py +++ /dev/null @@ -1,59 +0,0 @@ -# This file contains a mapping of various external coding systems to the concepts they represent - - -# PHIN VADS 1000-9, Race & Ethnicity - CDC -# https://phinvads.cdc.gov/vads/ViewCodeSystemConcept.action?oid=2.16.840.1.113883.6.238&code=1000-9 -CDC_RACE = { - 'White': '2106-3', - 'Black or African American': '2054-5', - 'American Indian or Alaska Native': '1002-5', - 'Asian': '2028-9', - 'Native Hawaiian or Other Pacific Islander': '2076-8', - 'Hispanic or Latino': '2135-2', - 'Not Hispanic or Latino': '2186-5' -} - - -# FHIR AdministrativeGender code (not a full gender spectrum, but quite limited) -# https://www.hl7.org/fhir/valueset-administrative-gender.html -# Anything not in this dictionary maps should map to 'other' -FHIR_GENDER = { - 'F': 'female', - 'M': 'male', - 'U': 'unknown', -} - - -# BCH internal lab codes mapping to international covid-19 codes -# system: http://loinc.org -LOINC_COVID_LAB_TESTS = { - 'LAB:1043473617': '94500-6', - 'LAB:1044804335': '94500-6', - 'LAB:1044704735': '94500-6', - 'LAB:1134792565': '95406-5', - 'LAB:1148157467': '95406-5', - 'LAB:467288722': '85477-8', - 'LAB:152831642': '85476-0', - 'LAB:467288694': '85478-6', - 'LAB:467288700': '85479-4', - 'LAB:13815125': '62462-7' -} - - -# PHIN VADS General adjectival modifier (qualifier value) {106234000 , SNOMED-CT } -# Subset of codes related to evaluating lab results -# system: http://snomed.info/sct -SNOMED_LAB_RESULT = { - 'Positive': '10828004', - 'Negative': '260385009', - 'Absent': '272519000' -} - - -# PHIN VADS Admission statuses {308277006, SNOMED-CT } -# Subset of codes related to means of admition -# system: http://snomed.info/sct -SNOMED_ADMISSION = { - 'Inpatient': 'IMP', - 'Emergency': 'EMER' -} diff --git a/cumulus/loaders/i2b2/transform.py b/cumulus/loaders/i2b2/transform.py index 3618c7c3..5f2573ec 100644 --- a/cumulus/loaders/i2b2/transform.py +++ b/cumulus/loaders/i2b2/transform.py @@ -20,7 +20,7 @@ from fhirclient.models.period import Period from cumulus import fhir_common -from cumulus.loaders.i2b2.resources import external_mappings +from cumulus.loaders.i2b2 import external_mappings from cumulus.loaders.i2b2.schema import PatientDimension, VisitDimension, ObservationFact @@ -58,7 +58,8 @@ def to_fhir_patient(patient: PatientDimension) -> Patient: })] if patient.race_cd: - race_code = parse_race(patient.race_cd) + # race_cd can be either a race or an ethnicity. In FHIR, those are two different extensions. + race_code = external_mappings.CDC_RACE.get(patient.race_cd) if race_code is not None: subject.extension = [Extension({ 'url': 'http://hl7.org/fhir/us/core/StructureDefinition/us-core-race', @@ -66,8 +67,24 @@ def to_fhir_patient(patient: PatientDimension) -> Patient: { 'url': 'ombCategory', 'valueCoding': { - 'system': 'urn:oid:2.16.840.1.113883.6.238', - 'code': race_code, + 'system': race_code[0], + 'code': race_code[1], + 'display': patient.race_cd, + }, + }, + ], + })] + + ethnicity_code = external_mappings.CDC_ETHNICITY.get(patient.race_cd) + if ethnicity_code is not None: + subject.extension = [Extension({ + 'url': 'http://hl7.org/fhir/us/core/StructureDefinition/us-core-ethnicity', + 'extension': [ + { + 'url': 'ombCategory', + 'valueCoding': { + 'system': ethnicity_code[0], + 'code': ethnicity_code[1], 'display': patient.race_cd, }, }, @@ -115,9 +132,7 @@ def to_fhir_encounter(visit: VisitDimension) -> Encounter: 'code': external_mappings.SNOMED_ADMISSION.get(visit.inout_cd) }) else: - logging.warning( - 'skipping encounter.class_fhir.code for i2b2 ' - 'INOUT_CD : %s', visit.inout_cd) + logging.debug('skipping encounter.class_fhir.code for i2b2 INOUT_CD : %s', visit.inout_cd) return encounter @@ -268,16 +283,6 @@ def parse_gender(i2b2_sex_cd) -> Optional[str]: return external_mappings.FHIR_GENDER.get(i2b2_sex_cd, 'other') -def parse_race(i2b2_race_cd) -> Optional[str]: - """ - :param i2b2_race_cd: - :return: CDC R5 Race codes or None - """ - if i2b2_race_cd and isinstance(i2b2_race_cd, str): - if i2b2_race_cd in external_mappings.CDC_RACE: - return external_mappings.CDC_RACE[i2b2_race_cd] - - def parse_fhir_duration(i2b2_length_of_stay) -> float: """ :param i2b2_length_of_stay: usually an integer like "days" diff --git a/docs/howtos/run-cumulus-etl.md b/docs/howtos/run-cumulus-etl.md index 3c5baccb..ec4ec44d 100644 --- a/docs/howtos/run-cumulus-etl.md +++ b/docs/howtos/run-cumulus-etl.md @@ -179,7 +179,7 @@ docker compose -f $CUMULUS_REPO_PATH/compose.yaml run --rm\ --comment="Any interesting logging data you like, like which user launched this" \ --input-format=ndjson \ --output-format=parquet \ - --batch-size=10000000 \ + --batch-size=200000 \ --s3-region=us-east-2 \ s3://my-us-east-2-input-bucket/ \ s3://my-cumulus-prefix-99999999999-us-east-2/subdir1/ \ diff --git a/tests/test_etl.py b/tests/test_etl.py index 79ec4937..739aa1c4 100644 --- a/tests/test_etl.py +++ b/tests/test_etl.py @@ -204,7 +204,7 @@ def test_unknown_task(self): def test_single_task(self): # Grab all observations before we mock anything - observations = loaders.I2b2Loader(store.Root(self.input_path)).load_all(['Observation']) + observations = loaders.I2b2Loader(store.Root(self.input_path), 5).load_all(['Observation']) def fake_load_all(internal_self, resources): del internal_self @@ -221,7 +221,7 @@ def fake_load_all(internal_self, resources): def test_multiple_tasks(self): # Grab all observations before we mock anything - loaded = loaders.I2b2Loader(store.Root(self.input_path)).load_all(['Observation', 'Patient']) + loaded = loaders.I2b2Loader(store.Root(self.input_path), 5).load_all(['Observation', 'Patient']) def fake_load_all(internal_self, resources): del internal_self @@ -439,7 +439,7 @@ def test_stores_cached_json(self): self.run_etl(output_format='parquet') notes_csv_path = os.path.join(self.input_path, 'observation_fact_notes.csv') - facts = extract.extract_csv_observation_facts(notes_csv_path) + facts = list(extract.extract_csv_observation_facts(notes_csv_path, 5)) for index, checksum in enumerate(self.expected_checksums): self.assertEqual( diff --git a/tests/test_i2b2_oracle_extract.py b/tests/test_i2b2_oracle_extract.py index cfd03d15..eb67a271 100644 --- a/tests/test_i2b2_oracle_extract.py +++ b/tests/test_i2b2_oracle_extract.py @@ -79,7 +79,7 @@ def test_loader(self, mock_extract): mock_extract.list_visit.return_value = [i2b2_mock_data.encounter_dim()] root = store.Root('tcp://localhost/foo') - oracle_loader = loader.I2b2Loader(root) + oracle_loader = loader.I2b2Loader(root, 5) tmpdir = oracle_loader.load_all(['Condition', 'Encounter', 'Patient']) # Check results diff --git a/tests/test_i2b2_oracle_query.py b/tests/test_i2b2_oracle_query.py index 31d67e2f..a507c22a 100644 --- a/tests/test_i2b2_oracle_query.py +++ b/tests/test_i2b2_oracle_query.py @@ -57,7 +57,7 @@ def test_sql_visit(self): pretty(query.sql_visit() + query.limit(20)) pretty(query.count_by_date_group(schema.Table.visit)) self.assertEqual( - 'select ENCOUNTER_NUM, PATIENT_NUM, LOCATION_CD, INOUT_CD, LOCATION_CD, ' + 'select ENCOUNTER_NUM, PATIENT_NUM, LOCATION_CD, INOUT_CD, ' "to_char(cast(START_DATE as date), 'YYYY-MM-DD') as START_DATE, " "to_char(cast(END_DATE as date), 'YYYY-MM-DD') as END_DATE, " "to_char(cast(IMPORT_DATE as date), 'YYYY-MM-DD') as IMPORT_DATE, "