diff --git a/test/src/unit-vfs.cc b/test/src/unit-vfs.cc index 70087a28032..d95b055573b 100644 --- a/test/src/unit-vfs.cc +++ b/test/src/unit-vfs.cc @@ -822,7 +822,6 @@ TEST_CASE( "[gcs][credentials][impersonation]") { ThreadPool thread_pool(2); Config cfg = set_config_params(true); - GCS gcs; std::string impersonate_service_account, target_service_account; std::vector delegates; @@ -847,9 +846,7 @@ TEST_CASE( require_tiledb_ok(cfg.set( "vfs.gcs.impersonate_service_account", impersonate_service_account)); - - require_tiledb_ok(gcs.init(cfg, &thread_pool)); - + GCS gcs(&thread_pool, cfg); auto credentials = gcs.make_credentials({}); // We are using an internal class only for inspection purposes. @@ -869,16 +866,13 @@ TEST_CASE( "[gcs][credentials][service-account]") { ThreadPool thread_pool(2); Config cfg = set_config_params(true); - GCS gcs; // The content of the credentials does not matter; it does not get parsed // until it is used in an API request, which we are not doing. std::string service_account_key = "{\"foo\": \"bar\"}"; require_tiledb_ok( cfg.set("vfs.gcs.service_account_key", service_account_key)); - - require_tiledb_ok(gcs.init(cfg, &thread_pool)); - + GCS gcs(&thread_pool, cfg); auto credentials = gcs.make_credentials({}); // We are using an internal class only for inspection purposes. @@ -895,7 +889,6 @@ TEST_CASE( "[gcs][credentials][service-account-and-impersonation]") { ThreadPool thread_pool(2); Config cfg = set_config_params(true); - GCS gcs; // The content of the credentials does not matter; it does not get parsed // until it is used in an API request, which we are not doing. std::string service_account_key = "{\"foo\": \"bar\"}"; @@ -905,9 +898,7 @@ TEST_CASE( cfg.set("vfs.gcs.service_account_key", service_account_key)); require_tiledb_ok(cfg.set( "vfs.gcs.impersonate_service_account", impersonate_service_account)); - - require_tiledb_ok(gcs.init(cfg, &thread_pool)); - + GCS gcs(&thread_pool, cfg); auto credentials = gcs.make_credentials({}); // We are using an internal class only for inspection purposes. @@ -933,17 +924,13 @@ TEST_CASE( "[gcs][credentials][external-account]") { ThreadPool thread_pool(2); Config cfg = set_config_params(true); - GCS gcs; // The content of the credentials does not matter; it does not get parsed // until it is used in an API request, which we are not doing. std::string workload_identity_configuration = "{\"foo\": \"bar\"}"; - require_tiledb_ok(cfg.set( "vfs.gcs.workload_identity_configuration", workload_identity_configuration)); - - require_tiledb_ok(gcs.init(cfg, &thread_pool)); - + GCS gcs(&thread_pool, cfg); auto credentials = gcs.make_credentials({}); // We are using an internal class only for inspection purposes. diff --git a/tiledb/api/c_api/config/config_api_external.h b/tiledb/api/c_api/config/config_api_external.h index 7ef312d085c..ed4b7f69c58 100644 --- a/tiledb/api/c_api/config/config_api_external.h +++ b/tiledb/api/c_api/config/config_api_external.h @@ -5,7 +5,7 @@ * * The MIT License * - * @copyright Copyright (c) 2023 TileDB, Inc. + * @copyright Copyright (c) 2023-2024 TileDB, Inc. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -455,6 +455,9 @@ TILEDB_EXPORT void tiledb_config_free(tiledb_config_t** config) TILEDB_NOEXCEPT; * The maximum permissible delay between Azure netwwork request retry * attempts, in milliseconds. * **Default**: 60000 + * - `vfs.gcs.endpoint`
+ * The GCS endpoint.
+ * **Default**: "" * - `vfs.gcs.project_id`
* Set the GCS project ID to create new buckets to. Not required unless you * are going to use the VFS to create buckets.
diff --git a/tiledb/common/exception/status.h b/tiledb/common/exception/status.h index 239e06450fe..a4934905fed 100644 --- a/tiledb/common/exception/status.h +++ b/tiledb/common/exception/status.h @@ -310,10 +310,6 @@ inline Status Status_UtilsError(const std::string& msg) { inline Status Status_S3Error(const std::string& msg) { return {"[TileDB::S3] Error", msg}; } -/** Return a FS_GCS error class Status with a given message **/ -inline Status Status_GCSError(const std::string& msg) { - return {"[TileDB::GCS] Error", msg}; -} /** Return a FS_HDFS error class Status with a given message **/ inline Status Status_HDFSError(const std::string& msg) { return {"[TileDB::HDFS] Error", msg}; diff --git a/tiledb/sm/cpp_api/config.h b/tiledb/sm/cpp_api/config.h index 3d8bbc9afff..4860429d3d4 100644 --- a/tiledb/sm/cpp_api/config.h +++ b/tiledb/sm/cpp_api/config.h @@ -1,13 +1,11 @@ /** * @file config.h * - * @author Ravi Gaddipati - * * @section LICENSE * * The MIT License * - * @copyright Copyright (c) 2017-2023 TileDB, Inc. + * @copyright Copyright (c) 2017-2024 TileDB, Inc. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -627,6 +625,9 @@ class Config { * The maximum permissible delay between Azure netwwork request retry * attempts, in milliseconds. * **Default**: 60000 + * - `vfs.gcs.endpoint`
+ * The GCS endpoint.
+ * **Default**: "" * - `vfs.gcs.project_id`
* Set the GCS project ID to create new buckets to. Not required unless you * are going to use the VFS to create buckets.
diff --git a/tiledb/sm/filesystem/gcs.cc b/tiledb/sm/filesystem/gcs.cc index 2f002144c2b..02fccb4bed3 100644 --- a/tiledb/sm/filesystem/gcs.cc +++ b/tiledb/sm/filesystem/gcs.cc @@ -65,13 +65,43 @@ namespace tiledb::sm { /* CONSTRUCTORS & DESTRUCTORS */ /* ********************************* */ -GCS::GCS() - : state_(State::UNINITIALIZED) - , write_cache_max_size_(0) - , max_parallel_ops_(1) - , multi_part_part_size_(0) - , use_multi_part_upload_(true) - , request_timeout_ms_(0) { +GCSParameters::GCSParameters(const Config& config) + : endpoint_(config.get("vfs.gcs.endpoint", Config::must_find)) + , project_id_( + config.get("vfs.gcs.project_id", Config::must_find)) + , service_account_key_(config.get( + "vfs.gcs.service_account_key", Config::must_find)) + , workload_identity_configuration_(config.get( + "vfs.gcs.workload_identity_configuration", Config::must_find)) + , impersonate_service_account_(config.get( + "vfs.gcs.impersonate_service_account", Config::must_find)) + , multi_part_size_( + config.get("vfs.gcs.multi_part_size", Config::must_find)) + , max_parallel_ops_( + config.get("vfs.gcs.max_parallel_ops", Config::must_find)) + , use_multi_part_upload_( + config.get("vfs.gcs.use_multi_part_upload", Config::must_find)) + , request_timeout_ms_( + config.get("vfs.gcs.request_timeout_ms", Config::must_find)) + , max_direct_upload_size_(config.get( + "vfs.gcs.max_direct_upload_size", Config::must_find)) { + if (endpoint_.empty() && getenv("TILEDB_TEST_GCS_ENDPOINT")) { + endpoint_ = getenv("TILEDB_TEST_GCS_ENDPOINT"); + } +} + +GCS::GCS(ThreadPool* thread_pool, const Config& config) + : gcs_params_(GCSParameters(config)) + , state_(State::UNINITIALIZED) + , ssl_cfg_(SSLConfig(config)) + , thread_pool_(thread_pool) + , write_cache_max_size_(0) { + assert(thread_pool); + write_cache_max_size_ = + gcs_params_.use_multi_part_upload_ ? + gcs_params_.max_parallel_ops_ * gcs_params_.multi_part_size_ : + gcs_params_.max_direct_upload_size_; + state_ = State::INITIALIZED; } GCS::~GCS() { @@ -81,59 +111,6 @@ GCS::~GCS() { /* API */ /* ********************************* */ -Status GCS::init(const Config& config, ThreadPool* const thread_pool) { - if (thread_pool == nullptr) { - return LOG_STATUS( - Status_GCSError("Can't initialize with null thread pool.")); - } - - ssl_cfg_ = SSLConfig(config); - - assert(state_ == State::UNINITIALIZED); - - thread_pool_ = thread_pool; - - bool found; - endpoint_ = config.get("vfs.gcs.endpoint", &found); - assert(found); - if (endpoint_.empty() && getenv("TILEDB_TEST_GCS_ENDPOINT")) { - endpoint_ = getenv("TILEDB_TEST_GCS_ENDPOINT"); - } - project_id_ = config.get("vfs.gcs.project_id", &found); - assert(found); - service_account_key_ = config.get("vfs.gcs.service_account_key", &found); - assert(found); - workload_identity_configuration_ = - config.get("vfs.gcs.workload_identity_configuration", &found); - assert(found); - impersonate_service_account_ = - config.get("vfs.gcs.impersonate_service_account", &found); - assert(found); - RETURN_NOT_OK(config.get( - "vfs.gcs.max_parallel_ops", &max_parallel_ops_, &found)); - assert(found); - RETURN_NOT_OK(config.get( - "vfs.gcs.use_multi_part_upload", &use_multi_part_upload_, &found)); - assert(found); - RETURN_NOT_OK(config.get( - "vfs.gcs.multi_part_size", &multi_part_part_size_, &found)); - assert(found); - RETURN_NOT_OK(config.get( - "vfs.gcs.request_timeout_ms", &request_timeout_ms_, &found)); - assert(found); - uint64_t max_direct_upload_size; - RETURN_NOT_OK(config.get( - "vfs.gcs.max_direct_upload_size", &max_direct_upload_size, &found)); - assert(found); - - write_cache_max_size_ = use_multi_part_upload_ ? - max_parallel_ops_ * multi_part_part_size_ : - max_direct_upload_size; - - state_ = State::INITIALIZED; - return Status::Ok(); -} - /** * Builds a chain of service account impersonation credentials. * @@ -191,23 +168,26 @@ static shared_ptr apply_impersonation( std::shared_ptr GCS::make_credentials( const google::cloud::Options& options) const { shared_ptr creds = nullptr; - if (!service_account_key_.empty()) { - if (!workload_identity_configuration_.empty()) { + if (!gcs_params_.service_account_key_.empty()) { + if (!gcs_params_.workload_identity_configuration_.empty()) { LOG_WARN( "Both GCS service account key and workload identity configuration " "were specified; picking the former"); } creds = google::cloud::MakeServiceAccountCredentials( - service_account_key_, options); - } else if (!workload_identity_configuration_.empty()) { + gcs_params_.service_account_key_, options); + } else if (!gcs_params_.workload_identity_configuration_.empty()) { creds = google::cloud::MakeExternalAccountCredentials( - workload_identity_configuration_, options); - } else if (!endpoint_.empty() || getenv("CLOUD_STORAGE_EMULATOR_ENDPOINT")) { + gcs_params_.workload_identity_configuration_, options); + } else if ( + !gcs_params_.endpoint_.empty() || + getenv("CLOUD_STORAGE_EMULATOR_ENDPOINT")) { creds = google::cloud::MakeInsecureCredentials(); } else { creds = google::cloud::MakeGoogleDefaultCredentials(options); } - return apply_impersonation(creds, impersonate_service_account_, options); + return apply_impersonation( + creds, gcs_params_.impersonate_service_account_, options); } Status GCS::init_client() const { @@ -244,17 +224,18 @@ Status GCS::init_client() const { auto client_options = ca_options; client_options.set( make_credentials(ca_options)); - if (!endpoint_.empty()) { - client_options.set(endpoint_); + if (!gcs_params_.endpoint_.empty()) { + client_options.set( + gcs_params_.endpoint_); } client_options.set( make_shared( - HERE(), std::chrono::milliseconds(request_timeout_ms_))); + HERE(), + std::chrono::milliseconds(gcs_params_.request_timeout_ms_))); client_ = tdb_unique_ptr( tdb_new(google::cloud::storage::Client, client_options)); } catch (const std::exception& e) { - return LOG_STATUS( - Status_GCSError("Failed to initialize GCS: " + std::string(e.what()))); + throw GCSException("Failed to initialize GCS: " + std::string(e.what())); } return Status::Ok(); @@ -264,8 +245,7 @@ Status GCS::create_bucket(const URI& uri) const { RETURN_NOT_OK(init_client()); if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + uri.to_string()); } std::string bucket_name; @@ -273,12 +253,14 @@ Status GCS::create_bucket(const URI& uri) const { google::cloud::StatusOr bucket_metadata = client_->CreateBucketForProject( - bucket_name, project_id_, google::cloud::storage::BucketMetadata()); + bucket_name, + gcs_params_.project_id_, + google::cloud::storage::BucketMetadata()); if (!bucket_metadata.ok()) { - return LOG_STATUS(Status_GCSError(std::string( + throw GCSException( "Create bucket failed on: " + uri.to_string() + " (" + - bucket_metadata.status().message() + ")"))); + bucket_metadata.status().message() + ")"); } return wait_for_bucket_to_propagate(bucket_name); @@ -288,8 +270,7 @@ Status GCS::empty_bucket(const URI& uri) const { RETURN_NOT_OK(init_client()); if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + uri.to_string()); } return remove_dir(uri); @@ -300,8 +281,7 @@ Status GCS::is_empty_bucket(const URI& uri, bool* is_empty) const { assert(is_empty); if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + uri.to_string()); } std::string bucket_name; @@ -313,9 +293,9 @@ Status GCS::is_empty_bucket(const URI& uri, bool* is_empty) const { for (const google::cloud::StatusOr& object_metadata : objects_reader) { if (!object_metadata) { - return LOG_STATUS(Status_GCSError(std::string( + throw GCSException( "List bucket objects failed on: " + uri.to_string() + " (" + - object_metadata.status().message() + ")"))); + object_metadata.status().message() + ")"); } *is_empty = false; @@ -331,8 +311,7 @@ Status GCS::is_bucket(const URI& uri, bool* const is_bucket) const { assert(is_bucket); if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + uri.to_string()); } std::string bucket_name; @@ -354,9 +333,9 @@ Status GCS::is_bucket( *is_bucket = false; return Status::Ok(); } else { - return LOG_STATUS(Status_GCSError(std::string( + throw GCSException( "Get bucket failed on: " + bucket_name + " (" + status.message() + - ")"))); + ")"); } } @@ -369,8 +348,7 @@ Status GCS::is_dir(const URI& uri, bool* const exists) const { assert(exists); if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + uri.to_string()); } std::vector paths; @@ -383,8 +361,7 @@ Status GCS::remove_bucket(const URI& uri) const { RETURN_NOT_OK(init_client()); if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + uri.to_string()); } // Empty bucket @@ -395,9 +372,9 @@ Status GCS::remove_bucket(const URI& uri) const { const google::cloud::Status status = client_->DeleteBucket(bucket_name); if (!status.ok()) { - return LOG_STATUS(Status_GCSError(std::string( + throw GCSException( "Delete bucket failed on: " + uri.to_string() + " (" + - status.message() + ")"))); + status.message() + ")"); } return wait_for_bucket_to_be_deleted(bucket_name); @@ -407,8 +384,7 @@ Status GCS::remove_object(const URI& uri) const { RETURN_NOT_OK(init_client()); if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + uri.to_string()); } std::string bucket_name; @@ -418,9 +394,9 @@ Status GCS::remove_object(const URI& uri) const { const google::cloud::Status status = client_->DeleteObject(bucket_name, object_path); if (!status.ok()) { - return LOG_STATUS(Status_GCSError(std::string( + throw GCSException( "Delete object failed on: " + uri.to_string() + " (" + - status.message() + ")"))); + status.message() + ")"); } return wait_for_object_to_be_deleted(bucket_name, object_path); @@ -430,8 +406,7 @@ Status GCS::remove_dir(const URI& uri) const { RETURN_NOT_OK(init_client()); if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + uri.to_string()); } std::vector paths; @@ -633,13 +608,11 @@ Status GCS::copy_object(const URI& old_uri, const URI& new_uri) { RETURN_NOT_OK(init_client()); if (!old_uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + old_uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + old_uri.to_string()); } if (!new_uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + new_uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + new_uri.to_string()); } std::string old_bucket_name; @@ -657,9 +630,9 @@ Status GCS::copy_object(const URI& old_uri, const URI& new_uri) { if (!object_metadata.ok()) { const google::cloud::Status status = object_metadata.status(); - return LOG_STATUS(Status_GCSError(std::string( + throw GCSException( "Copy object failed on: " + old_uri.to_string() + " (" + - status.message() + ")"))); + status.message() + ")"); } return wait_for_object_to_propagate(new_bucket_name, new_object_path); @@ -681,8 +654,8 @@ Status GCS::wait_for_object_to_propagate( std::chrono::milliseconds(constants::gcs_attempt_sleep_ms)); } - return LOG_STATUS(Status_GCSError( - std::string("Timed out waiting on object to propogate: " + object_path))); + throw GCSException( + "Timed out waiting on object to propogate: " + object_path); } Status GCS::wait_for_object_to_be_deleted( @@ -701,8 +674,8 @@ Status GCS::wait_for_object_to_be_deleted( std::chrono::milliseconds(constants::gcs_attempt_sleep_ms)); } - return LOG_STATUS(Status_GCSError(std::string( - "Timed out waiting on object to be deleted: " + object_path))); + throw GCSException( + "Timed out waiting on object to be deleted: " + object_path); } Status GCS::wait_for_bucket_to_propagate(const std::string& bucket_name) const { @@ -719,8 +692,8 @@ Status GCS::wait_for_bucket_to_propagate(const std::string& bucket_name) const { std::chrono::milliseconds(constants::gcs_attempt_sleep_ms)); } - return LOG_STATUS(Status_GCSError( - std::string("Timed out waiting on bucket to propogate: " + bucket_name))); + throw GCSException( + "Timed out waiting on bucket to propogate: " + bucket_name); } Status GCS::wait_for_bucket_to_be_deleted( @@ -739,8 +712,8 @@ Status GCS::wait_for_bucket_to_be_deleted( std::chrono::milliseconds(constants::gcs_attempt_sleep_ms)); } - return LOG_STATUS(Status_GCSError(std::string( - "Timed out waiting on bucket to be deleted: " + bucket_name))); + throw GCSException( + "Timed out waiting on bucket to be deleted: " + bucket_name); } Status GCS::move_dir(const URI& old_uri, const URI& new_uri) { @@ -760,8 +733,7 @@ Status GCS::touch(const URI& uri) const { RETURN_NOT_OK(init_client()); if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + uri.to_string()); } std::string bucket_name; @@ -774,9 +746,9 @@ Status GCS::touch(const URI& uri) const { if (!object_metadata.ok()) { const google::cloud::Status status = object_metadata.status(); - return LOG_STATUS(Status_GCSError(std::string( + throw GCSException( "Touch object failed on: " + uri.to_string() + " (" + status.message() + - ")"))); + ")"); } return Status::Ok(); @@ -787,8 +759,7 @@ Status GCS::is_object(const URI& uri, bool* const is_object) const { assert(is_object); if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + uri.to_string()); } std::string bucket_name; @@ -815,9 +786,9 @@ Status GCS::is_object( *is_object = false; return Status::Ok(); } else { - return LOG_STATUS(Status_GCSError(std::string( + throw GCSException( "Get object failed on: " + object_path + " (" + status.message() + - ")"))); + ")"); } } @@ -829,8 +800,7 @@ Status GCS::is_object( Status GCS::write( const URI& uri, const void* const buffer, const uint64_t length) { if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + uri.to_string()); } Buffer* const write_cache_buffer = get_write_cache_buffer(uri.to_string()); @@ -839,13 +809,13 @@ Status GCS::write( RETURN_NOT_OK( fill_write_cache(write_cache_buffer, buffer, length, &nbytes_filled)); - if (!use_multi_part_upload_) { + if (!gcs_params_.use_multi_part_upload_) { if (nbytes_filled != length) { std::stringstream errmsg; errmsg << "Cannot write more than " << write_cache_max_size_ << " bytes without multi-part uploads. This limit can be " "configured with the 'vfs.gcs.max_direct_upload_size' option."; - return LOG_STATUS(Status_GCSError(errmsg.str())); + throw GCSException(errmsg.str()); } else { return Status::Ok(); } @@ -887,8 +857,7 @@ Status GCS::object_size(const URI& uri, uint64_t* const nbytes) const { assert(nbytes); if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + uri.to_string()); } std::string bucket_name; @@ -901,9 +870,9 @@ Status GCS::object_size(const URI& uri, uint64_t* const nbytes) const { if (!object_metadata.ok()) { const google::cloud::Status status = object_metadata.status(); - return LOG_STATUS(Status_GCSError(std::string( + throw GCSException( "Get object size failed on: " + object_path + " (" + status.message() + - ")"))); + ")"); } *nbytes = object_metadata->size(); @@ -959,21 +928,21 @@ Status GCS::write_parts( const uint64_t length, const bool last_part) { if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not an GCS URI: " + uri.to_string()))); + throw GCSException("URI is not an GCS URI: " + uri.to_string()); } - // Ensure that each thread is responsible for exactly multi_part_part_size_ + // Ensure that each thread is responsible for exactly multi_part_size_ // bytes (except if this is the last part, in which case the final // thread should write less). Cap the number of parallel operations at the // configured max number. Length must be evenly divisible by - // multi_part_part_size_ unless this is the last part. - uint64_t num_ops = last_part ? - utils::math::ceil(length, multi_part_part_size_) : - (length / multi_part_part_size_); - num_ops = std::min(std::max(num_ops, uint64_t(1)), max_parallel_ops_); - - if (!last_part && length % multi_part_part_size_ != 0) { + // multi_part_size_ unless this is the last part. + uint64_t num_ops = + last_part ? utils::math::ceil(length, gcs_params_.multi_part_size_) : + (length / gcs_params_.multi_part_size_); + num_ops = + std::min(std::max(num_ops, uint64_t(1)), gcs_params_.max_parallel_ops_); + + if (!last_part && length % gcs_params_.multi_part_size_ != 0) { return LOG_STATUS( Status_S3Error("Length not evenly divisible by part size")); } @@ -1045,9 +1014,9 @@ Status GCS::write_parts( std::vector tasks; tasks.reserve(num_ops); for (uint64_t i = 0; i < num_ops; i++) { - const uint64_t begin = i * multi_part_part_size_; + const uint64_t begin = i * gcs_params_.multi_part_size_; const uint64_t end = - std::min((i + 1) * multi_part_part_size_ - 1, length - 1); + std::min((i + 1) * gcs_params_.multi_part_size_ - 1, length - 1); const char* const thread_buffer = reinterpret_cast(buffer) + begin; const uint64_t thread_buffer_len = end - begin + 1; @@ -1088,9 +1057,9 @@ Status GCS::upload_part( if (!object_metadata.ok()) { const google::cloud::Status status = object_metadata.status(); - return LOG_STATUS(Status_GCSError(std::string( + throw GCSException( "Upload part failed on: " + object_part_path + " (" + status.message() + - ")"))); + ")"); } return Status::Ok(); @@ -1100,11 +1069,10 @@ Status GCS::flush_object(const URI& uri) { RETURN_NOT_OK(init_client()); if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not a GCS URI: " + uri.to_string()))); + throw GCSException("URI is not a GCS URI: " + uri.to_string()); } - if (!use_multi_part_upload_) { + if (!gcs_params_.use_multi_part_upload_) { return flush_object_direct(uri); } @@ -1187,9 +1155,9 @@ Status GCS::flush_object(const URI& uri) { if (!object_metadata.ok()) { const google::cloud::Status status = object_metadata.status(); - return LOG_STATUS(Status_GCSError(std::string( + throw GCSException( "Compse object failed on: " + uri.to_string() + " (" + - status.message() + ")"))); + status.message() + ")"); } return wait_for_object_to_propagate(bucket_name, object_path); @@ -1218,8 +1186,8 @@ Status GCS::delete_part( const google::cloud::Status status = client_->DeleteObject(bucket_name, part_path); if (!status.ok()) { - return Status_GCSError(std::string( - "Delete part failed on: " + part_path + " (" + status.message() + ")")); + throw GCSException( + "Delete part failed on: " + part_path + " (" + status.message() + ")"); } return Status::Ok(); @@ -1268,9 +1236,9 @@ Status GCS::flush_object_direct(const URI& uri) { if (!object_metadata.ok()) { const google::cloud::Status status = object_metadata.status(); - return LOG_STATUS(Status_GCSError(std::string( + throw GCSException( "Write object failed on: " + uri.to_string() + " (" + status.message() + - ")"))); + ")"); } return wait_for_object_to_propagate(bucket_name, object_path); @@ -1286,8 +1254,7 @@ Status GCS::read( RETURN_NOT_OK(init_client()); if (!uri.is_gcs()) { - return LOG_STATUS(Status_GCSError( - std::string("URI is not an GCS URI: " + uri.to_string()))); + throw GCSException("URI is not an GCS URI: " + uri.to_string()); } std::string bucket_name; @@ -1301,9 +1268,9 @@ Status GCS::read( offset, offset + length + read_ahead_length)); if (!stream.status().ok()) { - return LOG_STATUS(Status_GCSError(std::string( + throw GCSException( "Read object failed on: " + uri.to_string() + " (" + - stream.status().message() + ")"))); + stream.status().message() + ")"); } stream.read(static_cast(buffer), length + read_ahead_length); @@ -1312,8 +1279,7 @@ Status GCS::read( stream.Close(); if (*length_returned < length) { - return LOG_STATUS(Status_GCSError( - std::string("Read operation read unexpected number of bytes."))); + throw GCSException("Read operation read unexpected number of bytes."); } return Status::Ok(); diff --git a/tiledb/sm/filesystem/gcs.h b/tiledb/sm/filesystem/gcs.h index 838b9fee6aa..783ef7cc2cd 100644 --- a/tiledb/sm/filesystem/gcs.h +++ b/tiledb/sm/filesystem/gcs.h @@ -62,13 +62,11 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage } // namespace google::cloud -namespace tiledb { - -namespace common::filesystem { +namespace tiledb::common::filesystem { class directory_entry; -} +} // namespace tiledb::common::filesystem -namespace sm { +namespace tiledb::sm { /** Class for GCS status exceptions. */ class GCSException : public StatusException { @@ -78,31 +76,104 @@ class GCSException : public StatusException { } }; +/** + * The GCS-specific configuration parameters. + * + * @note The member variables' default declarations have not yet been moved + * from the Config declaration into this struct. + */ +struct GCSParameters { + GCSParameters() = delete; + + GCSParameters(const Config& config); + + ~GCSParameters() = default; + + /** The GCS endpoint. */ + std::string endpoint_; + + /** The project ID to create new buckets on using the VFS. */ + std::string project_id_; + + /** + * The GCS service account credentials JSON string. + * + * Set the JSON string with GCS service account key. Takes precedence + * over `vfs.gcs.workload_identity_configuration` if both are specified. If + * neither is specified, Application Default Credentials will be used. + * + * @note Experimental + */ + std::string service_account_key_; + + /** + * The GCS external account credentials JSON string. + * + * Set the JSON string with Workload Identity Federation configuration. + * `vfs.gcs.service_account_key` takes precedence over this if both are + * specified. If neither is specified, Application Default Credentials will + * be used. + * + * @note Experimental + */ + std::string workload_identity_configuration_; + + /** + * A comma-separated list with the GCS service accounts to impersonate. + * + * Set the GCS service account to impersonate. A chain of impersonated + * accounts can be formed by specifying many service accounts, separated by a + * comma. + * + * @note Experimental + */ + std::string impersonate_service_account_; + + /** + * The part size (in bytes) used in multi part writes. + * + * @note `vfs.gcs.multi_part_size` * `vfs.gcs.max_parallel_ops` bytes will + * be buffered before issuing part uploads in parallel. + */ + uint64_t multi_part_size_; + + /** The maximum number of parallel operations issued. */ + uint64_t max_parallel_ops_; + + /** Whether or not to use chunked part uploads. */ + bool use_multi_part_upload_; + + /** The maximum amount of time to retry network requests. */ + uint64_t request_timeout_ms_; + + /** + * The maximum size in bytes of a direct upload to GCS. + * Ignored if `vfs.gcs.use_multi_part_upload` is set to true. + */ + uint64_t max_direct_upload_size_; +}; + class GCS { public: /* ********************************* */ /* CONSTRUCTORS & DESTRUCTORS */ /* ********************************* */ - /** Constructor. */ - GCS(); + /** + * Constructor. + * + * @param thread_pool The parent VFS thread pool. + * @param config Configuration parameters. + */ + GCS(ThreadPool* thread_pool, const Config& config); - /** Destructor. */ + /** Destructor. Must be explicitly defined. */ ~GCS(); /* ********************************* */ /* API */ /* ********************************* */ - /** - * Initializes and connects a GCS client. - * - * @param config Configuration parameters. - * @param thread_pool The parent VFS thread pool. - * @return Status - */ - Status init(const Config& config, ThreadPool* thread_pool); - /** * Creates a bucket. * @@ -449,6 +520,9 @@ class GCS { /* PRIVATE ATTRIBUTES */ /* ********************************* */ + /** The GCS configuration parameters. */ + GCSParameters gcs_params_; + /** * A libcurl initializer instance. This should remain * the first member variable to ensure that libcurl is @@ -471,21 +545,6 @@ class GCS { /** The VFS thread pool. */ ThreadPool* thread_pool_; - // The GCS endpoint. - std::string endpoint_; - - // The GCS project id. - std::string project_id_; - - // The GCS service account credentials JSON string. - std::string service_account_key_; - - // The GCS external account credentials JSON string. - std::string workload_identity_configuration_; - - // A comma-separated list with the GCS service accounts to impersonate. - std::string impersonate_service_account_; - // The GCS REST client. mutable tdb_unique_ptr client_; @@ -498,18 +557,6 @@ class GCS { /** The maximum size of each value-element in 'write_cache_map_'. */ uint64_t write_cache_max_size_; - /** The maximum number of parallel requests. */ - uint64_t max_parallel_ops_; - - /** The target part size in a part list upload */ - uint64_t multi_part_part_size_; - - /** Whether or not to use part list upload. */ - bool use_multi_part_upload_; - - /** The timeout for network requests. */ - uint64_t request_timeout_ms_; - /** Maps a object URI to its part list upload state. */ std::unordered_map multi_part_upload_states_; @@ -763,8 +810,7 @@ class GCS { Status flush_object_direct(const URI& uri); }; -} // namespace sm -} // namespace tiledb +} // namespace tiledb::sm #endif // HAVE_GCS #endif // TILEDB_GCS_H diff --git a/tiledb/sm/filesystem/vfs.cc b/tiledb/sm/filesystem/vfs.cc index d122d83ee3e..5081c2b8699 100644 --- a/tiledb/sm/filesystem/vfs.cc +++ b/tiledb/sm/filesystem/vfs.cc @@ -65,6 +65,7 @@ VFS::VFS( const Config& config) : VFSBase(parent_stats) , Azure_within_VFS(io_tp, config) + , GCS_within_VFS(io_tp, config) , S3_within_VFS(stats_, io_tp, config) , config_(config) , logger_(logger) @@ -98,10 +99,6 @@ VFS::VFS( #ifdef HAVE_GCS supported_fs_.insert(Filesystem::GCS); - st = gcs_.init(config_, io_tp_); - if (!st.ok()) { - throw VFSException("Failed to initialize GCS backend."); - } #endif #ifdef _WIN32 @@ -269,7 +266,7 @@ Status VFS::touch(const URI& uri) const { } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs_.touch(uri); + return gcs().touch(uri); #else throw BuiltWithout("GCS"); #endif @@ -305,7 +302,7 @@ Status VFS::create_bucket(const URI& uri) const { } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs_.create_bucket(uri); + return gcs().create_bucket(uri); #else throw BuiltWithout("GCS"); #endif @@ -332,7 +329,7 @@ Status VFS::remove_bucket(const URI& uri) const { } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs_.remove_bucket(uri); + return gcs().remove_bucket(uri); #else throw BuiltWithout("GCS"); #endif @@ -360,7 +357,7 @@ Status VFS::empty_bucket(const URI& uri) const { } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs_.empty_bucket(uri); + return gcs().empty_bucket(uri); #else throw BuiltWithout("GCS"); #endif @@ -389,7 +386,7 @@ Status VFS::is_empty_bucket( } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs_.is_empty_bucket(uri, is_empty); + return gcs().is_empty_bucket(uri, is_empty); #else throw BuiltWithout("GCS"); #endif @@ -425,7 +422,7 @@ Status VFS::remove_dir(const URI& uri) const { #endif } else if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs_.remove_dir(uri); + return gcs().remove_dir(uri); #else throw BuiltWithout("GCS"); #endif @@ -494,7 +491,7 @@ Status VFS::remove_file(const URI& uri) const { } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs_.remove_object(uri); + return gcs().remove_object(uri); #else throw BuiltWithout("GCS"); #endif @@ -586,7 +583,11 @@ Status VFS::file_size(const URI& uri, uint64_t* size) const { } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs_.object_size(uri, size); + try { + return gcs().object_size(uri, size); + } catch (std::exception& e) { + return Status_Error(e.what()); + } #else throw BuiltWithout("GCS"); #endif @@ -634,7 +635,7 @@ Status VFS::is_dir(const URI& uri, bool* is_dir) const { } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs_.is_dir(uri, is_dir); + return gcs().is_dir(uri, is_dir); #else *is_dir = false; throw BuiltWithout("GCS"); @@ -686,7 +687,7 @@ Status VFS::is_file(const URI& uri, bool* is_file) const { } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs_.is_object(uri, is_file); + return gcs().is_object(uri, is_file); #else *is_file = false; throw BuiltWithout("GCS"); @@ -721,7 +722,7 @@ Status VFS::is_bucket(const URI& uri, bool* is_bucket) const { } if (uri.is_gcs()) { #ifdef HAVE_GCS - RETURN_NOT_OK(gcs_.is_bucket(uri, is_bucket)); + RETURN_NOT_OK(gcs().is_bucket(uri, is_bucket)); return Status::Ok(); #else *is_bucket = false; @@ -777,7 +778,7 @@ std::vector VFS::ls_with_sizes(const URI& parent) const { #endif } else if (parent.is_gcs()) { #ifdef HAVE_GCS - entries = gcs_.ls_with_sizes(parent); + entries = gcs().ls_with_sizes(parent); #else throw BuiltWithout("GCS"); #endif @@ -864,7 +865,7 @@ Status VFS::move_file(const URI& old_uri, const URI& new_uri) { if (old_uri.is_gcs()) { if (new_uri.is_gcs()) #ifdef HAVE_GCS - return gcs_.move_object(old_uri, new_uri); + return gcs().move_object(old_uri, new_uri); #else throw BuiltWithout("GCS"); #endif @@ -938,7 +939,7 @@ Status VFS::move_dir(const URI& old_uri, const URI& new_uri) { if (old_uri.is_gcs()) { if (new_uri.is_gcs()) #ifdef HAVE_GCS - return gcs_.move_dir(old_uri, new_uri); + return gcs().move_dir(old_uri, new_uri); #else throw BuiltWithout("GCS"); #endif @@ -1228,7 +1229,7 @@ Status VFS::read_impl( #ifdef HAVE_GCS const auto read_fn = std::bind( &GCS::read, - &gcs_, + &gcs(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, @@ -1454,7 +1455,7 @@ Status VFS::close_file(const URI& uri) { } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs_.flush_object(uri); + return gcs().flush_object(uri); #else throw BuiltWithout("GCS"); #endif @@ -1519,7 +1520,7 @@ Status VFS::write( } if (uri.is_gcs()) { #ifdef HAVE_GCS - return gcs_.write(uri, buffer, buffer_size); + return gcs().write(uri, buffer, buffer_size); #else throw BuiltWithout("GCS"); #endif diff --git a/tiledb/sm/filesystem/vfs.h b/tiledb/sm/filesystem/vfs.h index 68b6e059d47..67bb3cf86d5 100644 --- a/tiledb/sm/filesystem/vfs.h +++ b/tiledb/sm/filesystem/vfs.h @@ -266,6 +266,37 @@ class Azure_within_VFS { }; #endif +/** The GCS filesystem. */ +#ifdef HAVE_GCS +class GCS_within_VFS { + /** Private member variable */ + GCS gcs_; + + protected: + template + GCS_within_VFS(Args&&... args) + : gcs_(std::forward(args)...) { + } + + /** Protected accessor for the GCS object. */ + inline GCS& gcs() { + return gcs_; + } + + /** Protected accessor for the const GCS object. */ + inline const GCS& gcs() const { + return gcs_; + } +}; +#else +class GCS_within_VFS { + protected: + template + GCS_within_VFS(Args&&...) { + } // empty constructor +}; +#endif + /** The S3 filesystem. */ #ifdef HAVE_S3 class S3_within_VFS { @@ -301,7 +332,10 @@ class S3_within_VFS { * This class implements a virtual filesystem that directs filesystem-related * function execution to the appropriate backend based on the input URI. */ -class VFS : private VFSBase, protected Azure_within_VFS, S3_within_VFS { +class VFS : private VFSBase, + protected Azure_within_VFS, + GCS_within_VFS, + S3_within_VFS { public: /* ********************************* */ /* TYPE DEFINITIONS */ @@ -597,7 +631,7 @@ class VFS : private VFSBase, protected Azure_within_VFS, S3_within_VFS { #endif } else if (parent.is_gcs()) { #ifdef HAVE_GCS - results = gcs_.ls_filtered(parent, f, d, recursive); + results = gcs().ls_filtered(parent, f, d, recursive); #else throw filesystem::VFSException("TileDB was built without GCS support"); #endif @@ -970,10 +1004,6 @@ class VFS : private VFSBase, protected Azure_within_VFS, S3_within_VFS { /* PRIVATE ATTRIBUTES */ /* ********************************* */ -#ifdef HAVE_GCS - GCS gcs_; -#endif - #ifdef _WIN32 Win win_; #else