diff --git a/pyproject.toml b/pyproject.toml index 4368a5b..1c538d6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -55,7 +55,7 @@ dependencies = [ "pygama>=2", "dspeed>=1.6", "pylegendmeta==1.2.0a2", - "legend-pydataobj>=1.11.4", + "legend-pydataobj>=1.11.6", "legend-daq2lh5>=1.4", ] diff --git a/workflow/rules/dsp_pars_geds.smk b/workflow/rules/dsp_pars_geds.smk index 907c32a..c020274 100644 --- a/workflow/rules/dsp_pars_geds.smk +++ b/workflow/rules/dsp_pars_geds.smk @@ -44,6 +44,7 @@ rule build_pars_dsp_tau_geds: "--datatype {params.datatype} " "--timestamp {params.timestamp} " "--channel {params.channel} " + "--raw-table-name {params.raw_table_name} " "--plot-path {output.plots} " "--output-file {output.decay_const} " "--pulser-file {input.pulser} " @@ -80,6 +81,7 @@ rule build_pars_evtsel_geds: "--datatype {params.datatype} " "--timestamp {params.timestamp} " "--channel {params.channel} " + "--raw-table-name {params.raw_table_name} " "--peak-file {output.peak_file} " "--pulser-file {input.pulser_file} " "--decay-const {input.database} " @@ -120,6 +122,7 @@ rule build_pars_dsp_nopt_geds: "--datatype {params.datatype} " "--timestamp {params.timestamp} " "--channel {params.channel} " + "--raw-table-name {params.raw_table_name} " "--inplots {input.inplots} " "--plot-path {output.plots} " "--dsp-pars {output.dsp_pars_nopt} " @@ -139,6 +142,9 @@ rule build_pars_dsp_dplms_geds: timestamp="{timestamp}", datatype="cal", channel="{channel}", + raw_table_name=lambda wildcards: get_table_name( + metadata, config, "cal", wildcards.timestamp, wildcards.channel, "raw" + ), output: dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp", "dplms")), lh5_path=temp(get_pattern_pars_tmp_channel(config, "dsp", extension="lh5")), @@ -159,6 +165,7 @@ rule build_pars_dsp_dplms_geds: "--datatype {params.datatype} " "--timestamp {params.timestamp} " "--channel {params.channel} " + "--raw-table-name {params.raw_table_name} " "--dsp-pars {output.dsp_pars} " "--lh5-path {output.lh5_path} " "--plot-path {output.plots} " @@ -174,6 +181,9 @@ rule build_pars_dsp_eopt_geds: timestamp="{timestamp}", datatype="cal", channel="{channel}", + raw_table_name=lambda wildcards: get_table_name( + metadata, config, "cal", wildcards.timestamp, wildcards.channel, "raw" + ), output: dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp_eopt")), qbb_grid=temp( @@ -192,6 +202,7 @@ rule build_pars_dsp_eopt_geds: "--datatype {params.datatype} " "--timestamp {params.timestamp} " "--channel {params.channel} " + "--raw-table-name {params.raw_table_name} " "--peak-file {input.peak_file} " "--inplots {input.inplots} " "--decay-const {input.decay_const} " diff --git a/workflow/rules/tcm.smk b/workflow/rules/tcm.smk index f4e7b2c..88f86f9 100644 --- a/workflow/rules/tcm.smk +++ b/workflow/rules/tcm.smk @@ -47,6 +47,9 @@ rule build_pulser_ids: timestamp="{timestamp}", datatype="cal", channel="{channel}", + rawid=lambda wildcards: metadata.channelmap(wildcards.timestamp, system="cal")[ + wildcards.channel + ].daq.rawid, output: pulser=temp(get_pattern_pars_tmp_channel(config, "tcm", "pulser_ids")), log: @@ -61,6 +64,7 @@ rule build_pulser_ids: "--datatype {params.datatype} " "--timestamp {params.timestamp} " "--channel {params.channel} " + "--rawid {params.rawid} " "--tcm-files {params.input} " "--pulser-file {output.pulser} " "--metadata {meta} " diff --git a/workflow/src/legenddataflow/scripts/par/geds/dsp/dplms.py b/workflow/src/legenddataflow/scripts/par/geds/dsp/dplms.py index 7ff512e..f758dc6 100644 --- a/workflow/src/legenddataflow/scripts/par/geds/dsp/dplms.py +++ b/workflow/src/legenddataflow/scripts/par/geds/dsp/dplms.py @@ -11,7 +11,6 @@ from pygama.pargen.dplms_ge_dict import dplms_ge_dict from .....log import build_log -from ....table_name import get_table_name def par_geds_dsp_dplms() -> None: @@ -23,11 +22,13 @@ def par_geds_dsp_dplms() -> None: argparser.add_argument("--log", help="log_file", type=str) argparser.add_argument("--configs", help="configs", type=str, required=True) - argparser.add_argument("--metadata", help="metadata", type=str, required=True) argparser.add_argument("--datatype", help="Datatype", type=str, required=True) argparser.add_argument("--timestamp", help="Timestamp", type=str, required=True) argparser.add_argument("--channel", help="Channel", type=str, required=True) + argparser.add_argument( + "--raw-table-name", help="raw table name", type=str, required=True + ) argparser.add_argument("--dsp-pars", help="dsp_pars", type=str, required=True) argparser.add_argument("--lh5-path", help="lh5_path", type=str, required=True) @@ -41,8 +42,6 @@ def par_geds_dsp_dplms() -> None: log = build_log(config_dict, args.log) sto = lh5.LH5Store() - channel = get_table_name(args.metadata, args.timestamp, args.datatype, args.channel) - configs = TextDB(args.configs).on(args.timestamp, system=args.datatype) dsp_config = config_dict["inputs"]["proc_chain"][args.channel] @@ -57,10 +56,13 @@ def par_geds_dsp_dplms() -> None: t0 = time.time() log.info("\nLoad fft data") - energies = sto.read(f"{channel}/raw/daqenergy", fft_files)[0] + energies = sto.read(f"{args.raw_table_name}/daqenergy", fft_files)[0] idxs = np.where(energies.nda == 0)[0] raw_fft = sto.read( - f"{channel}/raw", fft_files, n_rows=dplms_dict["n_baselines"], idx=idxs + f"{args.raw_table_name}/raw", + fft_files, + n_rows=dplms_dict["n_baselines"], + idx=idxs, )[0] t1 = time.time() log.info(f"Time to load fft data {(t1-t0):.2f} s, total events {len(raw_fft)}") @@ -70,14 +72,14 @@ def par_geds_dsp_dplms() -> None: # kev_widths = [tuple(kev_width) for kev_width in dplms_dict["kev_widths"]] peaks_rounded = [int(peak) for peak in peaks_kev] - peaks = sto.read(f"{channel}/raw", args.peak_file, field_mask=["peak"])[0][ + peaks = sto.read(args.raw_table_name, args.peak_file, field_mask=["peak"])[0][ "peak" ].nda ids = np.isin(peaks, peaks_rounded) peaks = peaks[ids] # idx_list = [np.where(peaks == peak)[0] for peak in peaks_rounded] - raw_cal = sto.read(f"{channel}/raw", args.peak_file, idx=ids)[0] + raw_cal = sto.read(args.raw_table_name, args.peak_file, idx=ids)[0] log.info( f"Time to run event selection {(time.time()-t1):.2f} s, total events {len(raw_cal)}" ) @@ -111,7 +113,7 @@ def par_geds_dsp_dplms() -> None: coeffs = out_dict["dplms"].pop("coefficients") dplms_pars = Table(col_dict={"coefficients": Array(coeffs)}) out_dict["dplms"]["coefficients"] = ( - f"loadlh5('{args.lh5_path}', '{channel}/dplms/coefficients')" + f"loadlh5('{args.lh5_path}', '{args.channel}/dplms/coefficients')" ) log.info(f"DPLMS creation finished in {(time.time()-t0)/60} minutes") @@ -129,7 +131,7 @@ def par_geds_dsp_dplms() -> None: Path(args.lh5_path).parent.mkdir(parents=True, exist_ok=True) sto.write( Table(col_dict={"dplms": dplms_pars}), - name=channel, + name=args.channel, lh5_file=args.lh5_path, wo_mode="overwrite", ) diff --git a/workflow/src/legenddataflow/scripts/par/geds/dsp/eopt.py b/workflow/src/legenddataflow/scripts/par/geds/dsp/eopt.py index 2244240..bd6cc95 100644 --- a/workflow/src/legenddataflow/scripts/par/geds/dsp/eopt.py +++ b/workflow/src/legenddataflow/scripts/par/geds/dsp/eopt.py @@ -19,7 +19,6 @@ ) from .....log import build_log -from ....table_name import get_table_name warnings.filterwarnings(action="ignore", category=RuntimeWarning) warnings.filterwarnings(action="ignore", category=np.RankWarning) @@ -34,11 +33,13 @@ def par_geds_dsp_eopt() -> None: argparser.add_argument("--log", help="log_file", type=str) argparser.add_argument("--configs", help="configs", type=str, required=True) - argparser.add_argument("--metadata", help="metadata", type=str, required=True) argparser.add_argument("--datatype", help="Datatype", type=str, required=True) argparser.add_argument("--timestamp", help="Timestamp", type=str, required=True) argparser.add_argument("--channel", help="Channel", type=str, required=True) + argparser.add_argument( + "--raw-table-name", help="raw table name", type=str, required=True + ) argparser.add_argument( "--final-dsp-pars", help="final_dsp_pars", type=str, required=True @@ -59,8 +60,6 @@ def par_geds_dsp_eopt() -> None: sto = lh5.LH5Store() t0 = time.time() - channel = get_table_name(args.metadata, args.timestamp, args.datatype, args.channel) - dsp_config = config_dict["inputs"]["processing_chain"][args.channel] opt_json = config_dict["inputs"]["optimiser_config"][args.channel] @@ -107,14 +106,14 @@ def par_geds_dsp_eopt() -> None: ) peaks_rounded = [int(peak) for peak in peaks_kev] - peaks = sto.read(f"{channel}/raw", args.peak_file, field_mask=["peak"])[0][ + peaks = sto.read(args.raw_table_name, args.peak_file, field_mask=["peak"])[0][ "peak" ].nda ids = np.isin(peaks, peaks_rounded) peaks = peaks[ids] idx_list = [np.where(peaks == peak)[0] for peak in peaks_rounded] - tb_data = sto.read(f"{channel}/raw", args.peak_file, idx=ids)[0] + tb_data = sto.read(args.raw_table_name, args.peak_file, idx=ids)[0] t1 = time.time() log.info(f"Data Loaded in {(t1-t0)/60} minutes") diff --git a/workflow/src/legenddataflow/scripts/par/geds/dsp/evtsel.py b/workflow/src/legenddataflow/scripts/par/geds/dsp/evtsel.py index cec8e37..d1ad256 100644 --- a/workflow/src/legenddataflow/scripts/par/geds/dsp/evtsel.py +++ b/workflow/src/legenddataflow/scripts/par/geds/dsp/evtsel.py @@ -17,7 +17,6 @@ from .....log import build_log from ....pulser_removal import get_pulser_mask -from ....table_name import get_table_name warnings.filterwarnings(action="ignore", category=RuntimeWarning) @@ -85,10 +84,10 @@ def par_geds_dsp_evtsel() -> None: argparser = argparse.ArgumentParser() argparser.add_argument("--raw-filelist", help="raw_filelist", type=str) argparser.add_argument( - "--tcm-filelist", help="tcm_filelist", type=str, required=False + "--pulser-file", help="pulser_file", type=str, required=False ) argparser.add_argument( - "--pulser-file", help="pulser_file", type=str, required=False + "-p", "--no-pulse", help="no pulser present", action="store_true" ) argparser.add_argument("--decay_const", help="decay_const", type=str, required=True) @@ -98,11 +97,13 @@ def par_geds_dsp_evtsel() -> None: argparser.add_argument("--log", help="log_file", type=str) argparser.add_argument("--configs", help="configs", type=str, required=True) - argparser.add_argument("--metadata", help="metadata", type=str, required=True) argparser.add_argument("--datatype", help="Datatype", type=str, required=True) argparser.add_argument("--timestamp", help="Timestamp", type=str, required=True) argparser.add_argument("--channel", help="Channel", type=str, required=True) + argparser.add_argument( + "--raw-table-name", help="raw table name", type=str, required=True + ) argparser.add_argument("--peak-file", help="peak_file", type=str, required=True) args = argparser.parse_args() @@ -115,8 +116,6 @@ def par_geds_dsp_evtsel() -> None: sto = lh5.LH5Store() t0 = time.time() - channel = get_table_name(args.metadata, args.timestamp, args.datatype, args.channel) - dsp_config = config_dict["inputs"]["processing_chain"][args.channel] peak_json = config_dict["inputs"]["peak_config"][args.channel] @@ -134,16 +133,7 @@ def par_geds_dsp_evtsel() -> None: files = f.read().splitlines() raw_files = sorted(files) - mask = get_pulser_mask( - pulser_file=args.pulser_file, - tcm_filelist=args.tcm_filelist, - channel=channel, - pulser_multiplicity_threshold=peak_dict.get( - "pulser_multiplicity_threshold" - ), - ) - - raw_dict = Props.read_from(args.raw_cal)[channel]["pars"]["operations"] + raw_dict = Props.read_from(args.raw_cal)[args.channel]["pars"]["operations"] peaks_kev = peak_dict["peaks"] kev_widths = peak_dict["kev_widths"] @@ -152,7 +142,7 @@ def par_geds_dsp_evtsel() -> None: final_cut_field = peak_dict["final_cut_field"] energy_parameter = peak_dict.get("energy_parameter", "trapTmax") - lh5_path = f"{channel}/raw" + lh5_path = args.raw_table_name if not isinstance(kev_widths, list): kev_widths = [kev_widths] @@ -164,6 +154,13 @@ def par_geds_dsp_evtsel() -> None: lh5_path, raw_files, field_mask=["daqenergy", "t_sat_lo", "timestamp"] )[0] + if args.no_pulse is False: + mask = get_pulser_mask( + args.pulser_file, + ) + else: + mask = np.full(len(tb), False) + discharges = tb["t_sat_lo"].nda > 0 discharge_timestamps = np.where(tb["timestamp"].nda[discharges])[0] is_recovering = np.full(len(tb), False, dtype=bool) diff --git a/workflow/src/legenddataflow/scripts/par/geds/dsp/nopt.py b/workflow/src/legenddataflow/scripts/par/geds/dsp/nopt.py index 337ed2e..814a72c 100644 --- a/workflow/src/legenddataflow/scripts/par/geds/dsp/nopt.py +++ b/workflow/src/legenddataflow/scripts/par/geds/dsp/nopt.py @@ -8,7 +8,6 @@ import pygama.pargen.noise_optimization as pno from dbetto import TextDB from dbetto.catalog import Props -from legendmeta import LegendMetadata from pygama.pargen.data_cleaning import generate_cuts, get_cut_indexes from pygama.pargen.dsp_optimize import run_one_dsp @@ -24,12 +23,14 @@ def par_geds_dsp_nopt() -> None: argparser.add_argument("--inplots", help="inplots", type=str) argparser.add_argument("--configs", help="configs", type=str, required=True) - argparser.add_argument("--metadata", help="metadata", type=str, required=True) argparser.add_argument("--log", help="log_file", type=str) argparser.add_argument("--datatype", help="Datatype", type=str, required=True) argparser.add_argument("--timestamp", help="Timestamp", type=str, required=True) argparser.add_argument("--channel", help="Channel", type=str, required=True) + argparser.add_argument( + "--raw-table-name", help="raw table name", type=str, required=True + ) argparser.add_argument("--dsp-pars", help="dsp_pars", type=str, required=True) argparser.add_argument("--plot-path", help="plot_path", type=str) @@ -43,10 +44,6 @@ def par_geds_dsp_nopt() -> None: t0 = time.time() - meta = LegendMetadata(path=args.metadata) - channel_dict = meta.channelmap(args.timestamp, system=args.datatype) - channel = f"ch{channel_dict[args.channel].daq.rawid:07}" - dsp_config = config_dict["inputs"]["processing_chain"][args.channel] opt_json = config_dict["inputs"]["optimiser_config"][args.channel] @@ -59,10 +56,12 @@ def par_geds_dsp_nopt() -> None: raw_files = sorted(files) - energies = sto.read(f"{channel}/raw/daqenergy", raw_files)[0] + energies = sto.read( + f"{args.raw_table_name}", raw_files, field_mask=["daqenergy"] + )["daqenergy"][0] idxs = np.where(energies.nda == 0)[0] tb_data = sto.read( - f"{channel}/raw", raw_files, n_rows=opt_dict["n_events"], idx=idxs + "args.raw_table_name", raw_files, n_rows=opt_dict["n_events"], idx=idxs )[0] t1 = time.time() log.info(f"Time to open raw files {t1-t0:.2f} s, n. baselines {len(tb_data)}") @@ -72,7 +71,7 @@ def par_geds_dsp_nopt() -> None: cut_dict = generate_cuts(dsp_data, cut_dict=opt_dict.pop("cut_pars")) cut_idxs = get_cut_indexes(dsp_data, cut_dict) tb_data = sto.read( - f"{channel}/raw", + args.raw_table_name, raw_files, n_rows=opt_dict.pop("n_events"), idx=idxs[cut_idxs], @@ -84,11 +83,16 @@ def par_geds_dsp_nopt() -> None: if args.plot_path: out_dict, plot_dict = pno.noise_optimization( - tb_data, dsp_config, db_dict.copy(), opt_dict, channel, display=1 + tb_data, + dsp_config, + db_dict.copy(), + opt_dict, + args.raw_table_name, + display=1, ) else: out_dict = pno.noise_optimization( - raw_files, dsp_config, db_dict.copy(), opt_dict, channel + raw_files, dsp_config, db_dict.copy(), opt_dict, args.raw_table_name ) t2 = time.time() diff --git a/workflow/src/legenddataflow/scripts/par/geds/dsp/tau.py b/workflow/src/legenddataflow/scripts/par/geds/dsp/tau.py index 4d45037..c14550f 100644 --- a/workflow/src/legenddataflow/scripts/par/geds/dsp/tau.py +++ b/workflow/src/legenddataflow/scripts/par/geds/dsp/tau.py @@ -2,6 +2,7 @@ import pickle as pkl from pathlib import Path +import lgdo import lgdo.lh5 as lh5 import numpy as np from dbetto import TextDB @@ -12,18 +13,22 @@ from .....log import build_log from ....pulser_removal import get_pulser_mask -from ....table_name import get_table_name def par_geds_dsp_tau() -> None: argparser = argparse.ArgumentParser() argparser.add_argument("--configs", help="configs path", type=str, required=True) - argparser.add_argument("--metadata", help="metadata", type=str, required=True) argparser.add_argument("--log", help="log file", type=str) + argparser.add_argument( + "-p", "--no-pulse", help="no pulser present", action="store_true" + ) argparser.add_argument("--datatype", help="Datatype", type=str, required=True) argparser.add_argument("--timestamp", help="Timestamp", type=str, required=True) argparser.add_argument("--channel", help="Channel", type=str, required=True) + argparser.add_argument( + "--raw-table-name", help="raw table name", type=str, required=True + ) argparser.add_argument("--plot-path", help="plot path", type=str, required=False) argparser.add_argument("--output-file", help="output file", type=str, required=True) @@ -33,20 +38,15 @@ def par_geds_dsp_tau() -> None: ) argparser.add_argument("--raw-files", help="input files", nargs="*", type=str) - argparser.add_argument( - "--tcm-files", help="tcm_files", nargs="*", type=str, required=False - ) args = argparser.parse_args() sto = lh5.LH5Store() configs = TextDB(args.configs, lazy=True).on(args.timestamp, system=args.datatype) - config_dict = configs["snakemake_rules"]["pars_dsp_nopt"] + config_dict = configs["snakemake_rules"]["pars_dsp_tau"] log = build_log(config_dict, args.log) - channel = get_table_name(args.metadata, args.timestamp, args.datatype, args.channel) - channel_dict = config_dict["inputs"]["processing_chain"][args.channel] kwarg_dict = config_dict["inputs"]["tau_config"][args.channel] @@ -65,22 +65,22 @@ def par_geds_dsp_tau() -> None: else: input_file = args.raw_files - mask = get_pulser_mask( - pulser_file=args.pulser_file, - tcm_filelist=args.tcm_files, - channel=channel, - pulser_multiplicity_threshold=kwarg_dict.get( - "pulser_multiplicity_threshold" - ), - ) - - data = sto.read( - f"{channel}/raw", + log.debug(lgdo.__version__) + log.debug(f"Reading Data for {args.raw_table_name} from:") + log.debug(input_file) + + data = lh5.read( + args.raw_table_name, input_file, field_mask=["daqenergy", "timestamp", "t_sat_lo"], - )[0].view_as("pd") + ).view_as("pd") threshold = kwarg_dict.pop("threshold") + if args.no_pulse is False: + mask = get_pulser_mask(args.pulser_file) + else: + mask = np.full(len(data), False) + discharges = data["t_sat_lo"] > 0 discharge_timestamps = np.where(data["timestamp"][discharges])[0] is_recovering = np.full(len(data), False, dtype=bool) @@ -98,7 +98,7 @@ def par_geds_dsp_tau() -> None: )[0] tb_data = sto.read( - f"{channel}/raw", + args.raw_table_name, input_file, idx=cuts, n_rows=kwarg_dict.pop("n_events"), diff --git a/workflow/src/legenddataflow/scripts/par/geds/tcm/pulser.py b/workflow/src/legenddataflow/scripts/par/geds/tcm/pulser.py index 8a4ffb8..9b283ca 100644 --- a/workflow/src/legenddataflow/scripts/par/geds/tcm/pulser.py +++ b/workflow/src/legenddataflow/scripts/par/geds/tcm/pulser.py @@ -7,7 +7,6 @@ from pygama.pargen.data_cleaning import get_tcm_pulser_ids from .....log import build_log -from ....table_name import get_table_name def par_geds_tcm_pulser() -> None: @@ -19,6 +18,7 @@ def par_geds_tcm_pulser() -> None: argparser.add_argument("--datatype", help="Datatype", type=str, required=True) argparser.add_argument("--timestamp", help="Timestamp", type=str, required=True) argparser.add_argument("--channel", help="Channel", type=str, required=True) + argparser.add_argument("--rawid", help="rawid", type=str, required=True) argparser.add_argument( "--pulser-file", help="pulser file", type=str, required=False @@ -35,8 +35,6 @@ def par_geds_tcm_pulser() -> None: kwarg_dict = config_dict["inputs"]["pulser_config"] kwarg_dict = Props.read_from(kwarg_dict) - channel = get_table_name(args.metadata, args.timestamp, args.datatype, args.channel) - if ( isinstance(args.tcm_files, list) and args.tcm_files[0].split(".")[-1] == "filelist" @@ -49,7 +47,7 @@ def par_geds_tcm_pulser() -> None: # get pulser mask from tcm files tcm_files = sorted(np.unique(tcm_files)) ids, mask = get_tcm_pulser_ids( - tcm_files, channel, kwarg_dict.pop("pulser_multiplicity_threshold") + tcm_files, args.rawid, kwarg_dict.pop("pulser_multiplicity_threshold") ) Path(args.pulser_file).parent.mkdir(parents=True, exist_ok=True) diff --git a/workflow/src/legenddataflow/scripts/write_filelist.py b/workflow/src/legenddataflow/scripts/write_filelist.py index edeba98..efba09f 100644 --- a/workflow/src/legenddataflow/scripts/write_filelist.py +++ b/workflow/src/legenddataflow/scripts/write_filelist.py @@ -6,7 +6,7 @@ print(f"INFO: found {len(snakemake.input)} files") if len(snakemake.input) == 0: print( - f"WARNING: no DAQ files found for the given pattern: {snakemake.wildcards.label}. " + f"WARNING: no files found for the given pattern: {snakemake.wildcards.label}. " "make sure patterns follows the format: " "all-{experiment}-{period}-{run}-{datatype}-{timestamp}-{tier}.gen" )