diff --git a/ruff.toml b/ruff.toml index 3a3d2ae..20707d6 100644 --- a/ruff.toml +++ b/ruff.toml @@ -45,7 +45,10 @@ convention = "google" [per-file-ignores] "__init__.py" = ["F401"] "__main__.py" = ["B008", "S101"] -"./**/tests/*.py" = ["S101"] # Use of assert detected. +"./**/tests/*.py" = [ + "S101", # Use of assert detected. + "PLR2004", # Use of magic value in comparison. +] [isort] force-single-line = true diff --git a/transforms/tabular-merger-tool/.bumpversion.cfg b/transforms/tabular-merger-tool/.bumpversion.cfg new file mode 100644 index 0000000..8c84a0a --- /dev/null +++ b/transforms/tabular-merger-tool/.bumpversion.cfg @@ -0,0 +1,27 @@ +[bumpversion] +current_version = 0.1.3-dev0 +commit = True +tag = False +parse = (?P\d+)\.(?P\d+)\.(?P\d+)(\-(?P[a-z]+)(?P\d+))? +serialize = + {major}.{minor}.{patch}-{release}{dev} + {major}.{minor}.{patch} + +[bumpversion:part:release] +optional_value = _ +first_value = dev +values = + dev + _ + +[bumpversion:part:dev] + +[bumpversion:file:pyproject.toml] +search = version = "{current_version}" +replace = version = "{new_version}" + +[bumpversion:file:plugin.json] + +[bumpversion:file:VERSION] + +[bumpversion:file:src/polus/images/transforms/tabular/tabular_merger/__init__.py] diff --git a/transforms/tabular-merger-tool/Dockerfile b/transforms/tabular-merger-tool/Dockerfile new file mode 100755 index 0000000..a851ae2 --- /dev/null +++ b/transforms/tabular-merger-tool/Dockerfile @@ -0,0 +1,21 @@ +FROM polusai/bfio:2.1.9 + +# environment variables defined in polusai/bfio +ENV EXEC_DIR="/opt/executables" +ENV POLUS_IMG_EXT=".ome.tif" +ENV POLUS_TAB_EXT=".arrow" +ENV POLUS_LOG="INFO" + +# Work directory defined in the base container +WORKDIR ${EXEC_DIR} + +COPY pyproject.toml ${EXEC_DIR} +COPY VERSION ${EXEC_DIR} +COPY README.md ${EXEC_DIR} +RUN pip3 install --index-url https://test.pypi.org/simple/ filepattern==2.2.7 +COPY src ${EXEC_DIR}/src + +RUN pip3 install ${EXEC_DIR} --no-cache-dir + +ENTRYPOINT ["python3", "-m", "polus.images.transforms.tabular.tabular_merger"] +CMD ["--help"] diff --git a/transforms/tabular-merger-tool/README.md b/transforms/tabular-merger-tool/README.md new file mode 100644 index 0000000..3d42224 --- /dev/null +++ b/transforms/tabular-merger-tool/README.md @@ -0,0 +1,52 @@ +# Tabular Merger (v0.1.0) + +This WIPP plugin merges all tabular files with vaex supported file formats into a combined file using either row or column merging. + +1. csv +2. hdf5 +3. parquet +4. feather +5. arrow + +**row merging with same headers** + +If this is a case `dim = rows` and `sameColumns`, files are assumed to have headers (column Names) in the first row. If headers are not the same between all files, It finds common headers among files and then performs row merging. An additional column with name `file` is created in the output file, and this contains the name of the original file associated with the row of data. + +**row merging without same headers** + +If this is a case `dim = rows`, In this case files can be merged even when are headers are not exactly same between all files, files that don't have a specific column header will have the column filled with 'NaN' values. An additional column with name `file` is created in the output file, and this contains the name of the original file associated with the row of data. + +**column merging with same rows** +If this is a case `dim = columns` and `sameRows`, it is assumed that all files have same number of rows. The filename is added as a prefix to each column name to avoid the duplication of column names on merging. + +**column merging with unequal rows** +If this is a case `dim = columns`. The `map_var` should be defined to join tabular files with unequal rows. The `indexcolumn` column is created from `map_var` and indexing its values in each tabular file which allows the joining of tabular files without duplication of rows. + +If `stripExtension` is set to true, then the file extensiton is removed from the file name in the `file` column. + +For more information on WIPP, visit the [official WIPP page](https://isg.nist.gov/deepzoomweb/software/wipp). + +## Building + +To build the Docker image for the conversion plugin, run +`./build-docker.sh`. + +## Install WIPP Plugin + +If WIPP is running, navigate to the plugins page and add a new plugin. Paste the contents of `plugin.json` into the pop-up window and submit. + +## Options + +This plugin takes eight input argument and one output argument: + +| Name | Description | I/O | Type | +|--------------------|------------------------------------------------------------|--------|---------------| +| `--inpDir` | Input data collection to be processed by this plugin | Input | genericData | +| `--filePattern` | Pattern to parse tabular files | Input | string | +| `--stripExtension` | Should csv be removed from the filename in the output file | Input | boolean | +| `--dim` | Perform `rows` or `columns` merger | Input | enum | +| `--sameRows` | Merge tabular files with the same number of rows? | Input | boolean | +| `--sameColumns` | Merge tabular files with the same header(Column Names) | Input | boolean | +| `--mapVar` | Column name use to merge files | Input | string | +| `--outDir` | Output file | Output | genericData | +| `--preview` | Generate JSON file with outputs | Output | JSON | diff --git a/transforms/tabular-merger-tool/VERSION b/transforms/tabular-merger-tool/VERSION new file mode 100644 index 0000000..e40c876 --- /dev/null +++ b/transforms/tabular-merger-tool/VERSION @@ -0,0 +1 @@ +0.1.3-dev0 diff --git a/transforms/tabular-merger-tool/plugin.json b/transforms/tabular-merger-tool/plugin.json new file mode 100644 index 0000000..4a64aa8 --- /dev/null +++ b/transforms/tabular-merger-tool/plugin.json @@ -0,0 +1,114 @@ +{ + "name": "Tabular Merger", + "version": "0.1.3-dev0", + "title": "Tabular Merger", + "description": "Merge vaex supported tabular file format into a single merged file.", + "author": "Nicholas Schaub (nick.schaub@nih.gov), Hamdah Shafqat Abbasi (hamdahshafqat.abbasi@nih.gov)", + "institution": "National Center for Advancing Translational Sciences, National Institutes of Health", + "repository": "https://github.com/PolusAI/polus-plugins", + "website": "https://ncats.nih.gov/preclinical/core/informatics", + "citation": "", + "containerId": "polusai/tabular-merger-tool:0.1.3-dev0", + "baseCommand": [ + "python3", + "-m", + "polus.images.transforms.tabular.tabular_merger" + ], + "inputs": [ + { + "name": "inpDir", + "type": "genericData", + "description": "Input data collection to be processed by this plugin", + "required": true + }, + { + "name": "filePattern", + "type": "string", + "description": "Pattern to parse input files", + "default": ".+", + "required": false + }, + { + "name": "stripExtension", + "type": "boolean", + "description": "Should file extension be removed for filenames in the merged file column", + "required": true + }, + { + "name": "dim", + "type": "enum", + "options": { + "values": [ + "rows", + "columns", + "default" + ] + }, + "description": "Merging dimension", + "required": true + }, + { + "name": "sameRows", + "type": "boolean", + "description": "Perform column merge on all files with the same number of rows?", + "required": false + }, + { + "name": "sameColumns", + "type": "boolean", + "description": "Perform row merge on all files with the same column names", + "required": false + }, + { + "name": "mapVar", + "type": "string", + "description": "Column name to join files column wise", + "required": false + } + ], + "outputs": [ + { + "name": "outDir", + "type": "genericData", + "description": "Output data collection" + } + ], + "ui": [ + { + "key": "inputs.inpDir", + "title": "Input collection", + "description": "Input image collection to be processed by this plugin" + }, + { + "key": "inputs.filePattern", + "title": "filePattern", + "description": "Pattern to parse input files", + "default": ".+" + }, + { + "key": "inputs.stripExtension", + "title": "Remove File Extension", + "description": "Remove file extension in the merged file column" + }, + { + "key": "inputs.dim", + "title": "Merging dimension", + "description": "Merge along rows or columns?" + }, + { + "key": "inputs.sameRows", + "title": "Merge files with equal rows:", + "description": "Merge only files with matching number of rows?" + }, + { + "key": "inputs.sameColumns", + "title": "Merge CSVs with same columns:", + "description": "Merge files with with common columns between files?" + }, + { + "key": "inputs.mapVar", + "title": "Column name use to merge files", + "description": "Column name use to merge files" + } + ] +} diff --git a/transforms/tabular-merger-tool/pyproject.toml b/transforms/tabular-merger-tool/pyproject.toml new file mode 100644 index 0000000..ae20ebf --- /dev/null +++ b/transforms/tabular-merger-tool/pyproject.toml @@ -0,0 +1,34 @@ +[tool.poetry] +name = "polus-images-transforms-tabular-tabular-merger" +version = "0.1.3-dev0" +description = "Merge vaex supported tabular file format into a single merged file." +authors = [ +"Nick Schaub ", +"Hamdah Shafqat abbasi " +] +readme = "README.md" +packages = [{include = "polus", from = "src"}] + +[tool.poetry.dependencies] +python = ">=3.9" +filepattern = "^2.0.0" +typer = "^0.7.0" +blake3 = "^0.3.3" +llvmlite = "^0.39.1" +fastapi = "^0.92.0" +astropy = "5.2.1" +vaex = "^4.17.0" +tqdm = "^4.65.0" + + +[tool.poetry.group.dev.dependencies] +bump2version = "^1.0.1" +pre-commit = "^3.1.0" +black = "^23.1.0" +flake8 = "^6.0.0" +mypy = "^1.0.1" +pytest = "^7.2.1" + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" diff --git a/transforms/tabular-merger-tool/run-plugin.sh b/transforms/tabular-merger-tool/run-plugin.sh new file mode 100644 index 0000000..faedd9b --- /dev/null +++ b/transforms/tabular-merger-tool/run-plugin.sh @@ -0,0 +1,30 @@ +#!/bin/bash +version=$( None: + """CLI for the tool.""" + logger.info(f"inpDir = {inp_dir}") + logger.info(f"outDir = {out_dir}") + logger.info(f"filePattern = {file_pattern}") + logger.info(f"stripExtension = {strip_extension}") + logger.info(f"dim= {dim}") + logger.info(f"sameRows= {same_rows}") + logger.info(f"sameColumns= {same_columns}") + logger.info(f"mapVar= {map_var}") + + inp_dir = inp_dir.resolve() + out_dir = out_dir.resolve() + + assert inp_dir.exists(), f"{inp_dir} doesnot exists!! Please check input path again" + assert ( + out_dir.exists() + ), f"{out_dir} doesnot exists!! Please check output path again" + + # By default it ingests all input files if not file_pattern is defined + file_pattern = ".*" + file_pattern + + fps = fp.FilePattern(inp_dir, file_pattern) + + if preview: + with pathlib.Path(out_dir, "preview.json").open("w") as jfile: + out_json: dict[str, Any] = { + "filepattern": file_pattern, + "outDir": [], + } + for file in fps: + out_name = str(file[1][0].name.split(".")[0]) + POLUS_TAB_EXT + out_json["outDir"].append(out_name) + json.dump(out_json, jfile, indent=2) + + inp_dir_files = [f[1][0] for f in fps] + st_time = time.time() + tm.merge_files( + inp_dir_files, + strip_extension, + dim, + same_rows, + same_columns, + map_var, + out_dir, + ) + + exec_time = time.time() - st_time + logger.info(f"Execution time: {time.strftime('%H:%M:%S', time.gmtime(exec_time))}") + logger.info("Finished Merging of files!") + + +if __name__ == "__main__": + app() diff --git a/transforms/tabular-merger-tool/src/polus/images/transforms/tabular/tabular_merger/tabular_merger.py b/transforms/tabular-merger-tool/src/polus/images/transforms/tabular/tabular_merger/tabular_merger.py new file mode 100644 index 0000000..18c653c --- /dev/null +++ b/transforms/tabular-merger-tool/src/polus/images/transforms/tabular/tabular_merger/tabular_merger.py @@ -0,0 +1,265 @@ +"""Tabular Merger.""" +import enum +import functools as ft +import logging +import os +import pathlib +from collections import Counter +from typing import Optional + +import numpy as np +import vaex +from tqdm import tqdm + +logger = logging.getLogger(__name__) +logger.setLevel(os.environ.get("POLUS_LOG", logging.INFO)) +POLUS_TAB_EXT = os.environ.get("POLUS_TAB_EXT", ".arrow") + + +class Dimensions(str, enum.Enum): + """File format of an output combined file.""" + + Rows = "rows" + Columns = "columns" + Default = "rows" + + +def sorted_dataframe_list( + x: list[vaex.dataframe.DataFrameLocal], +) -> list[vaex.dataframe.DataFrameLocal]: + """Reordering of list of dataframes based on the size. + + Args: + x: List of vaex dataframes. + + Returns: + sorted list of vaex dataFrame based on the size. + """ + my_dict = dict(zip(x, [x[i].shape[0] for i in range(len(x))])) + occurrences = dict(Counter(my_dict.values()).items()) + + for k, v in my_dict.items(): + count = occurrences[v] + if count > 1: + increment = v + 1 + my_dict[k] = increment + occurrences[v] = count - 1 + + sorted_values = sorted(my_dict.values(), reverse=True) + i = 0 + status = "Unknown" + prf = [] + while status != "true": + for o in sorted_values: + for k, v in my_dict.items(): + if v == o: + status = "true" + prf.append(k) + i += 1 + + return prf + + +def remove_files(curr_dir: pathlib.Path) -> None: + """Delete intermediate hdf5 and yaml files in a working directory. + + Args: + curr_dir: Path to the working directory. + """ + for f in curr_dir.iterdir(): + if f.suffix in [".hdf5", ".yaml"]: + f.unlink() + + +def merge_files( # noqa: PLR0915 PLR0912 PLR0913 C901 + inp_dir_files: list, + strip_extension: bool, + dim: Dimensions, + same_rows: Optional[bool], + same_columns: Optional[bool], + map_var: Optional[str], + out_dir: str, +) -> None: + """Merge several tabular data files into a single file. + + Merge tabular files with vaex supported file formats into a single combined + file using either row or column merging. + + The merged file can be saved into any of the vaex supported file format. + + Args: + inp_dir_files: List of an input files. + file_pattern : Pattern to parse input files. + strip_extension: True to remove csv from the filename in the output file. + dim: To perform merging either `rows` or `columns` wise + same_rows: Only merge csv files with the same number of rows. + same_columns: Check for common header and then perform merging of files + with common column names. + map_var: Variable Name used to join file column wise. + out_dir:Path to output directory + """ + # Generate the path to the output file + out_path = pathlib.Path(out_dir).joinpath(f"merged{POLUS_TAB_EXT}") + curr_dir = pathlib.Path(".").cwd() + + # Case One: If merging by columns and have same number of rows: + if dim == "columns" and same_rows: + logger.info("Merging data with identical number of rows...") + # Determine the number of output files, and a list of files to be merged + # in each file + dfs = [] + headers = [] + for in_file in tqdm( + inp_dir_files, + total=len(inp_dir_files), + desc="Vaex loading of file", + ): + if in_file.suffix == ".csv": + df = vaex.from_csv(in_file, chunk_size=100_000, convert=True) + [df.rename(f, in_file.stem + "_" + f) for f in list(df.columns)] + map_var = in_file.stem + "_" + map_var + else: + df = vaex.open(in_file, convert="bigdata.hdf5") + [df.rename(f, in_file.stem + "_" + f) for f in list(df.columns)] + map_var = in_file.stem + "_" + map_var + headers.append(df.get_column_names()) + dfs.append(df) + duplicate_columns = len(list(set(headers[0]).intersection(*headers))) + + if duplicate_columns == 0: + df_final = ft.reduce( + lambda left, right: left.join(right, how="left"), + dfs, + ) + df_final.export(out_path) + else: + ValueError("Duplicated column names in dataframes") + + # Case Two: If merging by columns and have different number of rows: + elif dim == "columns" and not same_rows: + if not map_var: + msg = f"mapVar name should be defined {map_var}" + raise ValueError(msg) + + dfs = [] + headers = [] + for in_file in tqdm( + inp_dir_files, + total=len(inp_dir_files), + desc="Vaex loading of file", + ): + if in_file.suffix == ".csv": + df = vaex.from_csv(in_file, chunk_size=100_000, convert=True) + [ + df.rename(f, in_file.stem + "_" + f) + for f in list(df.columns) + if f != map_var + ] + df.add_column( + "indexcolumn", + np.array( + [ + str(i) + "_" + str(p) + for i, p in zip( + range(len(df[map_var].values)), + df[map_var].values, + ) + ], + ), + ) + df.rename(map_var, in_file.stem + "_" + map_var) + else: + df = vaex.open(in_file) + [ + df.rename(f, in_file.stem + "_" + f) + for f in list(df.columns) + if f != map_var + ] + df.add_column( + "indexcolumn", + np.array( + [ + str(i) + "_" + str(p) + for i, p in zip( + range(len(df[map_var].values)), + df[map_var].values, + ) + ], + ), + ) + df.rename(map_var, in_file.stem + "_" + map_var) + headers.append(df.get_column_names()) + dfs.append(df) + dfs = sorted_dataframe_list(dfs) + duplicate_columns = len(list(set(headers[0]).intersection(*headers))) + if duplicate_columns == 1: + df_final = ft.reduce( + lambda left, right: left.join( + right, + how="left", + left_on="indexcolumn", + right_on="indexcolumn", + allow_duplication=False, + ), + dfs, + ) + df_final.export(out_path) + else: + ValueError("Duplicated column names in dataframes") + + # Case Three: Merging along rows with unique headers + elif dim == "rows" and same_columns: + # Get the column headers + logger.info("Getting all common headers in input files...") + headers = [] + for in_file in inp_dir_files: + df = vaex.open(in_file, convert="bigdata.hdf5") + headers.append(list(df.columns)) + headers = list(set(headers[0]).intersection(*headers)) + logger.info(f"Unique headers: {headers}") + logger.info("Merging the data along rows...") + dfs = [] + for in_file in tqdm( + inp_dir_files, + total=len(inp_dir_files), + desc="Vaex loading of file", + ): + if in_file.suffix == ".csv": + df = vaex.from_csv(in_file, chunk_size=100_000, convert=True) + else: + df = vaex.open(in_file, convert="bigdata.hdf5") + df = df[list(headers)] + if "file" in list(df.columns): + list(df.columns).remove("file") + outname = in_file.stem if strip_extension else in_file.name + df["file"] = np.repeat(outname, df.shape[0]) + dfs.append(df) + df_final = vaex.concat(dfs) + df_final = df_final[["file"] + [f for f in df_final.get_names() if f != "file"]] + df_final.export(out_path) + + # Case four: Merging along rows without unique headers + else: + logger.info("Merging the data along rows...") + dfs = [] + for in_file in tqdm( + inp_dir_files, + total=len(inp_dir_files), + desc="Vaex loading of file", + ): + logger.info(f"loading file {in_file}") + if in_file.suffix == ".csv": + df = vaex.from_csv(in_file, chunk_size=100_000, convert=True) + else: + df = vaex.open(in_file) + if "file" in list(df.columns): + list(df.columns).remove("file") + outname = in_file.stem if strip_extension else in_file.name + df["file"] = np.repeat(outname, df.shape[0]) + dfs.append(df) + df_final = vaex.concat(dfs) + df_final = df_final[["file"] + [f for f in df_final.get_names() if f != "file"]] + df_final.export(out_path) + + # Delete intermediate files in a working directory + remove_files(curr_dir) diff --git a/transforms/tabular-merger-tool/tests/__init__.py b/transforms/tabular-merger-tool/tests/__init__.py new file mode 100644 index 0000000..55378e1 --- /dev/null +++ b/transforms/tabular-merger-tool/tests/__init__.py @@ -0,0 +1 @@ +"""Testing Tabular Merger.""" diff --git a/transforms/tabular-merger-tool/tests/test_main.py b/transforms/tabular-merger-tool/tests/test_main.py new file mode 100644 index 0000000..90c9f58 --- /dev/null +++ b/transforms/tabular-merger-tool/tests/test_main.py @@ -0,0 +1,246 @@ +"""Testing Tabular Merger.""" +import pathlib +import string +import typing + +import filepattern as fp +import numpy as np +import pandas as pd +import pytest +import vaex +from polus.images.transforms.tabular.tabular_merger import tabular_merger as tm + + +class Generatedata: + """Generate tabular data with several different file format.""" + + def __init__( + self, + file_pattern: str, + out_name: str, + same_rows: typing.Optional[bool], + trunc_columns: typing.Optional[bool], + ) -> None: + """Define instance attributes.""" + self.data_dir = pathlib.Path.cwd().parent.joinpath("data") + + self.inp_dir = pathlib.Path(self.data_dir, "input") + if not self.inp_dir.exists(): + self.inp_dir.mkdir(exist_ok=True, parents=True) + + self.out_dir = pathlib.Path(self.data_dir, "output") + if not self.out_dir.exists(): + self.out_dir.mkdir(exist_ok=True, parents=True) + + self.file_pattern = file_pattern + self.same_rows = same_rows + self.trunc_columns = trunc_columns + self.out_name = out_name + self.df = self.create_dataframe() + + def get_inp_dir(self) -> pathlib.Path: + """Get input directory.""" + return self.inp_dir + + def get_out_dir(self) -> pathlib.Path: + """Get output directory.""" + return self.out_dir + + def create_dataframe(self) -> pd.DataFrame: + """Create Pandas dataframe.""" + df_size = 100 if self.same_rows else 200 + rng = np.random.default_rng() + letters = list(string.ascii_lowercase) + + diction_1 = { + "A": list(range(df_size)), + "B": [rng.choice(letters) for i in range(df_size)], + "C": rng.integers(low=1, high=100, size=df_size), + "D": rng.normal(0.0, 1.0, size=df_size), + } + + if self.trunc_columns: + diction_1 = {k: v for k, v in diction_1.items() if k not in ["A", "B"]} + + return pd.DataFrame(diction_1) + + def csv_func(self) -> None: + """Convert pandas dataframe to csv file format.""" + self.df.to_csv(pathlib.Path(self.inp_dir, self.out_name), index=False) + + def parquet_func(self) -> None: + """Convert pandas dataframe to parquet file format.""" + self.df.to_parquet( + pathlib.Path(self.inp_dir, self.out_name), + engine="auto", + compression=None, + ) + + def feather_func(self) -> None: + """Convert pandas dataframe to feather file format.""" + self.df.to_feather(pathlib.Path(self.inp_dir, self.out_name)) + + def arrow_func(self) -> None: + """Convert pandas dataframe to Arrow file format.""" + self.df.to_feather(pathlib.Path(self.inp_dir, self.out_name)) + + def hdf_func(self) -> None: + """Convert pandas dataframe to hdf5 file format.""" + v_df = vaex.from_pandas(self.df, copy_index=False) + v_df.export(pathlib.Path(self.inp_dir, self.out_name)) + + def __call__(self) -> None: + """To make a class callable.""" + data_ext = { + ".hdf5": self.hdf_func, + ".csv": self.csv_func, + ".parquet": self.parquet_func, + ".feather": self.feather_func, + ".arrow": self.arrow_func, + } + + return data_ext[self.file_pattern]() + + def clean_directories(self) -> None: + """Remove files.""" + for f in self.get_out_dir().iterdir(): + f.unlink() + for f in self.get_inp_dir().iterdir(): + f.unlink() + + +FILE_EXT = [[".hdf5", ".parquet", ".csv", ".feather", ".arrow"]] + + +@pytest.fixture(params=FILE_EXT) +def poly(request: pytest.FixtureRequest) -> list[str]: + """To get the parameter of the fixture.""" + return request.param + + +def test_mergingfiles_row_wise_samerows(poly: list[str]) -> None: + """Testing of merging of tabular data by rows with equal number of rows.""" + for i in poly: + d1 = Generatedata(i, out_name=f"data_1{i}", same_rows=True, trunc_columns=False) + d2 = Generatedata(i, out_name=f"data_2{i}", same_rows=True, trunc_columns=False) + d3 = Generatedata(i, out_name=f"data_3{i}", same_rows=True, trunc_columns=False) + d1() + d2() + d3() + pattern = f".*{i}" + fps = fp.FilePattern(d1.get_inp_dir(), pattern) + inp_dir_files = [f[1][0] for f in fps()] + tm.merge_files( + inp_dir_files, + strip_extension=True, + dim="rows", + same_rows=True, + same_columns=False, + map_var="A", + out_dir=d1.get_out_dir(), + ) + + outfile = [f for f in d1.get_out_dir().iterdir() if f.suffix == ".arrow"][0] + merged = vaex.open(outfile) + assert len(merged["file"].unique()) == 3 + d1.clean_directories() + + +def test_mergingfiles_row_wise_unequalrows(poly: list[str]) -> None: + """Testing of merging of tabular data by rows with unequal number of rows.""" + for i in poly: + d1 = Generatedata(i, out_name=f"data_1{i}", same_rows=True, trunc_columns=False) + d2 = Generatedata( + i, + out_name=f"data_2{i}", + same_rows=False, + trunc_columns=False, + ) + d3 = Generatedata( + i, + out_name=f"data_3{i}", + same_rows=False, + trunc_columns=False, + ) + d1() + d2() + d3() + pattern = f".*{i}" + fps = fp.FilePattern(d1.get_inp_dir(), pattern) + inp_dir_files = [f[1][0] for f in fps()] + tm.merge_files( + inp_dir_files, + strip_extension=True, + dim="rows", + same_rows=True, + same_columns=False, + map_var="A", + out_dir=d1.get_out_dir(), + ) + outfile = [f for f in d1.get_out_dir().iterdir() if f.suffix == ".arrow"][0] + merged = vaex.open(outfile) + assert len(merged["file"].unique()) == 3 + assert merged.shape[0] > 300 + d1.clean_directories() + + +def test_mergingfiles_column_wise_equalrows(poly: list[str]) -> None: + """Testing of merging of tabular data by columns with equal number of rows.""" + for i in poly: + d1 = Generatedata(i, out_name=f"data_1{i}", same_rows=True, trunc_columns=False) + d2 = Generatedata(i, out_name=f"data_2{i}", same_rows=True, trunc_columns=False) + d3 = Generatedata(i, out_name=f"data_3{i}", same_rows=True, trunc_columns=False) + d1() + d2() + d3() + pattern = f".*{i}" + fps = fp.FilePattern(d1.get_inp_dir(), pattern) + inp_dir_files = [f[1][0] for f in fps()] + tm.merge_files( + inp_dir_files, + strip_extension=True, + dim="columns", + same_rows=True, + same_columns=False, + map_var="A", + out_dir=d1.get_out_dir(), + ) + outfile = [f for f in d1.get_out_dir().iterdir() if f.suffix == ".arrow"][0] + merged = vaex.open(outfile) + assert len(merged.get_column_names()) == 12 + assert merged.shape[0] == 100 + d1.clean_directories() + + +def test_mergingfiles_column_wise_unequalrows(poly: list[str]) -> None: + """Testing of merging of tabular data by columns with unequal number of rows.""" + for i in poly: + d1 = Generatedata(i, out_name=f"data_1{i}", same_rows=True, trunc_columns=False) + d2 = Generatedata(i, out_name=f"data_2{i}", same_rows=True, trunc_columns=False) + d3 = Generatedata( + i, + out_name=f"data_3{i}", + same_rows=False, + trunc_columns=False, + ) + d1() + d2() + d3() + pattern = f".*{i}" + fps = fp.FilePattern(d1.get_inp_dir(), pattern) + inp_dir_files = [f[1][0] for f in fps()] + tm.merge_files( + inp_dir_files, + strip_extension=True, + dim="columns", + same_rows=False, + same_columns=False, + map_var="A", + out_dir=d1.get_out_dir(), + ) + outfile = [f for f in d1.get_out_dir().iterdir() if f.suffix == ".arrow"][0] + merged = vaex.open(outfile) + assert len(merged.get_column_names()) == 13 + assert "indexcolumn" in merged.get_column_names() + assert merged.shape[0] == 200 + d1.clean_directories()