Skip to content

Commit

Permalink
Merge pull request #12 from kfirgoldberg/kfir/feature/glob_rglob_iterdir
Browse files Browse the repository at this point in the history
replacing "listdir" with glob, rglob, and iterdir
  • Loading branch information
kfirgoldberg authored Jun 16, 2024
2 parents 9b7ba9d + d9771e2 commit d01aef3
Show file tree
Hide file tree
Showing 14 changed files with 197 additions and 87 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ local_file_path = my_file.copy(force_overwrite=False) # Returns the path of the
my_dir = AnyPath("https://account_name.blob.core.windows.net/container_name/path/to/dir")
my_dir.exists() # True if my_path exists, otherwise False
parent, name, stem = my_dir.parent, my_dir.name, my_dir.stem
files_in_dir: List[AnyPath] = my_dir.listdir() # List of AnyPath instances for files in the directory
files_in_dir: List[AnyPath] = my_dir.rglob('*') # List of AnyPath instances for files in the directory

my_file = AnyPath("s3://bucket/path/to/file.txt")
my_file.is_file() # True if my_path exists, otherwise False
Expand Down
12 changes: 9 additions & 3 deletions anypathlib/anypath.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,6 @@ def is_file(self) -> bool:
def exists(self) -> bool:
return self.path_handler.exists(self.base_path)

def listdir(self) -> List['AnyPath']:
return [AnyPath(p) for p in self.path_handler.listdir(self.base_path)]

def remove(self):
self.path_handler.remove(self.base_path)

Expand All @@ -114,6 +111,15 @@ def stem(self) -> str:
def name(self) -> str:
return self.path_handler.name(self.base_path)

def iterdir(self) -> List['AnyPath']:
return [AnyPath(p) for p in self.path_handler.iterdir(self.base_path)]

def glob(self, pattern: str) -> List['AnyPath']:
return [AnyPath(p) for p in self.path_handler.glob(self.base_path, pattern)]

def rglob(self, pattern: str) -> List['AnyPath']:
return [AnyPath(p) for p in self.path_handler.rglob(self.base_path, pattern)]

def __get_local_path(self, target_path: Optional[Path] = None, force_overwrite: bool = False,
verbose: bool = False) -> Optional[Path]:
if target_path is None:
Expand Down
6 changes: 3 additions & 3 deletions anypathlib/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ def exists(path):

@click.command()
@click.option('-p', 'path', required=True, type=click.STRING, help='Path to list')
def listdir(path):
def iterdir(path):
"""List the directory. """
click.echo(AnyPath(path).listdir())
click.echo(AnyPath(path).iterdir())


@click.command()
Expand All @@ -42,7 +42,7 @@ def remove(path):

cli.add_command(copy)
cli.add_command(exists)
cli.add_command(listdir)
cli.add_command(iterdir)
cli.add_command(remove)

if __name__ == '__main__':
Expand Down
114 changes: 65 additions & 49 deletions anypathlib/path_handlers/azure_handler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import fnmatch
import os
import shutil
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional, List, Tuple
from urllib.parse import urlparse
Expand All @@ -11,7 +11,7 @@

from azure.identity import DefaultAzureCredential
from azure.mgmt.storage import StorageManagementClient
from azure.storage.blob import BlobServiceClient
from azure.storage.blob import BlobServiceClient, ContainerClient

from loguru import logger

Expand All @@ -24,20 +24,38 @@ class AzureStoragePath:
container_name: str
blob_name: str
connection_string: Optional[str] = None
_blob_service_client: Optional[BlobServiceClient] = field(init=False, default=None)
_container_client: Optional[ContainerClient] = field(init=False, default=None)

def __post_init__(self):
if self.connection_string is None:
self.connection_string = AzureHandler.get_connection_string(self.storage_account)
self._container_client = None
self._blob_service_client = None

@property
def http_url(self) -> str:
return f'https://{self.storage_account}.blob.core.windows.net/{self.container_name}/{self.blob_name}'
return f'https://{self.storage_account}.{AzureHandler.AZURE_URL_SUFFIX}/{self.container_name}/{self.blob_name}'

