From 21a31c410d87b8b2a4efe9c727033ee32e34b99d Mon Sep 17 00:00:00 2001 From: Joachim Moeyens Date: Thu, 26 Oct 2023 14:59:54 -0700 Subject: [PATCH 01/14] Parallelize TestOrbit.range_observations with ray --- thor/orbit.py | 178 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 115 insertions(+), 63 deletions(-) diff --git a/thor/orbit.py b/thor/orbit.py index d3fa2435..3db97368 100644 --- a/thor/orbit.py +++ b/thor/orbit.py @@ -6,6 +6,7 @@ import pyarrow as pa import pyarrow.compute as pc import quivr as qv +import ray from adam_core.coordinates import ( CartesianCoordinates, CometaryCoordinates, @@ -51,6 +52,79 @@ class TestOrbitEphemeris(qv.Table): observer = Observers.as_column() +def range_observations_worker( + observations: Observations, ephemeris: TestOrbitEphemeris, state_id: int +) -> RangedPointSourceDetections: + """ + Range observations for a single state given the orbit's ephemeris for that state. + + Parameters + ---------- + observations + Observations to range. + ephemeris + Ephemeris from which to extract the test orbit's aberrated state (we + use this state to get the test orbit's heliocentric distance). + state_id + The ID for this particular state. + + Returns + ------- + ranged_point_source_detections + The detections assuming they are located at the same heliocentric distance + as the test orbit. + """ + observations_state = observations.select("state_id", state_id) + ephemeris_state = ephemeris.select("id", state_id) + detections_state = observations_state.detections + + # Get the heliocentric position vector of the object at the time of the exposure + r = ephemeris_state.ephemeris.aberrated_coordinates.r[0] + + # Get the observer's heliocentric coordinates + observer_i = ephemeris_state.observer + + # Create an array of observatory codes for the detections + num_detections = len(observations_state) + observatory_codes = np.repeat( + observations_state.observatory_code[0].as_py(), num_detections + ) + + # The following can be replaced with: + # coords = observations_state.to_spherical(observatory_codes) + # Start replacement: + sigma_data = np.vstack( + [ + pa.nulls(num_detections, pa.float64()), + detections_state.ra_sigma.to_numpy(zero_copy_only=False), + detections_state.dec_sigma.to_numpy(zero_copy_only=False), + pa.nulls(num_detections, pa.float64()), + pa.nulls(num_detections, pa.float64()), + pa.nulls(num_detections, pa.float64()), + ] + ).T + coords = SphericalCoordinates.from_kwargs( + lon=detections_state.ra, + lat=detections_state.dec, + time=detections_state.time, + covariance=CoordinateCovariances.from_sigmas(sigma_data), + origin=Origin.from_kwargs(code=observatory_codes), + frame="equatorial", + ) + # End replacement (only once + # https://github.com/B612-Asteroid-Institute/adam_core/pull/45 is merged) + + return RangedPointSourceDetections.from_kwargs( + id=detections_state.id, + exposure_id=detections_state.exposure_id, + coordinates=assume_heliocentric_distance(r, coords, observer_i.coordinates), + state_id=observations_state.state_id, + ) + + +range_observations_remote = ray.remote(range_observations_worker) + + class TestOrbit: def __init__( self, @@ -314,73 +388,51 @@ def range_observations( observations, propagator=propagator, max_processes=max_processes ) - # Link the ephemeris to the observations - link = qv.Linkage( - ephemeris, - observations, - left_keys=ephemeris.id, - right_keys=observations.state_id, - ) - - # Do a sorted iteration over the unique state IDs - rpsds = [] - state_ids = observations.state_id.unique().sort() - for state_id in state_ids: - - # Select the ephemeris and observations for this state - ephemeris_i = link.select_left(state_id) - observations_i = link.select_right(state_id) - detections_i = observations_i.detections - - # Get the heliocentric position vector of the object at the time of the exposure - r = ephemeris_i.ephemeris.aberrated_coordinates.r[0] - - # Get the observer's heliocentric coordinates - observer_i = ephemeris_i.observer + ranged_detections_list = [] + if max_processes is None or max_processes > 1: + if not ray.is_initialized(): + ray.init(num_cpus=max_processes) + + if isinstance(observations, ray.ObjectRef): + observations_ref = observations + observations = ray.get(observations_ref) + else: + observations_ref = ray.put(observations) + + if isinstance(ephemeris, ray.ObjectRef): + ephemeris_ref = ephemeris + else: + ephemeris_ref = ray.put(ephemeris) + + # Get state IDs + state_ids = observations.state_id.unique().sort() + futures = [] + for state_id in state_ids: + futures.append( + range_observations_remote.remote( + observations_ref, ephemeris_ref, state_id + ) + ) - # Create an array of observatory codes for the detections - num_detections = len(observations_i) - observatory_codes = np.repeat( - observations_i.observatory_code[0].as_py(), num_detections - ) + while futures: + finished, futures = ray.wait(futures, num_returns=1) + ranged_detections_list.append(ray.get(finished[0])) - # The following can be replaced with: - # coords = observations_i.to_spherical(observatory_codes) - # Start replacement: - sigma_data = np.vstack( - [ - pa.nulls(num_detections, pa.float64()), - detections_i.ra_sigma.to_numpy(zero_copy_only=False), - detections_i.dec_sigma.to_numpy(zero_copy_only=False), - pa.nulls(num_detections, pa.float64()), - pa.nulls(num_detections, pa.float64()), - pa.nulls(num_detections, pa.float64()), - ] - ).T - coords = SphericalCoordinates.from_kwargs( - lon=detections_i.ra, - lat=detections_i.dec, - time=detections_i.time, - covariance=CoordinateCovariances.from_sigmas(sigma_data), - origin=Origin.from_kwargs(code=observatory_codes), - frame="equatorial", - ) - # End replacement (only once - # https://github.com/B612-Asteroid-Institute/adam_core/pull/45 is merged) - - rpsds.append( - RangedPointSourceDetections.from_kwargs( - id=detections_i.id, - exposure_id=detections_i.exposure_id, - coordinates=assume_heliocentric_distance( - r, coords, observer_i.coordinates - ), - state_id=observations_i.state_id, + else: + # Get state IDs + state_ids = observations.state_id.unique().sort() + + for state_id in state_ids: + ranged_detections_list.append( + range_observations_worker( + observations.select("state_id", state_id), + ephemeris.select("id", state_id), + state_id, + ) ) - ) - ranged_detections = qv.concatenate(rpsds) - return ranged_detections + ranged_point_source_detections = qv.concatenate(ranged_detections_list) + return ranged_point_source_detections.sort_by(by=["state_id"]) def assume_heliocentric_distance( From a1774f7879dde1f3e66a85afe2e28f63031ebb02 Mon Sep 17 00:00:00 2001 From: Joachim Moeyens Date: Thu, 26 Oct 2023 15:01:28 -0700 Subject: [PATCH 02/14] Parallelize range_and_transform with ray --- thor/main_2.py | 130 +++++++++++++++++++++++++++++++++++++------------ 1 file changed, 100 insertions(+), 30 deletions(-) diff --git a/thor/main_2.py b/thor/main_2.py index 853673eb..9443bca5 100644 --- a/thor/main_2.py +++ b/thor/main_2.py @@ -3,6 +3,7 @@ import pandas as pd import pyarrow.compute as pc import quivr as qv +import ray from adam_core.coordinates import ( CartesianCoordinates, OriginCodes, @@ -18,7 +19,7 @@ ) from .observations.filters import ObservationFilter, TestOrbitRadiusObservationFilter from .observations.observations import Observations, ObserversWithStates -from .orbit import TestOrbit +from .orbit import TestOrbit, TestOrbitEphemeris from .projections import GnomonicCoordinates @@ -28,6 +29,55 @@ class TransformedDetections(qv.Table): state_id = qv.Int64Column() +def range_and_transform_worker( + ranged_detections: CartesianCoordinates, + observations: Observations, + ephemeris: TestOrbitEphemeris, + state_id: int, +) -> TransformedDetections: + """ + Given ranged detections and their original observations, transform these to the gnomonic tangent + plane centered on the motion of the test orbit for a single state. + + Parameters + ---------- + ranged_detections + Cartesian detections ranged so that their heliocentric distance is the same as the test orbit + for each state + observations + The observations from which the ranged detections originate. These should be sorted one-to-one + with the ranged detections + ephemeris + Ephemeris from which to extract the test orbit's aberrated state. + state_id + The ID for this particular state. + + Returns + ------- + transformed_detections + Detections transformed to a gnomonic tangent plane centered on the motion of the + test orbit. + """ + # Select the detections and ephemeris for this state id + mask = pc.equal(state_id, observations.state_id) + ranged_detections_state = ranged_detections.apply_mask(mask) + ephemeris_state = ephemeris.select("id", state_id) + observations_state = observations.select("state_id", state_id) + + # Transform the detections into the co-rotating frame + return TransformedDetections.from_kwargs( + id=observations_state.detections.id, + coordinates=GnomonicCoordinates.from_cartesian( + ranged_detections_state, + center_cartesian=ephemeris_state.ephemeris.aberrated_coordinates, + ), + state_id=observations_state.state_id, + ) + + +range_and_transform_remote = ray.remote(range_and_transform_worker) + + def range_and_transform( test_orbit: TestOrbit, observations: Observations, @@ -80,38 +130,58 @@ def range_and_transform( origin_out=OriginCodes.SUN, ) - # Link the ephemeris and observations by state id - link = qv.Linkage( - ephemeris, - observations, - left_keys=ephemeris.id, - right_keys=observations.state_id, - ) - - # Transform the detections into the co-rotating frame transformed_detection_list = [] - state_ids = observations.state_id.unique().sort() - for state_id in state_ids: - # Select the detections and ephemeris for this state id - mask = pc.equal(state_id, observations.state_id) - ranged_detections_cartesian_i = ranged_detections_cartesian.apply_mask(mask) - ephemeris_i = link.select_left(state_id) - observations_i = link.select_right(state_id) - - # Transform the detections into the co-rotating frame - transformed_detections_i = TransformedDetections.from_kwargs( - id=observations_i.detections.id, - coordinates=GnomonicCoordinates.from_cartesian( - ranged_detections_cartesian_i, - center_cartesian=ephemeris_i.ephemeris.aberrated_coordinates, - ), - state_id=observations_i.state_id, - ) - - transformed_detection_list.append(transformed_detections_i) + if max_processes is None or max_processes > 1: + + if not ray.is_initialized(): + ray.init(num_cpus=max_processes) + + if isinstance(observations, ray.ObjectRef): + observations_ref = observations + observations = ray.get(observations_ref) + else: + observations_ref = ray.put(observations) + + if isinstance(ephemeris, ray.ObjectRef): + ephemeris_ref = ephemeris + else: + ephemeris_ref = ray.put(ephemeris) + + ranged_detections_cartesian_ref = ray.put(ranged_detections_cartesian) + + # Get state IDs + state_ids = observations.state_id.unique().sort() + futures = [] + for state_id in state_ids: + futures.append( + range_and_transform_remote.remote( + ranged_detections_cartesian_ref, + observations_ref, + ephemeris_ref, + state_id, + ) + ) + + while futures: + finished, futures = ray.wait(futures, num_returns=1) + transformed_detection_list.append(ray.get(finished[0])) + + else: + # Get state IDs + state_ids = observations.state_id.unique().sort() + for state_id in state_ids: + mask = pc.equal(state_id, observations.state_id) + transformed_detection_list.append( + range_and_transform_worker( + ranged_detections_cartesian.apply_mask(mask), + observations.select("state_id", state_id), + ephemeris.select("id", state_id), + state_id, + ) + ) transformed_detections = qv.concatenate(transformed_detection_list) - return transformed_detections + return transformed_detections.sort_by(by=["state_id"]) def _observations_to_observations_df(observations: Observations) -> pd.DataFrame: From 167df8ed35ff4a83d224e180680c84f1a182e604 Mon Sep 17 00:00:00 2001 From: Joachim Moeyens Date: Thu, 26 Oct 2023 15:09:39 -0700 Subject: [PATCH 03/14] Remove horizons.py --- thor/orbits/orbits.py | 42 +------- thor/utils/__init__.py | 1 - thor/utils/horizons.py | 217 ----------------------------------------- 3 files changed, 1 insertion(+), 259 deletions(-) delete mode 100644 thor/utils/horizons.py diff --git a/thor/orbits/orbits.py b/thor/orbits/orbits.py index 3c63be70..28894718 100644 --- a/thor/orbits/orbits.py +++ b/thor/orbits/orbits.py @@ -1,13 +1,12 @@ import ast import logging -from typing import Any, Tuple import numpy as np import pandas as pd from astropy import units as u from astropy.time import Time -from ..utils import _checkTime, getHorizonsVectors +from ..utils import _checkTime CARTESIAN_COLS = ["x", "y", "z", "vx", "vy", "vz"] CARTESIAN_UNITS = [u.au, u.au, u.au, u.au / u.d, u.au / u.d, u.au / u.d] @@ -353,45 +352,6 @@ def assignOrbitClasses(self): self.orbit_class = classes return - @staticmethod - def fromHorizons(obj_ids, t0): - """ - Query Horizons for state vectors for each object ID at time t0. - This is a convenience function and should not be used to query for state - vectors for many objects or at many times. - - Parameters - ---------- - obj_ids : `~numpy.ndarray` (N) - Object IDs / designations recognizable by HORIZONS. - t0 : `~astropy.core.time.Time` (1) - Astropy time object at which to gather state vectors. - - Return - ------ - `~thor.orbits.orbits.Orbits` - THOR Orbits class - """ - - if len(t0) != 1: - err = "t0 should be a single time." - raise ValueError(err) - - horizons_vectors = getHorizonsVectors( - obj_ids, t0, location="@sun", id_type="smallbody", aberrations="geometric" - ) - - orbits = Orbits( - horizons_vectors[["x", "y", "z", "vx", "vy", "vz"]].values, - t0 + np.zeros(len(obj_ids)), - orbit_type="cartesian", - orbit_units=CARTESIAN_UNITS, - ids=horizons_vectors["targetname"].values, - H=horizons_vectors["H"].values, - G=horizons_vectors["G"].values, - ) - return orbits - @staticmethod def fromMPCOrbitCatalog(mpcorb): diff --git a/thor/utils/__init__.py b/thor/utils/__init__.py index 4bf18ac1..e5f0e3a9 100644 --- a/thor/utils/__init__.py +++ b/thor/utils/__init__.py @@ -1,6 +1,5 @@ from .ades import * from .astropy import * -from .horizons import * from .linkages import * from .logging import * from .multiprocessing import * diff --git a/thor/utils/horizons.py b/thor/utils/horizons.py deleted file mode 100644 index 464ce0b8..00000000 --- a/thor/utils/horizons.py +++ /dev/null @@ -1,217 +0,0 @@ -import pandas as pd -from astroquery.jplhorizons import Horizons - -from .astropy import _checkTime - -__all__ = [ - "getHorizonsVectors", - "getHorizonsElements", - "getHorizonsEphemeris", - "getHorizonsObserverState", -] - - -def getHorizonsVectors( - obj_ids, - times, - location="@sun", - id_type="smallbody", - aberrations="geometric", -): - """ - Query JPL Horizons (through astroquery) for an object's - state vectors at the given times. - - Parameters - ---------- - obj_ids : `~numpy.ndarray` (N) - Object IDs / designations recognizable by HORIZONS. - times : `~astropy.core.time.Time` (M) - Astropy time object at which to gather state vectors. - location : str, optional - Location of the origin typically a NAIF code. - ('0' or '@ssb' for solar system barycenter, '10' or '@sun' for heliocenter) - [Default = '@sun'] - id_type : {'majorbody', 'smallbody', 'designation', - 'name', 'asteroid_name', 'comet_name', 'id'} - ID type, Horizons will find closest match under any given type. - aberrations : {'geometric', 'astrometric', 'apparent'} - Adjust state for one of three different aberrations. - - Returns - ------- - vectors : `~pandas.DataFrame` - Dataframe containing the full cartesian state, r, r_rate, delta, delta_rate and light time - of the object at each time. - """ - _checkTime(times, "times") - dfs = [] - for obj_id in obj_ids: - obj = Horizons( - id=obj_id, - epochs=times.tdb.mjd, - location=location, - id_type=id_type, - ) - vectors = obj.vectors( - refplane="ecliptic", aberrations=aberrations, cache=False - ).to_pandas() - dfs.append(vectors) - - vectors = pd.concat(dfs, ignore_index=True) - return vectors - - -def getHorizonsElements(obj_ids, times, location="@sun", id_type="smallbody"): - """ - Query JPL Horizons (through astroquery) for an object's - elements at the given times. - - Parameters - ---------- - obj_ids : `~numpy.ndarray` (N) - Object IDs / designations recognizable by HORIZONS. - times : `~astropy.core.time.Time` - Astropy time object at which to gather state vectors. - location : str, optional - Location of the origin typically a NAIF code. - ('0' or '@ssb' for solar system barycenter, '10' or '@sun' for heliocenter) - [Default = '@sun'] - id_type : {'majorbody', 'smallbody', 'designation', - 'name', 'asteroid_name', 'comet_name', 'id'} - ID type, Horizons will find closest match under any given type. - - Returns - ------- - elements : `~pandas.DataFrame` - Dataframe containing the full cartesian state, r, r_rate, delta, delta_rate and light time - of the object at each time. - """ - _checkTime(times, "times") - dfs = [] - for obj_id in obj_ids: - obj = Horizons( - id=obj_id, - epochs=times.tdb.mjd, - location=location, - id_type=id_type, - ) - elements = obj.elements( - refsystem="J2000", refplane="ecliptic", tp_type="absolute", cache=False - ).to_pandas() - dfs.append(elements) - - elements = pd.concat(dfs, ignore_index=True) - return elements - - -def getHorizonsEphemeris(obj_ids, observers, id_type="smallbody"): - """ - Query JPL Horizons (through astroquery) for an object's - ephemerides at the given times viewed from the given location. - - Parameters - ---------- - obj_ids : `~numpy.ndarray` (N) - Object IDs / designations recognizable by HORIZONS. - observers : dict or `~pandas.DataFrame` - A dictionary with observatory/location codes as keys and observation_times (`~astropy.time.core.Time`) as values. - Location of the observer which can be an MPC observatory code (eg, 'I41', 'I11') - or a NAIF code ('0' for solar system barycenter, '10' for heliocenter) - id_type : {'majorbody', 'smallbody', 'designation', - 'name', 'asteroid_name', 'comet_name', 'id'} - ID type, Horizons will find closest match under any given type. - - Returns - ------- - ephemeris : `~pandas.DataFrame` - Dataframe containing the RA, DEC, r, r_rate, delta, delta_rate and light time - of the object at each time. - """ - dfs = [] - for orbit_id, obj_id in enumerate(obj_ids): - for observatory_code, observation_times in observers.items(): - _checkTime(observation_times, "observation_times") - obj = Horizons( - id=obj_id, - epochs=observation_times.utc.mjd, - location=observatory_code, - id_type=id_type, - ) - ephemeris = obj.ephemerides( - # RA, DEC, r, r_rate, delta, delta_rate, lighttime - # quantities="1, 2, 19, 20, 21", - extra_precision=True - ).to_pandas() - ephemeris["orbit_id"] = [orbit_id for i in range(len(ephemeris))] - ephemeris["observatory_code"] = [ - observatory_code for i in range(len(ephemeris)) - ] - ephemeris["mjd_utc"] = observation_times.utc.mjd - - dfs.append(ephemeris) - - ephemeris = pd.concat(dfs) - ephemeris.sort_values( - by=["orbit_id", "observatory_code", "datetime_jd"], - inplace=True, - ignore_index=True, - ) - return ephemeris - - -def getHorizonsObserverState( - observatory_codes, observation_times, origin="heliocenter", aberrations="geometric" -): - """ - Query JPL Horizons (through astroquery) for an object's - elements at the given times. - - Parameters - ---------- - observatory_codes : list or `~numpy.ndarray` - MPC observatory codes. - observation_times : `~astropy.time.core.Time` - Epochs for which to find the observatory locations. - origin : {'barycenter', 'heliocenter'} - Return observer state with heliocentric or barycentric origin. - aberrations : {'geometric', 'astrometric', 'apparent'} - Adjust state for one of three different aberrations. - - Returns - ------- - vectors : `~pandas.DataFrame` - Dataframe containing the full cartesian state, r, r_rate, delta, delta_rate and light time - of the object at each time. - """ - _checkTime(observation_times, "observation_times") - - if origin == "heliocenter": - origin_horizons = "sun" - elif origin == "barycenter": - origin_horizons = "ssb" - else: - err = "origin should be one of {'heliocenter', 'barycenter'}" - raise ValueError(err) - - dfs = [] - for code in observatory_codes: - obj = Horizons( - id=origin_horizons, - epochs=observation_times.tdb.mjd, - location=code, - id_type="majorbody", - ) - vectors = obj.vectors( - refplane="ecliptic", - aberrations=aberrations, - cache=False, - ).to_pandas() - - vectors = vectors.drop(columns="targetname") - vectors.insert(0, "observatory_code", [code for i in range(len(vectors))]) - vectors.loc[:, ["x", "y", "z", "vx", "vy", "vz"]] *= -1 - dfs.append(vectors) - - vectors = pd.concat(dfs, ignore_index=True) - return vectors From 753952cc422241287aba3e1eaec413870a5fe96b Mon Sep 17 00:00:00 2001 From: Joachim Moeyens Date: Thu, 26 Oct 2023 16:31:14 -0700 Subject: [PATCH 04/14] Set time linking precision to ms in GnomonicCoordinates.from_cartesian --- thor/projections/gnomonic.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/thor/projections/gnomonic.py b/thor/projections/gnomonic.py index 713ef598..73d5ab68 100644 --- a/thor/projections/gnomonic.py +++ b/thor/projections/gnomonic.py @@ -131,12 +131,12 @@ def from_cartesian( [cartesian[0].origin for _ in range(num_unique_times)] ), ) - link = cartesian.time.link(center_cartesian.time, precision="us") + link = cartesian.time.link(center_cartesian.time, precision="ms") elif center_cartesian is not None and cartesian.time is not None: assert cartesian.time.scale == center_cartesian.time.scale - link = cartesian.time.link(center_cartesian.time, precision="us") + link = cartesian.time.link(center_cartesian.time, precision="ms") else: raise ValueError( @@ -146,21 +146,24 @@ def from_cartesian( # Round the times to the nearest microsecond and use those to select # the cartesian coordinates and center cartesian coordinates - rounded_cartesian_times = cartesian.time.rounded(precision="us") # type: ignore - rounded_center_cartesian_times = center_cartesian.time.rounded(precision="us") # type: ignore + rounded_cartesian_times = cartesian.time.rounded(precision="ms") # type: ignore + rounded_center_cartesian_times = center_cartesian.time.rounded(precision="ms") # type: ignore gnomonic_coords = [] for key, time_i, center_time_i in link.iterate(): cartesian_i = cartesian.apply_mask( - rounded_cartesian_times.equals_scalar(key[0], key[1], precision="us") + rounded_cartesian_times.equals_scalar(key[0], key[1], precision="ms") ) center_cartesian_i = center_cartesian.apply_mask( rounded_center_cartesian_times.equals_scalar( - key[0], key[1], precision="us" + key[0], key[1], precision="ms" ) ) + if len(center_cartesian_i) == 0: + raise ValueError("No center cartesian coordinates found for this time.") + coords_gnomonic, M = cartesian_to_gnomonic( cartesian_i.values, center_cartesian=center_cartesian_i.values[0], From fa4ea0a32d71febe5e4e94b6b5c2087a6985f2db Mon Sep 17 00:00:00 2001 From: Joachim Moeyens Date: Thu, 26 Oct 2023 16:48:05 -0700 Subject: [PATCH 05/14] Delete runTHOROrbit and runTHOR --- runTHOR.py | 55 ---- thor/main.py | 691 +------------------------------------------------ thor/main_2.py | 10 +- 3 files changed, 6 insertions(+), 750 deletions(-) delete mode 100644 runTHOR.py diff --git a/runTHOR.py b/runTHOR.py deleted file mode 100644 index e653fee3..00000000 --- a/runTHOR.py +++ /dev/null @@ -1,55 +0,0 @@ -import argparse -import logging - -import pandas as pd - -if __name__ == "__main__": - - parser = argparse.ArgumentParser( - description="Run Tracklet-less Heliocentric Orbit Recovery" - ) - parser.add_argument( - "preprocessed_observations", type=str, help="Preprocessed observations." - ) - parser.add_argument("test_orbits", type=str, help="Path to test orbits.") - parser.add_argument( - "out_dir", - type=str, - ) - parser.add_argument( - "--config", - type=str, - default=None, - ) - args = parser.parse_args() - - from thor import runTHOR - from thor.config import Config - from thor.orbits import Orbits - - if not isinstance(args.config, str): - config = Config - else: - config = Config.fromYaml(args.config) - - # Read observations - preprocessed_observations = pd.read_csv( - args.preprocessed_observations, index_col=False, dtype={"obs_id": str} - ) - - # Read test orbits - test_orbits = Orbits.from_csv(args.test_orbits) - - # Run THOR - test_orbits_, recovered_orbits, recovered_orbit_members = runTHOR( - preprocessed_observations, - test_orbits, - range_shift_config=config.RANGE_SHIFT_CONFIG, - cluster_link_config=config.CLUSTER_LINK_CONFIG, - iod_config=config.IOD_CONFIG, - od_config=config.OD_CONFIG, - odp_config=config.ODP_CONFIG, - out_dir=args.out_dir, - if_exists="continue", - logging_level=logging.INFO, - ) diff --git a/thor/main.py b/thor/main.py index d37fdc4f..e293aa6d 100644 --- a/thor/main.py +++ b/thor/main.py @@ -9,27 +9,18 @@ import concurrent.futures as cf import logging import multiprocessing as mp -import shutil import time import uuid from functools import partial import numpy as np import pandas as pd -import yaml from astropy.time import Time from .cell import Cell from .clusters import filter_clusters_by_length, find_clusters -from .config import Config, Configuration from .orbit import TestOrbit -from .orbits import ( - Orbits, - differentialCorrection, - generateEphemeris, - initialOrbitDetermination, - mergeAndExtendOrbits, -) +from .orbits import generateEphemeris from .utils import _checkParallel, _initWorker logger = logging.getLogger("thor") @@ -40,8 +31,6 @@ "clusterVelocity", "clusterVelocity_worker", "clusterAndLink", - "runTHOROrbit", - "runTHOR", ] @@ -776,681 +765,3 @@ def clusterAndLink( ) return clusters, cluster_members - - -def runTHOROrbit( - preprocessed_observations, - orbit, - range_shift_config=Config.RANGE_SHIFT_CONFIG, - cluster_link_config=Config.CLUSTER_LINK_CONFIG, - iod_config=Config.IOD_CONFIG, - od_config=Config.OD_CONFIG, - odp_config=Config.ODP_CONFIG, - out_dir=None, - if_exists="continue", - logging_level=logging.INFO, -): - logger = logging.getLogger("thor") - logger.setLevel(logging_level) - - # Build the configuration class which stores the run parameters - config = Configuration( - range_shift_config=range_shift_config, - cluster_link_config=cluster_link_config, - iod_config=iod_config, - od_config=od_config, - odp_config=odp_config, - ) - status = { - "rangeAndShift": False, - "clusterAndLink": False, - "initialOrbitDetermination": False, - "differentialCorrection": False, - "mergeAndExtendOrbits": False, - "complete": False, - } - - continue_ = False - if out_dir is not None: - if not os.path.exists(out_dir): - os.mkdir(out_dir) - logger.debug("Created {} directory.".format(out_dir)) - - else: - if if_exists == "continue": - logger.warning( - "{} directory already exists, attempting to continue previous run.".format( - out_dir - ) - ) - continue_ = True - elif if_exists == "erase": - logger.warning( - "{} directory already exists, removing previous results.".format( - out_dir - ) - ) - shutil.rmtree(out_dir) - os.mkdir(out_dir) - logger.debug("Created {} directory.".format(out_dir)) - else: - err = "if_exists should be one of {'continue', 'erase'}." - raise ValueError(err) - - file_handler = logging.FileHandler( - os.path.join(out_dir, "thor.log"), encoding="utf-8", delay=False - ) - file_handler.setLevel(logging.DEBUG) - file_format = logging.Formatter( - "%(asctime)s.%(msecs)03d [%(levelname)s] [%(thread)s] %(message)s (%(filename)s, %(funcName)s, %(lineno)d)", - datefmt="%Y-%m-%d %H:%M:%S", - ) - file_handler.setFormatter(file_format) - logger.addHandler(file_handler) - - # The primary files which will be used to determine if the run - # can be continued from a previous state and, if so, from where - # to continue the run - config_file = os.path.join(out_dir, "config.yml") - test_orbit_file = os.path.join(out_dir, "test_orbit.csv") - status_file = os.path.join(out_dir, "status.yml") - config_eq = False - test_orbit_eq = False - save_orbit = True - save_config = True - - if continue_: - - if not os.path.exists(config_file): - logger.warning("No previous configuration file found.") - save_config = True - else: - logger.info("Previous configuration file found. Comparing settings...") - - config_prev = Configuration.fromYaml(config_file) - if config_prev != config: - logger.warning( - "Previous configuration does not match current configuration. Processing will not continue from previous state." - ) - else: - config_eq = True - save_config = False - logger.info("Previous configuration matches current configuration.") - - if not os.path.exists(test_orbit_file): - logger.warning("No previous test orbit file found.") - save_orbit = True - else: - logger.info("Previous test orbit file found.") - - test_orbit_prev = Orbits.from_csv( - test_orbit_file, - ) - if test_orbit_prev != orbit: - logger.warning( - "Previous test orbit does not match current test orbit." - ) - else: - test_orbit_eq = True - save_orbit = False - logger.info("Previous test orbit matches current test orbit.") - - if not os.path.exists(status_file): - logger.warning("No previous status file found.") - else: - if test_orbit_eq and config_eq: - with open(status_file, "r") as status_in: - status = yaml.load(status_in, Loader=yaml.FullLoader) - logger.info("Previous status file found.") - - if save_config: - config.toYaml(config_file) - logger.debug("Saved config.yml.") - - if save_orbit: - orbit.to_csv(test_orbit_file) - logger.debug("Saved test_orbit.csv.") - - if status["complete"]: - logger.info("Orbit has already finished processing.") - - if not status["complete"]: - if not status["rangeAndShift"]: - projected_observations = rangeAndShift( - preprocessed_observations, orbit, **range_shift_config - ) - if out_dir is not None: - projected_observations.to_csv( - os.path.join(out_dir, "projected_observations.csv"), - index=False, - float_format="%.15e", - ) - logger.debug("Saved projected_observations.csv.") - - else: - logger.info("Range and shift completed previously.") - projected_observations = pd.read_csv( - os.path.join(out_dir, "projected_observations.csv"), - index_col=False, - dtype={"obs_id": str}, - float_precision="round_trip", - ) - logger.debug("Read projected_observations.csv.") - - status["rangeAndShift"] = True - if out_dir is not None: - with open(status_file, "w") as status_out: - yaml.safe_dump(status, status_out) - logger.debug("Updated status.yml.") - - if not status["clusterAndLink"]: - clusters, cluster_members = clusterAndLink( - projected_observations, **cluster_link_config - ) - if out_dir is not None: - clusters.to_csv( - os.path.join(out_dir, "clusters.csv"), - index=False, - float_format="%.15e", - ) - logger.debug("Saved clusters.csv.") - - cluster_members.to_csv( - os.path.join(out_dir, "cluster_members.csv"), - index=False, - float_format="%.15e", - ) - logger.debug("Saved cluster_members.csv.") - else: - logger.info("Clustering completed previously.") - clusters = pd.read_csv( - os.path.join(out_dir, "clusters.csv"), index_col=False - ) - logger.debug("Read clusters.csv.") - - cluster_members = pd.read_csv( - os.path.join(out_dir, "cluster_members.csv"), - index_col=False, - dtype={"obs_id": str}, - float_precision="round_trip", - ) - logger.debug("Read cluster_members.csv.") - - status["clusterAndLink"] = True - if out_dir is not None: - with open(status_file, "w") as status_out: - yaml.safe_dump(status, status_out) - logger.debug("Updated status.yml.") - - if not status["initialOrbitDetermination"]: - iod_orbits, iod_orbit_members = initialOrbitDetermination( - projected_observations, cluster_members, **iod_config - ) - if out_dir is not None: - Orbits.from_df(iod_orbits).to_csv( - os.path.join(out_dir, "iod_orbits.csv") - ) - logger.debug("Saved iod_orbits.csv.") - - iod_orbit_members.to_csv( - os.path.join(out_dir, "iod_orbit_members.csv"), - index=False, - float_format="%.15e", - ) - logger.debug("Saved iod_orbit_members.csv.") - else: - logger.info("Initial orbit determination completed previously.") - iod_orbits = Orbits.from_csv( - os.path.join(out_dir, "iod_orbits.csv"), - ).to_df(include_units=False) - logger.debug("Read iod_orbits.csv.") - - iod_orbit_members = pd.read_csv( - os.path.join(out_dir, "iod_orbit_members.csv"), - index_col=False, - dtype={"obs_id": str}, - float_precision="round_trip", - ) - logger.debug("Read iod_orbit_members.csv.") - - status["initialOrbitDetermination"] = True - if out_dir is not None: - with open(status_file, "w") as status_out: - yaml.safe_dump(status, status_out) - logger.debug("Updated status.yml.") - - iod_orbits = iod_orbits[ - ["orbit_id", "mjd_tdb", "x", "y", "z", "vx", "vy", "vz"] - ] - iod_orbit_members = iod_orbit_members[iod_orbit_members["outlier"] == 0][ - ["orbit_id", "obs_id"] - ] - iod_orbits = iod_orbits[ - iod_orbits["orbit_id"].isin(iod_orbit_members["orbit_id"].unique()) - ] - for df in [iod_orbits, iod_orbit_members]: - df.reset_index(inplace=True, drop=True) - - if not status["differentialCorrection"]: - od_orbits, od_orbit_members = differentialCorrection( - iod_orbits, iod_orbit_members, projected_observations, **od_config - ) - if out_dir is not None: - Orbits.from_df(od_orbits).to_csv(os.path.join(out_dir, "od_orbits.csv")) - logger.debug("Saved od_orbits.csv.") - - od_orbit_members.to_csv( - os.path.join(out_dir, "od_orbit_members.csv"), - index=False, - float_format="%.15e", - ) - logger.debug("Saved od_orbit_members.csv.") - else: - logger.info("Differential correction completed previously.") - od_orbits = Orbits.from_csv( - os.path.join(out_dir, "od_orbits.csv"), - ).to_df(include_units=False) - logger.debug("Read od_orbits.csv.") - - od_orbit_members = pd.read_csv( - os.path.join(out_dir, "od_orbit_members.csv"), - index_col=False, - dtype={"obs_id": str}, - float_precision="round_trip", - ) - logger.debug("Read od_orbit_members.csv.") - - status["differentialCorrection"] = True - if out_dir is not None: - with open(status_file, "w") as status_out: - yaml.safe_dump(status, status_out) - logger.debug("Updated status.yml.") - - od_orbit_members = od_orbit_members[od_orbit_members["outlier"] == 0][ - ["orbit_id", "obs_id"] - ] - od_orbits = od_orbits[ - od_orbits["orbit_id"].isin(od_orbit_members["orbit_id"].unique()) - ] - for df in [od_orbits, od_orbit_members]: - df.reset_index(inplace=True, drop=True) - - if not status["mergeAndExtendOrbits"]: - - recovered_orbits, recovered_orbit_members = mergeAndExtendOrbits( - od_orbits, od_orbit_members, projected_observations, **odp_config - ) - if out_dir is not None: - Orbits.from_df(recovered_orbits).to_csv( - os.path.join(out_dir, "recovered_orbits.csv") - ) - logger.debug("Saved recovered_orbits.csv.") - - recovered_orbit_members.to_csv( - os.path.join(out_dir, "recovered_orbit_members.csv"), - index=False, - float_format="%.15e", - ) - logger.debug("Saved recovered_orbit_members.csv.") - else: - logger.info("Orbit extension and merging completed previously.") - recovered_orbits = Orbits.from_csv( - os.path.join(out_dir, "recovered_orbits.csv"), - ).to_df(include_units=False) - logger.debug("Read recovered_orbits.csv.") - - recovered_orbit_members = pd.read_csv( - os.path.join(out_dir, "recovered_orbit_members.csv"), - index_col=False, - dtype={"obs_id": str}, - float_precision="round_trip", - ) - logger.debug("Read recovered_orbit_members.csv.") - - status["mergeAndExtendOrbits"] = True - status["complete"] = True - if out_dir is not None: - with open(status_file, "w") as status_out: - yaml.safe_dump(status, status_out) - logger.debug("Updated status.yml.") - - else: - logger.info("Orbit previously completed processing.") - recovered_orbits = Orbits.from_csv( - os.path.join(out_dir, "recovered_orbits.csv"), - ).to_df(include_units=False) - logger.debug("Read recovered_orbits.csv.") - - recovered_orbit_members = pd.read_csv( - os.path.join(out_dir, "recovered_orbit_members.csv"), - index_col=False, - dtype={"obs_id": str}, - float_precision="round_trip", - ) - logger.debug("Read recovered_orbit_members.csv.") - - if out_dir is not None: - logger.removeHandler(file_handler) - return recovered_orbits, recovered_orbit_members - - -def runTHOR( - preprocessed_observations, - test_orbits, - range_shift_config=Config.RANGE_SHIFT_CONFIG, - cluster_link_config=Config.CLUSTER_LINK_CONFIG, - iod_config=Config.IOD_CONFIG, - od_config=Config.OD_CONFIG, - odp_config=Config.ODP_CONFIG, - out_dir=None, - if_exists="continue", - logging_level=logging.INFO, -): - logger = logging.getLogger("thor") - logger.setLevel(logging_level) - - # Connect to ray cluster if enabled - enable_ray = False - configs = [ - range_shift_config, - cluster_link_config, - iod_config, - od_config, - odp_config, - ] - for conf in configs: - if conf["parallel_backend"] == "ray": - enable_ray = True - - if enable_ray: - import ray - - if not ray.is_initialized(): - ray.init(address="auto") - - # Build the configuration class which stores the run parameters - config = Configuration( - range_shift_config=range_shift_config, - cluster_link_config=cluster_link_config, - iod_config=iod_config, - od_config=od_config, - odp_config=odp_config, - ) - - orbits_completed = [] - continue_ = False - if_exists_ = if_exists - if out_dir is not None: - if not os.path.exists(out_dir): - os.mkdir(out_dir) - logger.debug("Created {} directory.".format(out_dir)) - - else: - if if_exists == "continue": - logger.warning( - "{} directory already exists, attempting to continue previous run.".format( - out_dir - ) - ) - continue_ = True - elif if_exists == "erase": - logger.warning( - "{} directory already exists, removing previous results.".format( - out_dir - ) - ) - shutil.rmtree(out_dir) - os.mkdir(out_dir) - logger.debug("Created {} directory.".format(out_dir)) - else: - err = "if_exists should be one of {'continue', 'erase'}." - raise ValueError(err) - - # The primary files which will be used to determine if the run - # can be continued from a previous state and, if so, from where - # to continue the run - config_file = os.path.join(out_dir, "config.yml") - test_orbits_in_file = os.path.join(out_dir, "test_orbits_in.csv") - status_file = os.path.join(out_dir, "status.txt") - config_eq = False - test_orbits_eq = False - save_orbits = True - save_config = True - - # Add summary file for test_orbits that tracks number of recovered orbits and number of observations - # linked in addition to the test_orbit_id used by THOR - test_orbits_out_file = os.path.join(out_dir, "test_orbits_out.csv") - if continue_: - if not os.path.exists(config_file): - logger.warning("No previous configuration file found.") - save_config = True - if_exists_ = "erase" - else: - logger.info("Previous configuration file found. Comparing settings...") - - config_prev = Configuration.fromYaml(config_file) - if config_prev != config: - logger.warning( - "Previous configuration does not match current configuration. Processing will not continue from previous state." - ) - if_exists_ = "erase" - else: - config_eq = True - save_config = False - logger.info("Previous configuration matches current configuration.") - - if not os.path.exists(test_orbits_in_file): - logger.warning("No previous test orbits file found.") - save_orbits = True - else: - logger.info("Previous test orbits file found.") - - test_orbits_prev = Orbits.from_csv( - test_orbits_in_file, - ) - if test_orbits_prev != test_orbits: - logger.warning( - "Previous test orbits do not match current test orbits." - ) - else: - test_orbits_eq = True - save_orbits = False - test_orbits_df = test_orbits_prev.to_df(include_units=False) - logger.info("Previous test orbits match current test orbits.") - - if not os.path.exists(status_file): - logger.warning("No previous status file found.") - else: - if test_orbits_eq and config_eq: - orbits_completed = np.loadtxt( - os.path.join(out_dir, "status.txt"), - delimiter="\n", - dtype=str, - ndmin=1, - ) - logger.info("Previous status file found.") - - if (not test_orbits_eq or not config_eq) and continue_: - if if_exists == "continue": - logger.critical("Previous run cannot continue from previous state.") - raise ValueError( - "Previous run cannot continue from previous state. Set if_exists to 'erase' or change/delete the output directory." - ) - elif if_exists == "erase": - shutil.rmtree(out_dir) - os.mkdir(out_dir) - logger.debug("Created {} directory.".format(out_dir)) - else: - pass - - if save_config: - config.toYaml(config_file) - logger.debug("Saved config.yml.") - - if save_orbits: - test_orbits.to_csv(test_orbits_in_file) - logger.debug("Saved test_orbits_in.csv.") - - preprocessed_observations.to_csv( - os.path.join(out_dir, "preprocessed_observations.csv"), - index=False, - float_format="%.15e", - ) - logger.debug("Saved preprocessed_observations.csv.") - - test_orbit_dfs = [] - recovered_orbits_dfs = [] - recovered_orbit_members_dfs = [] - obs_ids_linked = [] - num_orbits = len(test_orbits) - if num_orbits != len(orbits_completed): - test_orbits_split = test_orbits[len(orbits_completed) :].split(1) - else: - test_orbits_split = [] - - # If orbits have previously completed, read the results and continue iterating - # through orbits not previously completed. - id_offset = 0 - if len(orbits_completed) > 0: - logger.info( - "{}/{} orbits have previously finished processing.".format( - len(orbits_completed), num_orbits - ) - ) - - test_orbits_df = Orbits.from_csv( - test_orbits_out_file, - ).to_df(include_units=False) - logger.debug("Read previous test_orbits_out.csv.") - - recovered_orbits = Orbits.from_csv( - os.path.join(out_dir, "recovered_orbits.csv"), - ).to_df(include_units=False) - logger.debug("Read previous recovered_orbits.csv.") - - recovered_orbit_members = pd.read_csv( - os.path.join(out_dir, "recovered_orbit_members.csv"), - index_col=False, - dtype={"obs_id": str}, - float_precision="round_trip", - ) - logger.debug("Read previous recovered_orbit_members.csv.") - - test_orbit_dfs = [test_orbits_df] - recovered_orbits_dfs = [recovered_orbits] - recovered_orbit_members_dfs = [recovered_orbit_members] - obs_ids_linked = recovered_orbit_members["obs_id"].values - id_offset = len(orbits_completed) - - if len(test_orbits_split) != 0: - for i, orbit_i in enumerate(test_orbits_split): - - time_start = time.time() - orbit_id = "{:08d}".format(i + id_offset) - - logger.info( - "Processing orbit {} ({}/{})...".format( - orbit_id, i + 1 + id_offset, num_orbits - ) - ) - - if out_dir is not None: - orbit_dir = os.path.join(out_dir, "orbit_{}".format(orbit_id)) - else: - orbit_dir = None - - linked_mask = ~preprocessed_observations["obs_id"].isin(obs_ids_linked) - - recovered_orbits_i, recovered_orbit_members_i = runTHOROrbit( - preprocessed_observations[linked_mask], - orbit_i, - range_shift_config=range_shift_config, - cluster_link_config=cluster_link_config, - iod_config=iod_config, - od_config=od_config, - odp_config=odp_config, - out_dir=orbit_dir, - if_exists=if_exists_, - logging_level=logging_level, - ) - - time_end = time.time() - - if len(recovered_orbits_i) > 0: - recovered_orbits_i.insert(0, "test_orbit_id", orbit_id) - recovered_orbit_members_i.insert(0, "test_orbit_id", orbit_id) - obs_ids_linked_i = recovered_orbit_members_i["obs_id"].unique() - obs_ids_linked = np.concatenate([obs_ids_linked, obs_ids_linked_i]) - - orbits_recovered = len(recovered_orbits_i) - observations_linked = len(obs_ids_linked_i) - else: - orbits_recovered = 0 - observations_linked = 0 - - test_orbit_i = orbit_i.to_df(include_units=False) - test_orbit_i["test_orbit_id"] = orbit_id - test_orbit_i["orbits_recovered"] = orbits_recovered - test_orbit_i["observations_linked"] = observations_linked - test_orbit_i["processing_time"] = time_end - time_start - test_orbit_dfs.append(test_orbit_i) - - logger.info( - "Completed processing orbit {} in {:.3f} seconds.".format( - orbit_id, time_end - time_start - ) - ) - - recovered_orbits_dfs.append(recovered_orbits_i) - recovered_orbit_members_dfs.append(recovered_orbit_members_i) - - test_orbits_df = pd.concat(test_orbit_dfs, ignore_index=True) - recovered_orbits = pd.concat(recovered_orbits_dfs, ignore_index=True) - recovered_orbit_members = pd.concat( - recovered_orbit_members_dfs, ignore_index=True - ) - - if out_dir is not None: - Orbits.from_df(test_orbits_df).to_csv(test_orbits_out_file) - logger.debug("Saved test_orbits_out.csv.") - - Orbits.from_df(recovered_orbits).to_csv( - os.path.join(out_dir, "recovered_orbits.csv") - ) - logger.debug("Saved recovered_orbits.csv.") - - recovered_orbit_members.to_csv( - os.path.join(out_dir, "recovered_orbit_members.csv"), - index=False, - float_format="%.15e", - ) - logger.debug("Saved recovered_orbit_members.csv.") - - orbits_completed = np.concatenate([orbits_completed, np.array([orbit_id])]) - if out_dir is not None: - with open(os.path.join(out_dir, "status.txt"), "w") as status_out: - np.savetxt(status_out, orbits_completed, delimiter="\n", fmt="%s") - logger.info("Saved status.txt.") - - else: - - logger.info("Run completed previously.") - test_orbits_df = Orbits.from_csv( - test_orbits_out_file, - ).to_df(include_units=False) - logger.debug("Read test_orbits_out.csv.") - - recovered_orbits = Orbits.from_csv( - os.path.join(out_dir, "recovered_orbits.csv"), - ).to_df(include_units=False) - logger.debug("Read recovered_orbits.csv.") - - recovered_orbit_members = pd.read_csv( - os.path.join(out_dir, "recovered_orbit_members.csv"), - index_col=False, - dtype={"obs_id": str}, - float_precision="round_trip", - ) - logger.debug("Read recovered_orbit_members.csv.") - - return test_orbits_df, recovered_orbits, recovered_orbit_members diff --git a/thor/main_2.py b/thor/main_2.py index 9443bca5..7e43e174 100644 --- a/thor/main_2.py +++ b/thor/main_2.py @@ -11,15 +11,15 @@ ) from adam_core.propagator import PYOORB, Propagator -from .main import ( - clusterAndLink, +from .main import clusterAndLink +from .observations.filters import ObservationFilter, TestOrbitRadiusObservationFilter +from .observations.observations import Observations, ObserversWithStates +from .orbit import TestOrbit, TestOrbitEphemeris +from .orbits import ( differentialCorrection, initialOrbitDetermination, mergeAndExtendOrbits, ) -from .observations.filters import ObservationFilter, TestOrbitRadiusObservationFilter -from .observations.observations import Observations, ObserversWithStates -from .orbit import TestOrbit, TestOrbitEphemeris from .projections import GnomonicCoordinates From 52e4b2b75bc11ac42fd6ed7eaf022f68302688c8 Mon Sep 17 00:00:00 2001 From: Joachim Moeyens Date: Thu, 26 Oct 2023 16:54:22 -0700 Subject: [PATCH 06/14] Add new Config dataclass --- thor/config.py | 382 +++++--------------------------------- thor/main_2.py | 83 ++++++++- thor/tests/test_config.py | 252 ------------------------- 3 files changed, 120 insertions(+), 597 deletions(-) delete mode 100644 thor/tests/test_config.py diff --git a/thor/config.py b/thor/config.py index ed44c20d..dc345465 100644 --- a/thor/config.py +++ b/thor/config.py @@ -1,336 +1,46 @@ -import copy -import io -import logging -import os - -import yaml - -__all__ = ["_handleUserConfig", "Configuration", "Config"] - -logger = logging.getLogger("thor") - -### DEFAULT CONFIGURATION - -MIN_OBS: int = 5 -MIN_ARC_LENGTH: float = 1.0 -CONTAMINATION_PERCENTAGE: float = 20.0 -BACKEND: str = "PYOORB" -BACKEND_KWARGS: dict = {} -NUM_JOBS: str = "auto" -PARALLEL_BACKEND: str = "cf" - -DEFAULT_RANGE_SHIFT_CONFIG = { - "cell_area": 1000, - "num_jobs": NUM_JOBS, - "backend": BACKEND, - "backend_kwargs": BACKEND_KWARGS, - "parallel_backend": PARALLEL_BACKEND, -} - -DEFAULT_CLUSTER_LINK_CONFIG = { - "vx_range": [-0.1, 0.1], - "vy_range": [-0.1, 0.1], - "vx_bins": 300, - "vy_bins": 300, - "vx_values": None, - "vy_values": None, - "eps": 0.005, - "min_obs": MIN_OBS, - "min_arc_length": MIN_ARC_LENGTH, - "num_jobs": NUM_JOBS, - "alg": "hotspot_2d", - "parallel_backend": PARALLEL_BACKEND, -} - -DEFAULT_IOD_CONFIG = { - "min_obs": MIN_OBS, - "min_arc_length": MIN_ARC_LENGTH, - "contamination_percentage": CONTAMINATION_PERCENTAGE, - "rchi2_threshold": 100000, - "observation_selection_method": "combinations", - "iterate": False, - "light_time": True, - "linkage_id_col": "cluster_id", - "identify_subsets": False, - "chunk_size": 10, - "num_jobs": NUM_JOBS, - "backend": BACKEND, - "backend_kwargs": BACKEND_KWARGS, - "parallel_backend": PARALLEL_BACKEND, -} - -DEFAULT_OD_CONFIG = { - "min_obs": MIN_OBS, - "min_arc_length": MIN_ARC_LENGTH, - "contamination_percentage": CONTAMINATION_PERCENTAGE, - "rchi2_threshold": 10, - "delta": 1e-6, - "max_iter": 10, - "method": "central", - "fit_epoch": False, - "test_orbit": None, - "chunk_size": 10, - "num_jobs": NUM_JOBS, - "backend": BACKEND, - "backend_kwargs": BACKEND_KWARGS, - "parallel_backend": PARALLEL_BACKEND, -} - -DEFAULT_ODP_CONFIG = { - "min_obs": MIN_OBS, - "min_arc_length": MIN_ARC_LENGTH, - "contamination_percentage": 0.0, - "rchi2_threshold": 10, - "eps": 1 / 3600, - "delta": 1e-8, - "max_iter": 5, - "method": "central", - "fit_epoch": False, - "orbits_chunk_size": 10, - "observations_chunk_size": 1000000, - "num_jobs": NUM_JOBS, - "backend": BACKEND, - "backend_kwargs": BACKEND_KWARGS, - "parallel_backend": PARALLEL_BACKEND, -} - -DEFAULT_TASKQUEUE_CONFIG = { - "hostname": "rabbit.c.thor-moeyens-dev.internal", - "port": 5672, - "username": "thor", - "password": None, - "queue": "thor_tasks", - "bucket": "thor_taskqueue_v1", -} - - -def _handleUserConfig(user_dict, default_dict): - out_dict = copy.deepcopy(user_dict) - for key, value in default_dict.items(): - if key not in out_dict.keys(): - out_dict[key] = value - return out_dict - - -class Configuration: - def __init__( - self, - range_shift_config=None, - cluster_link_config=None, - iod_config=None, - od_config=None, - odp_config=None, - taskqueue_config=None, - min_obs=None, - min_arc_length=None, - contamination_percentage=None, - backend=None, - backend_kwargs=None, - num_jobs=None, - parallel_backend=None, - ): - - if range_shift_config is None: - self.RANGE_SHIFT_CONFIG = DEFAULT_RANGE_SHIFT_CONFIG - else: - self.RANGE_SHIFT_CONFIG = _handleUserConfig( - range_shift_config, DEFAULT_RANGE_SHIFT_CONFIG - ) - - if cluster_link_config is None: - self.CLUSTER_LINK_CONFIG = DEFAULT_CLUSTER_LINK_CONFIG - else: - self.CLUSTER_LINK_CONFIG = _handleUserConfig( - cluster_link_config, DEFAULT_CLUSTER_LINK_CONFIG - ) - - if iod_config is None: - self.IOD_CONFIG = DEFAULT_IOD_CONFIG - else: - self.IOD_CONFIG = _handleUserConfig(iod_config, DEFAULT_IOD_CONFIG) - - if od_config is None: - self.OD_CONFIG = DEFAULT_OD_CONFIG - else: - self.OD_CONFIG = _handleUserConfig(od_config, DEFAULT_OD_CONFIG) - - if odp_config is None: - self.ODP_CONFIG = DEFAULT_ODP_CONFIG - else: - self.ODP_CONFIG = _handleUserConfig(odp_config, DEFAULT_ODP_CONFIG) - - if taskqueue_config is None: - self.TASKQUEUE_CONFIG = DEFAULT_TASKQUEUE_CONFIG - else: - self.TASKQUEUE_CONFIG = _handleUserConfig( - taskqueue_config, - DEFAULT_TASKQUEUE_CONFIG, - ) - - self._conf = {} - if min_obs is not None: - self.MIN_OBS = min_obs - self._conf["MIN_OBS"] = self.MIN_OBS - components = [ - self.CLUSTER_LINK_CONFIG, - self.IOD_CONFIG, - self.OD_CONFIG, - self.ODP_CONFIG, - ] - self.updateConfigs("min_obs", min_obs, components) - - if min_arc_length is not None: - self.MIN_ARC_LENGTH = min_arc_length - self._conf["MIN_ARC_LENGTH"] = self.MIN_ARC_LENGTH - components = [ - self.CLUSTER_LINK_CONFIG, - self.IOD_CONFIG, - self.OD_CONFIG, - self.ODP_CONFIG, - ] - self.updateConfigs("min_arc_length", min_arc_length, components) - - if contamination_percentage is not None: - self.CONTAMINATION_PERCENTAGE = contamination_percentage - self._conf["CONTAMINATION_PERCENTAGE"] = self.CONTAMINATION_PERCENTAGE - components = [self.IOD_CONFIG, self.OD_CONFIG, self.ODP_CONFIG] - self.updateConfigs( - "contamination_percentage", contamination_percentage, components - ) - - if backend is not None: - self.BACKEND = backend - self._conf["BACKEND"] = self.BACKEND - components = [ - self.RANGE_SHIFT_CONFIG, - self.IOD_CONFIG, - self.OD_CONFIG, - self.ODP_CONFIG, - ] - self.updateConfigs("backend", backend, components) - - if backend_kwargs is not None: - self.BACKEND_KWARGS = backend_kwargs - self._conf["BACKEND_KWARGS"] = self.BACKEND_KWARGS - components = [ - self.RANGE_SHIFT_CONFIG, - self.IOD_CONFIG, - self.OD_CONFIG, - self.ODP_CONFIG, - ] - self.updateConfigs("backend_kwargs", backend_kwargs, components) - - if num_jobs is not None: - self.NUM_JOBS = num_jobs - self._conf["NUM_JOBS"] = self.NUM_JOBS - components = [ - self.RANGE_SHIFT_CONFIG, - self.CLUSTER_LINK_CONFIG, - self.IOD_CONFIG, - self.OD_CONFIG, - self.ODP_CONFIG, - ] - self.updateConfigs("num_jobs", num_jobs, components) - - if parallel_backend is not None: - self.PARALLEL_BACKEND = parallel_backend - self._conf["PARALLEL_BACKEND"] = self.PARALLEL_BACKEND - components = [ - self.RANGE_SHIFT_CONFIG, - self.CLUSTER_LINK_CONFIG, - self.IOD_CONFIG, - self.OD_CONFIG, - self.ODP_CONFIG, - ] - self.updateConfigs("parallel_backend", parallel_backend, components) - - self._conf["RANGE_SHIFT_CONFIG"] = self.RANGE_SHIFT_CONFIG - self._conf["CLUSTER_LINK_CONFIG"] = self.CLUSTER_LINK_CONFIG - self._conf["IOD_CONFIG"] = self.IOD_CONFIG - self._conf["OD_CONFIG"] = self.OD_CONFIG - self._conf["ODP_CONFIG"] = self.ODP_CONFIG - self._conf["TASKQUEUE_CONFIG"] = self.TASKQUEUE_CONFIG - return - - def __eq__(self, other): - """ - Make sure all pipeline component dictionaries are the equal. - """ - eq = True - if self.RANGE_SHIFT_CONFIG != other.RANGE_SHIFT_CONFIG: - eq = False - if self.CLUSTER_LINK_CONFIG != other.CLUSTER_LINK_CONFIG: - eq = False - if self.IOD_CONFIG != other.IOD_CONFIG: - eq = False - if self.OD_CONFIG != other.OD_CONFIG: - eq = False - if self.ODP_CONFIG != other.ODP_CONFIG: - eq = False - return eq - - def updateConfigs(self, name, value, components): - logger.warning(f"Setting pipeline components to use {name}: {value}.") - for config in components: - config[name] = value - return - - def toYaml(self, file_name): - """ - Save configuration to a YAML file. - - Parameters - ---------- - file_name : str - Path to file. - """ - with open(file_name, "w") as config_out: - yaml.dump(self._conf, config_out, sort_keys=False) - return - - def toYamlString(self): - """ - Save configuration to a YAML string. - """ - return yaml.dump(self._conf, stream=None, sort_keys=False) - - @classmethod - def fromYaml(cls, file_name): - """ - Read configuration from YAML file. - - Parameters - ---------- - file_name : str - Path to file. - """ - with open(file_name, "r") as config_in: - conf_in = yaml.load(config_in, Loader=yaml.FullLoader) - - conf = {} - for key, values in conf_in.items(): - conf[key.lower()] = values - - config = Configuration(**conf) - return config - - @classmethod - def fromYamlString(cls, s): - """ - Read configuration from a YAML string. - - Parameters - ---------- - s : str - The string containing YAML data. - """ - conf_in = yaml.load(io.StringIO(s), Loader=yaml.FullLoader) - conf = {} - for key, values in conf_in.items(): - conf[key.lower()] = values - - config = Configuration(**conf) - return config - - -Config = Configuration() +from dataclasses import dataclass +from typing import Literal, Optional + +import numpy as np +import numpy.typing as npt + + +@dataclass +class Config: + max_processes: Optional[int] = None + propagator: str = "PYOORB" + parallel_backend: Literal["cf"] = "cf" + cell_radius: float = 10 + vx_min: float = -0.1 + vx_max: float = 0.1 + vy_min: float = -0.1 + vy_max: float = 0.1 + vx_bins: int = 300 + vy_bins: int = 300 + vx_values: Optional[npt.NDArray[np.float64]] = None + vy_values: Optional[npt.NDArray[np.float64]] = None + cluster_radius: float = 0.005 + cluster_min_obs: int = 6 + cluster_min_arc_length: float = 1.0 + cluster_algorithm: Literal["hotspot_2d", "dbscan"] = "dbscan" + iod_min_obs: int = 6 + iod_min_arc_length: float = 1.0 + iod_contamination_percentage: float = 20.0 + iod_rchi2_threshold: float = 100000 + iod_observation_selection_method: Literal[ + "combinations", "first+middle+last" + ] = "combinations" + iod_chunk_size: int = 10 + od_min_obs: int = 6 + od_min_arc_length: float = 1.0 + od_contamination_percentage: float = 20.0 + od_rchi2_threshold: float = 10 + od_delta: float = 1e-6 + od_max_iter: int = 10 + od_chunk_size: int = 10 + arc_extension_min_obs: int = 6 + arc_extension_min_arc_length: float = 1.0 + arc_extension_contamination_percentage: float = 0.0 + arc_extension_rchi2_threshold: float = 10 + arc_extension_radius: float = 1 / 3600 + arc_extension_chunk_size: int = 100 diff --git a/thor/main_2.py b/thor/main_2.py index 7e43e174..9cd79076 100644 --- a/thor/main_2.py +++ b/thor/main_2.py @@ -11,6 +11,7 @@ ) from adam_core.propagator import PYOORB, Propagator +from .config import Config from .main import clusterAndLink from .observations.filters import ObservationFilter, TestOrbitRadiusObservationFilter from .observations.observations import Observations, ObserversWithStates @@ -82,7 +83,7 @@ def range_and_transform( test_orbit: TestOrbit, observations: Observations, propagator: Propagator = PYOORB(), - max_processes: int = 1, + max_processes: Optional[int] = 1, ) -> TransformedDetections: """ Range observations for a single test orbit and transform them into a @@ -285,11 +286,8 @@ def _transformed_detections_to_transformed_detections_df( def link_test_orbit( test_orbit: TestOrbit, observations: Observations, - filters: Optional[List[ObservationFilter]] = [ - TestOrbitRadiusObservationFilter(radius=10.0) - ], - propagator: Propagator = PYOORB(), - max_processes: int = 1, + filters: Optional[List[ObservationFilter]] = None, + config: Optional[Config] = None, ) -> Iterator[Any]: """ Run THOR for a single test orbit on the given observations. This function will yield @@ -314,18 +312,30 @@ def link_test_orbit( max_processes : int, optional Maximum number of processes to use for parallelization. """ + if config is None: + config = Config() + + if config.propagator == "PYOORB": + propagator = PYOORB() + else: + raise ValueError(f"Unknown propagator: {config.propagator}") + # Apply filters to the observations filtered_observations = observations if filters is not None: for filter_i in filters: filtered_observations = filter_i.apply(filtered_observations, test_orbit) + else: + filters = [TestOrbitRadiusObservationFilter(radius=config.cell_radius)] + for filter_i in filters: + filtered_observations = filter_i.apply(filtered_observations, test_orbit) # Range and transform the observations transformed_detections = range_and_transform( test_orbit, filtered_observations, propagator=propagator, - max_processes=max_processes, + max_processes=config.max_processes, ) yield transformed_detections @@ -347,6 +357,18 @@ def link_test_orbit( # Run clustering clusters, cluster_members = clusterAndLink( transformed_detections_df, + vx_range=[config.vx_min, config.vx_max], + vy_range=[config.vy_min, config.vy_max], + vx_bins=config.vx_bins, + vy_bins=config.vy_bins, + vx_values=config.vx_values, + vy_values=config.vy_values, + eps=config.cluster_radius, + min_obs=config.cluster_min_obs, + min_arc_length=config.cluster_min_arc_length, + alg=config.cluster_algorithm, + num_jobs=config.max_processes, + parallel_backend=config.parallel_backend, ) yield clusters, cluster_members @@ -354,8 +376,21 @@ def link_test_orbit( iod_orbits, iod_orbit_members = initialOrbitDetermination( observations_df, cluster_members, + min_obs=config.iod_min_obs, + min_arc_length=config.iod_min_arc_length, + contamination_percentage=config.iod_contamination_percentage, + rchi2_threshold=config.iod_rchi2_threshold, + observation_selection_method=config.iod_observation_selection_method, + backend=config.propagator, + backend_kwargs={}, + chunk_size=config.iod_chunk_size, + num_jobs=config.max_processes, + parallel_backend=config.parallel_backend, + # TODO: investigate whether these should be configurable + iterate=False, identify_subsets=False, - rchi2_threshold=1e10, + light_time=True, + linkage_id_col="cluster_id", ) yield iod_orbits, iod_orbit_members @@ -364,7 +399,21 @@ def link_test_orbit( iod_orbits, iod_orbit_members, observations_df, - rchi2_threshold=1e10, + min_obs=config.od_min_obs, + min_arc_length=config.od_min_arc_length, + contamination_percentage=config.od_contamination_percentage, + rchi2_threshold=config.od_rchi2_threshold, + delta=config.od_delta, + max_iter=config.od_max_iter, + backend=config.propagator, + backend_kwargs={}, + chunk_size=config.od_chunk_size, + num_jobs=config.max_processes, + parallel_backend=config.parallel_backend, + # TODO: investigate whether these should be configurable + method="central", + fit_epoch=False, + test_orbit=None, ) yield od_orbits, od_orbit_members @@ -373,5 +422,21 @@ def link_test_orbit( od_orbits, od_orbit_members, observations_df, + min_obs=config.arc_extension_min_obs, + min_arc_length=config.arc_extension_min_arc_length, + contamination_percentage=config.arc_extension_contamination_percentage, + rchi2_threshold=config.arc_extension_rchi2_threshold, + eps=config.arc_extension_radius, + delta=config.od_delta, + max_iter=config.od_max_iter, + backend=config.propagator, + backend_kwargs={}, + orbits_chunk_size=config.arc_extension_chunk_size, + num_jobs=config.max_processes, + parallel_backend=config.parallel_backend, + # TODO: investigate whether these should be configurable + method="central", + fit_epoch=False, + observations_chunk_size=100000, ) yield recovered_orbits, recovered_orbit_members diff --git a/thor/tests/test_config.py b/thor/tests/test_config.py deleted file mode 100644 index e8cb2af9..00000000 --- a/thor/tests/test_config.py +++ /dev/null @@ -1,252 +0,0 @@ -from ..config import Configuration, _handleUserConfig - - -def test__handleUserConfig(): - - # Define some arbitrary defaultr dictionary - default_config = { - "a": 1, - "b": 2, - "c": 3, - } - - user_config = {"a": 4} - - config = _handleUserConfig(user_config, default_config) - - assert config["a"] == 4 - assert config["b"] == 2 - assert config["c"] == 3 - return - - -def test_configuration_rangeAndShift(): - Config = Configuration() - - # Configuration should be a dictionary - assert isinstance(Config.RANGE_SHIFT_CONFIG, dict) - kwargs = ["cell_area", "num_jobs", "backend", "backend_kwargs", "parallel_backend"] - - # Make sure all rangeAndShift kwargs are included in the dictionary - for kwarg in kwargs: - assert kwarg in Config.RANGE_SHIFT_CONFIG.keys() - assert len(Config.RANGE_SHIFT_CONFIG.keys()) == len(kwargs) - - return - - -def test_configuration_clusterAndLink(): - Config = Configuration() - - # Configuration should be a dictionary - assert isinstance(Config.CLUSTER_LINK_CONFIG, dict) - - # Make sure all clusterAndLink kwargs are included in the dictionary - kwargs = [ - "vx_range", - "vy_range", - "vx_bins", - "vy_bins", - "vx_values", - "vy_values", - "eps", - "min_obs", - "min_arc_length", - "alg", - "num_jobs", - "parallel_backend", - ] - for kwarg in kwargs: - assert kwarg in Config.CLUSTER_LINK_CONFIG.keys() - assert len(Config.CLUSTER_LINK_CONFIG.keys()) == len(kwargs) - - return - - -def test_configuration_iod(): - Config = Configuration() - - # Configuration should be a dictionary - assert isinstance(Config.IOD_CONFIG, dict) - - # Make sure all initialOrbitDetermination kwargs are included in the dictionary - kwargs = [ - "min_obs", - "min_arc_length", - "contamination_percentage", - "rchi2_threshold", - "observation_selection_method", - "iterate", - "light_time", - "linkage_id_col", - "identify_subsets", - "backend", - "backend_kwargs", - "chunk_size", - "num_jobs", - "parallel_backend", - ] - for kwarg in kwargs: - assert kwarg in Config.IOD_CONFIG.keys() - assert len(Config.IOD_CONFIG.keys()) == len(kwargs) - - return - - -def test_configuration_od(): - Config = Configuration() - - # Configuration should be a dictionary - assert isinstance(Config.OD_CONFIG, dict) - - # Make sure all differentialCorrection kwargs are included in the dictionary - kwargs = [ - "min_obs", - "min_arc_length", - "contamination_percentage", - "rchi2_threshold", - "delta", - "max_iter", - "method", - "fit_epoch", - "test_orbit", - "backend", - "backend_kwargs", - "chunk_size", - "num_jobs", - "parallel_backend", - ] - for kwarg in kwargs: - assert kwarg in Config.OD_CONFIG.keys() - assert len(Config.OD_CONFIG.keys()) == len(kwargs) - - return - - -def test_configuration_odp(): - Config = Configuration() - - # Configuration should be a dictionary - assert isinstance(Config.ODP_CONFIG, dict) - - # Make sure all mergeAndExtendOrbits kwargs are included in the dictionary - kwargs = [ - "min_obs", - "min_arc_length", - "contamination_percentage", - "rchi2_threshold", - "eps", - "delta", - "max_iter", - "method", - "fit_epoch", - "backend", - "backend_kwargs", - "orbits_chunk_size", - "observations_chunk_size", - "num_jobs", - "parallel_backend", - ] - for kwarg in kwargs: - assert kwarg in Config.ODP_CONFIG.keys() - assert len(Config.ODP_CONFIG.keys()) == len(kwargs) - - return - - -def test_configuration_min_obs_override(): - val = 1234 - Config = Configuration(min_obs=val) - - assert "min_obs" not in Config.RANGE_SHIFT_CONFIG.keys() - assert Config.CLUSTER_LINK_CONFIG["min_obs"] == val - assert Config.IOD_CONFIG["min_obs"] == val - assert Config.OD_CONFIG["min_obs"] == val - assert Config.ODP_CONFIG["min_obs"] == val - assert Config.MIN_OBS == val - - return - - -def test_configuration_min_arc_length_override(): - val = 999.999 - Config = Configuration(min_arc_length=val) - - assert "min_arc_length" not in Config.RANGE_SHIFT_CONFIG.keys() - assert Config.CLUSTER_LINK_CONFIG["min_arc_length"] == val - assert Config.IOD_CONFIG["min_arc_length"] == val - assert Config.OD_CONFIG["min_arc_length"] == val - assert Config.ODP_CONFIG["min_arc_length"] == val - assert Config.MIN_ARC_LENGTH == val - - return - - -def test_configuration_contamination_percentage_override(): - val = 100.0 - Config = Configuration(contamination_percentage=val) - - assert "contamination_percentage" not in Config.RANGE_SHIFT_CONFIG.keys() - assert "contamination_percentage" not in Config.CLUSTER_LINK_CONFIG.keys() - assert Config.IOD_CONFIG["contamination_percentage"] == val - assert Config.OD_CONFIG["contamination_percentage"] == val - assert Config.ODP_CONFIG["contamination_percentage"] == val - assert Config.CONTAMINATION_PERCENTAGE == val - - return - - -def test_configuration_backend_override(): - val = "propagator" - Config = Configuration(backend=val) - - assert Config.RANGE_SHIFT_CONFIG["backend"] == val - assert "backend" not in Config.CLUSTER_LINK_CONFIG.keys() - assert Config.IOD_CONFIG["backend"] == val - assert Config.OD_CONFIG["backend"] == val - assert Config.ODP_CONFIG["backend"] == val - assert Config.BACKEND == val - - return - - -def test_configuration_backend_kwargs_override(): - val = {"dynamical_model": "test"} - Config = Configuration(backend_kwargs=val) - - assert Config.RANGE_SHIFT_CONFIG["backend_kwargs"] == val - assert "backend_kwargs" not in Config.CLUSTER_LINK_CONFIG.keys() - assert Config.IOD_CONFIG["backend_kwargs"] == val - assert Config.OD_CONFIG["backend_kwargs"] == val - assert Config.ODP_CONFIG["backend_kwargs"] == val - assert Config.BACKEND_KWARGS == val - - return - - -def test_configuration_num_jobs_override(): - val = 123 - Config = Configuration(num_jobs=val) - - assert Config.RANGE_SHIFT_CONFIG["num_jobs"] == val - assert Config.CLUSTER_LINK_CONFIG["num_jobs"] == val - assert Config.IOD_CONFIG["num_jobs"] == val - assert Config.OD_CONFIG["num_jobs"] == val - assert Config.ODP_CONFIG["num_jobs"] == val - assert Config.NUM_JOBS == val - - return - - -def test_configuration_parallel_backend_override(): - val = "parallelizer" - Config = Configuration(parallel_backend=val) - - assert Config.RANGE_SHIFT_CONFIG["parallel_backend"] == val - assert Config.CLUSTER_LINK_CONFIG["parallel_backend"] == val - assert Config.IOD_CONFIG["parallel_backend"] == val - assert Config.OD_CONFIG["parallel_backend"] == val - assert Config.ODP_CONFIG["parallel_backend"] == val - assert Config.PARALLEL_BACKEND == val - - return From dd20cac6fca6f0d31e116b7417bd1c14677935a5 Mon Sep 17 00:00:00 2001 From: Joachim Moeyens Date: Fri, 27 Oct 2023 08:15:27 -0700 Subject: [PATCH 07/14] Add logging statements to range_and_transform, link_test_orbit, and TestOrbitRadiusObservationFilter --- thor/main_2.py | 44 ++++++++++++++++++++++++++++++------ thor/observations/filters.py | 18 ++++++++++++++- 2 files changed, 54 insertions(+), 8 deletions(-) diff --git a/thor/main_2.py b/thor/main_2.py index 9cd79076..78ac0daa 100644 --- a/thor/main_2.py +++ b/thor/main_2.py @@ -1,3 +1,5 @@ +import logging +import time from typing import Any, Iterator, List, Optional import pandas as pd @@ -23,6 +25,8 @@ ) from .projections import GnomonicCoordinates +logger = logging.getLogger(__name__) + class TransformedDetections(qv.Table): id = qv.StringColumn() @@ -108,6 +112,10 @@ def range_and_transform( The transformed detections as gnomonic coordinates of the observations in the co-rotating frame. """ + time_start = time.perf_counter() + logger.info("Running range and transform...") + logger.info(f"Assuming r = {test_orbit.orbit.coordinates.r[0]} au") + logger.info(f"Assuming v = {test_orbit.orbit.coordinates.v[0]} au/d") # Compute the ephemeris of the test orbit (this will be cached) ephemeris = test_orbit.generate_ephemeris_from_observations( observations, @@ -135,6 +143,9 @@ def range_and_transform( if max_processes is None or max_processes > 1: if not ray.is_initialized(): + logger.debug( + f"Ray is not initialized. Initializing with {max_processes}..." + ) ray.init(num_cpus=max_processes) if isinstance(observations, ray.ObjectRef): @@ -182,7 +193,14 @@ def range_and_transform( ) transformed_detections = qv.concatenate(transformed_detection_list) - return transformed_detections.sort_by(by=["state_id"]) + transformed_detections = transformed_detections.sort_by(by=["state_id"]) + + time_end = time.perf_counter() + logger.info(f"Transformed {len(transformed_detections)} observations.") + logger.info( + f"Range and transform completed in {time_end - time_start:.3f} seconds." + ) + return transformed_detections def _observations_to_observations_df(observations: Observations) -> pd.DataFrame: @@ -312,6 +330,9 @@ def link_test_orbit( max_processes : int, optional Maximum number of processes to use for parallelization. """ + time_start = time.perf_counter() + logger.info("Running test orbit...") + if config is None: config = Config() @@ -320,15 +341,21 @@ def link_test_orbit( else: raise ValueError(f"Unknown propagator: {config.propagator}") + if config.max_processes is None or config.max_processes > 1: + # Initialize ray + if not ray.is_initialized(): + logger.debug( + f"Ray is not initialized. Initializing with {config.max_processes}..." + ) + ray.init(num_cpus=config.max_processes) + # Apply filters to the observations filtered_observations = observations - if filters is not None: - for filter_i in filters: - filtered_observations = filter_i.apply(filtered_observations, test_orbit) - else: + if filters is None: + # By default we always filter by radius from the predicted position of the test orbit filters = [TestOrbitRadiusObservationFilter(radius=config.cell_radius)] - for filter_i in filters: - filtered_observations = filter_i.apply(filtered_observations, test_orbit) + for filter_i in filters: + filtered_observations = filter_i.apply(filtered_observations, test_orbit) # Range and transform the observations transformed_detections = range_and_transform( @@ -440,3 +467,6 @@ def link_test_orbit( observations_chunk_size=100000, ) yield recovered_orbits, recovered_orbit_members + + time_end = time.perf_counter() + logger.info(f"Test orbit completed in {time_end-time_start:.3f} seconds.") diff --git a/thor/observations/filters.py b/thor/observations/filters.py index ea346638..1afb945c 100644 --- a/thor/observations/filters.py +++ b/thor/observations/filters.py @@ -1,4 +1,6 @@ import abc +import logging +import time from typing import TYPE_CHECKING import numpy as np @@ -11,6 +13,9 @@ from .observations import Observations +logger = logging.getLogger(__name__) + + class ObservationFilter(abc.ABC): """An ObservationFilter is reduces a collection of observations to a subset of those observations. @@ -73,6 +78,9 @@ def apply( filtered_observations : `~thor.observations.Observations` The filtered observations. """ + time_start = time.perf_counter() + logger.info("Applying TestOrbitRadiusObservationFilter...") + logger.info(f"Using radius = {self.radius:.5f} deg") # Generate an ephemeris for every observer time/location in the dataset test_orbit_ephemeris = test_orbit.generate_ephemeris_from_observations( observations @@ -113,7 +121,15 @@ def apply( # Update the mask mask[idx_within] = True - return observations.apply_mask(mask) + observations_masked = observations.apply_mask(mask) + time_end = time.perf_counter() + logger.info( + f"Filtered {len(observations)} observations to {len(observations_masked)} observations." + ) + logger.info( + f"TestOrbitRadiusObservationFilter completed in {time_end - time_start:.3f} seconds." + ) + return observations_masked def _within_radius( From 64102cc39e9ea9b30ab140462f92b31c846be221 Mon Sep 17 00:00:00 2001 From: Joachim Moeyens Date: Fri, 27 Oct 2023 08:21:07 -0700 Subject: [PATCH 08/14] Add Config.set_min_obs and .set_min_arc_length --- thor/config.py | 28 ++++++++++++++++++++++++++++ thor/tests/test_config.py | 19 +++++++++++++++++++ 2 files changed, 47 insertions(+) create mode 100644 thor/tests/test_config.py diff --git a/thor/config.py b/thor/config.py index dc345465..c296b6dc 100644 --- a/thor/config.py +++ b/thor/config.py @@ -44,3 +44,31 @@ class Config: arc_extension_rchi2_threshold: float = 10 arc_extension_radius: float = 1 / 3600 arc_extension_chunk_size: int = 100 + + def set_min_obs(self, min_obs: int): + """ + Set the minimum number of observations for all stages of the pipeline. + + Parameters + ---------- + min_obs + The minimum number of observations. + """ + self.cluster_min_obs = min_obs + self.iod_min_obs = min_obs + self.od_min_obs = min_obs + self.arc_extension_min_obs = min_obs + + def set_min_arc_length(self, min_arc_length: int): + """ + Set the minimum arc length for all stages of the pipeline. + + Parameters + ---------- + min_arc_length + The minimum arc length. + """ + self.cluster_min_arc_length = min_arc_length + self.iod_min_arc_length = min_arc_length + self.od_min_arc_length = min_arc_length + self.arc_extension_min_arc_length = min_arc_length diff --git a/thor/tests/test_config.py b/thor/tests/test_config.py new file mode 100644 index 00000000..c40a17eb --- /dev/null +++ b/thor/tests/test_config.py @@ -0,0 +1,19 @@ +from ..config import Config + + +def test_Config_set_min_obs(): + config = Config() + config.set_min_obs(3) + assert config.cluster_min_obs == 3 + assert config.iod_min_obs == 3 + assert config.od_min_obs == 3 + assert config.arc_extension_min_obs == 3 + + +def test_Config_set_min_arc_length(): + config = Config() + config.set_min_arc_length(3) + assert config.cluster_min_arc_length == 3 + assert config.iod_min_arc_length == 3 + assert config.od_min_arc_length == 3 + assert config.arc_extension_min_arc_length == 3 From b02e34ddf986261ba2eb22bc2f15d829b5c45440 Mon Sep 17 00:00:00 2001 From: Joachim Moeyens Date: Fri, 27 Oct 2023 10:19:31 -0700 Subject: [PATCH 09/14] Parallelize TestOrbitRadiusObservationFilter with ray --- thor/main_2.py | 8 +- thor/observations/filters.py | 170 +++++++++++++++++++++++++---------- 2 files changed, 131 insertions(+), 47 deletions(-) diff --git a/thor/main_2.py b/thor/main_2.py index 78ac0daa..33b3b4e5 100644 --- a/thor/main_2.py +++ b/thor/main_2.py @@ -104,7 +104,9 @@ def range_and_transform( Propagator to use to propagate the test orbit and generate ephemerides. max_processes : int, optional - Maximum number of processes to use for parallelization. + Maximum number of processes to use for parallelization. If + an existing ray cluster is already running, this parameter + will be ignored if larger than 1 or not None. Returns ------- @@ -355,7 +357,9 @@ def link_test_orbit( # By default we always filter by radius from the predicted position of the test orbit filters = [TestOrbitRadiusObservationFilter(radius=config.cell_radius)] for filter_i in filters: - filtered_observations = filter_i.apply(filtered_observations, test_orbit) + filtered_observations = filter_i.apply( + filtered_observations, test_orbit, max_processes=config.max_processes + ) # Range and transform the observations transformed_detections = range_and_transform( diff --git a/thor/observations/filters.py b/thor/observations/filters.py index 1afb945c..5a5650cd 100644 --- a/thor/observations/filters.py +++ b/thor/observations/filters.py @@ -1,13 +1,14 @@ import abc import logging import time -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional import numpy as np import quivr as qv +import ray from adam_core.observations import PointSourceDetections -from ..orbit import TestOrbit +from ..orbit import TestOrbit, TestOrbitEphemeris if TYPE_CHECKING: from .observations import Observations @@ -16,6 +17,54 @@ logger = logging.getLogger(__name__) +def TestOrbitRadiusObservationFilter_worker( + observations: "Observations", + ephemeris: TestOrbitEphemeris, + state_id: int, + radius: float, +) -> "Observations": + """ + Apply the filter to a collection of observations for a particular state. + + Parameters + ---------- + observations : `~thor.observations.Observations` + The observations to filter. + ephemeris : `~thor.orbit.TestOrbitEphemeris` + The ephemeris to use for filtering. + state_id : int + The state ID. + radius : float + The radius in degrees. + + Returns + ------- + filtered_observations : `~thor.observations.Observations` + The filtered observations. + """ + # Select the ephemeris and observations for this state + ephemeris_state = ephemeris.select("id", state_id) + observations_state = observations.select("state_id", state_id) + detections_state = observations_state.detections + + assert ( + len(ephemeris_state) == 1 + ), "there should be exactly one ephemeris per exposure" + + ephem_ra = ephemeris_state.ephemeris.coordinates.lon[0].as_py() + ephem_dec = ephemeris_state.ephemeris.coordinates.lat[0].as_py() + + # Return the observations within the radius for this particular state + return observations_state.apply_mask( + _within_radius(detections_state, ephem_ra, ephem_dec, radius) + ) + + +TestOrbitRadiusObservationFilter_remote = ray.remote( + TestOrbitRadiusObservationFilter_worker +) + + class ObservationFilter(abc.ABC): """An ObservationFilter is reduces a collection of observations to a subset of those observations. @@ -24,7 +73,10 @@ class ObservationFilter(abc.ABC): @abc.abstractmethod def apply( - self, observations: "Observations", test_orbit: TestOrbit + self, + observations: "Observations", + test_orbit: TestOrbit, + max_processes: Optional[int] = 1, ) -> "Observations": """ Apply the filter to a collection of observations. @@ -35,6 +87,10 @@ def apply( The observations to filter. test_orbit : `~thor.orbit.TestOrbit` The test orbit to use for filtering. + max_processes : int, optional + Maximum number of processes to use for parallelization. If + an existing ray cluster is already running, this parameter + will be ignored if larger than 1 or not None. Returns ------- @@ -61,7 +117,10 @@ def __init__(self, radius: float): self.radius = radius def apply( - self, observations: "Observations", test_orbit: TestOrbit + self, + observations: "Observations", + test_orbit: TestOrbit, + max_processes: Optional[int] = 1, ) -> "Observations": """ Apply the filter to a collection of observations. @@ -72,64 +131,85 @@ def apply( The observations to filter. test_orbit : `~thor.orbit.TestOrbit` The test orbit to use for filtering. + max_processes : int, optional + Maximum number of processes to use for parallelization. If + an existing ray cluster is already running, this parameter + will be ignored if larger than 1 or not None. Returns ------- filtered_observations : `~thor.observations.Observations` - The filtered observations. + The filtered observations. This will return a copy of the original + observations. """ time_start = time.perf_counter() logger.info("Applying TestOrbitRadiusObservationFilter...") logger.info(f"Using radius = {self.radius:.5f} deg") - # Generate an ephemeris for every observer time/location in the dataset - test_orbit_ephemeris = test_orbit.generate_ephemeris_from_observations( - observations - ) - # Link the ephemeris to the observations - link = qv.Linkage( - test_orbit_ephemeris, - observations, - left_keys=test_orbit_ephemeris.id, - right_keys=observations.state_id, + # Generate an ephemeris for every observer time/location in the dataset + ephemeris = test_orbit.generate_ephemeris_from_observations(observations) + + filtered_observations_list = [] + if max_processes is None or max_processes > 1: + + if not ray.is_initialized(): + logger.debug( + f"Ray is not initialized. Initializing with {max_processes}..." + ) + ray.init(num_cpus=max_processes) + + if isinstance(observations, ray.ObjectRef): + observations_ref = observations + observations = ray.get(observations_ref) + else: + observations_ref = ray.put(observations) + + if isinstance(ephemeris, ray.ObjectRef): + ephemeris_ref = ephemeris + else: + ephemeris_ref = ray.put(ephemeris) + + state_ids = observations.state_id.unique().sort() + futures = [] + for state_id in state_ids: + futures.append( + TestOrbitRadiusObservationFilter_remote.remote( + observations_ref, + ephemeris_ref, + state_id, + self.radius, + ) + ) + + while futures: + finished, futures = ray.wait(futures, num_returns=1) + filtered_observations_list.append(ray.get(finished[0])) + + else: + + state_ids = observations.state_id.unique().sort() + for state_id in state_ids: + filtered_observations = TestOrbitRadiusObservationFilter_worker( + observations, + ephemeris, + state_id, + self.radius, + ) + filtered_observations_list.append(filtered_observations) + + observations_filtered = qv.concatenate(filtered_observations_list) + observations_filtered = observations_filtered.sort_by( + ["detections.time.days", "detections.time.nanos", "observatory_code"] ) - # Loop over states and build a mask of detections within the radius - state_ids = observations.state_id.to_numpy(zero_copy_only=False) - mask = np.zeros(len(observations), dtype=bool) - for state_id in np.unique(state_ids): - # Compute the indices for observations belonging to this state - idx_state = np.where(state_ids == state_id)[0] - - # Select the ephemeris and observations for this state - ephemeris_i = link.select_left(state_id) - observations_i = link.select_right(state_id) - detections_i = observations_i.detections - - assert ( - len(ephemeris_i) == 1 - ), "there should be exactly one ephemeris per exposure" - - ephem_ra = ephemeris_i.ephemeris.coordinates.lon[0].as_py() - ephem_dec = ephemeris_i.ephemeris.coordinates.lat[0].as_py() - - # Compute indices for the detections within the radius - idx_within = idx_state[ - _within_radius(detections_i, ephem_ra, ephem_dec, self.radius) - ] - - # Update the mask - mask[idx_within] = True - - observations_masked = observations.apply_mask(mask) time_end = time.perf_counter() logger.info( - f"Filtered {len(observations)} observations to {len(observations_masked)} observations." + f"Filtered {len(observations)} observations to {len(observations_filtered)} observations." ) logger.info( f"TestOrbitRadiusObservationFilter completed in {time_end - time_start:.3f} seconds." ) - return observations_masked + return observations_filtered def _within_radius( From 53d771d5b2c6e9ab0017d9adaa160b14364f877d Mon Sep 17 00:00:00 2001 From: Joachim Moeyens Date: Fri, 27 Oct 2023 12:02:48 -0700 Subject: [PATCH 10/14] Allow ray objects to be passed by reference to range_and_transform --- thor/main_2.py | 25 +++++++++++++++++++++++-- thor/observations/filters.py | 4 ++-- thor/orbit.py | 7 +++++-- thor/tests/test_main.py | 35 +++++++++++++++++++++++++++-------- 4 files changed, 57 insertions(+), 14 deletions(-) diff --git a/thor/main_2.py b/thor/main_2.py index 33b3b4e5..69de7044 100644 --- a/thor/main_2.py +++ b/thor/main_2.py @@ -1,6 +1,6 @@ import logging import time -from typing import Any, Iterator, List, Optional +from typing import Any, Iterator, List, Optional, Union import pandas as pd import pyarrow.compute as pc @@ -85,7 +85,7 @@ def range_and_transform_worker( def range_and_transform( test_orbit: TestOrbit, - observations: Observations, + observations: Union[Observations, ray.ObjectRef], propagator: Propagator = PYOORB(), max_processes: Optional[int] = 1, ) -> TransformedDetections: @@ -118,6 +118,7 @@ def range_and_transform( logger.info("Running range and transform...") logger.info(f"Assuming r = {test_orbit.orbit.coordinates.r[0]} au") logger.info(f"Assuming v = {test_orbit.orbit.coordinates.v[0]} au/d") + # Compute the ephemeris of the test orbit (this will be cached) ephemeris = test_orbit.generate_ephemeris_from_observations( observations, @@ -343,6 +344,7 @@ def link_test_orbit( else: raise ValueError(f"Unknown propagator: {config.propagator}") + use_ray = False if config.max_processes is None or config.max_processes > 1: # Initialize ray if not ray.is_initialized(): @@ -351,6 +353,11 @@ def link_test_orbit( ) ray.init(num_cpus=config.max_processes) + if not isinstance(observations, ray.ObjectRef): + observations = ray.put(observations) + + use_ray = True + # Apply filters to the observations filtered_observations = observations if filters is None: @@ -361,6 +368,15 @@ def link_test_orbit( filtered_observations, test_orbit, max_processes=config.max_processes ) + # Defragment the observations + filtered_observations = qv.defragment(filtered_observations) + + # Observations are no longer needed, so we can delete them + del observations + + if use_ray: + filtered_observations = ray.put(filtered_observations) + # Range and transform the observations transformed_detections = range_and_transform( test_orbit, @@ -370,6 +386,11 @@ def link_test_orbit( ) yield transformed_detections + # TODO: ray support for the rest of the pipeline has not yet been implemented + # so we will convert the ray objects to regular objects for now + if use_ray: + filtered_observations = ray.get(filtered_observations) + # Convert quivr tables to dataframes used by the rest of the pipeline observations_df = _observations_to_observations_df(filtered_observations) observers_df = _observers_with_states_to_observers_df( diff --git a/thor/observations/filters.py b/thor/observations/filters.py index 5a5650cd..89e29e4a 100644 --- a/thor/observations/filters.py +++ b/thor/observations/filters.py @@ -1,7 +1,7 @@ import abc import logging import time -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Optional, Union import numpy as np import quivr as qv @@ -118,7 +118,7 @@ def __init__(self, radius: float): def apply( self, - observations: "Observations", + observations: Union["Observations", ray.ObjectRef], test_orbit: TestOrbit, max_processes: Optional[int] = 1, ) -> "Observations": diff --git a/thor/orbit.py b/thor/orbit.py index 3db97368..3231b680 100644 --- a/thor/orbit.py +++ b/thor/orbit.py @@ -289,7 +289,7 @@ def generate_ephemeris( def generate_ephemeris_from_observations( self, - observations: Observations, + observations: Union[Observations, ray.ObjectRef], propagator: Propagator = PYOORB(), max_processes: Optional[int] = 1, ): @@ -322,6 +322,9 @@ def generate_ephemeris_from_observations( ValueError If the observations are empty. """ + if isinstance(observations, ray.ObjectRef): + observations = ray.get(observations) + if len(observations) == 0: raise ValueError("Observations must not be empty.") @@ -360,7 +363,7 @@ def generate_ephemeris_from_observations( def range_observations( self, - observations: Observations, + observations: Union[Observations, ray.ObjectRef], propagator: Propagator = PYOORB(), max_processes: Optional[int] = 1, ) -> RangedPointSourceDetections: diff --git a/thor/tests/test_main.py b/thor/tests/test_main.py index 3da9c240..8692bee9 100644 --- a/thor/tests/test_main.py +++ b/thor/tests/test_main.py @@ -2,6 +2,7 @@ import pytest from adam_core.utils.helpers import make_observations, make_real_orbits +from ..config import Config from ..main_2 import link_test_orbit, range_and_transform from ..observations import Observations from ..observations.filters import TestOrbitRadiusObservationFilter @@ -54,6 +55,21 @@ def orbits(): return make_real_orbits() +@pytest.fixture +def ray_cluster(): + import ray + + ray_initialized = False + if not ray.is_initialized(): + ray.init( + num_cpus=4, include_dashboard=False, namespace="THOR Integration Tests" + ) + ray_initialized = True + yield + if ray_initialized: + ray.shutdown() + + def test_Orbit_generate_ephemeris_from_observations_empty(orbits): # Test that when passed empty observations, TestOrbit.generate_ephemeris_from_observations # returns a Value Error @@ -134,8 +150,15 @@ def test_range_and_transform(object_id, orbits, observations): ] + OBJECT_IDS[9:], ) +@pytest.mark.parametrize("parallelized", [True, False]) @pytest.mark.integration -def test_link_test_orbit(object_id, orbits, observations): +def test_link_test_orbit(object_id, orbits, observations, parallelized, ray_cluster): + + config = Config() + if parallelized: + config.max_processes = 4 + else: + config.max_processes = 1 orbit = orbits.select("object_id", object_id) exposures, detections, associations = observations @@ -170,20 +193,16 @@ def test_link_test_orbit(object_id, orbits, observations): observations = Observations.from_detections_and_exposures(detections_i, exposures_i) if object_id in TOLERANCES: - tolerance = TOLERANCES[object_id] + config.cell_radius = TOLERANCES[object_id] else: - tolerance = TOLERANCES["default"] - - # Set a filter to include observations within 1 arcsecond of the predicted position - # of the test orbit - filters = [TestOrbitRadiusObservationFilter(radius=tolerance)] + config.cell_radius = TOLERANCES["default"] # Create a test orbit for this object test_orbit = THORbit.from_orbits(orbit) # Run link_test_orbit and make sure we get the correct observations back for i, results in enumerate( - link_test_orbit(test_orbit, observations, filters=filters) + link_test_orbit(test_orbit, observations, config=config) ): if i == 4: od_orbits, od_orbit_members = results From f48f70207eb524f307226beb6aeb137d932dced2 Mon Sep 17 00:00:00 2001 From: Joachim Moeyens Date: Fri, 27 Oct 2023 12:17:03 -0700 Subject: [PATCH 11/14] Add ray as a dependency --- .github/workflows/conda-build-lint-test.yml | 7 +++--- recipe/meta.yaml | 24 ++++++++++----------- requirements.txt | 23 +++++++++++--------- setup.cfg | 23 +++++++++----------- 4 files changed, 38 insertions(+), 39 deletions(-) diff --git a/.github/workflows/conda-build-lint-test.yml b/.github/workflows/conda-build-lint-test.yml index ac438612..cd62fd52 100644 --- a/.github/workflows/conda-build-lint-test.yml +++ b/.github/workflows/conda-build-lint-test.yml @@ -28,17 +28,18 @@ jobs: activate-environment: "thor" auto-update-conda: true python-version: ${{ matrix.python-version }} - - name: Install dependencies (excluding adam_core since it is not available via conda) + - name: Install dependencies (excluding adam_core and quivr since they are not available via conda) run: | - tail +2 requirements.txt > requirements_conda.txt + tail +3 requirements.txt > requirements_conda.txt conda install -c defaults -c conda-forge -c astropy -c moeyensj --file requirements_conda.txt --yes - name: Update OBSCODE.dat run: | cd $CONDA_PREFIX/share/oorb && ./updateOBSCODE cp OBSCODE.dat $CONDA_PREFIX/share/openorb/OBSCODE.dat - - name: Install adam_core + - name: Install adam_core and quivr run: | pip install adam-core@git+https://github.com/B612-Asteroid-Institute/adam_core@main + pip install quivr>=0.7.1 - name: Build and install run: pip install . --no-deps - name: Lint diff --git a/recipe/meta.yaml b/recipe/meta.yaml index 3bee55f0..3cacaeda 100644 --- a/recipe/meta.yaml +++ b/recipe/meta.yaml @@ -11,27 +11,25 @@ requirements: host: - python {{ python }} - pip - - setuptools >=45 - - setuptools_scm >=6.0 + - setuptools >= 45 + - setuptools_scm >= 6.0 - wheel run: - python {{ python }} + - astropy >= 5.3.1 + - astroquery + - difi + - healpy + - jax - numpy - - pyarrow >= 13.0.0 - numba - pandas - - difi - - astropy - - astroquery - - scipy + - pyarrow >= 13.0.0 + - pyyaml >= 5.1 + - ray-default - scikit-learn - - pyyaml - - openorb + - scipy - spiceypy - - matplotlib - - seaborn - - plotly - - ipykernel - pytest - pytest-cov - pre-commit diff --git a/requirements.txt b/requirements.txt index ebefa0ff..d2ead654 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,20 +1,23 @@ adam-core @ git+https://github.com/B612-Asteroid-Institute/adam_core@main +quivr >= 0.7.1 +astropy >= 5.3.1 +astroquery +difi +healpy +jax numpy -pyarrow>=13.0.0 numba +openorb pandas -difi -astropy>=5.3.1 -astroquery -scipy +pyarrow >= 13.0.0 +pyyaml >= 5.1 +ray-default scikit-learn -healpy -pyyaml>=5.1 -openorb +scipy spiceypy pytest pytest-cov pre-commit -setuptools>=45 +setuptools >= 45 wheel -setuptools_scm>=6.0 +setuptools_scm >= 6.0 diff --git a/setup.cfg b/setup.cfg index 35fcd159..e43988e6 100644 --- a/setup.cfg +++ b/setup.cfg @@ -32,24 +32,21 @@ setup_requires = setuptools_scm >= 6.0 install_requires = adam-core @ git+https://github.com/B612-Asteroid-Institute/adam_core@main - numpy - pyarrow >= 13.0.0 - numba - pandas - difi astropy >= 5.3.1 astroquery - jax - quivr>=0.7.1 - scipy - scikit-learn + difi healpy + jax + numpy + numba + pandas + pyarrow >= 13.0.0 pyyaml >= 5.1 + quivr >= 0.7.1 + ray[default] + scikit-learn + scipy spiceypy - matplotlib - seaborn - plotly - ipykernel [options.extras_require] tests = From 0f4c512e935e84a03e1818e7634569f25c985e95 Mon Sep 17 00:00:00 2001 From: Joachim Moeyens Date: Fri, 27 Oct 2023 14:51:21 -0700 Subject: [PATCH 12/14] Handle case when there are no observations after filtering --- thor/main_2.py | 174 ++++++++++++++++++++----------------- thor/orbits/attribution.py | 16 ++-- thor/orbits/iod.py | 42 ++++----- thor/orbits/od.py | 8 +- 4 files changed, 128 insertions(+), 112 deletions(-) diff --git a/thor/main_2.py b/thor/main_2.py index 69de7044..fcdacc1f 100644 --- a/thor/main_2.py +++ b/thor/main_2.py @@ -119,84 +119,91 @@ def range_and_transform( logger.info(f"Assuming r = {test_orbit.orbit.coordinates.r[0]} au") logger.info(f"Assuming v = {test_orbit.orbit.coordinates.v[0]} au/d") - # Compute the ephemeris of the test orbit (this will be cached) - ephemeris = test_orbit.generate_ephemeris_from_observations( - observations, - propagator=propagator, - max_processes=max_processes, - ) + if isinstance(observations, ray.ObjectRef): + observations = ray.get(observations) + + if len(observations) > 0: + # Compute the ephemeris of the test orbit (this will be cached) + ephemeris = test_orbit.generate_ephemeris_from_observations( + observations, + propagator=propagator, + max_processes=max_processes, + ) - # Assume that the heliocentric distance of all point sources in - # the observations are the same as that of the test orbit - ranged_detections_spherical = test_orbit.range_observations( - observations, - propagator=propagator, - max_processes=max_processes, - ) + # Assume that the heliocentric distance of all point sources in + # the observations are the same as that of the test orbit + ranged_detections_spherical = test_orbit.range_observations( + observations, + propagator=propagator, + max_processes=max_processes, + ) - # Transform from spherical topocentric to cartesian heliocentric coordinates - ranged_detections_cartesian = transform_coordinates( - ranged_detections_spherical.coordinates, - representation_out=CartesianCoordinates, - frame_out="ecliptic", - origin_out=OriginCodes.SUN, - ) + # Transform from spherical topocentric to cartesian heliocentric coordinates + ranged_detections_cartesian = transform_coordinates( + ranged_detections_spherical.coordinates, + representation_out=CartesianCoordinates, + frame_out="ecliptic", + origin_out=OriginCodes.SUN, + ) - transformed_detection_list = [] - if max_processes is None or max_processes > 1: + transformed_detection_list = [] + if max_processes is None or max_processes > 1: - if not ray.is_initialized(): - logger.debug( - f"Ray is not initialized. Initializing with {max_processes}..." - ) - ray.init(num_cpus=max_processes) + if not ray.is_initialized(): + logger.debug( + f"Ray is not initialized. Initializing with {max_processes}..." + ) + ray.init(num_cpus=max_processes) + + if isinstance(observations, ray.ObjectRef): + observations_ref = observations + observations = ray.get(observations_ref) + else: + observations_ref = ray.put(observations) + + if isinstance(ephemeris, ray.ObjectRef): + ephemeris_ref = ephemeris + else: + ephemeris_ref = ray.put(ephemeris) + + ranged_detections_cartesian_ref = ray.put(ranged_detections_cartesian) + + # Get state IDs + state_ids = observations.state_id.unique().sort() + futures = [] + for state_id in state_ids: + futures.append( + range_and_transform_remote.remote( + ranged_detections_cartesian_ref, + observations_ref, + ephemeris_ref, + state_id, + ) + ) - if isinstance(observations, ray.ObjectRef): - observations_ref = observations - observations = ray.get(observations_ref) - else: - observations_ref = ray.put(observations) + while futures: + finished, futures = ray.wait(futures, num_returns=1) + transformed_detection_list.append(ray.get(finished[0])) - if isinstance(ephemeris, ray.ObjectRef): - ephemeris_ref = ephemeris else: - ephemeris_ref = ray.put(ephemeris) - - ranged_detections_cartesian_ref = ray.put(ranged_detections_cartesian) - - # Get state IDs - state_ids = observations.state_id.unique().sort() - futures = [] - for state_id in state_ids: - futures.append( - range_and_transform_remote.remote( - ranged_detections_cartesian_ref, - observations_ref, - ephemeris_ref, - state_id, + # Get state IDs + state_ids = observations.state_id.unique().sort() + for state_id in state_ids: + mask = pc.equal(state_id, observations.state_id) + transformed_detection_list.append( + range_and_transform_worker( + ranged_detections_cartesian.apply_mask(mask), + observations.select("state_id", state_id), + ephemeris.select("id", state_id), + state_id, + ) ) - ) - while futures: - finished, futures = ray.wait(futures, num_returns=1) - transformed_detection_list.append(ray.get(finished[0])) + transformed_detections = qv.concatenate(transformed_detection_list) + transformed_detections = transformed_detections.sort_by(by=["state_id"]) else: - # Get state IDs - state_ids = observations.state_id.unique().sort() - for state_id in state_ids: - mask = pc.equal(state_id, observations.state_id) - transformed_detection_list.append( - range_and_transform_worker( - ranged_detections_cartesian.apply_mask(mask), - observations.select("state_id", state_id), - ephemeris.select("id", state_id), - state_id, - ) - ) - - transformed_detections = qv.concatenate(transformed_detection_list) - transformed_detections = transformed_detections.sort_by(by=["state_id"]) + transformed_detections = TransformedDetections.empty() time_end = time.perf_counter() logger.info(f"Transformed {len(transformed_detections)} observations.") @@ -369,7 +376,8 @@ def link_test_orbit( ) # Defragment the observations - filtered_observations = qv.defragment(filtered_observations) + if len(filtered_observations) > 0: + filtered_observations = qv.defragment(filtered_observations) # Observations are no longer needed, so we can delete them del observations @@ -391,20 +399,24 @@ def link_test_orbit( if use_ray: filtered_observations = ray.get(filtered_observations) - # Convert quivr tables to dataframes used by the rest of the pipeline - observations_df = _observations_to_observations_df(filtered_observations) - observers_df = _observers_with_states_to_observers_df( - filtered_observations.get_observers() - ) - transformed_detections_df = _transformed_detections_to_transformed_detections_df( - transformed_detections - ) + if len(filtered_observations) > 0: + # Convert quivr tables to dataframes used by the rest of the pipeline + observations_df = _observations_to_observations_df(filtered_observations) + observers_df = _observers_with_states_to_observers_df( + filtered_observations.get_observers() + ) + transformed_detections_df = ( + _transformed_detections_to_transformed_detections_df(transformed_detections) + ) - # Merge dataframes together - observations_df = observations_df.merge(observers_df, on="state_id") - transformed_detections_df = transformed_detections_df.merge( - observations_df[["obs_id", "mjd_utc", "observatory_code"]], on="obs_id" - ) + # Merge dataframes together + observations_df = observations_df.merge(observers_df, on="state_id") + transformed_detections_df = transformed_detections_df.merge( + observations_df[["obs_id", "mjd_utc", "observatory_code"]], on="obs_id" + ) + else: + transformed_detections_df = pd.DataFrame() + observations_df = pd.DataFrame() # Run clustering clusters, cluster_members = clusterAndLink( diff --git a/thor/orbits/attribution.py b/thor/orbits/attribution.py index d88e4468..b6902dce 100644 --- a/thor/orbits/attribution.py +++ b/thor/orbits/attribution.py @@ -379,16 +379,20 @@ def mergeAndExtendOrbits( time_start = time.time() logger.info("Running orbit extension and merging...") - orbits_iter, orbit_members_iter = sortLinkages( - orbits, orbit_members, observations, linkage_id_col="orbit_id" - ) - observations_iter = observations.copy() + if len(observations) > 0: + orbits_iter, orbit_members_iter = sortLinkages( + orbits, orbit_members, observations, linkage_id_col="orbit_id" + ) + + else: + orbits_iter = orbits.copy() + orbit_members_iter = orbit_members.copy() iterations = 0 odp_orbits_dfs = [] odp_orbit_members_dfs = [] - - if len(orbits_iter) > 0: + observations_iter = observations.copy() + if len(orbits_iter) > 0 and len(observations_iter) > 0: converged = False while not converged: diff --git a/thor/orbits/iod.py b/thor/orbits/iod.py index 9eb6340b..6b63575e 100644 --- a/thor/orbits/iod.py +++ b/thor/orbits/iod.py @@ -785,6 +785,27 @@ def initialOrbitDetermination( iod_orbits = pd.concat(iod_orbits_dfs, ignore_index=True) iod_orbit_members = pd.concat(iod_orbit_members_dfs, ignore_index=True) + for col in ["num_obs"]: + iod_orbits[col] = iod_orbits[col].astype(int) + for col in ["gauss_sol", "outlier"]: + iod_orbit_members[col] = iod_orbit_members[col].astype(int) + + logger.info("Found {} initial orbits.".format(len(iod_orbits))) + + if identify_subsets and len(iod_orbits) > 0: + iod_orbits, iod_orbit_members = identifySubsetLinkages( + iod_orbits, iod_orbit_members, linkage_id_col="orbit_id" + ) + logger.info( + "{} subset orbits identified.".format( + len(iod_orbits[~iod_orbits["subset_of"].isna()]) + ) + ) + + iod_orbits, iod_orbit_members = sortLinkages( + iod_orbits, iod_orbit_members, observations, linkage_id_col="orbit_id" + ) + else: iod_orbits = pd.DataFrame( columns=[ @@ -815,27 +836,6 @@ def initialOrbitDetermination( ] ) - for col in ["num_obs"]: - iod_orbits[col] = iod_orbits[col].astype(int) - for col in ["gauss_sol", "outlier"]: - iod_orbit_members[col] = iod_orbit_members[col].astype(int) - - logger.info("Found {} initial orbits.".format(len(iod_orbits))) - - if identify_subsets and len(iod_orbits) > 0: - iod_orbits, iod_orbit_members = identifySubsetLinkages( - iod_orbits, iod_orbit_members, linkage_id_col="orbit_id" - ) - logger.info( - "{} subset orbits identified.".format( - len(iod_orbits[~iod_orbits["subset_of"].isna()]) - ) - ) - - iod_orbits, iod_orbit_members = sortLinkages( - iod_orbits, iod_orbit_members, observations, linkage_id_col="orbit_id" - ) - time_end = time.time() logger.info( "Initial orbit determination completed in {:.3f} seconds.".format( diff --git a/thor/orbits/od.py b/thor/orbits/od.py index 052ada66..04efc590 100644 --- a/thor/orbits/od.py +++ b/thor/orbits/od.py @@ -794,6 +794,10 @@ def differentialCorrection( for col in ["outlier"]: od_orbit_members[col] = od_orbit_members[col].astype(int) + od_orbits, od_orbit_members = sortLinkages( + od_orbits, od_orbit_members, observations, linkage_id_col="orbit_id" + ) + else: od_orbits = pd.DataFrame( columns=[ @@ -828,10 +832,6 @@ def differentialCorrection( ] ) - od_orbits, od_orbit_members = sortLinkages( - od_orbits, od_orbit_members, observations, linkage_id_col="orbit_id" - ) - time_end = time.time() logger.info("Differentially corrected {} orbits.".format(len(od_orbits))) logger.info( From e560ecec0a53947f9427e840c760f9312492d285 Mon Sep 17 00:00:00 2001 From: Joachim Moeyens Date: Mon, 30 Oct 2023 09:21:13 -0700 Subject: [PATCH 13/14] Remove vx/vy_values --- thor/config.py | 2 -- thor/main.py | 75 +++++++------------------------------------------- thor/main_2.py | 2 -- 3 files changed, 10 insertions(+), 69 deletions(-) diff --git a/thor/config.py b/thor/config.py index c296b6dc..27b783fa 100644 --- a/thor/config.py +++ b/thor/config.py @@ -17,8 +17,6 @@ class Config: vy_max: float = 0.1 vx_bins: int = 300 vy_bins: int = 300 - vx_values: Optional[npt.NDArray[np.float64]] = None - vy_values: Optional[npt.NDArray[np.float64]] = None cluster_radius: float = 0.005 cluster_min_obs: int = 6 cluster_min_arc_length: float = 1.0 diff --git a/thor/main.py b/thor/main.py index e293aa6d..561c491b 100644 --- a/thor/main.py +++ b/thor/main.py @@ -408,8 +408,6 @@ def clusterAndLink( vy_range=[-0.1, 0.1], vx_bins=100, vy_bins=100, - vx_values=None, - vy_values=None, eps=0.005, min_obs=5, min_arc_length=1.0, @@ -427,30 +425,18 @@ def clusterAndLink( DataFrame containing post-range and shift observations. vx_range : {None, list or `~numpy.ndarray` (2)} Maximum and minimum velocity range in x. - Will not be used if vx_values are specified. [Default = [-0.1, 0.1]] vy_range : {None, list or `~numpy.ndarray` (2)} Maximum and minimum velocity range in y. - Will not be used if vy_values are specified. [Default = [-0.1, 0.1]] vx_bins : int, optional Length of x-velocity grid between vx_range[0] - and vx_range[-1]. Will not be used if vx_values are - specified. + and vx_range[-1]. [Default = 100] vy_bins: int, optional Length of y-velocity grid between vy_range[0] - and vy_range[-1]. Will not be used if vy_values are - specified. + and vy_range[-1]. [Default = 100] - vx_values : {None, `~numpy.ndarray`}, optional - Values of velocities in x at which to cluster - and link. - [Default = None] - vy_values : {None, `~numpy.ndarray`}, optional - Values of velocities in y at which to cluster - and link. - [Default = None] eps : float, optional The maximum distance between two samples for them to be considered as in the same neighborhood. @@ -491,58 +477,17 @@ def clusterAndLink( time_start_cluster = time.time() logger.info("Running velocity space clustering...") - if vx_values is None and vx_range is not None: - vx = np.linspace(*vx_range, num=vx_bins) - elif vx_values is None and vx_range is None: - raise ValueError("Both vx_values and vx_range cannot be None.") - else: - vx = vx_values - vx_range = [vx_values[0], vx_values[-1]] - vx_bins = len(vx) - - if vy_values is None and vy_range is not None: - vy = np.linspace(*vy_range, num=vy_bins) - elif vy_values is None and vy_range is None: - raise ValueError("Both vy_values and vy_range cannot be None.") - else: - vy = vy_values - vy_range = [vy_values[0], vy_values[-1]] - vy_bins = len(vy) - - if vx_values is None and vy_values is None: - vxx, vyy = np.meshgrid(vx, vy) - vxx = vxx.flatten() - vyy = vyy.flatten() - elif vx_values is not None and vy_values is not None: - vxx = vx - vyy = vy - else: - raise ValueError("") + vx = np.linspace(*vx_range, num=vx_bins) + vy = np.linspace(*vy_range, num=vy_bins) + vxx, vyy = np.meshgrid(vx, vy) + vxx = vxx.flatten() + vyy = vyy.flatten() logger.debug("X velocity range: {}".format(vx_range)) - if vx_values is not None: - logger.debug("X velocity values: {}".format(vx_bins)) - else: - logger.debug("X velocity bins: {}".format(vx_bins)) - + logger.debug("X velocity bins: {}".format(vx_bins)) logger.debug("Y velocity range: {}".format(vy_range)) - if vy_values is not None: - logger.debug("Y velocity values: {}".format(vy_bins)) - else: - logger.debug("Y velocity bins: {}".format(vy_bins)) - if vx_values is not None: - logger.debug("User defined x velocity values: True") - else: - logger.debug("User defined x velocity values: False") - if vy_values is not None: - logger.debug("User defined y velocity values: True") - else: - logger.debug("User defined y velocity values: False") - - if vx_values is None and vy_values is None: - logger.debug("Velocity grid size: {}".format(vx_bins * vy_bins)) - else: - logger.debug("Velocity grid size: {}".format(vx_bins)) + logger.debug("Y velocity bins: {}".format(vy_bins)) + logger.debug("Velocity grid size: {}".format(vx_bins)) logger.info("Max sample distance: {}".format(eps)) logger.info("Minimum samples: {}".format(min_obs)) diff --git a/thor/main_2.py b/thor/main_2.py index fcdacc1f..5e359cb7 100644 --- a/thor/main_2.py +++ b/thor/main_2.py @@ -425,8 +425,6 @@ def link_test_orbit( vy_range=[config.vy_min, config.vy_max], vx_bins=config.vx_bins, vy_bins=config.vy_bins, - vx_values=config.vx_values, - vy_values=config.vy_values, eps=config.cluster_radius, min_obs=config.cluster_min_obs, min_arc_length=config.cluster_min_arc_length, From 0336738f63337518e4be52a7f2c53d7ddac1017f Mon Sep 17 00:00:00 2001 From: Joachim Moeyens Date: Mon, 30 Oct 2023 14:40:26 -0700 Subject: [PATCH 14/14] Move integration tests to separate chron-scheduled workflow --- .../workflows/pip-build-integration-test.yml | 43 +++++++++++++++++++ .../pip-build-lint-test-coverage.yml | 2 - setup.cfg | 2 + thor/tests/test_main.py | 8 ++++ 4 files changed, 53 insertions(+), 2 deletions(-) create mode 100644 .github/workflows/pip-build-integration-test.yml diff --git a/.github/workflows/pip-build-integration-test.yml b/.github/workflows/pip-build-integration-test.yml new file mode 100644 index 00000000..c34a1738 --- /dev/null +++ b/.github/workflows/pip-build-integration-test.yml @@ -0,0 +1,43 @@ +name: pip - Build and Integration Test + +on: + schedule: + # Run every day at 20:00 UTC + - cron: '0 20 * * * ' + +jobs: + build-lint-test-coverage: + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [ubuntu-latest] + python-version: ["3.10"] + defaults: + run: + shell: bash -l {0} + + steps: + - name: Checkout git repo + uses: actions/checkout@v3 + - name: Get git tags + run: git fetch --prune --unshallow --tags + - name: Set up miniconda + uses: conda-incubator/setup-miniconda@v2 + with: + activate-environment: "thor" + auto-update-conda: true + python-version: ${{ matrix.python-version }} + - name: Install openorb using conda + run: conda install -c defaults -c conda-forge openorb --yes + - name: Update OBSCODE.dat + run: | + cd $CONDA_PREFIX/share/oorb && ./updateOBSCODE + cp OBSCODE.dat $CONDA_PREFIX/share/openorb/OBSCODE.dat + - uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + cache: 'pip' # caching pip dependencies + - name: Build and install + run: pip install .[tests] + - name: Integration Tests + run: pytest . -m "integration" diff --git a/.github/workflows/pip-build-lint-test-coverage.yml b/.github/workflows/pip-build-lint-test-coverage.yml index 5d8cc951..de770ea1 100644 --- a/.github/workflows/pip-build-lint-test-coverage.yml +++ b/.github/workflows/pip-build-lint-test-coverage.yml @@ -44,8 +44,6 @@ jobs: run: pre-commit run --all-files - name: Test run: pytest . --cov --cov-report xml - - name: Integration Tests - run: pytest . -m "integration" - name: Coverage uses: coverallsapp/github-action@v2.0.0 with: diff --git a/setup.cfg b/setup.cfg index e43988e6..a2626180 100644 --- a/setup.cfg +++ b/setup.cfg @@ -65,6 +65,8 @@ test=pytest [tool:pytest] python_functions = test_* addopts = -m "not integration" +markers = + integration: Mark a test as an integration test. [isort] profile = black diff --git a/thor/tests/test_main.py b/thor/tests/test_main.py index 8692bee9..edddeeed 100644 --- a/thor/tests/test_main.py +++ b/thor/tests/test_main.py @@ -160,6 +160,14 @@ def test_link_test_orbit(object_id, orbits, observations, parallelized, ray_clus else: config.max_processes = 1 + # Reduce the clustering grid size to speed up the test + config.vx_bins = 10 + config.vy_bins = 10 + config.vx_min = -0.01 + config.vx_max = 0.01 + config.vy_min = -0.01 + config.vy_max = 0.01 + orbit = orbits.select("object_id", object_id) exposures, detections, associations = observations