From 52f8a68d213b9cc6b2e30d10c7921997a1a3ddc3 Mon Sep 17 00:00:00 2001 From: UENISHI Kota Date: Wed, 5 Oct 2022 21:03:32 +0900 Subject: [PATCH 1/9] Google Cloud Storage --- pfio/v2/__init__.py | 1 + pfio/v2/fs.py | 3 +++ pfio/v2/gcs.py | 66 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 70 insertions(+) create mode 100644 pfio/v2/gcs.py diff --git a/pfio/v2/__init__.py b/pfio/v2/__init__.py index a4d70307..15abcc65 100644 --- a/pfio/v2/__init__.py +++ b/pfio/v2/__init__.py @@ -32,6 +32,7 @@ from .local import Local, LocalFileStat # NOQA from .pathlib import Path # NOQA from .s3 import S3 # NOQA +from .gcs import Gcs from .zip import Zip, ZipFileStat # NOQA local = Local() diff --git a/pfio/v2/fs.py b/pfio/v2/fs.py index c2141162..12ed0628 100644 --- a/pfio/v2/fs.py +++ b/pfio/v2/fs.py @@ -461,6 +461,9 @@ def _from_scheme(scheme, dirname, kwargs, bucket=None): elif scheme == 's3': from .s3 import S3 fs = S3(bucket=bucket, prefix=dirname, **kwargs) + elif scheme == 'gs': + from .gcs import GoogleCloudStorage + fs = GoogleCloudStorage(bucket=bucket, prefix=dirname, **kwargs) else: raise RuntimeError("bug: scheme '{}' is not supported".format(scheme)) diff --git a/pfio/v2/gcs.py b/pfio/v2/gcs.py new file mode 100644 index 00000000..0cb5ac2f --- /dev/null +++ b/pfio/v2/gcs.py @@ -0,0 +1,66 @@ +from google.cloud import storage +from google.cloud.storage.fileio import BlobReader, BlobWriter +from google.oauth2 import service_account + +from .fs import FS, FileStat + +class ObjectStat(FileStat): + def __init__(self, blob): + self.blob = blob + + +class GoogleCloudStorage(FS): + def __init__(self, bucket, prefix=None, key_path=None): + with open(key_path) as kp: + service_account_info = json.load(kp) + credentials = service_account.Credentials.from_service_account_info(service_account_info) + self.client = storage.Client(credentials=credentials, + project=credentials.project_id) + self.bucket = self.client.get_bucket(bucket) + self.bucket_name = bucket + self.prefix = prefix + + def open(self, path, mode='r', **kwargs): + blob = self.bucket.blob(os.path.join(self.prefix, path)) + + if 'r' in mode: + return BlobReader(blob, chunk_size=1024*1024, text_mode=(not 'b' in mode)) + + elif 'w' in mode: + return BlobReader(blob, chunk_size=1024*1024, text_mode=(not 'b' in mode)) + + raise RuntimeError("Invalid mode") + + def list(self, prefix, recursive=False, detail=False): + # TODO: recursive + for blob in self.client.list_blobs(self.bucket_name, prefix=self.prefix): + if detail: + yield ObjectStat(blob) + else: + yield blob.path + + def stat(self, path): + return ObjectStat(self.bucket.blob(path)) + + def isdir(self, path): + return False + + def mkdir(self, path): + pass + + def makedirs(self, path): + pass + + def exists(self, path): + return self.bucket.blob(path).exists() + + def rename(self, src, dst): + source_blob = self.bucket.blob(src) + destination_bucket = storage_client.bucket(destination_bucket_name) + + # Returns Blob destination + self.bucket.copy_blob(source_blob, self.bucket, dst) + self.bucket.delete_blob(blob_name) + + def remove(self, path, recursive=False): + self.bucket.delete_blob(path) From 964b6e57996d7ac81b820247a88d7b1961ebfa63 Mon Sep 17 00:00:00 2001 From: UENISHI Kota Date: Wed, 5 Oct 2022 21:10:25 +0900 Subject: [PATCH 2/9] Add gcs to setup.py --- setup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index bd2fedb0..b26fb69b 100644 --- a/setup.py +++ b/setup.py @@ -42,6 +42,7 @@ 'Programming Language :: Python :: 3.8', 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', + 'Programming Language :: Python :: 3.11', 'Topic :: System :: Filesystems', ], @@ -57,7 +58,7 @@ 'hdfs': ['pyarrow>=6.0.0'], }, # When updating install requires, docs/requirements.txt should be updated too - install_requires=['boto3', 'deprecation', 'urllib3'], + install_requires=['boto3', 'deprecation', 'urllib3', 'google-cloud-storage'], include_package_data=True, zip_safe=False, From e8e0004a1d27f52a6f712ca571ffcfc21daf4e03 Mon Sep 17 00:00:00 2001 From: UENISHI Kota Date: Thu, 6 Oct 2022 09:19:41 +0900 Subject: [PATCH 3/9] Setup for test --- pfio/v2/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pfio/v2/__init__.py b/pfio/v2/__init__.py index 15abcc65..2772003a 100644 --- a/pfio/v2/__init__.py +++ b/pfio/v2/__init__.py @@ -28,11 +28,11 @@ ''' from .fs import from_url, lazify, open_url # NOQA +from .gcs import GoogleCloudStorage, GoogleCloudStorageFileStat # NOQA from .hdfs import Hdfs, HdfsFileStat # NOQA from .local import Local, LocalFileStat # NOQA from .pathlib import Path # NOQA from .s3 import S3 # NOQA -from .gcs import Gcs from .zip import Zip, ZipFileStat # NOQA local = Local() From ccd2cd5ea447bbb467297248aa061af1f96c615f Mon Sep 17 00:00:00 2001 From: Kota Uenishi Date: Thu, 27 Jul 2023 23:19:13 +0900 Subject: [PATCH 4/9] update --- pfio/v2/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pfio/v2/__init__.py b/pfio/v2/__init__.py index 2772003a..eefb58fc 100644 --- a/pfio/v2/__init__.py +++ b/pfio/v2/__init__.py @@ -28,7 +28,7 @@ ''' from .fs import from_url, lazify, open_url # NOQA -from .gcs import GoogleCloudStorage, GoogleCloudStorageFileStat # NOQA +from .gcs import GoogleCloudStorage # NOQA from .hdfs import Hdfs, HdfsFileStat # NOQA from .local import Local, LocalFileStat # NOQA from .pathlib import Path # NOQA From ff8b0f67426116d13b8ca63ae86422fdb604eabb Mon Sep 17 00:00:00 2001 From: Kota Uenishi Date: Fri, 28 Jul 2023 21:31:40 +0900 Subject: [PATCH 5/9] wip --- pfio/v2/gcs.py | 40 +++++++++++++++++++++++++++++++--------- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/pfio/v2/gcs.py b/pfio/v2/gcs.py index 0cb5ac2f..00665e1d 100644 --- a/pfio/v2/gcs.py +++ b/pfio/v2/gcs.py @@ -1,24 +1,46 @@ +import base64 +import json + from google.cloud import storage from google.cloud.storage.fileio import BlobReader, BlobWriter from google.oauth2 import service_account from .fs import FS, FileStat + class ObjectStat(FileStat): def __init__(self, blob): - self.blob = blob + self.path = blob.path + self.size = blob.size + self.metadata = blob.metadata + self.crc32c = blob.crc32c + self.md5_hash = base64.b64decode(blob.md5_hash).hex() class GoogleCloudStorage(FS): - def __init__(self, bucket, prefix=None, key_path=None): - with open(key_path) as kp: - service_account_info = json.load(kp) - credentials = service_account.Credentials.from_service_account_info(service_account_info) + def __init__(self, bucket: str, prefix=None, key_path=None): + self.bucket_name = bucket + self.prefix = prefix + self.key_path = key_path + self._reset() + + def _reset(self): + if self.key_path is None: + self.client = storage.Client() + else: + with open(self.key_path) as kp: + service_account_info = json.load(kp) + credentials = service_account.Credentials.from_service_account_info(service_account_info) self.client = storage.Client(credentials=credentials, project=credentials.project_id) - self.bucket = self.client.get_bucket(bucket) - self.bucket_name = bucket - self.prefix = prefix + + # Caveat: You'll need ``roles/storage.insightsCollectorService`` role for the accessor instead. + # This is because ``roles/storage.objectViewer`` does not have ``storage.buckets.get`` + # which is needed to call ``get_bucket()``. + # + # See also: https://cloud.google.com/storage/docs/access-control/iam-roles + self.bucket = self.client.get_bucket(self.bucket_name) + self.bucket_name = self.bucket def open(self, path, mode='r', **kwargs): blob = self.bucket.blob(os.path.join(self.prefix, path)) @@ -40,7 +62,7 @@ def list(self, prefix, recursive=False, detail=False): yield blob.path def stat(self, path): - return ObjectStat(self.bucket.blob(path)) + return ObjectStat(self.bucket.get_blob(path)) def isdir(self, path): return False From 3043124c0ac699a802753237f7ebfc7d39c6d503 Mon Sep 17 00:00:00 2001 From: Kota Uenishi Date: Sat, 29 Jul 2023 18:07:34 +0900 Subject: [PATCH 6/9] Fix tests --- mypy.ini | 6 ++++++ pfio/v2/gcs.py | 40 +++++++++++++++++++++++++--------------- 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/mypy.ini b/mypy.ini index 9771dec0..0e11ae0f 100644 --- a/mypy.ini +++ b/mypy.ini @@ -20,3 +20,9 @@ ignore_missing_imports = True [mypy-pyarrow.*] ignore_missing_imports = True + +[mypy-google.cloud.*] +ignore_missing_imports = True + +[mypy-google.oauth2.*] +ignore_missing_imports = True diff --git a/pfio/v2/gcs.py b/pfio/v2/gcs.py index 00665e1d..ec930bb5 100644 --- a/pfio/v2/gcs.py +++ b/pfio/v2/gcs.py @@ -1,8 +1,9 @@ import base64 import json +import os from google.cloud import storage -from google.cloud.storage.fileio import BlobReader, BlobWriter +from google.cloud.storage.fileio import BlobReader # , BlobWriter from google.oauth2 import service_account from .fs import FS, FileStat @@ -30,32 +31,40 @@ def _reset(self): else: with open(self.key_path) as kp: service_account_info = json.load(kp) - credentials = service_account.Credentials.from_service_account_info(service_account_info) + credentials = service_account.Credentials.\ + from_service_account_info(service_account_info) self.client = storage.Client(credentials=credentials, project=credentials.project_id) - # Caveat: You'll need ``roles/storage.insightsCollectorService`` role for the accessor instead. - # This is because ``roles/storage.objectViewer`` does not have ``storage.buckets.get`` - # which is needed to call ``get_bucket()``. + # Caveat: You'll need + # ``roles/storage.insightsCollectorService`` role for the + # accessor instead. This is because + # ``roles/storage.objectViewer`` does not have + # ``storage.buckets.get`` which is needed to call + # ``get_bucket()``. # - # See also: https://cloud.google.com/storage/docs/access-control/iam-roles + # See also: + # https://cloud.google.com/storage/docs/access-control/iam-roles self.bucket = self.client.get_bucket(self.bucket_name) self.bucket_name = self.bucket - + def open(self, path, mode='r', **kwargs): blob = self.bucket.blob(os.path.join(self.prefix, path)) if 'r' in mode: - return BlobReader(blob, chunk_size=1024*1024, text_mode=(not 'b' in mode)) + return BlobReader(blob, chunk_size=1024*1024, + text_mode=('b' not in mode)) elif 'w' in mode: - return BlobReader(blob, chunk_size=1024*1024, text_mode=(not 'b' in mode)) + return BlobReader(blob, chunk_size=1024*1024, + text_mode=('b' not in mode)) raise RuntimeError("Invalid mode") def list(self, prefix, recursive=False, detail=False): - # TODO: recursive - for blob in self.client.list_blobs(self.bucket_name, prefix=self.prefix): + # TODO: recursive + for blob in self.client.list_blobs(self.bucket_name, + prefix=self.prefix): if detail: yield ObjectStat(blob) else: @@ -77,12 +86,13 @@ def exists(self, path): return self.bucket.blob(path).exists() def rename(self, src, dst): - source_blob = self.bucket.blob(src) - destination_bucket = storage_client.bucket(destination_bucket_name) + # source_blob = self.bucket.blob(src) + # dest = self.client.bucket(dst) # Returns Blob destination - self.bucket.copy_blob(source_blob, self.bucket, dst) - self.bucket.delete_blob(blob_name) + # self.bucket.copy_blob(source_blob, self.bucket, dst) + # self.bucket.delete_blob(src) + pass def remove(self, path, recursive=False): self.bucket.delete_blob(path) From 4bb9c8300dc57464542130920bb7af3046dd1c5c Mon Sep 17 00:00:00 2001 From: Kota Uenishi Date: Sat, 29 Jul 2023 18:13:59 +0900 Subject: [PATCH 7/9] Add doc --- docs/source/reference.rst | 6 ++++++ pfio/v2/gcs.py | 11 +++++++++++ 2 files changed, 17 insertions(+) diff --git a/docs/source/reference.rst b/docs/source/reference.rst index e44f17dc..4e4684e0 100644 --- a/docs/source/reference.rst +++ b/docs/source/reference.rst @@ -33,6 +33,12 @@ S3 (AWS S3) .. autoclass:: pfio.v2.S3 :members: +Google Cloud Storage +~~~~~~~~~~~~~~~~~~~~ + +.. autoclass:: pfio.v2.GoogleCloudStorage + :members: + Zip Archive ~~~~~~~~~~~ diff --git a/pfio/v2/gcs.py b/pfio/v2/gcs.py index ec930bb5..e6c41305 100644 --- a/pfio/v2/gcs.py +++ b/pfio/v2/gcs.py @@ -19,6 +19,17 @@ def __init__(self, blob): class GoogleCloudStorage(FS): + '''Google Cloud Storage wrapper + + ``key_path`` argument is a path to credential files of + IAM service account. The path to the file can be set to + the environmental variable``GOOGLE_APPLICATION_CREDENTIALS`` + instead. + + .. note:: This is an experimental implmentation. + + ''' + def __init__(self, bucket: str, prefix=None, key_path=None): self.bucket_name = bucket self.prefix = prefix From b242a0e2dc40358b99a3b6d129f6051b9c26a76e Mon Sep 17 00:00:00 2001 From: UENISHI Kota Date: Mon, 30 Oct 2023 19:30:35 +0900 Subject: [PATCH 8/9] Working with metadata --- pfio/v2/gcs.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pfio/v2/gcs.py b/pfio/v2/gcs.py index e6c41305..e83c1e4f 100644 --- a/pfio/v2/gcs.py +++ b/pfio/v2/gcs.py @@ -11,11 +11,15 @@ class ObjectStat(FileStat): def __init__(self, blob): - self.path = blob.path + self.path = blob.name self.size = blob.size self.metadata = blob.metadata self.crc32c = blob.crc32c self.md5_hash = base64.b64decode(blob.md5_hash).hex() + self.filename = os.path.basename(blob.name) + + def isdir(self): + return self.size == 0 and self.path.endswith('/') class GoogleCloudStorage(FS): @@ -57,7 +61,7 @@ def _reset(self): # See also: # https://cloud.google.com/storage/docs/access-control/iam-roles self.bucket = self.client.get_bucket(self.bucket_name) - self.bucket_name = self.bucket + self.bucket_name = self.bucket_name def open(self, path, mode='r', **kwargs): blob = self.bucket.blob(os.path.join(self.prefix, path)) From ccd3cabee8f55a23314422e7e4c047e333847a7d Mon Sep 17 00:00:00 2001 From: UENISHI Kota Date: Fri, 24 Nov 2023 10:34:02 +0900 Subject: [PATCH 9/9] Only recursive listing allowed for now --- pfio/v2/gcs.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/pfio/v2/gcs.py b/pfio/v2/gcs.py index e83c1e4f..160ee620 100644 --- a/pfio/v2/gcs.py +++ b/pfio/v2/gcs.py @@ -61,6 +61,7 @@ def _reset(self): # See also: # https://cloud.google.com/storage/docs/access-control/iam-roles self.bucket = self.client.get_bucket(self.bucket_name) + assert self.bucket self.bucket_name = self.bucket_name def open(self, path, mode='r', **kwargs): @@ -76,10 +77,20 @@ def open(self, path, mode='r', **kwargs): raise RuntimeError("Invalid mode") - def list(self, prefix, recursive=False, detail=False): + def list(self, prefix, recursive=True, detail=False): # TODO: recursive - for blob in self.client.list_blobs(self.bucket_name, - prefix=self.prefix): + assert recursive, "gcs.list recursive=False no supported yet" + path = None + if prefix: + path = prefix + if self.prefix: + path = os.path.join(self.prefix, path) + + if path: + path = os.path.normpath(path) + + for blob in self.bucket.list_blobs(): + # prefix=path): if detail: yield ObjectStat(blob) else: