Skip to content

Commit

Permalink
Release v2.35 (#94)
Browse files Browse the repository at this point in the history
[2023-01-30 19:45] Fix for Arome and added slowapi ratelimiter

FIXED:
 - Due to the unusual selection of files for the KDP, it was possible for  requests earlier than the currently available dataset file to come up with the oldest file, rather than no file at all. This has been accommodated for.
Signed-off-by: Raoul Linnenbank <[email protected]>

ADDED:
 - Basic SlowAPI support was added with some initial settings. In the future we'll expand upon this and move it to the app config.
  • Loading branch information
rflinnenbank authored Jan 30, 2023
1 parent 76e5101 commit ca0e929
Show file tree
Hide file tree
Showing 7 changed files with 480 additions and 396 deletions.
468 changes: 244 additions & 224 deletions poetry.lock

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "weather_provider_api"
version = "2.34.0"
version = "2.35.0"
description = "Weather Provider Libraries and API"
authors = ["Verbindingsteam", "Raoul Linnenbank <[email protected]>"]
license = "MPL-2.0"
Expand Down Expand Up @@ -30,7 +30,7 @@ pandas = "^1.5.2"
xarray = "^2022.12.0"
cfgrib = "^0.9.10.3"
uvicorn = "^0.20.0"
gevent = "^22.10.2"
slowapi = "^0.1.7"

[tool.poetry.group.dev.dependencies]
pytest = "^7.2.0"
Expand All @@ -39,6 +39,7 @@ pytest-cov = "^4.0.0"
isort = "^5.10.1"
black = "^22.10.0"
jupyter = "^1.0.0"
pylint = "^2.15.10"


[tool.poetry.scripts]
Expand All @@ -50,6 +51,9 @@ wpla_clear_era5land = "weather_provider_api.scripts.erase_era5land_repository:ma
wpla_clear_arome = "weather_provider_api.scripts.erase_arome_repository:main"
wpla_run_api = "weather_provider_api.main:main"

[tool.pylint]
max-line-length = 120

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
10 changes: 10 additions & 0 deletions weather_provider_api/core/initializers/rate_limiter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# SPDX-FileCopyrightText: 2019-2023 Alliander N.V.
# SPDX-License-Identifier: MPL-2.0

# !/usr/bin/env python
# -*- coding: utf-8 -*-
from slowapi import Limiter
from slowapi.util import get_remote_address


API_RATE_LIMITER = Limiter(key_func=get_remote_address, default_limits=["5/minute"])
14 changes: 11 additions & 3 deletions weather_provider_api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import structlog
import uvicorn
from fastapi import FastAPI
from slowapi import _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
from starlette.responses import RedirectResponse

from weather_provider_api.app_config import get_setting
Expand All @@ -26,19 +28,25 @@
from weather_provider_api.core.initializers.monitoring import \
initialize_prometheus_middleware
from weather_provider_api.core.initializers.mounting import mount_api_version
from weather_provider_api.core.initializers.rate_limiter import API_RATE_LIMITER
from weather_provider_api.core.initializers.validation import \
initialize_validation_middleware
from weather_provider_api.versions.v1 import app as v1
from weather_provider_api.versions.v2 import app as v2

app = FastAPI(version=get_setting("APP_VERSION"), title=get_setting("APP_NAME"))

# Add rate limiter
app.state.limiter = API_RATE_LIMITER
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)


# Enable logging
initialize_logging()
logger = structlog.get_logger(__name__)
logger.info(f'--------------------------------------', datetime=datetime.utcnow())
logger.info(f'Booting Weather Provider API Systems..', datetime=datetime.utcnow())
logger.info(f'--------------------------------------', datetime=datetime.utcnow())
logger.info('--------------------------------------', datetime=datetime.utcnow())
logger.info('Booting Weather Provider API Systems..', datetime=datetime.utcnow())
logger.info('--------------------------------------', datetime=datetime.utcnow())

