Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(repo): Add structured logging for containers #13

Merged
merged 1 commit into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ FROM python:3.11-slim
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/

RUN apt-get update -y && \
apt-get -y --no-install-recommends install git unzip build-essential && \
apt-get -y --no-install-recommends install git build-essential && \
rm -rf /var/lib/apt/lists/*

# Add repository code
Expand All @@ -13,5 +13,7 @@ COPY .git /opt/app/.git

RUN uv sync --no-dev

ENV _TYPER_STANDARD_TRACEBACK=1

ENTRYPOINT ["uv", "run", "--no-dev"]
CMD ["cloudcasting-app"]
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dependencies = [
"fsspec==2024.6.1",
"huggingface-hub==0.28.1",
"hydra-core==1.3.2",
"loguru == 0.7.3",
"numpy==2.1.2",
"ocf-data-sampler==0.1.4",
"ocf_blosc2==0.0.13",
Expand Down
51 changes: 51 additions & 0 deletions src/cloudcasting_app/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""Setup logging configuration for the application."""

import json
import sys
import os
import loguru

def development_formatter(record: "loguru.Record") -> str:
"""Format a log record for development."""
return "".join((
"<green>{time:HH:mm:ss.SSS}</green> ",
"<level>{level:<7s}</level> [{file}:{line}] | <level>{message}</level> ",
"<green>{extra}</green>" if record["extra"] else "",
"\n{exception}",
))

def structured_formatter(record: "loguru.Record") -> str:
"""Format a log record as a structured JSON object."""
record["extra"]["serialized"] = json.dumps({
"timestamp": record["time"].strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"severity": record["level"].name,
"message": record["message"],
"logging.googleapis.com/labels": {"python_logger": record["name"]},
"logging.googleapis.com/sourceLocation": {
"file": record["file"].name,
"line": record["line"],
"function": record["function"],
},
"context": record["extra"],
})
return "{extra[serialized]}\n"

# Define the logging formatter, removing the default one
loguru.logger.remove(0)
if sys.stdout.isatty():
# Simple logging for development
loguru.logger.add(
sys.stdout, format=development_formatter, diagnose=True,
level=os.getenv("LOGLEVEL", "DEBUG"), backtrace=True, colorize=True,
)
else:
# JSON logging for containers
loguru.logger.add(
sys.stdout, format=structured_formatter, backtrace=True,
level=os.getenv("LOGLEVEL", "INFO").upper(),
)

# Uncomment and change the list to quieten external libraries
# for logger in ["aiobotocore", "cfgrib"]:
# logging.getLogger(logger).setLevel(logging.WARNING)

86 changes: 37 additions & 49 deletions src/cloudcasting_app/app.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
"""
The main script for running the cloudcasting model in production
"""The main script for running the cloudcasting model in production

This app expects these environmental variables to be available:
SATELLITE_ZARR_PATH (str): The path of the input satellite data
OUTPUT_PREDICTION_DIRECTORY (str): The path of the directory to save the predictions to
"""

from importlib.metadata import PackageNotFoundError, version
import logging
import os
import yaml
import hydra
import typer
import fsspec
from importlib.metadata import PackageNotFoundError, version

import fsspec
import hydra
import pandas as pd
import xarray as xr
import torch

import typer
import xarray as xr
import yaml
from huggingface_hub import snapshot_download
from safetensors.torch import load_model
from loguru import logger

from cloudcasting_app.data import prepare_satellite_data, sat_path, get_input_data
from cloudcasting_app.data import get_input_data, prepare_satellite_data, sat_path

# Get package version
try:
Expand All @@ -30,15 +28,6 @@
__version__ = "v?"

# ---------------------------------------------------------------------------
# GLOBAL SETTINGS

logging.basicConfig(
level=getattr(logging, os.getenv("LOGLEVEL", "INFO")),
format="[%(asctime)s] {%(pathname)s:%(lineno)d} %(levelname)s - %(message)s",
)

# Create a logger
logger = logging.getLogger(__name__)

# Model will use GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
Expand All @@ -56,9 +45,8 @@ def app(t0=None):
Args:
t0 (datetime): Datetime at which forecast is made
"""

logger.info(f"Using `cloudcasting_app` version: {__version__}")

logger.info(f"Using `cloudcasting_app` version: {__version__}", version=__version__)

# ---------------------------------------------------------------------------
# 0. If inference datetime is None, round down to last 30 minutes
if t0 is None:
Expand All @@ -72,87 +60,87 @@ def app(t0=None):
# 1. Prepare the input data
logger.info("Downloading satellite data")
prepare_satellite_data(t0)

# ---------------------------------------------------------------------------
# 2. Load model
logger.info("Loading model")

hf_download_dir = snapshot_download(
repo_id=REPO_ID,
revision=REVISION,
)
with open(f"{hf_download_dir}/model_config.yaml", "r", encoding="utf-8") as f:

with open(f"{hf_download_dir}/model_config.yaml", encoding="utf-8") as f:
model = hydra.utils.instantiate(yaml.safe_load(f))

model = model.to(device)

load_model(
model,
filename=f"{hf_download_dir}/model.safetensors",
filename=f"{hf_download_dir}/model.safetensors",
strict=True,
)

model.eval()

# ---------------------------------------------------------------------------
# 3. Get inference inputs
logger.info("Preparing inputs")

# TODO check the spatial dimensions of this zarr
# Get inputs
ds = xr.open_zarr(sat_path)

# Reshape to (channel, time, height, width)
ds = ds.transpose("variable", "time", "y_geostationary", "x_geostationary")

X = get_input_data(ds, t0)

# Convert to tensor, expand into batch dimension, and move to device
X = X[None, ...].to(device)

# ---------------------------------------------------------------------------
# 4. Make predictions
logger.info("Making predictions")

with torch.no_grad():
y_hat = model(X).cpu().numpy()

# ---------------------------------------------------------------------------
# 5. Save predictions
logger.info("Saving predictions")
da_y_hat = xr.DataArray(
y_hat,
dims=["init_time", "variable", "step", "y_geostationary", "x_geostationary"],
y_hat,
dims=["init_time", "variable", "step", "y_geostationary", "x_geostationary"],
coords={
"init_time": [t0],
"variable": ds.variable,
"step": pd.timedelta_range(start="15min", end="180min", freq="15min"),
"y_geostationary": ds.y_geostationary,
"x_geostationary": ds.x_geostationary,
}
},
)

ds_y_hat = da_y_hat.to_dataset(name="sat_pred")
ds_y_hat.sat_pred.attrs.update(ds.data.attrs)

# Save predictions to the latest path and to path with timestring
out_dir = os.environ["OUTPUT_PREDICTION_DIRECTORY"]

latest_zarr_path = f"{out_dir}/latest.zarr"
t0_string_zarr_path = t0.strftime(f"{out_dir}/%Y-%m-%dT%H:%M.zarr")

fs = fsspec.open(out_dir).fs
for path in [latest_zarr_path, t0_string_zarr_path]:

# Remove the path if it exists already
if fs.exists(path):
logger.info(f"Removing path: {path}")
fs.rm(path, recursive=True)

ds_y_hat.to_zarr(path)


def main() -> None:
"""Entrypoint to the application."""
typer.run(app)
Expand Down
Loading