Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Google Cloud Storage #286

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/source/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ S3 (AWS S3)
.. autoclass:: pfio.v2.S3
:members:

Google Cloud Storage
~~~~~~~~~~~~~~~~~~~~

.. autoclass:: pfio.v2.GoogleCloudStorage
:members:

Zip Archive
~~~~~~~~~~~

Expand Down
6 changes: 6 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,9 @@ ignore_missing_imports = True

[mypy-pyarrow.*]
ignore_missing_imports = True

[mypy-google.cloud.*]
ignore_missing_imports = True

[mypy-google.oauth2.*]
ignore_missing_imports = True
1 change: 1 addition & 0 deletions pfio/v2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

'''
from .fs import from_url, lazify, open_url # NOQA
from .gcs import GoogleCloudStorage # NOQA
from .hdfs import Hdfs, HdfsFileStat # NOQA
from .local import Local, LocalFileStat # NOQA
from .pathlib import Path # NOQA
Expand Down
3 changes: 3 additions & 0 deletions pfio/v2/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,9 @@ def _from_scheme(scheme, dirname, kwargs, bucket=None):
elif scheme == 's3':
from .s3 import S3
fs = S3(bucket=bucket, prefix=dirname, **kwargs)
elif scheme == 'gs':
from .gcs import GoogleCloudStorage
fs = GoogleCloudStorage(bucket=bucket, prefix=dirname, **kwargs)
else:
raise RuntimeError("bug: scheme '{}' is not supported".format(scheme))

Expand Down
124 changes: 124 additions & 0 deletions pfio/v2/gcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import base64
import json
import os

from google.cloud import storage
from google.cloud.storage.fileio import BlobReader # , BlobWriter
from google.oauth2 import service_account

from .fs import FS, FileStat


class ObjectStat(FileStat):
def __init__(self, blob):
self.path = blob.name
self.size = blob.size
self.metadata = blob.metadata
self.crc32c = blob.crc32c
self.md5_hash = base64.b64decode(blob.md5_hash).hex()
self.filename = os.path.basename(blob.name)

def isdir(self):
return self.size == 0 and self.path.endswith('/')


class GoogleCloudStorage(FS):
'''Google Cloud Storage wrapper

``key_path`` argument is a path to credential files of
IAM service account. The path to the file can be set to
the environmental variable``GOOGLE_APPLICATION_CREDENTIALS``
instead.

.. note:: This is an experimental implmentation.

'''

def __init__(self, bucket: str, prefix=None, key_path=None):
self.bucket_name = bucket
self.prefix = prefix
self.key_path = key_path
self._reset()

def _reset(self):
if self.key_path is None:
self.client = storage.Client()
else:
with open(self.key_path) as kp:
service_account_info = json.load(kp)
credentials = service_account.Credentials.\
from_service_account_info(service_account_info)
self.client = storage.Client(credentials=credentials,
project=credentials.project_id)

# Caveat: You'll need
# ``roles/storage.insightsCollectorService`` role for the
# accessor instead. This is because
# ``roles/storage.objectViewer`` does not have
# ``storage.buckets.get`` which is needed to call
# ``get_bucket()``.
#
# See also:
# https://cloud.google.com/storage/docs/access-control/iam-roles
self.bucket = self.client.get_bucket(self.bucket_name)
assert self.bucket
self.bucket_name = self.bucket_name

def open(self, path, mode='r', **kwargs):
blob = self.bucket.blob(os.path.join(self.prefix, path))

if 'r' in mode:
return BlobReader(blob, chunk_size=1024*1024,
text_mode=('b' not in mode))

elif 'w' in mode:
return BlobReader(blob, chunk_size=1024*1024,
text_mode=('b' not in mode))

raise RuntimeError("Invalid mode")

def list(self, prefix, recursive=True, detail=False):
# TODO: recursive
assert recursive, "gcs.list recursive=False no supported yet"
path = None
if prefix:
path = prefix
if self.prefix:
path = os.path.join(self.prefix, path)

if path:
path = os.path.normpath(path)

for blob in self.bucket.list_blobs():
# prefix=path):
if detail:
yield ObjectStat(blob)
else:
yield blob.path

def stat(self, path):
return ObjectStat(self.bucket.get_blob(path))

def isdir(self, path):
return False

def mkdir(self, path):
pass

def makedirs(self, path):
pass

def exists(self, path):
return self.bucket.blob(path).exists()

def rename(self, src, dst):
# source_blob = self.bucket.blob(src)
# dest = self.client.bucket(dst)

# Returns Blob destination
# self.bucket.copy_blob(source_blob, self.bucket, dst)
# self.bucket.delete_blob(src)
pass

def remove(self, path, recursive=False):
self.bucket.delete_blob(path)
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',

'Topic :: System :: Filesystems',
],
Expand All @@ -57,7 +58,7 @@
'hdfs': ['pyarrow>=6.0.0'],
},
# When updating install requires, docs/requirements.txt should be updated too
install_requires=['boto3', 'deprecation', 'urllib3'],
install_requires=['boto3', 'deprecation', 'urllib3', 'google-cloud-storage'],
include_package_data=True,
zip_safe=False,

Expand Down