diff --git a/nextplot/route.py b/nextplot/route.py index 63bd80d..6c0e645 100644 --- a/nextplot/route.py +++ b/nextplot/route.py @@ -1,6 +1,9 @@ +import dataclasses import json +from collections.abc import Callable import folium +import jsonpath_ng import plotly.graph_objects as go from folium import plugins @@ -14,28 +17,19 @@ # ==================== Pre-configured plot profiles +@dataclasses.dataclass class RoutePlotProfile: """ Pre-configured plot profiles for routes. """ - def __init__( - self, - jpath_route: str = "", - jpath_pos: str = "", - jpath_x: str = "", - jpath_y: str = "", - jpath_unassigned: str = "", - jpath_unassigned_x: str = "", - jpath_unassigned_y: str = "", - ): - self.jpath_route = jpath_route - self.jpath_pos = jpath_pos - self.jpath_x = jpath_x - self.jpath_y = jpath_y - self.jpath_unassigned = jpath_unassigned - self.jpath_unassigned_x = jpath_unassigned_x - self.jpath_unassigned_y = jpath_unassigned_y + jpath_route: str = "" + jpath_pos: str = "" + jpath_x: str = "" + jpath_y: str = "" + jpath_unassigned: str = "" + jpath_unassigned_x: str = "" + jpath_unassigned_y: str = "" def __str__(self): return ( @@ -50,6 +44,28 @@ def __str__(self): ) +@dataclasses.dataclass +class MultiRoutePlotProfile: + """ + Multiple pre-configured plot profiles selected via given input tests. + """ + + profiles: list[tuple[Callable[[dict, dict], bool], RoutePlotProfile]] = dataclasses.field(default_factory=list) + fail_message: str = "No suitable profile found for plotting." + + def unwrap(self, content_route: dict, content_pos: dict) -> RoutePlotProfile: + """ + Tests the given data against the profiles and returns the first matching profile. + """ + for test, profile in self.profiles: + if test(content_route, content_pos): + return profile + raise Exception(self.fail_message) + + def __str__(self): + return "MultiRoutePlotProfile(" + ", ".join([f"{p[0]}: {p[1]}" for p in self.profiles]) + ")" + + # ==================== Route mode argument definition @@ -219,14 +235,8 @@ def arguments(parser): def parse( input_route: str, - jpath_route: str, - jpath_unassigned: str, - jpath_unassigned_x: str, - jpath_unassigned_y: str, input_pos: str, - jpath_pos: str, - jpath_x: str, - jpath_y: str, + profile: MultiRoutePlotProfile | RoutePlotProfile, ) -> tuple[list[list[types.Position]], list[list[types.Position]]]: """ Parses the route data from the file(s). @@ -234,6 +244,17 @@ def parse( # Load json data content_route, content_pos = common.load_data(input_route, input_pos) + # Dynamically set profile, if given + if isinstance(profile, MultiRoutePlotProfile): + profile = profile.unwrap(json.loads(content_route), json.loads(content_pos)) + jpath_route = profile.jpath_route + jpath_pos = profile.jpath_pos + jpath_x = profile.jpath_x + jpath_y = profile.jpath_y + jpath_unassigned = profile.jpath_unassigned + jpath_unassigned_x = profile.jpath_unassigned_x + jpath_unassigned_y = profile.jpath_unassigned_y + # Extract routes points = common.extract_position_groups( content_route, @@ -553,17 +574,7 @@ def plot( profile = nextroute_profile() # Parse data - points, unassigned = parse( - input_route, - profile.jpath_route, - profile.jpath_unassigned, - profile.jpath_unassigned_x, - profile.jpath_unassigned_y, - input_pos, - profile.jpath_pos, - profile.jpath_x, - profile.jpath_y, - ) + points, unassigned = parse(input_route, input_pos, profile) # Quit on no points if len(points) <= 0: @@ -691,13 +702,37 @@ def nextroute_profile() -> RoutePlotProfile: """ Returns the nextroute profile. """ - return RoutePlotProfile( - jpath_route="solutions[-1].vehicles[*].route", - jpath_x="stop.location.lon", - jpath_y="stop.location.lat", - jpath_unassigned="solutions[-1].unplanned[*]", - jpath_unassigned_x="location.lon", - jpath_unassigned_y="location.lat", + base_paths = [ + "solutions[-1]", + "solution", + "output.solutions[-1]", + "output.solution", + ] + + def make_path_exists(path): + def path_exists(content_route, path): + matches = jsonpath_ng.parse(path).find(content_route) + return len(list(matches)) > 0 + + return lambda content_route, _: path_exists(content_route, path) + + return MultiRoutePlotProfile( + [ + ( + make_path_exists(p), + RoutePlotProfile( + jpath_route=f"{p}.vehicles[*].route", + jpath_x="stop.location.lon", + jpath_y="stop.location.lat", + jpath_unassigned=f"{p}.unplanned[*]", + jpath_unassigned_x="location.lon", + jpath_unassigned_y="location.lat", + ), + ) + for p in base_paths + ], + "Input data does not match any known profile for nextroute plotting. Routes are expected at one of:\n" + + "\n".join(f"{p}.vehicles[*].route" for p in base_paths), )