Skip to content

Commit

Permalink
Merge pull request #43 from lsst/tickets/DM-42606
Browse files Browse the repository at this point in the history
DM-42606: Provide self-consistent alert schema version numbers
  • Loading branch information
bsmartradio authored Aug 27, 2024
2 parents 80b54cb + ee93394 commit dfe88ab
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 106 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ WORKDIR /app
RUN pip install --no-cache-dir .

ENTRYPOINT ["sh", "-c"]
CMD "syncLatestSchemaToRegistry.py --help"
CMD "syncAllSchemasToRegistry.py --help"
13 changes: 9 additions & 4 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ Alert schemas are located in the ``schema`` directory.

Schemas are filed according to their version number, following a ``MAJOR.MINOR`` scheme.
We maintain ``FORWARD_TRANSITIVE`` compatibility within a major version, per the `Confluent compatibility model`_.
The latest version of the schema may always be found in ``schema/latest.txt``.
However, for all schemas to exist in the same registry this is not enforced and the schema registry compatibility is set
to ``NONE``. The latest version of the schema may always be found in ``schema/latest.txt``.

Within the schema registry, versions are registered with space for two digits in the minor version. Example : 7.1
becomes 701. Version 13.10 would be 1310.

.. _Confluent compatibility model: https://docs.confluent.io/current/schema-registry/docs/avro.html#forward-compatibility

Expand Down Expand Up @@ -68,11 +72,12 @@ Update Schema Registry

When a pull request is made for a change to ``alert_packet``, a docker container image is created which needs to be synced to
the alert stream. The new docker image is present at `lsstdm/lsst_alert_packet <https://hub.docker.com/r/lsstdm/lsst_alert_packet/tags>` on dockerhub.
The image will apear with the same tag as the ticket branch you created to update ``alert_packet``.
The image will appear with the same tag as the ticket branch you created to update ``alert_packet``.

* Go to ``alert-stream-schema-registry`` in the ``alert-stream-broker`` charts folder in the ``phalanx`` github repository.
* In the `values.yaml <https://github.com/lsst-sqre/phalanx/blob/main/applications/alert-stream-broker/charts/alert-stream-schema-registry/values.yaml>` file, update the `tag` in `schemaSync` to match the docker image tag.
* If you have access to ``argocd``, sync the ``schema-registry`` instance. If the application will not sync, a full restart may be required of the `alert-stream` application.
* In the `values.yaml <https://github.com/lsst-sqre/phalanx/blob/main/applications/alert-stream-broker/charts/alert-stream-schema-registry/values.yaml>`_ file,
update the `tag` in `schemaSync` to match the docker image tag. If the image is not updating in argo, use the image digest to force an update.
* If you have access to ``argocd``, click ``sync`` at the top. A full sync is needed to load the registry with the schemas. If the application will not sync, a full restart may be required of the `alert-stream` application.
* If you do not have access, alert whoever is in charge of the ``alert-stream`` so that they can sync the registry.

More thorough instructions for updating the schema registry can be found in `DMTN-214 <https://dmtn-214.lsst.io/#updating-the-alert-schema>`
Expand Down
136 changes: 136 additions & 0 deletions python/lsst/alert/packet/bin/syncAllSchemasToRegistry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
#!/usr/bin/env python
#
# This file is part of alert_packet.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import argparse
import json
import fastavro
import requests

import lsst.alert.packet


def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--schema-registry-url",
type=str,
default="http://alert-schemas.localhost",
help="URL of a Confluent Schema Registry service",
)
parser.add_argument(
"--subject",
type=str,
default="alert-packet",
help="Schema Registry subject name to use",
)
return parser.parse_args()


def upload_schemas(registry_url, subject, schema_registry):
"""Parse schema registry and upload all schemas.
"""
for schema_id in schema_registry.known_ids:
schema = schema_registry.get_by_id(schema_id)
normalized_schema = fastavro.schema.to_parsing_canonical_form(
schema.definition)
confluent_schema = {"version": schema_id,
"id": schema_id, "schema": normalized_schema}
payload = json.dumps(confluent_schema)
headers = {"Content-Type": "application/vnd.schemaregistry.v1+json"}
url = f"{registry_url}/subjects/{subject}/versions"
print(f"uploading schema to {url}")
response = requests.post(url=url, data=payload, headers=headers)
response.raise_for_status()
print(f"done, status={response.status_code}")
print(f"response text={response.text}")


def clear_schema_registry_for_import(registry_url, subject):
"""Delete schemas in the registry and then remake it in import mode"""
# Define the URLs
url_mode = f"{registry_url}/mode/{subject}"
url_schemas = f"{registry_url}/subjects/{subject}"
url_schema_versions = f"{registry_url}/subjects/{subject}/versions"
response = requests.get(url_schema_versions)

# Schema registry must be empty to put it in import mode. If it exists,
# remove it and remake the schema. If not, continue.
if response.status_code == 200:
print('The schema will be deleted and remade in import mode.')
response = requests.delete(url_schemas)
print('Status Code:', response.status_code)
print('Response Text:', response.text)
else:
print('The schema does not exist. Creating in import mode.')

# Switch registry to import mode.
headers = {
'Content-Type': 'application/json'
}

# Define the data to send
data = {
'mode': 'IMPORT'
}

# Perform the PUT request
response = requests.put(url_mode, headers=headers, data=json.dumps(data))

# Check the status code and response
print('Status Code:', response.status_code)
print('Response Text:', response.text)


def close_schema_registry(registry_url, subject):
"""Return the schema registry from import mode to readwrite.
"""
data = {
"mode": "READWRITE"
}

# Headers to specify the content type
headers = {
'Content-Type': 'application/json'
}