@property
def blob_service_client(self) -> BlobServiceClient:
if self._blob_service_client is None:
self._blob_service_client = BlobServiceClient.from_connection_string(self.connection_string)
return self._blob_service_client

@property
def container_client(cls) -> ContainerClient:
if cls._container_client is None:
cls._container_client = cls.blob_service_client.get_container_client(cls.container_name)

return cls._container_client


class AzureHandler(BasePathHandler):
DEFAULT_SUBSCRIPTION_ID = os.environ.get('AZURE_SUBSCRIPTION_ID', None)

DEFAULT_GROUP_NAME = os.environ.get('AZURE_RESOURCE_GROUP_NAME', None)
AZURE_URL_SUFFIX = r'blob.core.windows.net'

@classmethod
def refresh_credentials(cls):
Expand All @@ -58,9 +76,7 @@ def is_dir(cls, url: str) -> bool:
@classmethod
def is_file(cls, url: str) -> bool:
storage_path = cls.http_to_storage_params(url)
blob_service_client = BlobServiceClient(
account_url=f"https://{storage_path.storage_account}.blob.core.windows.net")
container_client = blob_service_client.get_container_client(storage_path.container_name)
container_client = storage_path.container_client
blob_client = container_client.get_blob_client(storage_path.blob_name)

try:
Expand All @@ -73,9 +89,7 @@ def is_file(cls, url: str) -> bool:
@classmethod
def exists(cls, url: str) -> bool:
storage_path = cls.http_to_storage_params(url)

blob_service_client = BlobServiceClient.from_connection_string(storage_path.connection_string)
container_client = blob_service_client.get_container_client(container=storage_path.container_name)
container_client = storage_path.container_client
return len([p for p in container_client.list_blobs(name_starts_with=storage_path.blob_name)]) > 0

@classmethod
Expand Down Expand Up @@ -165,7 +179,7 @@ def download_file(cls, url: str, target_path: Path, force_overwrite: bool = True
azure_storage_path = cls.http_to_storage_params(url)
# Construct the Blob Service Client
blob_service_client = BlobServiceClient(
account_url=f"https://{azure_storage_path.storage_account}.blob.core.windows.net")
account_url=f"https://{azure_storage_path.storage_account}.{cls.AZURE_URL_SUFFIX}")