# Create and configure new application instance
initialize_error_handling(app)
Expand Down
45 changes: 21 additions & 24 deletions weather_provider_api/routers/weather/api_view_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,14 @@
# SPDX-License-Identifier: MPL-2.0

"""
TODO:
- Decouple weather factors from the get_weather function
- Add the async process for e.g. CDS and Harmonie
- Improve datetime slicing
- Fix datetime validation in Swagger UI on IE and Safari
- Improve output / error messages when no data is available
"""

from typing import List

import accept_types
from fastapi import APIRouter, BackgroundTasks, Depends, Header, HTTPException
from fastapi import APIRouter, BackgroundTasks, Depends, Header, HTTPException, Request

from weather_provider_api.core.initializers.rate_limiter import API_RATE_LIMITER
from weather_provider_api.routers.weather.api_models import (
WeatherContentRequestQuery,
WeatherFormattingRequestQuery,
Expand Down Expand Up @@ -45,7 +40,7 @@ def header_accept_type(accept: str = Header(None)) -> str:
# @weather_provider_api.get("/sources", response_model=List[WeatherSource], tags=["sync", "async"])
@app.get("/sources", response_model=List[WeatherSource], tags=["sync"])
async def get_sources(): # pragma: no cover
"""<B>List all of the Weather Sources available</B>"""
"""<B>List all the Weather Sources available</B>"""
"""
An API function that returns all of the Sources available from the WeatherController
Args:
Expand All @@ -59,7 +54,7 @@ async def get_sources(): # pragma: no cover
# @weather_provider_api.get("/sources/{source_id}", response_model=WeatherSource, tags=["sync", "async"])
@app.get("/sources/{source_id}", response_model=WeatherSource, tags=["sync"])
async def get_source(source_id: str): # pragma: no cover
"""<B>List all of the Models available for the Source</B>"""
"""<B>List all the Models available for the Source</B>"""
"""
An API function that returns all of the models available for the given source
Args:
Expand Down Expand Up @@ -89,13 +84,15 @@ async def get_sync_models(source_id: str): # pragma: no cover


@app.get("/sources/{source_id}/models/{model_id}", tags=["sync"])
@API_RATE_LIMITER.limit("20/minute")
async def get_sync_weather(
source_id: str,
model_id: str,
cleanup_tasks: BackgroundTasks,
ret_args: WeatherContentRequestQuery = Depends(),
fmt_args: WeatherFormattingRequestQuery = Depends(),
accept: str = Depends(header_accept_type),
request: Request,
source_id: str,
model_id: str,
cleanup_tasks: BackgroundTasks,
ret_args: WeatherContentRequestQuery = Depends(),
fmt_args: WeatherFormattingRequestQuery = Depends(),
accept: str = Depends(header_accept_type),
): # pragma: no cover
"""
<B>Request weather data for a specific Model using the given settings (location, period, weather factors, e.g.). <BR>
Expand All @@ -105,16 +102,16 @@ async def get_sync_weather(
that sometimes the 'begin' and 'end' values will be altered to match these restrictions.)</I>
"""
"""
An API function that retrieves specific weather data for a specific Weather Model and returns it as the
An API function that retrieves specific weather data for a specific Weather Model and returns it as the
requested output format and units.
Args:
source_id: The Source ID of the Source to request the weather data from.
model_id: The Model ID for the Model to request the weather data from.
cleanup_tasks: A BackgroundTasks object to hold any pending cleanup tasks for when the data request is
cleanup_tasks: A BackgroundTasks object to hold any pending cleanup tasks for when the data request is
finished.
ret_args: A WeatherContentRequestQuery object holding the parameters for the weather data request to
use.
fmt_args: A WeatherFormattingRequestQuery object holding the parameters for the output format and
fmt_args: A WeatherFormattingRequestQuery object holding the parameters for the output format and
units to use.
accept: Header type to use for the output file.
Rounded to the most likely value using header_accept_type().
Expand Down Expand Up @@ -189,12 +186,12 @@ async def get_alarm(): # pragma: no cover
# Handler for requests with multiple locations:
@app.get("/sources/{source_id}/models/{model_id}/multiple-locations/", tags=["sync"])
async def get_sync_weather_multi_loc(
source_id: str,
model_id: str,
cleanup_tasks: BackgroundTasks,
ret_args: WeatherContentRequestMultiLocationQuery = Depends(),
fmt_args: WeatherFormattingRequestQuery = Depends(),
accept: str = Depends(header_accept_type),
source_id: str,
model_id: str,
cleanup_tasks: BackgroundTasks,
ret_args: WeatherContentRequestMultiLocationQuery = Depends(),
fmt_args: WeatherFormattingRequestQuery = Depends(),
accept: str = Depends(header_accept_type),
): # pragma: no cover
source_id = source_id.lower()
model_id = model_id.lower()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# SPDX-FileCopyrightText: 2019-2022 Alliander N.V.
# SPDX-License-Identifier: MPL-2.0
import glob
import re
import sys
import tarfile
import tempfile
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Tuple
from typing import List, Tuple

import cfgrib
import numpy as np
Expand All @@ -24,9 +27,6 @@
from weather_provider_api.routers.weather.utils.grid_helpers import round_coordinates_to_wgs84_grid


# SPDX-FileCopyrightText: 2019-2022 Alliander N.V.
# SPDX-License-Identifier: MPL-2.0

class HarmonieAromeRepository(WeatherRepositoryBase):
""" The Weather Repository class for the 'KNMI - Harmonie Arome' dataset
Expand Down Expand Up @@ -91,6 +91,8 @@ def update(self):
# Cleanup the repository
self.cleanup()

self.logger.info(f'KNMI Arome Update - Storage in: {self.repository_folder} ')

# Configure initial settings for the update
start_of_update = datetime.utcnow()
no_of_items_processed = 0
Expand All @@ -101,6 +103,9 @@ def update(self):
self.logger.info(f'- A forced end time of [{forced_end_of_update}] was set.')

prediction_to_evaluate = self.get_most_recent_prediction_moment
download_folder = Path(tempfile.gettempdir()).joinpath(self.dataset_name)
knmi_downloader = KNMIDownloader(self.dataset_name, self.dataset_version, str(download_folder), True)
files_in_dataset = {item['filename']: item['size'] for item in knmi_downloader.get_all_available_files()}

while prediction_to_evaluate >= self.first_day_of_repo:
if no_of_items_processed != 0:
Expand All @@ -117,17 +122,16 @@ def update(self):
file_to_download = f'harm40_v1_p1_{prediction_to_evaluate.year}' \
f'{str(prediction_to_evaluate.month).zfill(2)}' \
f'{str(prediction_to_evaluate.day).zfill(2)}' \
f'{str(prediction_to_evaluate.hour).zfill(2)}'

download_folder = Path(tempfile.gettempdir()).joinpath(self.dataset_name)
downloader = KNMIDownloader(
self.dataset_name, self.dataset_version, file_to_download, 1, download_folder
)
self._clear_temp_folder(download_folder)
files_saved = downloader.knmi_download_request()
f'{str(prediction_to_evaluate.hour).zfill(2)}.tar'

self._process_downloaded_files(download_folder, files_saved, prediction_to_evaluate)
no_of_items_processed += 1
if file_to_download in files_in_dataset.keys():
# File exists
knmi_downloader.download_specific_file(file_to_download, files_in_dataset[file_to_download])
self._process_downloaded_file(download_folder, file_to_download, prediction_to_evaluate)
no_of_items_processed += 1
else:
self.logger.info(f'The expected file [{file_to_download}] was not found within the KNMI dataset. '
f'Moving on to the next file!')
else:
self.logger.debug(f'The prediction for [{prediction_to_evaluate}] is already stored in the repository.')

Expand All @@ -148,27 +152,25 @@ def _prediction_already_available(self, prediction_moment: datetime):

return False

def _process_downloaded_files(
self, download_folder: Path, downloaded_files: List[Dict], predication_time: datetime
def _process_downloaded_file(
self, download_folder: Path, filename: str, prediction_time: datetime
):
""" The function that processes a number of downloaded files into repository files.
"""
for downloaded_file in downloaded_files:
filename = downloaded_file['filename']
stage = 'Started unpacking files..'
try:
self._unpack_downloaded_file(download_folder, filename)
stage = 'Files were unpacked'
self._convert_unpacked_data_to_netcdf4_files(download_folder)
stage = 'Data was converted to NetCDF4'
self._fuse_hourly_netcdf4_files(download_folder, predication_time)
stage = 'NetCDF4 files were properly fused together'
self._clear_temp_folder(download_folder)
except Exception as e:
self.logger.warning(f'Processing did not get past stage: {stage}', datetime=datetime.utcnow())
self.logger.warning(f'The downloaded data could not be properly downloaded: {e}',
datetime=datetime.utcnow())
stage = 'Started unpacking files..'
try:
self._unpack_downloaded_file(download_folder, filename)
stage = 'Files were unpacked'
self._convert_unpacked_data_to_netcdf4_files(download_folder, prediction_time)
stage = 'Data was converted to NetCDF4'
self._fuse_hourly_netcdf4_files(download_folder, prediction_time)
stage = 'NetCDF4 files were properly fused together'
self._clear_temp_folder(download_folder)
except Exception as e:
self.logger.warning(f'Processing did not get past stage: {stage}', datetime=datetime.utcnow())
self.logger.warning(f'The downloaded data could not be properly downloaded: {e}',
datetime=datetime.utcnow())

def _clear_temp_folder(self, download_folder: Path):
""" A function that cleans up the temporary download folder to prevent issues with partially written files.
Expand Down Expand Up @@ -197,7 +199,7 @@ def _unpack_downloaded_file(self, download_folder: Path, file_name: str):
self.logger.error(f'The tarfile [{file_name}] could not be unpacked!', datetime=datetime.utcnow())
raise e

def _convert_unpacked_data_to_netcdf4_files(self, download_folder: Path):
def _convert_unpacked_data_to_netcdf4_files(self, download_folder: Path, prediction_time: datetime):
""" This function converts any unpacked data files into NetCDF4 files
"""
Expand All @@ -208,7 +210,9 @@ def _convert_unpacked_data_to_netcdf4_files(self, download_folder: Path):
self.logger.error('CFGRIB was not properly installed. Cannot access GRIB files.')
raise e

grib_files_available = glob.glob(str(download_folder.joinpath('HA40_N25_*_GB')))
grib_files_available = glob.glob(str(download_folder.joinpath(
f'HA40_N25_{prediction_time.strftime("%Y%m%d%H")}00_*_GB'))
)

for grib_file in grib_files_available:
self.logger.debug(f'Processing GRIB file: {grib_file}')
Expand Down Expand Up @@ -369,8 +373,6 @@ def _fuse_hourly_netcdf4_files(self, download_folder: Path, prediction_moment: d
encoding = {v: {'zlib': True, 'complevel': 4} for v in fused_dataset.variables}
fused_dataset.to_netcdf(filename_to_save_to, format='NETCDF4', engine='netcdf4', encoding=encoding)

self._safely_delete_file(f'{filename_to_save_to}_UC')

def _build_lat_lon_grid(self, grib_message: cfgrib.Message) -> Tuple[List[float], List[float]]:
""" This function uses an existing GRIB file to extract the dimensions of the 'regular_ll' grid and format
those into a list of latitudes and a list of longitudes that together make up the grid.
Expand Down
Loading

0 comments on commit ca0e929

Please sign in to comment.