From 1e9d105c7439a6ccb0fa5c40dbfcf414287593f8 Mon Sep 17 00:00:00 2001 From: Avgustin Marinov Date: Thu, 13 Feb 2025 11:20:32 +0200 Subject: [PATCH] SDK: Fix closing getLink streams (#2275) Signed-off-by: Avgustin Marinov --- .../eclipse/hawkbit/sdk/HawkbitClient.java | 63 ++++--------------- .../hawkbit/sdk/device/UpdateHandler.java | 4 +- .../hawkbit/sdk/dmf/UpdateHandler.java | 4 +- 3 files changed, 16 insertions(+), 55 deletions(-) diff --git a/hawkbit-sdk/hawkbit-sdk-commons/src/main/java/org/eclipse/hawkbit/sdk/HawkbitClient.java b/hawkbit-sdk/hawkbit-sdk-commons/src/main/java/org/eclipse/hawkbit/sdk/HawkbitClient.java index 199621452c..f110dfd958 100644 --- a/hawkbit-sdk/hawkbit-sdk-commons/src/main/java/org/eclipse/hawkbit/sdk/HawkbitClient.java +++ b/hawkbit-sdk/hawkbit-sdk-commons/src/main/java/org/eclipse/hawkbit/sdk/HawkbitClient.java @@ -10,13 +10,11 @@ package org.eclipse.hawkbit.sdk; import java.io.BufferedOutputStream; -import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.lang.annotation.Annotation; import java.lang.ref.Cleaner; -import java.lang.ref.SoftReference; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.ParameterizedType; @@ -39,9 +37,9 @@ import java.util.Objects; import java.util.Random; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -52,7 +50,6 @@ import feign.Request; import feign.RequestInterceptor; import feign.RequestTemplate; -import feign.Response; import feign.codec.Decoder; import feign.codec.Encoder; import feign.codec.ErrorDecoder; @@ -150,16 +147,16 @@ public T ddiService(final Class serviceType, final Tenant tenantPropertie } /** - * Downloads a link. If the returned type (linkType) is {@link ClassicHttpResponse} or {@link InputStream} then the caller is responsible - * to close the response. Otherwise, it is assumed and json object, it is deserialized and returned. + * Downloads a link. After the handler is called, the steam and all resources are closed. */ @SuppressWarnings("unchecked") - public static T getLink(final Link link, final Class linkType, final Tenant tenant, final Controller controller) throws IOException { + public static R getLink( + final Link link, final Class linkType, final Tenant tenant, final Controller controller, + final Function handler) throws IOException { final String url = link.getHref(); final HttpClientKey key = new HttpClientKey( url.startsWith("https://"), controller == null ? null : controller.getCertificate(), tenant.getTenantCA()); final HttpClient httpClient = httpClient(key); - final AtomicBoolean delegatedRelease = new AtomicBoolean(false); try { final HttpGet request = new HttpGet(url); final String gatewayToken = tenant.getGatewayToken(); @@ -177,55 +174,19 @@ public static T getLink(final Link link, final Class linkType, final Tena throw new IllegalStateException("Unexpected status code: " + response.getCode()); } + final T result; if (linkType.isAssignableFrom(ClassicHttpResponse.class)) { - delegatedRelease.set(true); - return (T)Proxy.newProxyInstance( - ClassicHttpResponse.class.getClassLoader(), new Class[] { ClassicHttpResponse.class }, - (proxy, method, args) -> { - if (ObjectUtils.isEmpty(method.getParameterTypes()) && method.getName().equals("close")) { - response.close(); - key.release(); - return null; - } else { - try { - return method.invoke(response, args); - } catch (final InvocationTargetException e) { - throw e.getCause() == null ? e : e.getCause(); - } - } - }); + result = (T)response; } else if (linkType == InputStream.class) { - final InputStream is = response.getEntity().getContent(); - delegatedRelease.set(true); - return (T)new InputStream() { - - @Override - public int read() throws IOException { - return is.read(); - } - - @Override - public int read(final byte[] b, final int off, final int read) throws IOException { - return is.read(b, off, read); - } - - @Override - public void close() throws IOException { - try { - is.close(); - } finally { - key.release(); - } - } - }; + result = (T)response.getEntity().getContent(); } else { - return new ObjectMapper().readValue(response.getEntity().getContent(), linkType); + result = new ObjectMapper().readValue(response.getEntity().getContent(), linkType); } + + return handler.apply(result); }); } finally { - if (!delegatedRelease.get()) { - key.release(); - } + key.release(); } } diff --git a/hawkbit-sdk/hawkbit-sdk-device/src/main/java/org/eclipse/hawkbit/sdk/device/UpdateHandler.java b/hawkbit-sdk/hawkbit-sdk-device/src/main/java/org/eclipse/hawkbit/sdk/device/UpdateHandler.java index ffe5b9f5e8..bff0e4d303 100644 --- a/hawkbit-sdk/hawkbit-sdk-device/src/main/java/org/eclipse/hawkbit/sdk/device/UpdateHandler.java +++ b/hawkbit-sdk/hawkbit-sdk-device/src/main/java/org/eclipse/hawkbit/sdk/device/UpdateHandler.java @@ -264,7 +264,7 @@ private UpdateStatus readAndCheckDownloadUrl(final Link link, final DdiArtifactH final Validator sizeValidator = sizeValidator(size); final Validator hashValidator = hashValidator(hash); final ArtifactHandler.DownloadHandler downloadHandler = artifactHandler.getDownloadHandler(link.getHref()); - try (final InputStream is = HawkbitClient.getLink(link, InputStream.class, ddiController.getTenant(), ddiController.getController())) { + return HawkbitClient.getLink(link, InputStream.class, ddiController.getTenant(), ddiController.getController(), is -> { try { final byte[] buff = new byte[32 * 1024]; for (int read; (read = is.read(buff)) != -1; ) { @@ -292,7 +292,7 @@ private UpdateStatus readAndCheckDownloadUrl(final Link link, final DdiArtifactH downloadHandler.finished(ArtifactHandler.DownloadHandler.Status.ERROR); return new UpdateStatus(UpdateStatus.Status.FAILURE, List.of(message)); } - } + }); } private interface Validator { diff --git a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/UpdateHandler.java b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/UpdateHandler.java index b3b24be64b..a2fa58b373 100644 --- a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/UpdateHandler.java +++ b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/UpdateHandler.java @@ -269,7 +269,7 @@ private UpdateStatus readAndCheckDownloadUrl(final Link link, final DmfArtifactH final Validator hashValidator = hashValidator(hash); final ArtifactHandler.DownloadHandler downloadHandler = artifactHandler.getDownloadHandler(link.getHref()); - try (final InputStream is = HawkbitClient.getLink(link, InputStream.class, dmfController.getTenant(), dmfController.getController())) { + return HawkbitClient.getLink(link, InputStream.class, dmfController.getTenant(), dmfController.getController(), is -> { try { final byte[] buff = new byte[32 * 1024]; for (int read; (read = is.read(buff)) != -1; ) { @@ -297,7 +297,7 @@ private UpdateStatus readAndCheckDownloadUrl(final Link link, final DmfArtifactH downloadHandler.finished(ArtifactHandler.DownloadHandler.Status.ERROR); return new UpdateStatus(DmfActionStatus.ERROR, List.of(message)); } - } + }); } private interface Validator {