url_mode = f"{registry_url}/mode/{subject}"
# Send the PUT request
response = requests.put(url_mode, json=data, headers=headers)
print(f'Status Code: {response.status_code}')
print(f'Response Text: {response.text}')


def main():
args = parse_args()
clear_schema_registry_for_import(args.schema_registry_url, args.subject)
schema_registry = lsst.alert.packet.schemaRegistry.SchemaRegistry().all_schemas_from_filesystem()
upload_schemas(
args.schema_registry_url,
subject=args.subject,
schema_registry=schema_registry
)
close_schema_registry(args.schema_registry_url, args.subject)


if __name__ == "__main__":
main()
79 changes: 0 additions & 79 deletions python/lsst/alert/packet/bin/syncLatestSchemaToRegistry.py

This file was deleted.

14 changes: 13 additions & 1 deletion python/lsst/alert/packet/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@

import io
import tempfile
import re
import fastavro
from importlib import resources
from pathlib import PurePath

from lsst.resources import ResourcePath

import fastavro

__all__ = ["get_schema_root", "get_latest_schema_version", "get_schema_path",
"Schema", "get_path_to_latest_schema", "get_schema_root_uri",
Expand Down Expand Up @@ -428,3 +430,13 @@ def from_file(cls, filename=None):
if schema['name'] == root_name)

return cls(schema_definition)

def get_schema_id(self):
"""Retrieve the schema id used in the schema registry.
"""
numbers = re.findall(r'\d+', self.definition['name'])
assert (len(numbers) == 2)
numbers[1] = str(numbers[1]).zfill(2)
schema_id = int(''.join(numbers))

return schema_id
49 changes: 30 additions & 19 deletions python/lsst/alert/packet/schemaRegistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Provide a lookup table for alert schemas.
"""Provide a lookup table for versioned alert schemas.
"""

import json
import os
import zlib

__all__ = ["SchemaRegistry"]

Expand All @@ -38,6 +36,7 @@ class SchemaRegistry:
def __init__(self):
self._version_to_id = {}
self._id_to_schema = {}
self._ids = []

def register_schema(self, schema, version):
"""Register a new schema in the registry.
Expand All @@ -61,8 +60,9 @@ def register_schema(self, schema, version):
schema_id : `int`
The ID that has been allocated to the schema.
"""
schema_id = self.calculate_id(schema)
schema_id = schema.get_schema_id()
self._version_to_id[version] = schema_id
self._ids.append(schema_id)
self._id_to_schema[schema_id] = schema
return schema_id

Expand Down Expand Up @@ -108,27 +108,19 @@ def known_versions(self):
"""
return set(self._version_to_id)

@staticmethod
def calculate_id(schema):
"""Calculate an ID for the given schema.
Parameters
----------
schema : `lsst.alert.packet.Schema`
Schema for which an ID will be derived.
@property
def known_ids(self):
"""Return all the schema ids tracked by this registry.
Returns
-------
schema_id : `int`
The calculated ID.
schemas : `list` of `int`
List of schema ids.
"""
# Significant risk of collisions with more than a few schemas;
# CRC32 is ok for prototyping but isn't sensible in production.
return zlib.crc32(json.dumps(schema.definition,
sort_keys=True).encode('utf-8'))
return set(self._ids)

@classmethod
def from_filesystem(cls, root=None, schema_root="lsst.v7_0.alert"):
def from_filesystem(cls, root=None, schema_root="lsst.v7_1.alert"):
"""Populate a schema registry based on the filesystem.
Walk the directory tree from the root provided, locating files named
Expand All @@ -147,3 +139,22 @@ def from_filesystem(cls, root=None, schema_root="lsst.v7_0.alert"):
version = ".".join(root.split("/")[-2:])
registry.register_schema(schema, version)
return registry

@classmethod
def all_schemas_from_filesystem(cls, root=None):
"""Populate a schema registry based on the filesystem.
Walk the directory tree from the root provided, locating all schemas.
"""
from .schema import Schema
from .schema import get_schema_root
if not root:
root = get_schema_root()
registry = cls()
for root, dirs, files in os.walk(root, followlinks=False):
for file in files:
if "alert.avsc" in file:
schema = Schema.from_file(os.path.join(root, file))
version = ".".join(root.split("/")[-2:])
registry.register_schema(schema, version)
return registry
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ lsst.alert.packet =
console_scripts =
validateAvroRoundTrip.py = lsst.alert.packet.bin.validateAvroRoundTrip:main
simulateAlerts.py = lsst.alert.packet.bin.simulateAlerts:main
syncLatestSchemaToRegistry.py = lsst.alert.packet.bin.syncLatestSchemaToRegistry:main
syncAllSchemasToRegistry.py = lsst.alert.packet.bin.syncAllSchemasToRegistry:main

[flake8]
max-line-length = 110
Expand Down
6 changes: 5 additions & 1 deletion test/test_schemaRegistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def write_schema(root_dir, filename, version_major, version_minor):
# Generate a new schema for each version to avoid ID collisions.
schema = {
"name": "example",
"namespace": "lsst",
"namespace": f"lsst.v{version_major}_{version_minor}",
"type": "record",
"fields": [
{"name": "field%s%s" % (version_major, version_minor),
Expand Down Expand Up @@ -75,3 +75,7 @@ def test_from_filesystem(self):
for version in versions:
registry.get_by_version(version)
self.assertRaises(KeyError, registry.get_by_version, "2.2")

for id in registry.known_ids:
registry.get_by_id(id)
self.assertRaises(KeyError, registry.get_by_id, "202")

0 comments on commit dfe88ab

Please sign in to comment.