Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable dynamic creation of storage clients based on bucket type and specified protocol #2922

Merged
merged 3 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions internal/storage/fake_storage_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/fsouza/fake-gcs-server/fakestorage"
"github.com/googlecloudplatform/gcsfuse/v2/cfg"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil"
)

const TestBucketName string = "gcsfuse-default-bucket"
Expand Down Expand Up @@ -63,10 +64,11 @@ func (f *fakeStorage) CreateStorageHandle() (sh StorageHandle) {
f.mockClient = new(MockStorageControlClient)
}
sh = &storageClient{
httpClient: f.fakeStorageServer.Client(),
grpcClient: f.fakeStorageServer.Client(),
storageControlClient: f.mockClient,
clientProtocol: f.protocol,
httpClient: f.fakeStorageServer.Client(),
grpcClient: f.fakeStorageServer.Client(),
grpcClientWithBidiConfig: f.fakeStorageServer.Client(),
storageControlClient: f.mockClient,
clientConfig: storageutil.StorageClientConfig{ClientProtocol: f.protocol},
}
return
}
Expand Down
150 changes: 86 additions & 64 deletions internal/storage/storage_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,12 @@ type StorageHandle interface {
}

type storageClient struct {
httpClient *storage.Client
grpcClient *storage.Client
clientProtocol cfg.Protocol
storageControlClient StorageControlClient
directPathDetector *gRPCDirectPathDetector
httpClient *storage.Client
grpcClient *storage.Client
grpcClientWithBidiConfig *storage.Client
clientConfig storageutil.StorageClientConfig
storageControlClient StorageControlClient
directPathDetector *gRPCDirectPathDetector
}

