Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support realtime 2023a data format (SplitInIcePulses) #244

Merged
merged 65 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 63 commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
806d1e9
Changed pulse tag to 'SplitInIcePulses'
G-Sommani Nov 29, 2023
5249593
<bot> update dependencies*.log files(s)
wipacdevbot Nov 29, 2023
709af7c
add mapping for input pulses name
mlincett Nov 30, 2023
7dc335e
determine pulses name from realtime version + refactoring
mlincett Nov 30, 2023
12ef8e0
formatting
mlincett Nov 30, 2023
4ea055b
typing
mlincett Nov 30, 2023
4427f1c
add pulse series as argument, replace literals with cfg
mlincett Nov 30, 2023
f2ad4b1
add
mlincett Nov 30, 2023
a47b270
more
mlincett Nov 30, 2023
6ced070
pulses name in state dict
mlincett Nov 30, 2023
7e71ba0
<bot> update dependencies*.log files(s)
wipacdevbot Nov 30, 2023
9c5f7ed
copy pulses from specified pulses_name
mlincett Dec 1, 2023
2651270
revert to default input pulses name
mlincett Dec 1, 2023
b840fd9
followup changes
mlincett Dec 1, 2023
cf74c33
revert to default pulses name downstream
mlincett Dec 1, 2023
563e714
typing issues
mlincett Dec 1, 2023
aa22e65
log keys
mlincett Dec 1, 2023
c325c0c
fix case of no version
mlincett Dec 1, 2023
27b8c36
remove stray arg
mlincett Dec 1, 2023
418e10d
static pulses name
mlincett Dec 1, 2023
97243f8
Merge branch 'main' into realtime-2023a
mlincett Dec 1, 2023
c9bf034
revert to original pulses name so single-pixel tests do not fail
mlincett Dec 1, 2023
2c3d5bc
temporarily toggle
mlincett Dec 1, 2023
1c550ba
fix
mlincett Dec 1, 2023
4f785a6
pin rabbitmq myself
mlincett Dec 1, 2023
67de2b7
pin rabbitmq/2
mlincett Dec 1, 2023
0f9d65c
Merge branch 'main' into pulse-tag-new-filters
mlincett Dec 2, 2023
63bcdd1
<bot> update dependencies*.log files(s)
wipacdevbot Dec 2, 2023
e6c219b
Added json of event 138632 31747601
G-Sommani Dec 2, 2023
2f328d8
test new format
mlincett Dec 2, 2023
70d15a4
printout baseline gcd file
mlincett Dec 2, 2023
4e2d246
Merge branch 'main' into pulse-tag-new-filters
mlincett Dec 2, 2023
01d3a30
try staging gcd
mlincett Dec 2, 2023
9fafbfd
correct filename
mlincett Dec 2, 2023
00ce2ac
wrong method
mlincett Dec 2, 2023
e0b831f
fix path
mlincett Dec 2, 2023
2805452
emergency data source
mlincett Dec 2, 2023
233afe2
pass filename not path
mlincett Dec 2, 2023
0287fea
datastager client side
mlincett Dec 2, 2023
87ffbe0
datastager client side/2
mlincett Dec 2, 2023
11059e1
typing consistency
mlincett Dec 3, 2023
4642e84
re-organize datastager
mlincett Dec 3, 2023
77efe80
backward compatibility
mlincett Dec 3, 2023
141d428
revert temporary remote source
mlincett Dec 3, 2023
d8d4593
a bit more logging
mlincett Dec 3, 2023
0290b8e
increase log
mlincett Dec 3, 2023
b3d712d
more logs/2
mlincett Dec 3, 2023
89f986a
force debug logging
mlincett Dec 3, 2023
70262c6
restore remote paht
mlincett Dec 3, 2023
de89bcb
pass baseline gcd filename instead of full path
mlincett Dec 3, 2023
615bb85
fix gcd again
mlincett Dec 3, 2023
6f109e3
Merge branch 'main' into realtime-2023a
mlincett Dec 11, 2023
e602f6e
Merge branch 'main' into pulse-tag-new-filters
mlincett Dec 11, 2023
cdf5daa
merge dev branch
mlincett Dec 11, 2023
e851740
<bot> update dependencies*.log files(s)
wipacdevbot Dec 11, 2023
c64b954
Merge branch 'main' into realtime-2023a
tianluyuan Jan 5, 2024
347409b
copy over the TimeRange and update config vars
tianluyuan Jan 5, 2024
1cc67f3
fix typo. copy m-wilks result for new test event
tianluyuan Jan 5, 2024
1550022
del TimeRange
tianluyuan Jan 5, 2024
ebcab7b
revert input pulseseries name and add m-original reference
tianluyuan Jan 5, 2024
b981d6a
copy the splinempe reco seed if it doesn't exist by default
tianluyuan Jan 5, 2024
538ebf4
pass Keys named argument
tianluyuan Jan 5, 2024
7a038a8
add reference for splinempe
tianluyuan Jan 5, 2024
ba98917
remove pulses_name=None and use single underscore
tianluyuan Jan 8, 2024
becd30c
missed one
tianluyuan Jan 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,8 @@ jobs:
eventfile: [
hese_event_01.json,
run00136766-evt000007637140-GOLD.pkl,
run00136662-evt000035405932-BRONZE.pkl
run00136662-evt000035405932-BRONZE.pkl,
138632_31747601.json
]
exclude:
# splinempe should not run on HESE
Expand Down
17 changes: 13 additions & 4 deletions skymap_scanner/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json
import logging
from pathlib import Path
from ..utils.data_handling import get_gcd_datastager

