Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(*): add namespace as a parameter of the internal storage #65

Merged
merged 1 commit into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version=0.20.0-SNAPSHOT
kestraVersion=[0.18,)
kestraVersion=[0.20,)
33 changes: 18 additions & 15 deletions src/main/java/io/kestra/storage/azure/AzureStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.commons.lang3.StringUtils;
import reactor.core.publisher.Mono;

import javax.annotation.Nullable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -39,6 +40,7 @@
import java.util.stream.Stream;

import static io.kestra.core.utils.Rethrow.throwFunction;

@Builder
@Jacksonized
@NoArgsConstructor
Expand Down Expand Up @@ -86,12 +88,12 @@ private BlobAsyncClient blob(URI uri) {
}

@Override
public InputStream get(String tenantId, URI uri) throws IOException {
return this.getWithMetadata(tenantId, uri).inputStream();
public InputStream get(String tenantId, @Nullable String namespace, URI uri) throws IOException {
return this.getWithMetadata(tenantId, namespace, uri).inputStream();
}

@Override
public StorageObject getWithMetadata(String tenantId, URI uri) throws IOException {
public StorageObject getWithMetadata(String tenantId, @Nullable String namespace, URI uri) throws IOException {
try {
BlobAsyncClient blobClient = this.blob(getURI(tenantId, uri));

Expand All @@ -108,7 +110,7 @@ public StorageObject getWithMetadata(String tenantId, URI uri) throws IOExceptio
}

@Override
public List<URI> allByPrefix(String tenantId, URI prefix, boolean includeDirectories) {
public List<URI> allByPrefix(String tenantId, @Nullable String namespace, URI prefix, boolean includeDirectories) {
String path = getPath(tenantId, prefix);
String prefixPath = prefix.getPath();
Stream<String> allKeys = keysForPrefix(path, true, includeDirectories);
Expand All @@ -119,7 +121,7 @@ public List<URI> allByPrefix(String tenantId, URI prefix, boolean includeDirecto
}

@Override
public List<FileAttributes> list(String tenantId, URI uri) throws IOException {
public List<FileAttributes> list(String tenantId, @Nullable String namespace, URI uri) throws IOException {
String path = getPath(tenantId, uri);
String prefix = (path.endsWith("/")) ? path : path + "/";

Expand Down Expand Up @@ -160,7 +162,7 @@ private Stream<String> keysForPrefix(String prefix, boolean recursive, boolean i
}

@Override
public boolean exists(String tenantId, URI uri) {
public boolean exists(String tenantId, @Nullable String namespace, URI uri) {
try {
URI uriToCheck = uri;
if (uri.getPath().endsWith("/")) {
Expand All @@ -186,7 +188,7 @@ private boolean exists(String path) {
}

@Override
public FileAttributes getAttributes(String tenantId, URI uri) throws IOException {
public FileAttributes getAttributes(String tenantId, @Nullable String namespace, URI uri) throws IOException {
String path = getPath(tenantId, uri);
return getFileAttributes(path);
}
Expand All @@ -212,7 +214,7 @@ private FileAttributes getFileAttributes(String path) throws FileNotFoundExcepti
}

@Override
public URI put(String tenantId, URI uri, StorageObject storageObject) throws IOException {
public URI put(String tenantId, @Nullable String namespace, URI uri, StorageObject storageObject) throws IOException {
try {
URI path = getURI(tenantId, uri);
BlobAsyncClient blobClient = this.blob(path);
Expand All @@ -232,10 +234,11 @@ public URI put(String tenantId, URI uri, StorageObject storageObject) throws IOE
}
}

public boolean delete(String tenantId, URI uri) throws IOException {
@Override
public boolean delete(String tenantId, @Nullable String namespace, URI uri) throws IOException {
String path = getPath(tenantId, uri);
if (this.dirExists(path)) {
return !deleteByPrefix(tenantId, uri).isEmpty();
return !deleteByPrefix(tenantId, namespace, uri).isEmpty();
}
BlobAsyncClient blobClient = this.blobContainerClient.getBlobAsyncClient(path);
if (!block(blobClient.exists())) {
Expand All @@ -246,7 +249,7 @@ public boolean delete(String tenantId, URI uri) throws IOException {
}

@Override
public URI createDirectory(String tenantId, URI uri) throws IOException {
public URI createDirectory(String tenantId, @Nullable String namespace, URI uri) throws IOException {
String path = getPath(tenantId, uri);
if (!StringUtils.endsWith(path, "/")) {
path += "/";
Expand Down Expand Up @@ -282,8 +285,8 @@ private void mkdirs(String path) throws IOException {
}

@Override
public URI move(String tenantId, URI from, URI to) throws IOException {
if (!exists(tenantId, from)) {
public URI move(String tenantId, @Nullable String namespace, URI from, URI to) throws IOException {
if (!exists(tenantId, namespace, from)) {
throw new FileNotFoundException(from + " (File not found)");
}

Expand All @@ -305,12 +308,12 @@ public URI move(String tenantId, URI from, URI to) throws IOException {
Duration.ofSeconds(1)).getSyncPoller();
poller.waitForCompletion();
}
deleteByPrefix(tenantId, from);
deleteByPrefix(tenantId, namespace, from);
return URI.create("kestra://" + from.getPath());
}

@Override
public List<URI> deleteByPrefix(String tenantId, URI storagePrefix) throws IOException {
public List<URI> deleteByPrefix(String tenantId, @Nullable String namespace, URI storagePrefix) throws IOException {
try {
String path = getPath(tenantId, storagePrefix);
if (!path.endsWith("/")) {
Expand Down
Loading