type gRPCDirectPathDetector struct {
Expand All @@ -78,7 +79,7 @@ func (pd *gRPCDirectPathDetector) isDirectPathPossible(ctx context.Context, buck
}

// Return clientOpts for both gRPC client and control client.
func createClientOptionForGRPCClient(clientConfig *storageutil.StorageClientConfig) (clientOpts []option.ClientOption, err error) {
func createClientOptionForGRPCClient(clientConfig *storageutil.StorageClientConfig, enableBidiConfig bool) (clientOpts []option.ClientOption, err error) {
// Add Custom endpoint option.
if clientConfig.CustomEndpoint != "" {
clientOpts = append(clientOpts, option.WithEndpoint(storageutil.StripScheme(clientConfig.CustomEndpoint)))
Expand All @@ -99,6 +100,9 @@ func createClientOptionForGRPCClient(clientConfig *storageutil.StorageClientConf
clientOpts = append(clientOpts, option.WithTokenSource(tokenSrc))
}

if enableBidiConfig {
clientOpts = append(clientOpts, experimental.WithGRPCBidiReads())
}
clientOpts = append(clientOpts, option.WithGRPCConnectionPool(clientConfig.GrpcConnPoolSize))
clientOpts = append(clientOpts, option.WithUserAgent(clientConfig.UserAgent))
// Turning off the go-sdk metrics exporter to prevent any problems.
Expand All @@ -107,22 +111,52 @@ func createClientOptionForGRPCClient(clientConfig *storageutil.StorageClientConf
return
}

func setRetryConfig(sc *storage.Client, clientConfig *storageutil.StorageClientConfig) {
if sc == nil || clientConfig == nil {
logger.Fatal("setRetryConfig: Empty storage client or clientConfig")
return
}

// ShouldRetry function checks if an operation should be retried based on the
// response of operation (error.Code).
// RetryAlways causes all operations to be checked for retries using
// ShouldRetry function.
// Without RetryAlways, only those operations are checked for retries which
// are idempotent.
// https://github.com/googleapis/google-cloud-go/blob/main/storage/storage.go#L1953
retryOpts := []storage.RetryOption{storage.WithBackoff(gax.Backoff{
Max: clientConfig.MaxRetrySleep,
Multiplier: clientConfig.RetryMultiplier,
}),
storage.WithPolicy(storage.RetryAlways),
storage.WithErrorFunc(storageutil.ShouldRetry)}

sc.SetRetry(retryOpts...)

// The default MaxRetryAttempts value is 0 indicates no limit.
if clientConfig.MaxRetryAttempts != 0 {
sc.SetRetry(storage.WithMaxAttempts(clientConfig.MaxRetryAttempts))
}
}

// Followed https://pkg.go.dev/cloud.google.com/go/storage#hdr-Experimental_gRPC_API to create the gRPC client.
func createGRPCClientHandle(ctx context.Context, clientConfig *storageutil.StorageClientConfig) (sc *storage.Client, err error) {
func createGRPCClientHandle(ctx context.Context, clientConfig *storageutil.StorageClientConfig, enableBidiConfig bool) (sc *storage.Client, err error) {

if err := os.Setenv("GOOGLE_CLOUD_ENABLE_DIRECT_PATH_XDS", "true"); err != nil {
logger.Fatal("error setting direct path env var: %v", err)
}

var clientOpts []option.ClientOption
clientOpts, err = createClientOptionForGRPCClient(clientConfig)
clientOpts, err = createClientOptionForGRPCClient(clientConfig, enableBidiConfig)
if err != nil {
return nil, fmt.Errorf("error in getting clientOpts for gRPC client: %w", err)
}

sc, err = storage.NewGRPCClient(ctx, clientOpts...)
if err != nil {
err = fmt.Errorf("NewGRPCClient: %w", err)
} else {
setRetryConfig(sc, clientConfig)
}

// Unset the environment variable, since it's used only while creation of grpc client.
Expand Down Expand Up @@ -183,7 +217,13 @@ func createHTTPClientHandle(ctx context.Context, clientConfig *storageutil.Stora
TargetPercentile: clientConfig.ReadStallRetryConfig.ReqTargetPercentile,
}))
}
return storage.NewClient(ctx, clientOpts...)
sc, err = storage.NewClient(ctx, clientOpts...)
if err != nil {
err = fmt.Errorf("go http storage client creation failed: %w", err)
return
}
setRetryConfig(sc, clientConfig)
return
}

func (sh *storageClient) lookupBucketType(bucketName string) (*gcs.BucketType, error) {
Expand Down Expand Up @@ -220,39 +260,19 @@ func (sh *storageClient) getStorageLayout(bucketName string) (*controlpb.Storage
return stoargeLayout, err
}

// NewStorageHandle returns the handle of http or grpc Go storage client based on the
// provided StorageClientConfig.ClientProtocol.
// Please check out the StorageClientConfig to know about the parameters used in
// http and gRPC client.
// NewStorageHandle creates control client and stores client config to allow dynamic
// creation of http or grpc client.
func NewStorageHandle(ctx context.Context, clientConfig storageutil.StorageClientConfig) (sh StorageHandle, err error) {
var hc, gc *storage.Client
// The default protocol for the Go Storage control client's folders API is gRPC.
// gcsfuse will initially mirror this behavior due to the client's lack of HTTP support.
var controlClient *control.StorageControlClient
var clientOpts []option.ClientOption
var directPathDetector *gRPCDirectPathDetector

gc, err = createGRPCClientHandle(ctx, &clientConfig)
if err == nil {
clientOpts, err = createClientOptionForGRPCClient(&clientConfig)
directPathDetector = &gRPCDirectPathDetector{clientOptions: clientOpts}
}
if err != nil {
err = fmt.Errorf("go grpc storage client creation failed: %w", err)
return
}

hc, err = createHTTPClientHandle(ctx, &clientConfig)
if err != nil {
err = fmt.Errorf("go http storage client creation failed: %w", err)
return
}

// TODO: We will implement an additional check for the HTTP control client protocol once the Go SDK supports HTTP.
// TODO: Custom endpoints do not currently support gRPC. Remove this additional check once TPC(custom-endpoint) supports gRPC.
// Create storageControlClient irrespective of whether hns needs to be enabled or not.
// Because we will use storageControlClient to check layout of given bucket.
clientOpts, err = createClientOptionForGRPCClient(&clientConfig)
clientOpts, err = createClientOptionForGRPCClient(&clientConfig, false)
if err != nil {
return nil, fmt.Errorf("error in getting clientOpts for gRPC client: %w", err)
}
Expand All @@ -261,37 +281,38 @@ func NewStorageHandle(ctx context.Context, clientConfig storageutil.StorageClien
return nil, fmt.Errorf("could not create StorageControl Client: %w", err)
}

// ShouldRetry function checks if an operation should be retried based on the
// response of operation (error.Code).
// RetryAlways causes all operations to be checked for retries using
// ShouldRetry function.
// Without RetryAlways, only those operations are checked for retries which
// are idempotent.
// https://github.com/googleapis/google-cloud-go/blob/main/storage/storage.go#L1953
retryOpts := []storage.RetryOption{storage.WithBackoff(gax.Backoff{
Max: clientConfig.MaxRetrySleep,
Multiplier: clientConfig.RetryMultiplier,
}),
storage.WithPolicy(storage.RetryAlways),
storage.WithErrorFunc(storageutil.ShouldRetry)}
sh = &storageClient{
storageControlClient: controlClient,
clientConfig: clientConfig,
directPathDetector: &gRPCDirectPathDetector{clientOptions: clientOpts},
}
return
}

hc.SetRetry(retryOpts...)
gc.SetRetry(retryOpts...)
func (sh *storageClient) getClient(ctx context.Context, isbucketZonal bool) (*storage.Client, error) {
var err error
if isbucketZonal {
if sh.grpcClientWithBidiConfig == nil {
sh.grpcClientWithBidiConfig, err = createGRPCClientHandle(ctx, &sh.clientConfig, true)
abhishek10004 marked this conversation as resolved.
Show resolved Hide resolved
}
return sh.grpcClientWithBidiConfig, err
}

// The default MaxRetryAttempts value is 0 indicates no limit.
if clientConfig.MaxRetryAttempts != 0 {
hc.SetRetry(storage.WithMaxAttempts(clientConfig.MaxRetryAttempts))
gc.SetRetry(storage.WithMaxAttempts(clientConfig.MaxRetryAttempts))
if sh.clientConfig.ClientProtocol == cfg.GRPC {
if sh.grpcClient == nil {
sh.grpcClient, err = createGRPCClientHandle(ctx, &sh.clientConfig, false)
}
return sh.grpcClient, err
}

sh = &storageClient{
httpClient: hc,
grpcClient: gc,
storageControlClient: controlClient,
clientProtocol: clientConfig.ClientProtocol,
directPathDetector: directPathDetector,
if sh.clientConfig.ClientProtocol == cfg.HTTP1 || sh.clientConfig.ClientProtocol == cfg.HTTP2 {
if sh.httpClient == nil {
sh.httpClient, err = createHTTPClientHandle(ctx, &sh.clientConfig)
}
return sh.httpClient, err
}
return

return nil, fmt.Errorf("invalid client-protocol requested: %s", sh.clientConfig.ClientProtocol)
}

func (sh *storageClient) BucketHandle(ctx context.Context, bucketName string, billingProject string) (bh *bucketHandle, err error) {
Expand All @@ -301,18 +322,19 @@ func (sh *storageClient) BucketHandle(ctx context.Context, bucketName string, bi
return nil, fmt.Errorf("storageLayout call failed: %s", err)
}

if bucketType.Zonal || sh.clientProtocol == cfg.GRPC {
client = sh.grpcClient
client, err = sh.getClient(ctx, bucketType.Zonal)
if err != nil {
return nil, err
}

if bucketType.Zonal || sh.clientConfig.ClientProtocol == cfg.GRPC {
if sh.directPathDetector != nil {
if err := sh.directPathDetector.isDirectPathPossible(ctx, bucketName); err != nil {
logger.Warnf("Direct path connectivity unavailable for %s, reason: %v", bucketName, err)
}
}
} else if sh.clientProtocol == cfg.HTTP1 || sh.clientProtocol == cfg.HTTP2 {
client = sh.httpClient
} else {
return nil, fmt.Errorf("invalid client-protocol requested: %s", sh.clientProtocol)
}

storageBucketHandle := client.Bucket(bucketName)

if billingProject != "" {
Expand Down
14 changes: 12 additions & 2 deletions internal/storage/storage_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,17 @@ func (testSuite *StorageHandleTest) TestCreateGRPCClientHandle() {
sc := storageutil.GetDefaultStorageClientConfig()
sc.ClientProtocol = cfg.GRPC

storageClient, err := createGRPCClientHandle(testSuite.ctx, &sc)
storageClient, err := createGRPCClientHandle(testSuite.ctx, &sc, false)

assert.Nil(testSuite.T(), err)
assert.NotNil(testSuite.T(), storageClient)
}

func (testSuite *StorageHandleTest) TestCreateGRPCClientHandleWithBidiConfig() {
sc := storageutil.GetDefaultStorageClientConfig()
sc.ClientProtocol = cfg.GRPC

storageClient, err := createGRPCClientHandle(testSuite.ctx, &sc, true)

assert.Nil(testSuite.T(), err)
assert.NotNil(testSuite.T(), storageClient)
Expand Down Expand Up @@ -556,7 +566,7 @@ func (testSuite *StorageHandleTest) TestNewStorageHandleWithCustomEndpointAndEna
func (testSuite *StorageHandleTest) TestCreateClientOptionForGRPCClient() {
sc := storageutil.GetDefaultStorageClientConfig()

clientOption, err := createClientOptionForGRPCClient(&sc)
clientOption, err := createClientOptionForGRPCClient(&sc, false)
abhishek10004 marked this conversation as resolved.
Show resolved Hide resolved

assert.Nil(testSuite.T(), err)
assert.NotNil(testSuite.T(), clientOption)
Expand Down
Loading