Skip to content

Commit

Permalink
SDK: Fix closing getLink streams (#2275)
Browse files Browse the repository at this point in the history
Signed-off-by: Avgustin Marinov <[email protected]>
  • Loading branch information
avgustinmm authored Feb 13, 2025
1 parent df6f0e7 commit 1e9d105
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@
package org.eclipse.hawkbit.sdk;

import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.annotation.Annotation;
import java.lang.ref.Cleaner;
import java.lang.ref.SoftReference;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
Expand All @@ -39,9 +37,9 @@
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -52,7 +50,6 @@
import feign.Request;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import feign.Response;
import feign.codec.Decoder;
import feign.codec.Encoder;
import feign.codec.ErrorDecoder;
Expand Down Expand Up @@ -150,16 +147,16 @@ public <T> T ddiService(final Class<T> serviceType, final Tenant tenantPropertie
}

/**
* Downloads a link. If the returned type (linkType) is {@link ClassicHttpResponse} or {@link InputStream} then the caller is responsible
* to close the response. Otherwise, it is assumed and json object, it is deserialized and returned.
* Downloads a link. After the handler is called, the steam and all resources are closed.
*/
@SuppressWarnings("unchecked")
public static <T> T getLink(final Link link, final Class<T> linkType, final Tenant tenant, final Controller controller) throws IOException {
public static <T, R> R getLink(
final Link link, final Class<T> linkType, final Tenant tenant, final Controller controller,
final Function<T, R> handler) throws IOException {
final String url = link.getHref();
final HttpClientKey key = new HttpClientKey(
url.startsWith("https://"), controller == null ? null : controller.getCertificate(), tenant.getTenantCA());
final HttpClient httpClient = httpClient(key);
final AtomicBoolean delegatedRelease = new AtomicBoolean(false);
try {
final HttpGet request = new HttpGet(url);
final String gatewayToken = tenant.getGatewayToken();
Expand All @@ -177,55 +174,19 @@ public static <T> T getLink(final Link link, final Class<T> linkType, final Tena
throw new IllegalStateException("Unexpected status code: " + response.getCode());
}

final T result;
if (linkType.isAssignableFrom(ClassicHttpResponse.class)) {
delegatedRelease.set(true);
return (T)Proxy.newProxyInstance(
ClassicHttpResponse.class.getClassLoader(), new Class<?>[] { ClassicHttpResponse.class },
(proxy, method, args) -> {
if (ObjectUtils.isEmpty(method.getParameterTypes()) && method.getName().equals("close")) {
response.close();
key.release();
return null;
} else {
try {
return method.invoke(response, args);
} catch (final InvocationTargetException e) {
throw e.getCause() == null ? e : e.getCause();
}
}
});
result = (T)response;
} else if (linkType == InputStream.class) {
final InputStream is = response.getEntity().getContent();
delegatedRelease.set(true);
return (T)new InputStream() {

@Override
public int read() throws IOException {
return is.read();
}

@Override
public int read(final byte[] b, final int off, final int read) throws IOException {
return is.read(b, off, read);
}

@Override
public void close() throws IOException {
try {
is.close();
} finally {
key.release();
}
}
};
result = (T)response.getEntity().getContent();
} else {
return new ObjectMapper().readValue(response.getEntity().getContent(), linkType);
result = new ObjectMapper().readValue(response.getEntity().getContent(), linkType);
}

return handler.apply(result);
});
} finally {
if (!delegatedRelease.get()) {
key.release();
}
key.release();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ private UpdateStatus readAndCheckDownloadUrl(final Link link, final DdiArtifactH
final Validator sizeValidator = sizeValidator(size);
final Validator hashValidator = hashValidator(hash);
final ArtifactHandler.DownloadHandler downloadHandler = artifactHandler.getDownloadHandler(link.getHref());
try (final InputStream is = HawkbitClient.getLink(link, InputStream.class, ddiController.getTenant(), ddiController.getController())) {
return HawkbitClient.getLink(link, InputStream.class, ddiController.getTenant(), ddiController.getController(), is -> {
try {
final byte[] buff = new byte[32 * 1024];
for (int read; (read = is.read(buff)) != -1; ) {
Expand Down Expand Up @@ -292,7 +292,7 @@ private UpdateStatus readAndCheckDownloadUrl(final Link link, final DdiArtifactH
downloadHandler.finished(ArtifactHandler.DownloadHandler.Status.ERROR);
return new UpdateStatus(UpdateStatus.Status.FAILURE, List.of(message));
}
}
});
}

private interface Validator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ private UpdateStatus readAndCheckDownloadUrl(final Link link, final DmfArtifactH
final Validator hashValidator = hashValidator(hash);
final ArtifactHandler.DownloadHandler downloadHandler = artifactHandler.getDownloadHandler(link.getHref());

try (final InputStream is = HawkbitClient.getLink(link, InputStream.class, dmfController.getTenant(), dmfController.getController())) {
return HawkbitClient.getLink(link, InputStream.class, dmfController.getTenant(), dmfController.getController(), is -> {
try {
final byte[] buff = new byte[32 * 1024];
for (int read; (read = is.read(buff)) != -1; ) {
Expand Down Expand Up @@ -297,7 +297,7 @@ private UpdateStatus readAndCheckDownloadUrl(final Link link, final DmfArtifactH
downloadHandler.finished(ArtifactHandler.DownloadHandler.Status.ERROR);
return new UpdateStatus(DmfActionStatus.ERROR, List.of(message));
}
}
});
}

private interface Validator {
Expand Down

0 comments on commit 1e9d105

Please sign in to comment.