Skip to content

Commit

Permalink
Support realtime 2023a data format (SplitInIcePulses) (#244)
Browse files Browse the repository at this point in the history
* Changed pulse tag to 'SplitInIcePulses'

* <bot> update dependencies*.log files(s)

* add mapping for input pulses name

* determine pulses name from realtime version + refactoring

* formatting

* typing

* add pulse series as argument, replace literals with cfg

* add

* more

* pulses name in state dict

* <bot> update dependencies*.log files(s)

* copy pulses from specified pulses_name

* revert to default input pulses name

* followup changes

* revert to default pulses name downstream

* typing issues

* log keys

* fix case of no version

* remove stray arg

* static pulses name

* revert to original pulses name so single-pixel tests do not fail

* temporarily toggle

* fix

* pin rabbitmq myself

* pin rabbitmq/2

* <bot> update dependencies*.log files(s)

* Added json of event 138632 31747601

* test new format

* printout baseline gcd file

* try staging gcd

* correct filename

* wrong method

* fix path

* emergency data source

* pass filename not path

* datastager client side

* datastager client side/2

* typing consistency

* re-organize datastager

* backward compatibility

* revert temporary remote source

* a bit more logging

* increase log

* more logs/2

* force debug logging

* restore remote paht

* pass baseline gcd filename instead of full path

* fix gcd again

* <bot> update dependencies*.log files(s)

* copy over the TimeRange and update config vars

* fix typo. copy m-wilks result for new test event

* del TimeRange

* revert input pulseseries name and add m-original reference

* copy the splinempe reco seed if it doesn't exist by default

* pass Keys named argument

* add reference for splinempe

* remove pulses_name=None and use single underscore

* missed one

---------

Co-authored-by: G-Sommani <[email protected]>
Co-authored-by: wipacdevbot <[email protected]>
Co-authored-by: Tianlu Yuan <[email protected]>
  • Loading branch information
4 people authored Jan 8, 2024
1 parent dd68883 commit 4dba8f2
Show file tree
Hide file tree
Showing 15 changed files with 479 additions and 76 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,8 @@ jobs:
eventfile: [
hese_event_01.json,
run00136766-evt000007637140-GOLD.pkl,
run00136662-evt000035405932-BRONZE.pkl
run00136662-evt000035405932-BRONZE.pkl,
138632_31747601.json
]
exclude:
# splinempe should not run on HESE
Expand Down
17 changes: 13 additions & 4 deletions skymap_scanner/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json
import logging
from pathlib import Path
from ..utils.data_handling import get_gcd_datastager

import ewms_pilot
from wipac_dev_tools import argparse_tools, logging_tools
Expand Down Expand Up @@ -57,18 +58,26 @@ def main() -> None:
with open(args.client_startup_json, "rb") as f:
startup_json_dict = json.load(f)
with open("GCDQp_packet.json", "w") as f:
json.dump(startup_json_dict["GCDQp_packet"], f)
json.dump(startup_json_dict[cfg.STATEDICT_GCDQP_PACKET], f)

datastager = get_gcd_datastager()

baseline_gcd_file = Path(startup_json_dict["baseline_GCD_file"])

datastager.stage_files([baseline_gcd_file.name])

baseline_gcd_file = Path(datastager.get_filepath(baseline_gcd_file.name))

# check if baseline GCD file is reachable
if not Path(startup_json_dict["baseline_GCD_file"]).exists():
raise FileNotFoundError(startup_json_dict["baseline_GCD_file"])
if not baseline_gcd_file.exists():
raise FileNotFoundError(baseline_gcd_file)

cmd = (
"python -m skymap_scanner.client.reco_icetray "
" --in-pkl {{INFILE}}" # no f-string b/c want to preserve '{{..}}'
" --out-pkl {{OUTFILE}}" # ^^^
" --gcdqp-packet-json GCDQp_packet.json"
f" --baseline-gcd-file {startup_json_dict['baseline_GCD_file']}"
f" --baseline-gcd-file {baseline_gcd_file}"
)

# go!
Expand Down
36 changes: 31 additions & 5 deletions skymap_scanner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,49 @@
# True constants
#

DEFAULT_GCD_DIR: Path = Path("/opt/i3-data/baseline_gcds")

# Local data sources. These are assumed to be filesystem paths and are expected to have the same directory structure.
LOCAL_DATA_SOURCES: Final[List[Path]] = [
Path("/opt/i3-data"),
Path("/cvmfs/icecube.opensciencegrid.org/data"),
]
# Directory path under a local data source to fetch spline data from.
LOCAL_SPLINE_SUBDIR: Final[str] = "photon-tables/splines"

# HTTP source to download data from.
REMOTE_DATA_SOURCE: Final[str] = "http://prod-exe.icecube.wisc.edu"
REMOTE_SPLINE_SUBDIR: Final[str] = "spline-tables"

# Local ephemeral directory to stage files.
LOCAL_DATA_CACHE: Final[Path] = Path("./data-staging-cache")

# Directory path under a local data source to fetch spline data from.
LOCAL_SPLINE_SUBDIR: Final[str] = "photon-tables/splines"
REMOTE_SPLINE_SUBDIR: Final[str] = "spline-tables"

# GCD data sources.
LOCAL_GCD_DATA_SOURCES: Final[List[Path]] = [
Path("/opt/i3-data/baseline_gcds"),
Path("/cvmfs/icecube.opensciencegrid.org/users/RealTime/GCD/PoleBaseGCDs"),
]

DEFAULT_GCD_DIR = LOCAL_GCD_DATA_SOURCES[0]

# Since the container and CVFMS have GCD files in different subdirectories
# we put the complete path in LOCAL_GCD_DATA_SOURCES and use no subdir.
LOCAL_GCD_SUBDIR = ""

REMOTE_GCD_DATA_SOURCE: Final[
str
] = "http://prod-exe.icecube.wisc.edu/baseline_gcds"


# physics strings
INPUT_PULSES_NAME: Final = "SplitUncleanedInIcePulses"
INPUT_PULSES_NAME_MAP: Final[dict[str, str]] = {
"2021a": "SplitUncleanedInIcePulses",
"2023a": "SplitInIcePulses",
}
DEFAULT_INPUT_PULSES_NAME: Final = "SplitUncleanedInIcePulses"

INPUT_PULSES_NAME = "SplitUncleanedInIcePulses"

INPUT_TIME_NAME: Final = "SeedVertexTime"
INPUT_POS_NAME: Final = "SeedVertexPos"
OUTPUT_PARTICLE_NAME: Final = "MillipedeSeedParticle"
Expand All @@ -46,6 +71,7 @@
STATEDICT_GCDQP_PACKET: Final = "GCDQp_packet"
STATEDICT_BASELINE_GCD_FILE: Final = "baseline_GCD_file"
STATEDICT_NSIDES: Final = "nsides"
STATEDICT_INPUT_PULSES: Final = "input_pulses_name"
#
MSG_KEY_RECO_ALGO: Final = "reco_algo"
MSG_KEY_PFRAME: Final = "pframe"
Expand Down
4 changes: 2 additions & 2 deletions skymap_scanner/recos/millipede_original.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def get_vertex_variations() -> List[dataclasses.I3Position]:

@classmethod
@icetray.traysegment
def prepare_frames(cls, tray, name, logger, pulsesName):
def prepare_frames(cls, tray, name, logger):
# If VHESelfVeto is already present, copy over the output to the names used by Skymap Scanner for seeding the vertices.
def extract_seed(frame):
seed_prefix = "HESE_VHESelfVeto"
Expand All @@ -86,7 +86,7 @@ def extract_seed(frame):
# If HESE_VHESelfVeto is already in the frame, is likely using implicitly a VertexThreshold of 250 already. To be determined when this is not the case.
tray.AddModule('VHESelfVeto', 'selfveto',
VertexThreshold=2,
Pulses=pulsesName+'HLC',
Pulses=cfg.INPUT_PULSES_NAME+'HLC',
OutputBool='HESE_VHESelfVeto',
OutputVertexTime=cfg.INPUT_TIME_NAME,
OutputVertexPos=cfg.INPUT_POS_NAME,
Expand Down
8 changes: 4 additions & 4 deletions skymap_scanner/recos/millipede_wilks.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def setup_reco(self):

@classmethod
@icetray.traysegment
def prepare_frames(cls, tray, name, logger, pulsesName):
def prepare_frames(cls, tray, name, logger):
# Generates the vertex seed for the initial scan.
# Only run if HESE_VHESelfVeto is not present in the frame.
# VertexThreshold is 250 in the original HESE analysis (Tianlu)
Expand All @@ -98,7 +98,7 @@ def extract_seed(frame):

tray.AddModule('VHESelfVeto', 'selfveto',
VertexThreshold=250,
Pulses=pulsesName+'HLC',
Pulses=cfg.INPUT_PULSES_NAME+'HLC',
OutputBool='HESE_VHESelfVeto',
OutputVertexTime=cfg.INPUT_TIME_NAME,
OutputVertexPos=cfg.INPUT_POS_NAME,
Expand All @@ -107,14 +107,14 @@ def extract_seed(frame):
# this only runs if the previous module did not return anything
tray.AddModule('VHESelfVeto', 'selfveto-emergency-lowen-settings',
VertexThreshold=5,
Pulses=pulsesName+'HLC',
Pulses=cfg.INPUT_PULSES_NAME+'HLC',
OutputBool='VHESelfVeto_meaningless_lowen',
OutputVertexTime=cfg.INPUT_TIME_NAME,
OutputVertexPos=cfg.INPUT_POS_NAME,
If=lambda frame: not frame.Has("HESE_VHESelfVeto"))


tray.Add(mask_deepcore, origpulses=pulsesName, maskedpulses=cls.pulsesName)
tray.Add(mask_deepcore, origpulses=cfg.INPUT_PULSES_NAME, maskedpulses=cls.pulsesName)

@staticmethod
def makeSurePulsesExist(frame, pulsesName) -> None:
Expand Down
6 changes: 5 additions & 1 deletion skymap_scanner/recos/splinempe.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def get_steps():

@classmethod
@traysegment
def prepare_frames(cls, tray, name: str, logger, pulsesName) -> None:
def prepare_frames(cls, tray, name: str, logger) -> None:
# =========================================================
# PULSE CLEANING
# From "SplitUncleanedInIcePulses" to "CleanedMuonPulses".
Expand Down Expand Up @@ -191,6 +191,10 @@ def notify_muex(frame):
def log_frame(frame):
logger.debug(f"{repr(frame)}/{frame}")

tray.Add("Copy",
Keys=["l2_online_BestFit", cls.energy_reco_seed],
If=lambda f: f.Has("l2_online_BestFit") and not f.Has(cls.energy_reco_seed))

# From icetray/filterscript/python/onlinel2filter.py
tray.AddModule(
"muex",
Expand Down
3 changes: 2 additions & 1 deletion skymap_scanner/server/start_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,16 +691,17 @@ def _nside_and_pixelextension(val: str) -> Tuple[int, int]:
event_contents = fetch_event_contents_from_file(args.event_file)

# get inputs (load event_id + state_dict cache)
LOGGER.info("Extracting event...")
event_metadata, state_dict = extract_json_message.extract_json_message(
event_contents,
reco_algo=args.reco_algo,
is_real_event=args.real_event,
cache_dir=str(args.cache_dir),
GCD_dir=str(args.gcd_dir),
pulsesName=cfg.INPUT_PULSES_NAME,
)

# write startup files for client-spawning
LOGGER.info("Writing startup JSON...")
scan_id = write_startup_json(
args.client_startup_json,
event_metadata,
Expand Down
2 changes: 1 addition & 1 deletion skymap_scanner/server/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async def fetch_event_contents_from_skydriver() -> Any:
return manifest["event_i3live_json_dict"]


def fetch_event_contents_from_file(event_file: Optional[Path]) -> Any:
def fetch_event_contents_from_file(event_file: Optional[Path]) -> dict:
"""Fetch event contents from file (.json or .pkl)."""
if not event_file:
raise RuntimeError(
Expand Down
12 changes: 10 additions & 2 deletions skymap_scanner/utils/data_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ def stage_files(self, file_list: List[str]):
LOGGER.debug("File is available on staging path.")
else:
LOGGER.debug("Staging from HTTP source.")
self.stage_file(basename)
self.fetch_file(basename)

else:
LOGGER.debug(f"File {basename} is available at {filepath}.")

def stage_file(self, basename: str):
def fetch_file(self, basename: str):
"""Retrieves a file from the HTTP source.
Args:
Expand Down Expand Up @@ -120,3 +120,11 @@ def get_local_filepath(self, filename: str) -> str:
LOGGER.debug(f"-> fail.")
# File was not found in local paths.
raise FileNotFoundError(f"File {filename} is not available on any local path.")


def get_gcd_datastager() -> DataStager:
return DataStager(
local_paths=cfg.LOCAL_GCD_DATA_SOURCES,
local_subdir=cfg.LOCAL_GCD_SUBDIR,
remote_path=cfg.REMOTE_GCD_DATA_SOURCE,
)
Loading

0 comments on commit 4dba8f2

Please sign in to comment.