# Get a client to interact with the specified container and blob
blob_client = blob_service_client.get_blob_client(container=azure_storage_path.container_name,
Expand All @@ -182,12 +196,11 @@ def download_file(cls, url: str, target_path: Path, force_overwrite: bool = True

@classmethod
def upload_file(cls, local_path: str, target_url: str):
azure_storage_path = cls.http_to_storage_params(target_url)
"""Upload a single file to Azure Blob Storage."""
blob_service_client = BlobServiceClient.from_connection_string(azure_storage_path.connection_string)

azure_storage_path = cls.http_to_storage_params(target_url)
blob_service_client = azure_storage_path.blob_service_client
container_client = azure_storage_path.container_client
# Check if the container exists and create if it does not
container_client = blob_service_client.get_container_client(azure_storage_path.container_name)
try:
container_client.get_container_properties()
except Exception as e:
Expand All @@ -200,28 +213,11 @@ def upload_file(cls, local_path: str, target_url: str):
with open(local_path, "rb") as data:
blob_client.upload_blob(data, overwrite=True)

@classmethod
def listdir(cls, url: str, with_prefix: bool = True) -> List[str]:
"""List a directory (all blobs with the same prefix) from Azure Blob Storage."""
azure_storage_path = cls.http_to_storage_params(url)
blob_service_client = BlobServiceClient.from_connection_string(azure_storage_path.connection_string)
container_client = blob_service_client.get_container_client(container=azure_storage_path.container_name)
blob_names = []
for blob in container_client.list_blobs(name_starts_with=azure_storage_path.blob_name):
blob_name = blob.name if with_prefix else blob.name.replace(azure_storage_path.blob_name, '')
blob_azure_path = AzureStoragePath(storage_account=azure_storage_path.storage_account,
container_name=azure_storage_path.container_name, blob_name=blob_name,
connection_string=azure_storage_path.connection_string)
blob_names.append(blob_azure_path.http_url)
items = [item for item in blob_names if item != url]
return items

@classmethod
def remove_directory(cls, url: str):
"""Remove a directory (all blobs with the same prefix) from Azure Blob Storage."""
azure_storage_path = cls.http_to_storage_params(url)
blob_service_client = BlobServiceClient.from_connection_string(azure_storage_path.connection_string)
container_client = blob_service_client.get_container_client(container=azure_storage_path.container_name)
container_client = azure_storage_path.container_client
for blob in container_client.list_blobs(name_starts_with=azure_storage_path.blob_name):
container_client.delete_blob(blob.name)

Expand All @@ -232,8 +228,7 @@ def remove(cls, url: str, allow_missing: bool = False):
cls.remove_directory(url)
else:
azure_storage_path = cls.http_to_storage_params(url)
blob_service_client = BlobServiceClient.from_connection_string(azure_storage_path.connection_string)
container_client = blob_service_client.get_container_client(container=azure_storage_path.container_name)
container_client = azure_storage_path.container_client
try:
container_client.delete_blob(azure_storage_path.blob_name)
except ResourceNotFoundError as e:
Expand All @@ -247,8 +242,7 @@ def download_directory(cls, url: str, force_overwrite: bool, target_dir: Path, v
assert target_dir.is_dir()
azure_storage_path = cls.http_to_storage_params(url)

blob_service_client = BlobServiceClient.from_connection_string(azure_storage_path.connection_string)
container_client = blob_service_client.get_container_client(container=azure_storage_path.container_name)
container_client = azure_storage_path.container_client
local_paths = []

if verbose:
Expand All @@ -274,22 +268,20 @@ def download_directory(cls, url: str, force_overwrite: bool, target_dir: Path, v
def upload_directory(cls, local_dir: Path, target_url: str, verbose: bool):
"""Upload a directory to Azure Blob Storage."""
azure_storage_path = cls.http_to_storage_params(target_url)
blob_service_client = BlobServiceClient.from_connection_string(azure_storage_path.connection_string)
# Check if the container exists and create if it does not
container_client = blob_service_client.get_container_client(azure_storage_path.container_name)
container_client = azure_storage_path.container_client
try:
container_client.get_container_properties()
except Exception as e:
# Assuming exception means container does not exist. Create new container
container_client.create_container()

def upload_file_wrapper(local_path: str, blob_name: str):
azure_url = rf'azure://{azure_storage_path.storage_account}/{azure_storage_path.container_name}/{blob_name}'
azure_url = rf'https://{azure_storage_path.storage_account}.{cls.AZURE_URL_SUFFIX}/{azure_storage_path.container_name}/{blob_name}'
cls.upload_file(local_path=local_path, target_url=azure_url)

# Collect all files to upload
files_to_upload = []
# for file_path in local_dir.iterdir():
for file_path in local_dir.rglob('*'):
if not file_path.is_file():
continue
Expand All @@ -314,13 +306,8 @@ def copy(cls, source_url: str, target_url: str):
source_storage_path = cls.http_to_storage_params(source_url)
target_storage_path = cls.http_to_storage_params(target_url)

source_blob_service_client = BlobServiceClient.from_connection_string(
source_storage_path.connection_string)
target_blob_service_client = BlobServiceClient.from_connection_string(
target_storage_path.connection_string)

source_container_client = source_blob_service_client.get_container_client(
source_storage_path.container_name)
target_blob_service_client = target_storage_path.blob_service_client
source_container_client = source_storage_path.container_client

blobs_to_rename = source_container_client.list_blobs(name_starts_with=source_storage_path.blob_name)

Expand Down Expand Up @@ -349,7 +336,7 @@ def parent(cls, url: str) -> str:
if blob_path_parts[-1] == "":
blob_path_parts = blob_path_parts[:-1]
blob_path = '/'.join(blob_path_parts[:-1])
parent_url = f'https://{account_name}.blob.core.windows.net/{container_name}/{blob_path}/'
parent_url = f'https://{account_name}.{cls.AZURE_URL_SUFFIX}/{container_name}/{blob_path}/'
return parent_url

@classmethod
Expand All @@ -369,3 +356,32 @@ def stem(cls, url: str) -> str:
blob_path_parts = blob_path_parts[:-1]
blob_name = blob_path_parts[-1]
return Path(blob_name).stem

@classmethod
def iterdir(cls, url: str) -> List[str]:
return cls.glob(url, pattern='*')

@classmethod
def glob(cls, url: str, pattern: str) -> List[str]:
storage_path = cls.http_to_storage_params(url)
container_client = storage_path.container_client
blob_names = [blob.name for blob in
container_client.walk_blobs(name_starts_with=storage_path.blob_name, delimiter='/')]
all_blobs = [
f"https://{storage_path.storage_account}.{cls.AZURE_URL_SUFFIX}/{storage_path.container_name}/{blob}" for
blob in blob_names]
matched_blobs = [blob for blob in all_blobs if fnmatch.fnmatch(blob, pattern)]
return matched_blobs

@classmethod
def rglob(cls, url: str, pattern: str) -> List[str]:
storage_path = cls.http_to_storage_params(url)
container_client = storage_path.container_client
blobs = [blob for blob in container_client.list_blob_names(name_starts_with=storage_path.blob_name)]
all_blobs = [
f"https://{storage_path.storage_account}.{cls.AZURE_URL_SUFFIX}/{storage_path.container_name}/{blob}" for
blob in blobs]
matched_blobs = [blob for blob in all_blobs if fnmatch.fnmatch(blob, pattern)]
all_dirs = list(set([cls.parent(url) for url in matched_blobs]))
dirs_under_url = [dir.rstrip('/') for dir in all_dirs if dir.startswith(url) and dir != url]
return matched_blobs + dirs_under_url
28 changes: 24 additions & 4 deletions anypathlib/path_handlers/base_path_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ class BasePathHandler(ABC):
def download_file(cls, url: str, target_path: Path, force_overwrite: bool = True) -> Path:
pass

@classmethod
@abstractmethod
def listdir(cls, url: str) -> List[str]:
pass

@classmethod
@abstractmethod
Expand Down Expand Up @@ -74,3 +70,27 @@ def name(cls, url: str) -> str:
@abstractmethod
def stem(cls, url: str) -> str:
pass

@classmethod
@abstractmethod
def iterdir(cls, url: str) -> List[str]:
"""
Lists all files and directories directly under the given directory
"""
pass

@classmethod
@abstractmethod
def glob(cls, url: str, pattern: str) -> List[str]:
"""
Finds all the paths matching a specific pattern, which can include wildcards, but does not search recursively
"""
pass

@classmethod
@abstractmethod
def rglob(cls, url: str, pattern: str) -> List[str]:
"""
Finds all the paths matching a specific pattern, including wildcards, and searches recursively in all subdirectories
"""
pass
16 changes: 12 additions & 4 deletions anypathlib/path_handlers/local_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ def download_directory(cls, url: str, force_overwrite: bool, target_dir: Path, v
def download_file(cls, url: str, target_path: Path, force_overwrite: bool = True) -> Path:
return cls.copy_path(url=url, target_path=target_path, force_overwrite=force_overwrite)

@classmethod
def listdir(cls, url: str) -> List[str]:
return [str(p) for p in Path(url).rglob('*')]

@classmethod
def relative_path(cls, url: str) -> str:
return Path(url).relative_to(Path(url).anchor).as_posix()
Expand All @@ -80,3 +76,15 @@ def stem(cls, url: str) -> str:
@classmethod
def name(cls, url: str) -> str:
return Path(url).name

@classmethod
def iterdir(cls, url: str) -> List[str]:
return [str(p) for p in Path(url).iterdir()]

@classmethod
def glob(cls, url: str, pattern: str) -> List[str]:
return [str(p) for p in Path(url).glob(pattern)]

@classmethod
def rglob(cls, url: str, pattern: str) -> List[str]:
return [str(p) for p in Path(url).rglob(pattern)]
Loading

0 comments on commit d01aef3

Please sign in to comment.