import ewms_pilot
from wipac_dev_tools import argparse_tools, logging_tools
Expand Down Expand Up @@ -57,18 +58,26 @@ def main() -> None:
with open(args.client_startup_json, "rb") as f:
startup_json_dict = json.load(f)
with open("GCDQp_packet.json", "w") as f:
json.dump(startup_json_dict["GCDQp_packet"], f)
json.dump(startup_json_dict[cfg.STATEDICT_GCDQP_PACKET], f)

datastager = get_gcd_datastager()

baseline_gcd_file = Path(startup_json_dict["baseline_GCD_file"])

datastager.stage_files([baseline_gcd_file.name])

baseline_gcd_file = Path(datastager.get_filepath(baseline_gcd_file.name))

# check if baseline GCD file is reachable
if not Path(startup_json_dict["baseline_GCD_file"]).exists():
raise FileNotFoundError(startup_json_dict["baseline_GCD_file"])
if not baseline_gcd_file.exists():
raise FileNotFoundError(baseline_gcd_file)

cmd = (
"python -m skymap_scanner.client.reco_icetray "
" --in-pkl {{INFILE}}" # no f-string b/c want to preserve '{{..}}'
" --out-pkl {{OUTFILE}}" # ^^^
" --gcdqp-packet-json GCDQp_packet.json"
f" --baseline-gcd-file {startup_json_dict['baseline_GCD_file']}"
f" --baseline-gcd-file {baseline_gcd_file}"
)

# go!
Expand Down
36 changes: 31 additions & 5 deletions skymap_scanner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,49 @@
# True constants
#

DEFAULT_GCD_DIR: Path = Path("/opt/i3-data/baseline_gcds")

# Local data sources. These are assumed to be filesystem paths and are expected to have the same directory structure.
LOCAL_DATA_SOURCES: Final[List[Path]] = [
Path("/opt/i3-data"),
Path("/cvmfs/icecube.opensciencegrid.org/data"),
]
# Directory path under a local data source to fetch spline data from.
LOCAL_SPLINE_SUBDIR: Final[str] = "photon-tables/splines"

# HTTP source to download data from.
REMOTE_DATA_SOURCE: Final[str] = "http://prod-exe.icecube.wisc.edu"
REMOTE_SPLINE_SUBDIR: Final[str] = "spline-tables"

# Local ephemeral directory to stage files.
LOCAL_DATA_CACHE: Final[Path] = Path("./data-staging-cache")

# Directory path under a local data source to fetch spline data from.
LOCAL_SPLINE_SUBDIR: Final[str] = "photon-tables/splines"
REMOTE_SPLINE_SUBDIR: Final[str] = "spline-tables"

