From 8333bc8ab4248c52f1c1de855fbea5f67c480e3a Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Sun, 11 Aug 2024 20:46:14 -0400 Subject: [PATCH] switch from using asyncio to concurrent.futures and add test --- mesmerize_core/algorithms/cnmf.py | 6 --- mesmerize_core/algorithms/cnmfe.py | 6 --- mesmerize_core/algorithms/mcorr.py | 5 -- mesmerize_core/caiman_extensions/common.py | 52 ++++++++++++++------ tests/test_core.py | 56 +++++++++++++++++++++- 5 files changed, 92 insertions(+), 33 deletions(-) diff --git a/mesmerize_core/algorithms/cnmf.py b/mesmerize_core/algorithms/cnmf.py index 6db0d32..ea23626 100644 --- a/mesmerize_core/algorithms/cnmf.py +++ b/mesmerize_core/algorithms/cnmf.py @@ -1,15 +1,12 @@ """Performs CNMF in a separate process""" -import asyncio import click import caiman as cm from caiman.source_extraction.cnmf import cnmf as cnmf from caiman.source_extraction.cnmf.params import CNMFParams -import psutil import numpy as np import traceback from pathlib import Path, PurePosixPath from shutil import move as move_file -import os import time # prevent circular import @@ -24,9 +21,6 @@ def run_algo(batch_path, uuid, data_path: str = None, dview=None): - asyncio.run(run_algo_async(batch_path, uuid, data_path=data_path, dview=dview)) - -async def run_algo_async(batch_path, uuid, data_path: str = None, dview=None): algo_start = time.time() set_parent_raw_data_path(data_path) diff --git a/mesmerize_core/algorithms/cnmfe.py b/mesmerize_core/algorithms/cnmfe.py index 008ffde..1d8c601 100644 --- a/mesmerize_core/algorithms/cnmfe.py +++ b/mesmerize_core/algorithms/cnmfe.py @@ -1,14 +1,11 @@ -import asyncio import click import numpy as np import caiman as cm from caiman.source_extraction.cnmf import cnmf as cnmf from caiman.source_extraction.cnmf.params import CNMFParams -import psutil import traceback from pathlib import Path, PurePosixPath from shutil import move as move_file -import os import time if __name__ in ["__main__", "__mp_main__"]: # when running in subprocess @@ -22,9 +19,6 @@ def run_algo(batch_path, uuid, data_path: str = None, dview=None): - asyncio.run(run_algo_async(batch_path, uuid, data_path=data_path, dview=dview)) - -async def run_algo_async(batch_path, uuid, data_path: str = None, dview=None): algo_start = time.time() set_parent_raw_data_path(data_path) diff --git a/mesmerize_core/algorithms/mcorr.py b/mesmerize_core/algorithms/mcorr.py index 1304e7d..860c58f 100644 --- a/mesmerize_core/algorithms/mcorr.py +++ b/mesmerize_core/algorithms/mcorr.py @@ -1,11 +1,9 @@ import traceback -import asyncio import click import caiman as cm from caiman.source_extraction.cnmf.params import CNMFParams from caiman.motion_correction import MotionCorrect from caiman.summary_images import local_correlations_movie_offline -import psutil import os from pathlib import Path, PurePosixPath import numpy as np @@ -22,9 +20,6 @@ def run_algo(batch_path, uuid, data_path: str = None, dview=None): - asyncio.run(run_algo_async(batch_path, uuid, data_path=data_path, dview=dview)) - -async def run_algo_async(batch_path, uuid, data_path: str = None, dview=None): algo_start = time.time() set_parent_raw_data_path(data_path) diff --git a/mesmerize_core/caiman_extensions/common.py b/mesmerize_core/caiman_extensions/common.py index 35b3d02..462fdad 100644 --- a/mesmerize_core/caiman_extensions/common.py +++ b/mesmerize_core/caiman_extensions/common.py @@ -10,7 +10,7 @@ import time from copy import deepcopy import shlex -import asyncio +from concurrent.futures import ThreadPoolExecutor, Future import numpy as np import pandas as pd @@ -460,12 +460,26 @@ def get_parent(self, index: Union[int, str, UUID]) -> Union[UUID, None]: return r["uuid"] -class DummyProcess: +class Waitable(Protocol): + """An object that we can call "wait" on""" + def wait(self) -> None: ... + + +class DummyProcess(Waitable): """Dummy process for local backend""" - def wait(self): + def wait(self) -> None: pass +class WaitableFuture(Waitable): + """Adaptor for future returned from Executor.submit""" + def __init__(self, future: Future[None]): + self.future = future + + def wait(self) -> None: + return self.future.result() + + @pd.api.extensions.register_series_accessor("caiman") class CaimanSeriesExtensions: """ @@ -474,7 +488,7 @@ class CaimanSeriesExtensions: def __init__(self, s: pd.Series): self._series = s - self.process: Popen = None + self.process: Optional[Waitable] = None def _run_local( self, @@ -484,9 +498,15 @@ def _run_local( data_path: Union[Path, None], dview=None ) -> DummyProcess: - coroutine = self._run_local_async(algo, batch_path, uuid, data_path, dview) - asyncio.run(coroutine) - return DummyProcess() + algo_module = getattr(algorithms, algo) + algo_module.run_algo( + batch_path=str(batch_path), + uuid=str(uuid), + data_path=str(data_path), + dview=dview + ) + self.process = DummyProcess() + return self.process def _run_local_async( self, @@ -495,14 +515,18 @@ def _run_local_async( uuid: UUID, data_path: Union[Path, None], dview=None - ) -> Coroutine: + ) -> WaitableFuture: algo_module = getattr(algorithms, algo) - return algo_module.run_algo_async( - batch_path=str(batch_path), - uuid=str(uuid), - data_path=str(data_path), - dview=dview - ) + with ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit( + algo_module.run_algo, + batch_path=str(batch_path), + uuid=str(uuid), + data_path=str(data_path), + dview=dview + ) + self.process = WaitableFuture(future) + return self.process def _run_subprocess( self, diff --git a/tests/test_core.py b/tests/test_core.py index 163189e..4e10012 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1,5 +1,4 @@ import os - import numpy as np from caiman.utils.utils import load_dict_from_hdf5 from caiman.source_extraction.cnmf import cnmf @@ -12,8 +11,14 @@ CaimanSeriesExtensions, set_parent_raw_data_path, ) -from mesmerize_core.batch_utils import DATAFRAME_COLUMNS, COMPUTE_BACKEND_SUBPROCESS, get_full_raw_data_path +from mesmerize_core.batch_utils import ( + DATAFRAME_COLUMNS, + COMPUTE_BACKEND_SUBPROCESS, + COMPUTE_BACKEND_LOCAL, + COMPUTE_BACKEND_ASYNC, + get_full_raw_data_path) from mesmerize_core.utils import IS_WINDOWS +from mesmerize_core.algorithms._utils import ensure_server from uuid import uuid4 from typing import * import pytest @@ -30,6 +35,8 @@ import tifffile from copy import deepcopy +pytest_plugins = ('pytest_asyncio',) + tmp_dir = Path(os.path.dirname(os.path.abspath(__file__)), "tmp") vid_dir = Path(os.path.dirname(os.path.abspath(__file__)), "videos") ground_truths_dir = Path(os.path.dirname(os.path.abspath(__file__)), "ground_truths") @@ -1254,3 +1261,48 @@ def test_cache(): output2 = df.iloc[1].cnmf.get_output(return_copy=False) assert(hex(id(output)) == hex(id(output2))) assert(hex(id(cnmf.cnmf_cache.get_cache().iloc[-1]["return_val"])) == hex(id(output))) + + +def test_backends(): + """test subprocess, local, and async_local backend""" + set_parent_raw_data_path(vid_dir) + algo = "mcorr" + df, batch_path = _create_tmp_batch() + input_movie_path = get_datafile(algo) + + # make small version of movie for quick testing + movie = tifffile.imread(input_movie_path) + small_movie_path = input_movie_path.parent.joinpath("small_movie.tif") + tifffile.imwrite(small_movie_path, movie[:1001]) + print(input_movie_path) + + # put backends that can run in the background first to save time + backends = [COMPUTE_BACKEND_SUBPROCESS, COMPUTE_BACKEND_ASYNC, COMPUTE_BACKEND_LOCAL] + for backend in backends: + df.caiman.add_item( + algo="mcorr", + item_name=f"test-{backend}", + input_movie_path=small_movie_path, + params=test_params["mcorr"], + ) + + # run using each backend + procs = [] + with ensure_server(None) as (dview, _): + for backend, (_, item) in zip(backends, df.iterrows()): + procs.append(item.caiman.run(backend=backend, dview=dview, wait=False)) + + # wait for all to finish + for proc in procs: + proc.wait() + + # compare results + df = load_batch(batch_path) + for i, item in df.iterrows(): + output = item.mcorr.get_output() + + if i == 0: + # save to compare to other results + first_output = output + else: + numpy.testing.assert_array_equal(output, first_output)