From ede5c8d313f09ee53a9a191501acb278bced704c Mon Sep 17 00:00:00 2001 From: Lubos Mjachky Date: Mon, 27 Nov 2023 19:28:09 +0100 Subject: [PATCH] Address comments from the reviewer [noissue] --- CHANGES/507.feature | 4 +- docs/workflows/host.rst | 31 +- .../0037_create_pull_through_cache_models.py | 33 ++- pulp_container/app/models.py | 27 ++ pulp_container/app/registry.py | 276 +++++++++++++++--- pulp_container/app/registry_api.py | 47 ++- pulp_container/app/serializers.py | 56 +++- .../app/tasks/download_image_data.py | 51 +--- .../functional/api/test_pull_through_cache.py | 111 +++++-- requirements.txt | 2 +- 10 files changed, 460 insertions(+), 178 deletions(-) diff --git a/CHANGES/507.feature b/CHANGES/507.feature index df2a3bbd8..037ccbb01 100644 --- a/CHANGES/507.feature +++ b/CHANGES/507.feature @@ -1,3 +1,3 @@ Added support for pull-through caching. Users can now configure a dedicated distribution and remote -linked to an external registry without specifying a repository name (upstream name). Pulp downloads -missing content automatically if requested and acts as a caching proxy. +linked to an external registry without the need to create and mirror repositories in advance. Pulp +downloads missing content automatically if requested and acts as a caching proxy. diff --git a/docs/workflows/host.rst b/docs/workflows/host.rst index c8225fffe..185cb4014 100644 --- a/docs/workflows/host.rst +++ b/docs/workflows/host.rst @@ -123,32 +123,35 @@ Pull-Through Caching -------------------- The Pull-Through Caching feature offers an alternative way to host content by leveraging a **remote -registry** as the source of truth. This eliminates the need for repository synchronization, reducing -storage overhead, and ensuring up-to-date images. Pulp acts as a **caching proxy** and stores images -in a local repository. +registry** as the source of truth. This eliminates the need for in-advance repository +synchronization because Pulp acts as a **caching proxy** and stores images, after they have been +pulled by an end client, in a local repository. -Administering the caching:: +Configuring the caching:: # initialize a pull-through remote (the concept of upstream-name is not applicable here) REMOTE_HREF=$(http ${BASE_ADDR}/pulp/api/v3/remotes/container/pull-through/ name=docker-cache url=https://registry-1.docker.io | jq -r ".pulp_href") - # create a specialized distribution linked to the initialized remote + # create a pull-through distribution linked to the initialized remote http ${BASE_ADDR}/pulp/api/v3/distributions/container/pull-through/ remote=${REMOTE_HREF} name=docker-cache base_path=docker-cache -Downloading content:: +Pulling content:: podman pull localhost:24817/docker-cache/library/busybox -In the example above, the image "busybox" is pulled from the "docker-cache" distribution, acting as -a transparent caching layer. +In the example above, the image "busybox" is pulled from *DockerHub* through the "docker-cache" +distribution, acting as a transparent caching layer. -By incorporating the Pull-Through Caching feature, administrators can **reduce external network -dependencies**, and ensure a more reliable and responsive container deployment system in production -environments. +By incorporating the Pull-Through Caching feature into standard workflows, users **do not need** to +pre-configure a new repository and sync it to facilitate the retrieval of the actual content. This +speeds up the whole process of shipping containers from its early management stages to distribution. +Similarly to on-demand syncing, the feature also **reduces external network dependencies**, and +ensures a more reliable container deployment system in production environments. .. note:: - Pulp creates repositories that maintain a single repository version for user-pulled images. + Pulp creates repositories that maintain a single repository version for pulled images. Thus, only the latest repository version is retained. For instance, when pulling "debian:10," - a "debian" repository with the "10" tag is established. Subsequent pulls such as "debian:11" + a "debian" repository with the "10" tag is created. Subsequent pulls such as "debian:11" result in a new repository version that incorporates both tags while removing the previous - version. Repositories and their content remain manageable through standard API endpoints. + version. Repositories and their content remain manageable through standard Pulp API endpoints. + With that, no content can be pushed to these repositories. diff --git a/pulp_container/app/migrations/0037_create_pull_through_cache_models.py b/pulp_container/app/migrations/0037_create_pull_through_cache_models.py index b247a6cfa..f9ffe2cc2 100644 --- a/pulp_container/app/migrations/0037_create_pull_through_cache_models.py +++ b/pulp_container/app/migrations/0037_create_pull_through_cache_models.py @@ -1,4 +1,4 @@ -# Generated by Django 4.2.6 on 2023-10-25 20:04 +# Generated by Django 4.2.7 on 2023-12-08 16:14 from django.db import migrations, models import django.db.models.deletion @@ -8,32 +8,45 @@ class Migration(migrations.Migration): dependencies = [ - ('core', '0108_task_versions'), + ('core', '0116_alter_remoteartifact_md5_alter_remoteartifact_sha1_and_more'), ('container', '0036_containerpushrepository_pending_blobs_manifests'), ] operations = [ migrations.CreateModel( - name='ContainerPullThroughDistribution', + name='ContainerPullThroughRemote', fields=[ - ('distribution_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='core.distribution')), + ('remote_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='core.remote')), ], options={ - 'permissions': [('manage_roles_containerpullthroughdistribution', 'Can manage role assignments on pull-through cache distribution')], + 'permissions': [('manage_roles_containerpullthroughremote', 'Can manage role assignments on pull-through container remote')], 'default_related_name': '%(app_label)s_%(model_name)s', }, - bases=('core.distribution', pulpcore.app.models.access_policy.AutoAddObjPermsMixin), + bases=('core.remote', pulpcore.app.models.access_policy.AutoAddObjPermsMixin), + ), + migrations.AddField( + model_name='containerrepository', + name='pending_blobs', + field=models.ManyToManyField(to='container.blob'), + ), + migrations.AddField( + model_name='containerrepository', + name='pending_manifests', + field=models.ManyToManyField(to='container.manifest'), ), migrations.CreateModel( - name='ContainerPullThroughRemote', + name='ContainerPullThroughDistribution', fields=[ - ('remote_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='core.remote')), + ('distribution_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='core.distribution')), + ('private', models.BooleanField(default=False, help_text='Restrict pull access to explicitly authorized users. Related distributions inherit this value. Defaults to unrestricted pull access.')), + ('description', models.TextField(null=True)), + ('namespace', models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, related_name='container_pull_through_distributions', to='container.containernamespace')), ], options={ - 'permissions': [('manage_roles_containerpullthroughremote', 'Can manage role assignments on pull-through container remote')], + 'permissions': [('manage_roles_containerpullthroughdistribution', 'Can manage role assignments on pull-through cache distribution')], 'default_related_name': '%(app_label)s_%(model_name)s', }, - bases=('core.remote', pulpcore.app.models.access_policy.AutoAddObjPermsMixin), + bases=('core.distribution', pulpcore.app.models.access_policy.AutoAddObjPermsMixin), ), migrations.AddField( model_name='containerdistribution', diff --git a/pulp_container/app/models.py b/pulp_container/app/models.py index dea2db436..1014559f0 100644 --- a/pulp_container/app/models.py +++ b/pulp_container/app/models.py @@ -609,6 +609,8 @@ class ContainerRepository( manifest_signing_service = models.ForeignKey( ManifestSigningService, on_delete=models.SET_NULL, null=True ) + pending_blobs = models.ManyToManyField(Blob) + pending_manifests = models.ManyToManyField(Manifest) class Meta: default_related_name = "%(app_label)s_%(model_name)s" @@ -632,6 +634,15 @@ def finalize_new_version(self, new_version): """ remove_duplicates(new_version) validate_repo_version(new_version) + self.remove_pending_content(new_version) + + def remove_pending_content(self, repository_version): + """Remove pending blobs and manifests when committing the content to the repository.""" + added_content = repository_version.added( + base_version=repository_version.base_version + ).values_list("pk") + self.pending_blobs.remove(*Blob.objects.filter(pk__in=added_content)) + self.pending_manifests.remove(*Manifest.objects.filter(pk__in=added_content)) class ContainerPushRepository(Repository, AutoAddObjPermsMixin): @@ -695,6 +706,22 @@ class ContainerPullThroughDistribution(Distribution, AutoAddObjPermsMixin): TYPE = "pull-through" + namespace = models.ForeignKey( + ContainerNamespace, + on_delete=models.CASCADE, + related_name="container_pull_through_distributions", + null=True, + ) + private = models.BooleanField( + default=False, + help_text=_( + "Restrict pull access to explicitly authorized users. " + "Related distributions inherit this value. " + "Defaults to unrestricted pull access." + ), + ) + description = models.TextField(null=True) + class Meta: default_related_name = "%(app_label)s_%(model_name)s" permissions = [ diff --git a/pulp_container/app/registry.py b/pulp_container/app/registry.py index e8b7d7bbc..8262552cf 100644 --- a/pulp_container/app/registry.py +++ b/pulp_container/app/registry.py @@ -1,27 +1,28 @@ -import time import json import logging import os from asgiref.sync import sync_to_async +from tempfile import NamedTemporaryFile +from contextlib import suppress from urllib.parse import urljoin from aiohttp import web -from aiohttp.web_exceptions import HTTPTooManyRequests from django_guid import set_guid from django_guid.utils import generate_guid from django.conf import settings from django.core.exceptions import ObjectDoesNotExist +from django.db import IntegrityError from multidict import MultiDict from pulpcore.plugin.content import Handler, PathNotResolved -from pulpcore.plugin.models import Content, ContentArtifact, Task +from pulpcore.plugin.models import Artifact, RemoteArtifact, Content, ContentArtifact from pulpcore.plugin.content import ArtifactResponse from pulpcore.plugin.tasking import dispatch from pulp_container.app.cache import RegistryContentCache -from pulp_container.app.models import ContainerDistribution, Tag, Blob +from pulp_container.app.models import ContainerDistribution, Tag, Blob, Manifest, BlobManifest from pulp_container.app.schema_convert import Schema2toSchema1ConverterWrapper from pulp_container.app.tasks import download_image_data from pulp_container.app.utils import ( @@ -34,10 +35,6 @@ log = logging.getLogger(__name__) -v2_headers = MultiDict() -v2_headers["Docker-Distribution-API-Version"] = "registry/2.0" - - class Registry(Handler): """ A set of handlers for the Container Registry v2 API. @@ -121,6 +118,7 @@ async def get_tag(self, request): distribution = await sync_to_async(self._match_distribution)(path, add_trailing_slash=False) await sync_to_async(self._permit)(request, distribution) repository_version = await sync_to_async(distribution.get_repository_version)() + repository = await repository_version.repository.acast() if not repository_version: raise PathNotResolved(tag_name) accepted_media_types = get_accepted_media_types(request.headers) @@ -130,7 +128,8 @@ async def get_tag(self, request): pk__in=await sync_to_async(repository_version.get_content)(), name=tag_name ) except ObjectDoesNotExist: - if distribution.remote: + distribution = await distribution.acast() + if distribution.remote and distribution.pull_through_distribution_id: remote = await distribution.remote.acast() relative_url = "/v2/{name}/manifests/{tag}".format( @@ -141,7 +140,7 @@ async def get_tag(self, request): response = await downloader.run(extra_data={"headers": V2_ACCEPT_HEADERS}) set_guid(generate_guid()) - task = await sync_to_async(dispatch)( + await sync_to_async(dispatch)( download_image_data, exclusive_resources=[repository_version.repository], kwargs={ @@ -152,45 +151,84 @@ async def get_tag(self, request): }, ) - # waiting shortly for the task to be completed since a container client could - # request related content (i.e., manifests and blobs) and halt the pull operation - # if the content was not immediately served - for dummy in range(3): - time.sleep(1) - task = await Task.objects.aget(pk=task.pk) - if task.state == "completed": - await task.adelete() - break - elif task.state in ["waiting", "running"]: - continue - else: - error = task.error - await task.adelete() - raise Exception(str(error)) - else: - raise HTTPTooManyRequests() - try: manifest_data = json.loads(response.data) except json.decoder.JSONDecodeError: raise PathNotResolved(tag_name) else: - encoded_data = response.data.encode("utf-8") - digest = calculate_digest(encoded_data) media_type = determine_media_type(manifest_data, response) + if media_type in (MEDIA_TYPE.MANIFEST_V1_SIGNED, MEDIA_TYPE.MANIFEST_V1): + encoded_data = response.data.encode("utf-8") + digest = calculate_digest(encoded_data) + else: + # digest = response.artifact_attributes["sha256"] + encoded_data = response.data.encode("utf-8") + digest = calculate_digest(encoded_data) + + if media_type not in (MEDIA_TYPE.MANIFEST_LIST, MEDIA_TYPE.INDEX_OCI): + await self.save_manifest_and_blobs( + digest, manifest_data, media_type, remote, repository, response + ) response_headers = { "Content-Type": media_type, "Docker-Content-Digest": digest, + "Docker-Distribution-API-Version": "registry/2.0", } - # at this time, the manifest artifact was already established, and we can return it - # as it is; meanwhile, the dispatched task has created Manifest/Blob objects and - # relations between them; the said content units are streamed/downloaded on demand - # to a client on a next run return web.Response(text=response.data, headers=response_headers) else: raise PathNotResolved(tag_name) + else: + if distribution.remote and distribution.pull_through_distribution_id: + # check if the content was updated on the remove and stream it back + remote = await distribution.remote.acast() + relative_url = "/v2/{name}/manifests/{tag}".format( + name=remote.namespaced_upstream_name, tag=tag_name + ) + tag_url = urljoin(remote.url, relative_url) + downloader = remote.get_in_memory_downloader(url=tag_url) + response = await downloader.run(extra_data={"headers": V2_ACCEPT_HEADERS}) + + try: + manifest_data = json.loads(response.data) + except json.decoder.JSONDecodeError: + raise PathNotResolved(tag_name) + + media_type = determine_media_type(manifest_data, response) + if media_type in (MEDIA_TYPE.MANIFEST_V1_SIGNED, MEDIA_TYPE.MANIFEST_V1): + encoded_data = response.data.encode("utf-8") + digest = calculate_digest(encoded_data) + else: + # TODO: in_memory_downloader does not have artifact_attributes + encoded_data = response.data.encode("utf-8") + digest = calculate_digest(encoded_data) + + if tag.tagged_manifest.digest != digest: + set_guid(generate_guid()) + await sync_to_async(dispatch)( + download_image_data, + exclusive_resources=[repository_version.repository], + kwargs={ + "repository_pk": repository_version.repository.pk, + "remote_pk": remote.pk, + "tag_name": tag_name, + "response_data": response.data, + }, + ) + + if media_type not in (MEDIA_TYPE.MANIFEST_LIST, MEDIA_TYPE.INDEX_OCI): + await self.save_manifest_and_blobs( + digest, manifest_data, media_type, remote, repository, response + ) + + response_headers = { + "Content-Type": media_type, + "Docker-Content-Digest": digest, + "Docker-Distribution-API-Version": "registry/2.0", + } + + return web.Response(text=response.data, headers=response_headers) # we do not convert OCI to docker oci_mediatypes = [MEDIA_TYPE.MANIFEST_OCI, MEDIA_TYPE.INDEX_OCI] @@ -226,9 +264,121 @@ async def get_tag(self, request): # convert if necessary return await Registry.dispatch_converted_schema(tag, accepted_media_types, path) + async def save_manifest_and_blobs( + self, digest, manifest_data, media_type, remote, repository, response + ): + config_digest = manifest_data["config"]["digest"] + config_blob = await self.save_config_blob(config_digest, remote) + await sync_to_async(repository.pending_blobs.add)(config_blob) + + manifest = Manifest( + digest=digest, + schema_version=2, + media_type=media_type, + config_blob=config_blob, + ) + try: + await manifest.asave() + except IntegrityError: + manifest = await Manifest.objects.aget(digest=manifest.digest) + await sync_to_async(manifest.touch)() + await sync_to_async(repository.pending_manifests.add)(manifest) + + for layer in manifest_data["layers"]: + digest = layer["digest"] + + blob = Blob(digest=digest) + try: + await blob.asave() + except IntegrityError: + blob = await Blob.objects.aget(digest=digest) + await sync_to_async(blob.touch)() + await sync_to_async(repository.pending_blobs.add)(blob) + + bm_rel = BlobManifest(manifest=manifest, manifest_blob=blob) + with suppress(IntegrityError): + await bm_rel.asave() + + ca = ContentArtifact( + content=blob, + artifact=None, + relative_path=digest, + ) + with suppress(IntegrityError): + await ca.asave() + + relative_url = "/v2/{name}/blobs/{digest}".format( + name=remote.namespaced_upstream_name, digest=digest + ) + blob_url = urljoin(remote.url, relative_url) + ra = RemoteArtifact( + url=blob_url, + sha256=digest[len("sha256:") :], + content_artifact=ca, + remote=remote, + ) + with suppress(IntegrityError): + await ra.asave() + + with NamedTemporaryFile(mode="w", dir=settings.WORKING_DIRECTORY, delete=False) as tmp_file: + tmp_file.write(response.data) + tmp_file.flush() + try: + artifact = Artifact.init_and_validate(tmp_file.name) + await artifact.asave() + assert artifact.sha256 == manifest.digest[len("sha256:") :] + except IntegrityError: + artifact = await Artifact.objects.aget(sha256=artifact.sha256) + await sync_to_async(artifact.touch)() + + content_artifact = ContentArtifact( + artifact=artifact, content=manifest, relative_path=manifest.digest + ) + with suppress(IntegrityError): + await content_artifact.asave() + + async def save_config_blob(self, config_digest, remote): + relative_url = "/v2/{name}/blobs/{digest}".format( + name=remote.namespaced_upstream_name, digest=config_digest + ) + blob_url = urljoin(remote.url, relative_url) + downloader = remote.get_in_memory_downloader(url=blob_url) + response = await downloader.run() + + with NamedTemporaryFile(mode="w", dir=settings.WORKING_DIRECTORY, delete=False) as tmp_file: + tmp_file.write(response.data) + tmp_file.flush() + try: + config_blob_artifact = Artifact.init_and_validate(tmp_file.name) + await config_blob_artifact.asave() + assert config_blob_artifact.sha256 == config_digest[len("sha256:") :] + except IntegrityError: + config_blob_artifact = await Artifact.objects.aget( + sha256=config_blob_artifact.sha256 + ) + await sync_to_async(config_blob_artifact.touch)() + + config_blob = Blob(digest=config_digest) + try: + await config_blob.asave() + except IntegrityError: + config_blob = await Blob.objects.aget(digest=config_digest) + await sync_to_async(config_blob.touch)() + + content_artifact = ContentArtifact( + content=config_blob, + artifact=config_blob_artifact, + relative_path=config_digest, + ) + with suppress(IntegrityError): + await content_artifact.asave() + + return config_blob + async def dispatch_tag(self, request, tag, response_headers): """ - Finds an artifact associated with a Tag and sends it to the client. + Finds an artifact associated with a Tag and sends it to the client, otherwise tries + to stream it. Args: request(:class:`~aiohttp.web.Request`): The request to prepare a response for. @@ -241,8 +391,13 @@ async def dispatch_tag(self, request, tag, response_headers): streamed back to the client. """ - artifact = await sync_to_async(tag.tagged_manifest._artifacts.get)() - return await Registry._dispatch(artifact, response_headers) + try: + artifact = await tag.tagged_manifest._artifacts.aget() + except ObjectDoesNotExist: + ca = await sync_to_async(lambda x: x[0])(tag.tagged_manifest.contentartifact_set.all()) + return await self._stream_content_artifact(request, web.StreamResponse(), ca) + else: + return await Registry._dispatch(artifact, response_headers) @staticmethod async def dispatch_converted_schema(tag, accepted_media_types, path): @@ -299,11 +454,10 @@ async def get_by_digest(self, request): content = await sync_to_async(repository_version.get_content)() repository = await sync_to_async(repository_version.repository.cast)() - if repository.PUSH_ENABLED: - pending_blobs = repository.pending_blobs.values_list("pk") - pending_manifests = repository.pending_manifests.values_list("pk") - pending_content = pending_blobs.union(pending_manifests) - content |= Content.objects.filter(pk__in=pending_content) + pending_blobs = repository.pending_blobs.values_list("pk") + pending_manifests = repository.pending_manifests.values_list("pk") + pending_content = pending_blobs.union(pending_manifests) + content |= Content.objects.filter(pk__in=pending_content) ca = await ContentArtifact.objects.select_related("artifact", "content").aget( content__in=content, relative_path=digest @@ -318,7 +472,43 @@ async def get_by_digest(self, request): "Docker-Content-Digest": ca_content.digest, } except ObjectDoesNotExist: - raise PathNotResolved(path) + distribution = await distribution.acast() + if distribution.remote and distribution.pull_through_distribution_id: + remote = await distribution.remote.acast() + relative_url = "/v2/{name}/manifests/{digest}".format( + name=remote.namespaced_upstream_name, digest=digest + ) + tag_url = urljoin(remote.url, relative_url) + downloader = remote.get_in_memory_downloader(url=tag_url) + response = await downloader.run(extra_data={"headers": V2_ACCEPT_HEADERS}) + + try: + manifest_data = json.loads(response.data) + except json.decoder.JSONDecodeError: + raise PathNotResolved(digest) + + media_type = determine_media_type(manifest_data, response) + if media_type in (MEDIA_TYPE.MANIFEST_V1_SIGNED, MEDIA_TYPE.MANIFEST_V1): + encoded_data = response.data.encode("utf-8") + digest = calculate_digest(encoded_data) + else: + # TODO: in_memory_downloader does not have artifact_attributes + encoded_data = response.data.encode("utf-8") + digest = calculate_digest(encoded_data) + + if media_type not in (MEDIA_TYPE.MANIFEST_LIST, MEDIA_TYPE.INDEX_OCI): + await self.save_manifest_and_blobs( + digest, manifest_data, media_type, remote, repository, response + ) + + response_headers = { + "Content-Type": media_type, + "Docker-Content-Digest": digest, + "Docker-Distribution-API-Version": "registry/2.0", + } + return web.Response(text=response.data, headers=response_headers) + else: + raise PathNotResolved(path) else: artifact = ca.artifact if artifact: diff --git a/pulp_container/app/registry_api.py b/pulp_container/app/registry_api.py index 7af3b5c92..28365fd3d 100644 --- a/pulp_container/app/registry_api.py +++ b/pulp_container/app/registry_api.py @@ -948,13 +948,10 @@ def handle_safe_method(self, request, path, pk): if pk == EMPTY_BLOB: return redirects.redirect_to_content_app("blobs", pk) repository = repository.cast() - if repository.PUSH_ENABLED: - try: - blob = repository.pending_blobs.get(digest=pk) - blob.touch() - except models.Blob.DoesNotExist: - raise BlobNotFound(digest=pk) - else: + try: + blob = repository.pending_blobs.get(digest=pk) + blob.touch() + except models.Blob.DoesNotExist: raise BlobNotFound(digest=pk) return redirects.issue_blob_redirect(blob) @@ -993,9 +990,13 @@ def handle_safe_method(self, request, path, pk): try: tag = models.Tag.objects.get(name=pk, pk__in=repository_version.content) except models.Tag.DoesNotExist: - if distribution.remote: + distribution = distribution.cast() + if distribution.remote and distribution.pull_through_distribution_id: remote = distribution.remote.cast() repository = distribution.repository.cast() + # issue a head request first to ensure that the content exists on the remote + # source; we want to prevent immediate "not found" error responses from + # content-app: 302 (api-app) -> 404 (content-app) manifest = self.fetch_manifest(remote, repository_version, repository, pk) if manifest is None: return redirects.redirect_to_content_app("manifests", pk) @@ -1017,23 +1018,21 @@ def handle_safe_method(self, request, path, pk): manifest = models.Manifest.objects.get(digest=pk, pk__in=repository_version.content) except models.Manifest.DoesNotExist: repository = repository.cast() - if repository.PUSH_ENABLED: - # the manifest might be a part of listed manifests currently being uploaded - try: - manifest = repository.pending_manifests.get(digest=pk) - manifest.touch() - except models.Manifest.DoesNotExist: - raise ManifestNotFound(reference=pk) + # the manifest might be a part of listed manifests currently being uploaded + # or saved during the pull-through caching + try: + manifest = repository.pending_manifests.get(digest=pk) + manifest.touch() + except models.Manifest.DoesNotExist: + pass + + distribution = distribution.cast() + if distribution.remote and distribution.pull_through_distribution_id: + remote = distribution.remote.cast() + self.fetch_manifest(remote, repository_version, repository, pk) + return redirects.redirect_to_content_app("manifests", pk) else: - if distribution.remote: - remote = distribution.remote.cast() - manifest = self.fetch_manifest(remote, repository_version, repository, pk) - if manifest is None: - return redirects.redirect_to_content_app("manifests", pk) - - raise ManifestNotFound(reference=pk) - else: - raise ManifestNotFound(reference=pk) + raise ManifestNotFound(reference=pk) return redirects.issue_manifest_redirect(manifest) diff --git a/pulp_container/app/serializers.py b/pulp_container/app/serializers.py index 19c6f2642..54a0e9eb5 100644 --- a/pulp_container/app/serializers.py +++ b/pulp_container/app/serializers.py @@ -282,15 +282,10 @@ class ContainerPullThroughRemoteSerializer(RemoteSerializer): A serializer for a remote used in the pull-through distribution. """ - policy = serializers.ChoiceField( - help_text="The policy always mimics the on_demand behaviour when performing pull-through.", - choices=((models.Remote.ON_DEMAND, "When syncing, download just the metadata.")), - default=models.Remote.ON_DEMAND, - ) - class Meta: fields = RemoteSerializer.Meta.fields model = models.ContainerPullThroughRemote + read_only_fields = ["policy"] class ContainerDistributionSerializer(DistributionSerializer, GetOrCreateSerializerMixin): @@ -329,7 +324,7 @@ class ContainerDistributionSerializer(DistributionSerializer, GetOrCreateSeriali required=False, help_text=_("Remote that can be used to fetch content when using pull-through caching."), view_name_pattern=r"remotes(-.*/.*)?-detail", - queryset=models.ContainerRemote.objects.all(), + read_only=True, ) def validate(self, data): @@ -399,6 +394,19 @@ class ContainerPullThroughDistributionSerializer(DistributionSerializer): view_name_pattern=r"remotes(-.*/.*)-detail", queryset=models.ContainerPullThroughRemote.objects.all(), ) + namespace = RelatedField( + required=False, + read_only=True, + view_name="pulp_container/namespaces-detail", + help_text=_("Namespace this distribution belongs to."), + ) + content_guard = DetailRelatedField( + required=False, + help_text=_("An optional content-guard. If none is specified, a default one will be used."), + view_name=r"contentguards-container/content-redirect-detail", + queryset=ContentRedirectContentGuard.objects.all(), + allow_null=False, + ) distributions = DetailRelatedField( many=True, help_text="Distributions created after pulling content through cache", @@ -406,10 +414,36 @@ class ContainerPullThroughDistributionSerializer(DistributionSerializer): queryset=models.ContainerDistribution.objects.all(), required=False, ) + description = serializers.CharField( + help_text=_("An optional description."), required=False, allow_null=True + ) + + def validate(self, data): + validated_data = super().validate(data) + + if "content_guard" not in validated_data: + validated_data["content_guard"] = ContentRedirectContentGuardSerializer.get_or_create( + {"name": "content redirect"} + ) + + base_path = validated_data.get("base_path") + if base_path: + namespace_name = base_path.split("/")[0] + validated_data["namespace"] = ContainerNamespaceSerializer.get_or_create( + {"name": namespace_name} + ) + + return validated_data class Meta: model = models.ContainerPullThroughDistribution - fields = DistributionSerializer.Meta.fields + ("remote", "distributions") + fields = tuple(set(DistributionSerializer.Meta.fields) - {"base_url"}) + ( + "remote", + "distributions", + "namespace", + "private", + "description", + ) class TagOperationSerializer(ValidateFieldsMixin, serializers.Serializer): @@ -758,6 +792,12 @@ class ContainerRepositorySyncURLSerializer(RepositorySyncURLSerializer): Serializer for Container Sync. """ + remote = DetailRelatedField( + required=False, + view_name_pattern=r"remotes(-.*/.*)-detail", + queryset=models.ContainerRemote.objects.all(), + help_text=_("A remote to sync from. This will override a remote set on repository."), + ) signed_only = serializers.BooleanField( required=False, default=False, diff --git a/pulp_container/app/tasks/download_image_data.py b/pulp_container/app/tasks/download_image_data.py index f4076a391..4ba894bc9 100644 --- a/pulp_container/app/tasks/download_image_data.py +++ b/pulp_container/app/tasks/download_image_data.py @@ -9,32 +9,24 @@ from django.db import IntegrityError from pulpcore.plugin.models import Artifact -from pulpcore.plugin.stages import ( - ArtifactDownloader, - ArtifactSaver, - DeclarativeContent, - DeclarativeVersion, - RemoteArtifactSaver, - ResolveContentFutures, - QueryExistingArtifacts, - QueryExistingContents, -) +from pulpcore.plugin.stages import DeclarativeContent from pulp_container.app.models import ContainerRemote, ContainerRepository, Tag from pulp_container.app.utils import determine_media_type_from_json from pulp_container.constants import MEDIA_TYPE -from .sync_stages import ContainerContentSaver, ContainerFirstStage +from .synchronize import ContainerDeclarativeVersion +from .sync_stages import ContainerFirstStage log = logging.getLogger(__name__) def download_image_data(repository_pk, remote_pk, tag_name, response_data): repository = ContainerRepository.objects.get(pk=repository_pk) - remote = ContainerRemote.objects.get(pk=remote_pk).cast() + remote = ContainerRemote.objects.get(pk=remote_pk) log.info("Pulling cache: repository={r} remote={p}".format(r=repository.name, p=remote.name)) first_stage = ContainerPullThroughFirstStage(remote, tag_name, response_data) - dv = ContainerPullThroughCacheDeclarativeVersion(first_stage, repository, mirror=False) + dv = ContainerDeclarativeVersion(first_stage, repository) return dv.create() @@ -52,7 +44,7 @@ async def run(self): tag_dc = DeclarativeContent(Tag(name=self.tag_name)) content_data = json.loads(self.response_data) - with NamedTemporaryFile("w") as temp_file: + with NamedTemporaryFile(dir=".", mode="w", delete=False) as temp_file: temp_file.write(self.response_data) temp_file.flush() @@ -114,34 +106,3 @@ async def run(self): tagged_manifest_dc = tag_dc.extra_data["tagged_manifest_dc"] tag_dc.content.tagged_manifest = await tagged_manifest_dc.resolution() await self.put(tag_dc) - - -class ContainerPullThroughCacheDeclarativeVersion(DeclarativeVersion): - """ - Subclassed Declarative version that creates a pipeline for caching remote content. - """ - - def pipeline_stages(self, new_version): - """ - Define the "architecture" of caching remote content. - - Args: - new_version (:class:`~pulpcore.plugin.models.RepositoryVersion`): The - new repository version that is going to be built. - - Returns: - list: List of :class:`~pulpcore.plugin.stages.Stage` instances - - """ - pipeline = [ - self.first_stage, - QueryExistingArtifacts(), - ArtifactDownloader(), - ArtifactSaver(), - QueryExistingContents(), - ContainerContentSaver(), - RemoteArtifactSaver(), - ResolveContentFutures(), - ] - - return pipeline diff --git a/pulp_container/tests/functional/api/test_pull_through_cache.py b/pulp_container/tests/functional/api/test_pull_through_cache.py index bc0bb7244..c8ad66c5c 100644 --- a/pulp_container/tests/functional/api/test_pull_through_cache.py +++ b/pulp_container/tests/functional/api/test_pull_through_cache.py @@ -1,3 +1,4 @@ +import time import subprocess import pytest @@ -7,6 +8,7 @@ REGISTRY_V2, REGISTRY_V2_FEED_URL, PULP_HELLO_WORLD_REPO, + PULP_FIXTURE_1, ) @@ -27,7 +29,8 @@ def pull_through_distribution( return distribution -def test_image_pull( +@pytest.fixture +def pull_and_verify( add_to_cleanup, container_pull_through_distribution_api, container_distribution_api, @@ -36,51 +39,97 @@ def test_image_pull( container_tag_api, registry_client, local_registry, - pull_through_distribution, ): - remote_image_path = f"{REGISTRY_V2}/{PULP_HELLO_WORLD_REPO}" - registry_client.pull(f"{remote_image_path}:latest") - remote_image = registry_client.inspect(remote_image_path) + def _pull_and_verify(image_path, pull_through_distribution): + remote_image_path = f"{REGISTRY_V2}/{image_path}" + local_image_path = f"{pull_through_distribution.base_path}/{image_path}" - local_image_path = f"{pull_through_distribution.base_path}/{PULP_HELLO_WORLD_REPO}" - local_registry.pull(f"{local_image_path}:latest") - local_image = local_registry.inspect(local_image_path) + # 1. pull remote content through the Pulp distribution + local_registry.pull(local_image_path) + local_image = local_registry.inspect(local_image_path) + + # when the client pulls the image, a repository, distribution, and remote is created in + # the background; therefore, scheduling the cleanup for these entities is necessary in case + # further assertion fails + path = local_image_path.split(":")[0] + repository = container_repository_api.list(name=path).results[0] + add_to_cleanup(container_repository_api, repository.pulp_href) + remote = container_remote_api.list(name=path).results[0] + add_to_cleanup(container_remote_api, remote.pulp_href) + distribution = container_distribution_api.list(name=path).results[0] + add_to_cleanup(container_distribution_api, distribution.pulp_href) + + # 2. verify if the pulled content is the same as on the remote + registry_client.pull(remote_image_path) + remote_image = registry_client.inspect(remote_image_path) + assert local_image[0]["Id"] == remote_image[0]["Id"] + + pull_through_distribution = container_pull_through_distribution_api.list( + name=pull_through_distribution.name + ).results[0] + assert [distribution.pulp_href] == pull_through_distribution.distributions + + for _ in range(5): + repository = container_repository_api.list(name=path).results[0] + if f"{repository.pulp_href}versions/1/" == repository.latest_version_href: + break + + # there might be still the saving process running + time.sleep(1) + else: + assert False, "The repository was not updated with the cached content." + + # 3. test if pulling the same content twice works + local_registry.pull(local_image_path) - # when the client pulls the image, a repository, distribution, and remote is created in - # the background; therefore, scheduling the cleanup for these entities is necessary - repository = container_repository_api.list(name=local_image_path).results[0] - add_to_cleanup(container_repository_api, repository.pulp_href) - remote = container_remote_api.list(name=local_image_path).results[0] - add_to_cleanup(container_remote_api, remote.pulp_href) - distribution = container_distribution_api.list(name=local_image_path).results[0] - add_to_cleanup(container_distribution_api, distribution.pulp_href) + return path + + return _pull_and_verify - assert local_image[0]["Id"] == remote_image[0]["Id"] +def test_manifest_list_pull( + local_registry, + container_repository_api, + container_tag_api, + pull_through_distribution, + pull_and_verify, +): + image_path = f"{PULP_HELLO_WORLD_REPO}:latest" + local_image_path = pull_and_verify(image_path, pull_through_distribution) + + repository = container_repository_api.list(name=local_image_path).results[0] tags = container_tag_api.list(repository_version=repository.latest_version_href).results assert ["latest"] == [tag.name for tag in tags] - pull_through_distribution = container_pull_through_distribution_api.list( - name=pull_through_distribution.name - ).results[0] - assert [distribution.pulp_href] == pull_through_distribution.distributions + # test if pulling new content results into a new version, preserving the old content + local_registry.pull(f"{local_image_path}:linux") - assert f"{repository.pulp_href}versions/1/" == repository.latest_version_href + for _ in range(5): + repository = container_repository_api.list(name=local_image_path).results[0] + if f"{repository.pulp_href}versions/2/" == repository.latest_version_href: + break - # 1. test if pulling the same content twice works - local_registry.pull(f"{local_image_path}:latest") + # there might be still the saving process running + time.sleep(1) + else: + assert False, "The repository was not updated with the cached content." - repository = container_repository_api.list(name=local_image_path).results[0] - assert f"{repository.pulp_href}versions/1/" == repository.latest_version_href + tags = container_tag_api.list(repository_version=repository.latest_version_href).results + assert ["latest", "linux"] == sorted([tag.name for tag in tags]) - # 2. test if pulling new content results into a new version, preserving the old content - local_registry.pull(f"{local_image_path}:linux") - repository = container_repository_api.list(name=local_image_path).results[0] - assert f"{repository.pulp_href}versions/2/" == repository.latest_version_href +def test_manifest_pull( + container_repository_api, + container_tag_api, + pull_through_distribution, + pull_and_verify, +): + image_path = f"{PULP_FIXTURE_1}:manifest_a" + local_image_path = pull_and_verify(image_path, pull_through_distribution) + repository = container_repository_api.list(name=local_image_path).results[0] tags = container_tag_api.list(repository_version=repository.latest_version_href).results - assert ["latest", "linux"] == sorted([tag.name for tag in tags]) + assert ["manifest_a"] == [tag.name for tag in tags] def test_conflicting_names_and_paths( diff --git a/requirements.txt b/requirements.txt index 4e8130902..e78158fea 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ ecdsa>=0.14,<=0.18.0 jsonschema>=4.4,<4.21 -pulpcore>=3.40.3,<3.55 +pulpcore>=3.43.0,<3.55 pyjwkest>=1.4,<=1.4.2 pyjwt[crypto]>=2.4,<2.9