# GCD data sources.
LOCAL_GCD_DATA_SOURCES: Final[List[Path]] = [
Path("/opt/i3-data/baseline_gcds"),
Path("/cvmfs/icecube.opensciencegrid.org/users/RealTime/GCD/PoleBaseGCDs"),
]

DEFAULT_GCD_DIR = LOCAL_GCD_DATA_SOURCES[0]

# Since the container and CVFMS have GCD files in different subdirectories
# we put the complete path in LOCAL_GCD_DATA_SOURCES and use no subdir.
LOCAL_GCD_SUBDIR = ""

REMOTE_GCD_DATA_SOURCE: Final[
str
] = "http://prod-exe.icecube.wisc.edu/baseline_gcds"


# physics strings
INPUT_PULSES_NAME: Final = "SplitUncleanedInIcePulses"
INPUT_PULSES_NAME_MAP: Final[dict[str, str]] = {
"2021a": "SplitUncleanedInIcePulses",
"2023a": "SplitInIcePulses",
}
DEFAULT_INPUT_PULSES_NAME: Final = "SplitUncleanedInIcePulses"

INPUT_PULSES_NAME = "SplitUncleanedInIcePulses"

INPUT_TIME_NAME: Final = "SeedVertexTime"
INPUT_POS_NAME: Final = "SeedVertexPos"
OUTPUT_PARTICLE_NAME: Final = "MillipedeSeedParticle"
Expand All @@ -46,6 +71,7 @@
STATEDICT_GCDQP_PACKET: Final = "GCDQp_packet"
STATEDICT_BASELINE_GCD_FILE: Final = "baseline_GCD_file"
STATEDICT_NSIDES: Final = "nsides"
STATEDICT_INPUT_PULSES: Final = "input_pulses_name"
#
MSG_KEY_RECO_ALGO: Final = "reco_algo"
MSG_KEY_PFRAME: Final = "pframe"
Expand Down
4 changes: 2 additions & 2 deletions skymap_scanner/recos/millipede_original.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def get_vertex_variations() -> List[dataclasses.I3Position]:

