Skip to content

Commit

Permalink
add exception handling to zip operations
Browse files Browse the repository at this point in the history
  • Loading branch information
mki-c2c committed Jan 31, 2025
1 parent fd970b8 commit d7f8aaf
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 25 deletions.
96 changes: 77 additions & 19 deletions backend/maelstro/core/clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from requests.exceptions import HTTPError
from fastapi import HTTPException
from geonetwork import GnApi
from geonetwork.exceptions import ParameterException
from geoservercloud.services import RestService as GeoServerService # type: ignore
from maelstro.metadata import Meta
from maelstro.config import ConfigError, app_config as config
Expand All @@ -25,7 +26,8 @@ def __init__(self, err_dict: dict[str, str]):

class AuthError(MaelstroException):
def __init__(self, err_dict: dict[str, str]):
super().__init__({**err_dict, "status_code": "401"})
# status 501 instead of 401 because repsone truncated by gateway
super().__init__({**err_dict, "status_code": "501"})


class UrlError(MaelstroException):
Expand All @@ -38,23 +40,25 @@ def __init__(self, err_dict: dict[str, str]):
super().__init__({**err_dict, "status_code": "406"})


# TODO: handle requests.exceptions.ConnectTimeout errorxception
class TimeoutError(MaelstroException):
def __init__(self, err_dict: dict[str, str]):
super().__init__({**err_dict, "status_code": "406"})


class CloneDataset:
def __init__(self, src_name: str, dst_name: str, uuid: str, dry: bool = False):
self.op_logger = OpLogger()
self.src_name = src_name
self.dst_name = dst_name
self.set_uuid(uuid)
self.uuid = uuid
self.copy_meta = False
self.copy_layers = False
self.copy_styles = False
self.dry = dry

def set_uuid(self, uuid: str) -> None:
self.uuid = uuid
if uuid:
gn = get_gn_service(self.src_name, True, self.op_logger)
zipdata = gn.get_record_zip(uuid).read()
self.meta = Meta(zipdata)

def clone_dataset(
self,
Expand All @@ -63,23 +67,73 @@ def clone_dataset(
copy_styles: bool,
output_format: str = "text/plain",
) -> str | list[Any]:
if self.meta is None:
return []
self.copy_meta = copy_meta
self.copy_layers = copy_layers
self.copy_styles = copy_styles

if copy_layers or copy_styles:
self.clone_layers()

if copy_meta:
gn_dst = get_gn_service(self.dst_name, False, self.op_logger)
mapping: dict[str, list[str]] = {
"sources": config.get_gs_sources(),
"destinations": [src["gs_url"] for src in config.get_destinations()],
}
self.meta.update_geoverver_urls(mapping)
gn_dst.put_record_zip(BytesIO(self.meta.xml_bytes))
try:
if self.uuid:
gn = get_gn_service(self.src_name, True, self.op_logger)
zipdata = gn.get_record_zip(self.uuid).read()
self.meta = Meta(zipdata)

if self.meta is None:
return []

if copy_layers or copy_styles:
self.clone_layers()

if copy_meta:
gn_dst = get_gn_service(self.dst_name, False, self.op_logger)
mapping: dict[str, list[str]] = {
"sources": config.get_gs_sources(),
"destinations": [
src["gs_url"] for src in config.get_destinations()
],
}
pre_info, post_info = self.meta.update_geoverver_urls(mapping)
self.op_logger.log_operation(
"Update of geoserver links in zip archive",
str({"before": pre_info, "after": post_info}),
"GnApi",
"clone",
)
try:
results = gn_dst.put_record_zip(BytesIO(self.meta.get_zip()))
except ParameterException as err:
self.op_logger.log_operation(
"Record creation failed ({err.args[0]['code']})",
err.args[0]["details"],
"GnApi",
"clone",
)
raise HTTPException(406, self.op_logger.get_operations()) from err
self.op_logger.log_operation(
results["msg"],
results["detail"],
"GnApi",
"clone",
)
except HTTPError as err:
self.op_logger.log_operation(
"Failed",
err.response.url,
"requests",
"clone",
)
status_code = err.response.status_code
if status_code in [401, 403, 404]:
# prevent output truncation by gateway
status_code += 100
raise HTTPException(status_code, self.op_logger.get_operations()) from err
except ParameterException as err:
self.op_logger.log_operation(
err.args[0]["msg"],
err.args[0]["code"],
"GnApi",
"clone",
)
raise HTTPException(406, self.op_logger.get_operations()) from err

if output_format == "text/plain":
return self.op_logger.format_operations()
Expand Down Expand Up @@ -166,6 +220,10 @@ def clone_styles(
) -> None:
default_style = layer_data["layer"]["defaultStyle"]
additional_styles = layer_data["layer"].get("styles", {}).get("style", [])
if isinstance(additional_styles, dict):
# in case of a single element in the list, this may be provided by the API
# as a dict, it must be converted to a list of dicts
additional_styles = [additional_styles]
all_styles = {
style["name"]: {
"workspace": style.get("workspace"),
Expand Down
20 changes: 15 additions & 5 deletions backend/maelstro/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ def debug_page(request: Request) -> dict[str, Any]:
"root_path",
]
},
"data": request.body(),
"method": request.method,
"url": request.url,
"url": str(request.url),
"headers": dict(request.headers),
"query_params": request.query_params.multi_items(),
}
Expand Down Expand Up @@ -116,7 +117,13 @@ def get_layers(src_name: str, uuid: str) -> list[dict[str, str]]:
return meta.get_ogc_geoserver_layers()


