diff --git a/.github/workflows/build-manually.yaml b/.github/workflows/build-manually.yaml
index cbad7f3..22381ec 100644
--- a/.github/workflows/build-manually.yaml
+++ b/.github/workflows/build-manually.yaml
@@ -5,11 +5,11 @@ on:
rubinenvVersion:
description: 'rubin-env version'
required: true
- default: '7.0.1'
+ default: '8.0.0'
obsLsstVersion:
description: 'Science Pipelines release'
required: true
- default: 'w_2023_41'
+ default: 'w_2024_12'
env:
diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml
index aa127bf..32f164f 100644
--- a/.github/workflows/build.yaml
+++ b/.github/workflows/build.yaml
@@ -11,6 +11,7 @@ env:
ENQUEUE_NAME: embargo-butler-enqueue
INGEST_NAME: embargo-butler-ingest
IDLE_NAME: embargo-butler-idle
+ PRESENCE_NAME: embargo-butler-presence
jobs:
push:
@@ -20,7 +21,7 @@ jobs:
contents: read
steps:
- name: Checkout
- uses: actions/checkout@v3
+ uses: actions/checkout@v4
- name: Build enqueue image
run: |
@@ -43,6 +44,13 @@ jobs:
--tag $IDLE_NAME \
--label "runnumber=${GITHUB_RUN_ID}"
+ - name: Build presence image
+ run: |
+ docker build . \
+ -f Dockerfile.presence \
+ --tag $PRESENCE_NAME \
+ --label "runnumber=${GITHUB_RUN_ID}"
+
- name: Log in to GitHub Container Registry
run: echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io -u $ --password-stdin
@@ -51,6 +59,7 @@ jobs:
ENQUEUE_ID=ghcr.io/${{ github.repository_owner }}/$ENQUEUE_NAME
INGEST_ID=ghcr.io/${{ github.repository_owner }}/$INGEST_NAME
IDLE_ID=ghcr.io/${{ github.repository_owner }}/$IDLE_NAME
+ PRESENCE_ID=ghcr.io/${{ github.repository_owner }}/$PRESENCE_NAME
if [[ "${{ github.ref }}" == "refs/pull/"* ]]; then
VERSION=$(echo "${{ github.head_ref }}" | sed -e 's|.*/||')
@@ -62,6 +71,7 @@ jobs:
echo ENQUEUE_ID=$ENQUEUE_ID
echo INGEST_ID=$INGEST_ID
echo IDLE_ID=$IDLE_ID
+ echo PRESENCE_ID=$PRESENCE_ID
echo VERSION=$VERSION
docker tag $ENQUEUE_NAME $ENQUEUE_ID:$VERSION
docker push $ENQUEUE_ID:$VERSION
@@ -69,3 +79,5 @@ jobs:
docker push $INGEST_ID:$VERSION
docker tag $IDLE_NAME $IDLE_ID:$VERSION
docker push $IDLE_ID:$VERSION
+ docker tag $PRESENCE_NAME $PRESENCE_ID:$VERSION
+ docker push $PRESENCE_ID:$VERSION
diff --git a/Dockerfile.presence b/Dockerfile.presence
new file mode 100644
index 0000000..23ff0c7
--- /dev/null
+++ b/Dockerfile.presence
@@ -0,0 +1,32 @@
+# This file is part of embargo_butler.
+#
+# Developed for the LSST Data Management System.
+# This product includes software developed by the LSST Project
+# (http://www.lsst.org).
+# See the COPYRIGHT file at the top-level directory of this distribution
+# for details of code ownership.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+# Dockerfile for presence service.
+
+FROM python:3.11
+RUN pip install redis gunicorn flask
+WORKDIR /presence
+COPY src/presence.py src/utils.py /presence/
+# environment variables that must be set:
+# REDIS_HOST REDIS_PASSWORD
+# optional:
+# DELETE_SEEN
+ENTRYPOINT [ "gunicorn", "-b", "0.0.0.0:8000", "-w", "2", "presence:app" ]
diff --git a/src/ingest.py b/src/ingest.py
index 08a4896..42a1d93 100644
--- a/src/ingest.py
+++ b/src/ingest.py
@@ -22,10 +22,12 @@
"""
Service to ingest images or LFA objects into per-bucket Butler repos.
"""
+import json
import os
import socket
import time
+import astropy.io.fits
import requests
from lsst.daf.butler import Butler
from lsst.obs.base import DefineVisitsTask, RawIngestTask
@@ -50,6 +52,7 @@
butler_repo = os.environ["BUTLER_REPO"]
webhook_uri = os.environ.get("WEBHOOK_URI", None)
is_lfa = "rubinobs-lfa-" in bucket
+group_lifetime = int(os.environ.get("GROUP_LIFETIME", 86400))
worker_name = socket.gethostname()
worker_queue = f"WORKER:{bucket}:{worker_name}"
@@ -134,6 +137,44 @@ def on_metadata_failure(dataset, exc):
pipe.execute()
+def record_groups(resources: list[ResourcePath]) -> None:
+ """Record the group ids from received FITS files in Redis.
+
+ Parameters
+ ----------
+ resources: `list` [`ResourcePath`]
+ The resources to record group ids from.
+ """
+
+ global r, group_lifetime, logger
+ with r.pipeline() as pipe:
+ for res in resources:
+ if not res.exists():
+ continue
+ json_file = res.updatedExtension("json")
+ header = {}
+ try:
+ with json_file.open("rb") as f:
+ header = json.load(f)
+ except Exception:
+ try:
+ with res.open("rb") as f:
+ header = astropy.io.fits.open(f)[0].header
+ except Exception:
+ logger.exception("Error reading header for %s", res)
+ try:
+ instrument = header["INSTRUME"]
+ groupid = header["GROUPID"]
+ snap_number = int(header["CURINDEX"]) - 1
+ detector = header["RAFTBAY"] + "_" + header["CCDSLOT"]
+ key = f"GROUP:{instrument}:{groupid}:{snap_number}:{detector}"
+ pipe.set(key, str(res))
+ pipe.expire(key, group_lifetime)
+ except Exception:
+ logger.exception("Error reading group for %s", res)
+ pipe.execute()
+
+
def main():
"""Ingest FITS files from a Redis queue."""
logger.info("Initializing Butler from %s", butler_repo)
@@ -162,12 +203,17 @@ def main():
# Ingest if we have resources
if resources:
+
+ if not is_lfa:
+ record_groups(resources)
+
logger.info("Ingesting %s", resources)
refs = None
try:
refs = ingester.run(resources)
except Exception:
logger.exception("Error while ingesting %s", resources)
+ continue
# Define visits if we ingested anything
if not is_lfa and refs:
diff --git a/src/presence.py b/src/presence.py
new file mode 100644
index 0000000..77607e0
--- /dev/null
+++ b/src/presence.py
@@ -0,0 +1,78 @@
+# This file is part of embargo_butler.
+#
+# Developed for the LSST Data Management System.
+# This product includes software developed by the LSST Project
+# (http://www.lsst.org).
+# See the COPYRIGHT file at the top-level directory of this distribution
+# for details of code ownership.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""
+Presence service to translate group names to image URIs.
+"""
+import os
+import re
+
+from flask import Flask
+
+from utils import setup_logging, setup_redis
+
+logger = setup_logging(__name__)
+r = setup_redis()
+
+# Delete image key when seen, before it expires, to save space
+delete_seen = os.environ.get("DELETE_SEEN") is not None
+
+app = Flask(__name__)
+
+
+@app.get("/presence////")
+def presence(instrument: str, group_name: str, snap_index: int, detector_name: str) -> dict | tuple:
+ """Return the presence and URI of an image matching the parameters.
+
+ Parameters
+ ----------
+ instrument: `str`
+ Name of the instrument taking the image.
+ group_name: `str`
+ Name of the group (from the GROUPID FITS header).
+ snap_index: `int`
+ Number of the snap (zero-based).
+ detector_name: `str`
+ Name of the detector ("RNN_SNN").
+
+ Returns
+ -------
+ json: `dict`
+ JSON with "error", "present", "uri", and/or "message" keys.
+ """
+
+ try:
+ if instrument not in ("LATISS", "LSSTComCam", "LSSTComCamSim", "LSSTCam", "LSST-TS8"):
+ return ({"error": True, "message": f"Unknown instrument {instrument}"}, 400)
+ if not re.match(r"R\d\d_S\d\d", detector_name):
+ return ({"error": True, "message": f"Unrecognized detector name {detector_name}"}, 400)
+ key = f"GROUP:{instrument}:{group_name}:{snap_index}:{detector_name}"
+ result = r.get(key)
+ if result:
+ logger.info(f"Found key {key}")
+ if delete_seen:
+ r.delete(key)
+ return {"error": False, "present": True, "uri": result.decode()}
+ else:
+ logger.debug(f"No key {key}")
+ return {"error": False, "present": False}
+ except Exception as e:
+ return ({"error": True, "message": str(e)}, 500)