@classmethod
@icetray.traysegment
def prepare_frames(cls, tray, name, logger, pulsesName):
def prepare_frames(cls, tray, name, logger):
# If VHESelfVeto is already present, copy over the output to the names used by Skymap Scanner for seeding the vertices.
def extract_seed(frame):
seed_prefix = "HESE_VHESelfVeto"
Expand All @@ -86,7 +86,7 @@ def extract_seed(frame):
# If HESE_VHESelfVeto is already in the frame, is likely using implicitly a VertexThreshold of 250 already. To be determined when this is not the case.
tray.AddModule('VHESelfVeto', 'selfveto',
VertexThreshold=2,
Pulses=pulsesName+'HLC',
Pulses=cfg.INPUT_PULSES_NAME+'HLC',
OutputBool='HESE_VHESelfVeto',
OutputVertexTime=cfg.INPUT_TIME_NAME,
OutputVertexPos=cfg.INPUT_POS_NAME,
Expand Down
8 changes: 4 additions & 4 deletions skymap_scanner/recos/millipede_wilks.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def setup_reco(self):

@classmethod
@icetray.traysegment
def prepare_frames(cls, tray, name, logger, pulsesName):
def prepare_frames(cls, tray, name, logger):
# Generates the vertex seed for the initial scan.
# Only run if HESE_VHESelfVeto is not present in the frame.
# VertexThreshold is 250 in the original HESE analysis (Tianlu)
Expand All @@ -98,7 +98,7 @@ def extract_seed(frame):

tray.AddModule('VHESelfVeto', 'selfveto',
VertexThreshold=250,
Pulses=pulsesName+'HLC',
Pulses=cfg.INPUT_PULSES_NAME+'HLC',
OutputBool='HESE_VHESelfVeto',
OutputVertexTime=cfg.INPUT_TIME_NAME,
OutputVertexPos=cfg.INPUT_POS_NAME,
Expand All @@ -107,14 +107,14 @@ def extract_seed(frame):
# this only runs if the previous module did not return anything
tray.AddModule('VHESelfVeto', 'selfveto-emergency-lowen-settings',
VertexThreshold=5,
Pulses=pulsesName+'HLC',
Pulses=cfg.INPUT_PULSES_NAME+'HLC',
OutputBool='VHESelfVeto_meaningless_lowen',
OutputVertexTime=cfg.INPUT_TIME_NAME,
OutputVertexPos=cfg.INPUT_POS_NAME,
If=lambda frame: not frame.Has("HESE_VHESelfVeto"))


tray.Add(mask_deepcore, origpulses=pulsesName, maskedpulses=cls.pulsesName)
tray.Add(mask_deepcore, origpulses=cfg.INPUT_PULSES_NAME, maskedpulses=cls.pulsesName)

@staticmethod
def makeSurePulsesExist(frame, pulsesName) -> None:
Expand Down
6 changes: 5 additions & 1 deletion skymap_scanner/recos/splinempe.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def get_steps():

@classmethod
@traysegment
def prepare_frames(cls, tray, name: str, logger, pulsesName) -> None:
def prepare_frames(cls, tray, name: str, logger) -> None:
# =========================================================
# PULSE CLEANING
# From "SplitUncleanedInIcePulses" to "CleanedMuonPulses".
Expand Down Expand Up @@ -191,6 +191,10 @@ def notify_muex(frame):
def log_frame(frame):
logger.debug(f"{repr(frame)}/{frame}")

tray.Add("Copy",
Keys=["l2_online_BestFit", cls.energy_reco_seed],
If=lambda f: f.Has("l2_online_BestFit") and not f.Has(cls.energy_reco_seed))

# From icetray/filterscript/python/onlinel2filter.py
tray.AddModule(
"muex",
Expand Down
6 changes: 5 additions & 1 deletion skymap_scanner/server/start_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,16 +691,20 @@ def _nside_and_pixelextension(val: str) -> Tuple[int, int]:
event_contents = fetch_event_contents_from_file(args.event_file)

# get inputs (load event_id + state_dict cache)
# NOTE: if pulses_name is None, it is automatically determined from the realtime format version
# leave argument to support passing an arbitrary pulses_name in the future
LOGGER.info("Extracting event...")
event_metadata, state_dict = extract_json_message.extract_json_message(
event_contents,
reco_algo=args.reco_algo,
is_real_event=args.real_event,
cache_dir=str(args.cache_dir),
GCD_dir=str(args.gcd_dir),
pulsesName=cfg.INPUT_PULSES_NAME,
pulses_name=None,
ric-evans marked this conversation as resolved.
Show resolved Hide resolved
)

# write startup files for client-spawning
LOGGER.info("Writing startup JSON...")
scan_id = write_startup_json(
args.client_startup_json,
event_metadata,
Expand Down
2 changes: 1 addition & 1 deletion skymap_scanner/server/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async def fetch_event_contents_from_skydriver() -> Any:
return manifest["event_i3live_json_dict"]


def fetch_event_contents_from_file(event_file: Optional[Path]) -> Any:
def fetch_event_contents_from_file(event_file: Optional[Path]) -> dict:
"""Fetch event contents from file (.json or .pkl)."""
if not event_file:
raise RuntimeError(
Expand Down
12 changes: 10 additions & 2 deletions skymap_scanner/utils/data_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ def stage_files(self, file_list: List[str]):
LOGGER.debug("File is available on staging path.")
else:
LOGGER.debug("Staging from HTTP source.")
self.stage_file(basename)
self.fetch_file(basename)

else:
LOGGER.debug(f"File {basename} is available at {filepath}.")

def stage_file(self, basename: str):
def fetch_file(self, basename: str):
"""Retrieves a file from the HTTP source.
Args:
Expand Down Expand Up @@ -120,3 +120,11 @@ def get_local_filepath(self, filename: str) -> str:
LOGGER.debug(f"-> fail.")
# File was not found in local paths.
raise FileNotFoundError(f"File {filename} is not available on any local path.")


def get_gcd_datastager() -> DataStager:
return DataStager(
local_paths=cfg.LOCAL_GCD_DATA_SOURCES,
local_subdir=cfg.LOCAL_GCD_SUBDIR,
remote_path=cfg.REMOTE_GCD_DATA_SOURCE,
)
Loading