@app.put("/copy")
@app.put(
"/copy",
responses={
200: {"content": {"text/plain": {}, "application/json": {}}},
406: {"description": "should be 404, but 404 is rewritten by the gateway"},
},
)
def put_dataset_copy(
src_name: str,
dst_name: str,
Expand All @@ -125,16 +132,19 @@ def put_dataset_copy(
copy_layers: bool = True,
copy_styles: bool = True,
dry_run: bool = False,
accept: Annotated[str, Header()] = "text/plain",
accept: Annotated[str, Header(include_in_schema=False)] = "text/plain",
) -> Any:
if accept not in ["text/plain", "application/json"]:
raise HTTPException(
status.HTTP_415_UNSUPPORTED_MEDIA_TYPE,
f"Unsupported media type: {accept}. "
'Accepts "text/plain" or "application/json"',
)
clone_ds = CloneDataset(src_name, dst_name, metadataUuid, dry_run)
logged_ops = clone_ds.clone_dataset(copy_meta, copy_layers, copy_styles, accept)
try:
clone_ds = CloneDataset(src_name, dst_name, metadataUuid, dry_run)
logged_ops = clone_ds.clone_dataset(copy_meta, copy_layers, copy_styles, accept)
except HTTPException as err:
raise err
if accept == "application/json":
return logged_ops
return PlainTextResponse(logged_ops)
Expand Down
34 changes: 33 additions & 1 deletion backend/maelstro/metadata/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def get_gslayer_from_gn_link(
workspace_name=ows_url.lstrip("/").split("/")[0], layer_name=layer_name
)

def update_geoverver_urls(self, mapping: dict[str, list[str]]) -> None:
def update_geoverver_urls(self, mapping: dict[str, list[str]]) -> tuple[str, str]:
xml_root = etree.parse(BytesIO(self.xml_bytes))
for url_node in xml_root.findall(
f".//{self.prefix}:CI_OnlineResource/{self.prefix}:linkage/",
Expand All @@ -75,7 +75,10 @@ def update_geoverver_urls(self, mapping: dict[str, list[str]]) -> None:
b_io = BytesIO()
xml_root.write(b_io)
b_io.seek(0)
pre = len(self.xml_bytes)
self.xml_bytes = b_io.read()
post = len(self.xml_bytes)
return f"Before: {pre} bytes", f"Before: {post} bytes"

def is_ogc_layer(self, link_node: etree._Element) -> bool:
link_protocol = self.protocol_from_link(link_node)
Expand Down Expand Up @@ -125,3 +128,32 @@ def __init__(self, zipfile: bytes):
schema = self.properties.get("schema", "iso19139")

super().__init__(xml_bytes, schema)

def update_geoverver_urls(self, mapping: dict[str, list[str]]) -> tuple[str, str]:
super().update_geoverver_urls(mapping)
new_bytes = BytesIO(b"")
with ZipFile(BytesIO(self.zipfile), "r") as zf_src:
# get compression type from non directory elements of zip archive
compression = next(
fi.compress_type for fi in zf_src.infolist() if not fi.is_dir()
)
md_filepath = f"{self.properties['uuid']}/metadata/metadata.xml"
pre_info = zf_src.getinfo(md_filepath)
with ZipFile(new_bytes, "w", compression=compression) as zf_dst:
for file_info in zf_src.infolist():
if file_info.is_dir():
zf_dst.mkdir(file_info)
else:
file_path = file_info.filename
with zf_dst.open(file_path, "w") as zb:
if file_path == md_filepath:
zb.write(self.xml_bytes)
else:
zb.write(zf_src.read(file_path))
post_info = zf_dst.getinfo(md_filepath)
new_bytes.seek(0)
self.zipfile = new_bytes.read()
return str(pre_info), str(post_info)

def get_zip(self) -> bytes:
return self.zipfile

0 comments on commit d7f8aaf

Please sign in to comment.