-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathChannelSummaryPublisher.py
executable file
·150 lines (132 loc) · 5.43 KB
/
ChannelSummaryPublisher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: Ampel-contrib-HU/ampel/contrib/hu/t3/ChannelSummaryPublisher.py
# License: BSD-3-Clause
# Author: m. giomi <[email protected]>
# Date: 13.11.2018
# Last Modified Date: 16.12.2020
# Last Modified By: Jakob van Santen <[email protected]>
import datetime
from collections.abc import Generator
from io import BytesIO
from typing import Any
import backoff
import requests
from astropy.time import Time
from pytz import timezone
from requests.auth import HTTPBasicAuth
from ampel.abstract.AbsPhotoT3Unit import AbsPhotoT3Unit
from ampel.secret.NamedSecret import NamedSecret
from ampel.struct.T3Store import T3Store
from ampel.struct.UnitResult import UnitResult
from ampel.types import ChannelId, T3Send, UBson
from ampel.view.TransientView import TransientView
from .DCachePublisher import type_adapter
class ChannelSummaryPublisher(AbsPhotoT3Unit):
"""
Create a json file with summary statistics for the channel. For the transients
detected in the last N days, this json file contains, i.e. coords, RB score,
first detection, latest detection, and the total number of transients detected
by the channel. The summary info for each transient is taken from
T2LightCurveSummary.
"""
dry_run: bool = False
base_url: str = "https://desycloud.desy.de/remote.php/webdav/AMPEL/ZTF"
auth: NamedSecret[list] = NamedSecret[list](label="desycloud/valery")
def post_init(self) -> None:
self.summary: dict[str, Any] = {}
self._jd_range = [float("inf"), -float("inf")]
self._channels: set[ChannelId] = set()
self.session = requests.Session()
self.session.auth = HTTPBasicAuth(*self.auth.get())
self._adapter = type_adapter(dict)
def extract_from_transient_view(
self, tran_view: TransientView
) -> None | dict[str, Any]:
"""
given transient view object return a dictionary
with the desired metrics
"""
out: dict[str, Any] = {}
assert tran_view.stock
if names := tran_view.stock.get("name"):
out["ztf_name"] = next(
name
for name in names
if isinstance(name, str) and name.startswith("ZTF")
)
out["tns_names"] = tuple(
name
for name in names
if isinstance(name, str) and name.startswith("TNS")
)
# incorporate T2LightCurveSummary
if summary := tran_view.get_t2_body(unit="T2LightCurveSummary"):
assert isinstance(summary, dict)
out.update(summary)
last_detection = summary["last_detection"]
self._jd_range[0] = min(last_detection, self._jd_range[0])
self._jd_range[1] = max(last_detection, self._jd_range[1])
return out
def process(
self, gen: Generator[TransientView, T3Send, None], t3s: None | T3Store = None
) -> UBson | UnitResult:
"""
load the stats from the alerts
"""
for tran_view in gen:
assert tran_view.stock
if len(channels := tran_view.stock.get("channel") or []) != 1:
raise ValueError("Only single-channel views are supported")
info_dict = self.extract_from_transient_view(tran_view)
if not info_dict:
continue
key = info_dict.pop("ztf_name")
self.summary[key] = info_dict
self._channels.add(channels[0])
self.done()
return None
@backoff.on_exception(
backoff.expo,
(TimeoutError, requests.exceptions.HTTPError),
giveup=lambda exc: isinstance(exc, requests.exceptions.HTTPError)
and exc.response.status_code not in {400, 403, 405, 423, 500},
)
def done(self) -> None:
""""""
if len(self._channels) == 0:
return
if len(self._channels) > 1:
raise ValueError(
f"Got multiple channels ({list(self._channels)}) in summary"
)
# Find the date of the most recent observation, in Pacific time
timestamp = Time(self._jd_range[-1], format="jd").to_datetime(
timezone("US/Pacific")
)
# If before noon, it belongs to the night that started yesterday
if timestamp.hour < 12:
timestamp -= datetime.timedelta(days=1)
filename = timestamp.strftime("channel-summary-%Y%m%d.json")
channel = next(iter(self._channels))
basedir = f"{self.base_url}/{channel}"
rep = self.session.head(basedir)
if not (rep.ok or self.dry_run):
self.session.request("MKCOL", basedir).raise_for_status()
try:
rep = self.session.get(f"{basedir}/{filename}")
rep.raise_for_status()
partial_summary = self._adapter.validate_json(rep.content)
partial_summary.update(self.summary)
self.summary = partial_summary
except (requests.exceptions.HTTPError, StopIteration):
pass
outfile = BytesIO()
outfile.write(self._adapter.dump_json(self.summary))
outfile.write(b"\n")
mb = len(outfile.getvalue()) / 2.0**20
self.logger.info(f"{filename}: {len(self.summary)} transients {mb:.1f} MB")
if not self.dry_run:
self.session.put(
f"{basedir}/{filename}", data=outfile.getvalue()
).raise_for_status()