diff --git a/internal/storage/fake_storage_util.go b/internal/storage/fake_storage_util.go index 349272cdcc..9a5622d530 100644 --- a/internal/storage/fake_storage_util.go +++ b/internal/storage/fake_storage_util.go @@ -18,6 +18,7 @@ import ( "github.com/fsouza/fake-gcs-server/fakestorage" "github.com/googlecloudplatform/gcsfuse/v2/cfg" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs" + "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil" ) const TestBucketName string = "gcsfuse-default-bucket" @@ -63,10 +64,11 @@ func (f *fakeStorage) CreateStorageHandle() (sh StorageHandle) { f.mockClient = new(MockStorageControlClient) } sh = &storageClient{ - httpClient: f.fakeStorageServer.Client(), - grpcClient: f.fakeStorageServer.Client(), - storageControlClient: f.mockClient, - clientProtocol: f.protocol, + httpClient: f.fakeStorageServer.Client(), + grpcClient: f.fakeStorageServer.Client(), + grpcClientWithBidiConfig: f.fakeStorageServer.Client(), + storageControlClient: f.mockClient, + clientConfig: storageutil.StorageClientConfig{ClientProtocol: f.protocol}, } return } diff --git a/internal/storage/storage_handle.go b/internal/storage/storage_handle.go index 1be60ec9cd..c05ca3a5f1 100644 --- a/internal/storage/storage_handle.go +++ b/internal/storage/storage_handle.go @@ -59,11 +59,12 @@ type StorageHandle interface { } type storageClient struct { - httpClient *storage.Client - grpcClient *storage.Client - clientProtocol cfg.Protocol - storageControlClient StorageControlClient - directPathDetector *gRPCDirectPathDetector + httpClient *storage.Client + grpcClient *storage.Client + grpcClientWithBidiConfig *storage.Client + clientConfig storageutil.StorageClientConfig + storageControlClient StorageControlClient + directPathDetector *gRPCDirectPathDetector } type gRPCDirectPathDetector struct { @@ -78,7 +79,7 @@ func (pd *gRPCDirectPathDetector) isDirectPathPossible(ctx context.Context, buck } // Return clientOpts for both gRPC client and control client. -func createClientOptionForGRPCClient(clientConfig *storageutil.StorageClientConfig) (clientOpts []option.ClientOption, err error) { +func createClientOptionForGRPCClient(clientConfig *storageutil.StorageClientConfig, enableBidiConfig bool) (clientOpts []option.ClientOption, err error) { // Add Custom endpoint option. if clientConfig.CustomEndpoint != "" { clientOpts = append(clientOpts, option.WithEndpoint(storageutil.StripScheme(clientConfig.CustomEndpoint))) @@ -99,6 +100,9 @@ func createClientOptionForGRPCClient(clientConfig *storageutil.StorageClientConf clientOpts = append(clientOpts, option.WithTokenSource(tokenSrc)) } + if enableBidiConfig { + clientOpts = append(clientOpts, experimental.WithGRPCBidiReads()) + } clientOpts = append(clientOpts, option.WithGRPCConnectionPool(clientConfig.GrpcConnPoolSize)) clientOpts = append(clientOpts, option.WithUserAgent(clientConfig.UserAgent)) // Turning off the go-sdk metrics exporter to prevent any problems. @@ -107,15 +111,43 @@ func createClientOptionForGRPCClient(clientConfig *storageutil.StorageClientConf return } +func setRetryConfig(sc *storage.Client, clientConfig *storageutil.StorageClientConfig) { + if sc == nil || clientConfig == nil { + logger.Fatal("setRetryConfig: Empty storage client or clientConfig") + return + } + + // ShouldRetry function checks if an operation should be retried based on the + // response of operation (error.Code). + // RetryAlways causes all operations to be checked for retries using + // ShouldRetry function. + // Without RetryAlways, only those operations are checked for retries which + // are idempotent. + // https://github.com/googleapis/google-cloud-go/blob/main/storage/storage.go#L1953 + retryOpts := []storage.RetryOption{storage.WithBackoff(gax.Backoff{ + Max: clientConfig.MaxRetrySleep, + Multiplier: clientConfig.RetryMultiplier, + }), + storage.WithPolicy(storage.RetryAlways), + storage.WithErrorFunc(storageutil.ShouldRetry)} + + sc.SetRetry(retryOpts...) + + // The default MaxRetryAttempts value is 0 indicates no limit. + if clientConfig.MaxRetryAttempts != 0 { + sc.SetRetry(storage.WithMaxAttempts(clientConfig.MaxRetryAttempts)) + } +} + // Followed https://pkg.go.dev/cloud.google.com/go/storage#hdr-Experimental_gRPC_API to create the gRPC client. -func createGRPCClientHandle(ctx context.Context, clientConfig *storageutil.StorageClientConfig) (sc *storage.Client, err error) { +func createGRPCClientHandle(ctx context.Context, clientConfig *storageutil.StorageClientConfig, enableBidiConfig bool) (sc *storage.Client, err error) { if err := os.Setenv("GOOGLE_CLOUD_ENABLE_DIRECT_PATH_XDS", "true"); err != nil { logger.Fatal("error setting direct path env var: %v", err) } var clientOpts []option.ClientOption - clientOpts, err = createClientOptionForGRPCClient(clientConfig) + clientOpts, err = createClientOptionForGRPCClient(clientConfig, enableBidiConfig) if err != nil { return nil, fmt.Errorf("error in getting clientOpts for gRPC client: %w", err) } @@ -123,6 +155,8 @@ func createGRPCClientHandle(ctx context.Context, clientConfig *storageutil.Stora sc, err = storage.NewGRPCClient(ctx, clientOpts...) if err != nil { err = fmt.Errorf("NewGRPCClient: %w", err) + } else { + setRetryConfig(sc, clientConfig) } // Unset the environment variable, since it's used only while creation of grpc client. @@ -183,7 +217,13 @@ func createHTTPClientHandle(ctx context.Context, clientConfig *storageutil.Stora TargetPercentile: clientConfig.ReadStallRetryConfig.ReqTargetPercentile, })) } - return storage.NewClient(ctx, clientOpts...) + sc, err = storage.NewClient(ctx, clientOpts...) + if err != nil { + err = fmt.Errorf("go http storage client creation failed: %w", err) + return + } + setRetryConfig(sc, clientConfig) + return } func (sh *storageClient) lookupBucketType(bucketName string) (*gcs.BucketType, error) { @@ -220,39 +260,19 @@ func (sh *storageClient) getStorageLayout(bucketName string) (*controlpb.Storage return stoargeLayout, err } -// NewStorageHandle returns the handle of http or grpc Go storage client based on the -// provided StorageClientConfig.ClientProtocol. -// Please check out the StorageClientConfig to know about the parameters used in -// http and gRPC client. +// NewStorageHandle creates control client and stores client config to allow dynamic +// creation of http or grpc client. func NewStorageHandle(ctx context.Context, clientConfig storageutil.StorageClientConfig) (sh StorageHandle, err error) { - var hc, gc *storage.Client // The default protocol for the Go Storage control client's folders API is gRPC. // gcsfuse will initially mirror this behavior due to the client's lack of HTTP support. var controlClient *control.StorageControlClient var clientOpts []option.ClientOption - var directPathDetector *gRPCDirectPathDetector - - gc, err = createGRPCClientHandle(ctx, &clientConfig) - if err == nil { - clientOpts, err = createClientOptionForGRPCClient(&clientConfig) - directPathDetector = &gRPCDirectPathDetector{clientOptions: clientOpts} - } - if err != nil { - err = fmt.Errorf("go grpc storage client creation failed: %w", err) - return - } - - hc, err = createHTTPClientHandle(ctx, &clientConfig) - if err != nil { - err = fmt.Errorf("go http storage client creation failed: %w", err) - return - } // TODO: We will implement an additional check for the HTTP control client protocol once the Go SDK supports HTTP. // TODO: Custom endpoints do not currently support gRPC. Remove this additional check once TPC(custom-endpoint) supports gRPC. // Create storageControlClient irrespective of whether hns needs to be enabled or not. // Because we will use storageControlClient to check layout of given bucket. - clientOpts, err = createClientOptionForGRPCClient(&clientConfig) + clientOpts, err = createClientOptionForGRPCClient(&clientConfig, false) if err != nil { return nil, fmt.Errorf("error in getting clientOpts for gRPC client: %w", err) } @@ -261,37 +281,38 @@ func NewStorageHandle(ctx context.Context, clientConfig storageutil.StorageClien return nil, fmt.Errorf("could not create StorageControl Client: %w", err) } - // ShouldRetry function checks if an operation should be retried based on the - // response of operation (error.Code). - // RetryAlways causes all operations to be checked for retries using - // ShouldRetry function. - // Without RetryAlways, only those operations are checked for retries which - // are idempotent. - // https://github.com/googleapis/google-cloud-go/blob/main/storage/storage.go#L1953 - retryOpts := []storage.RetryOption{storage.WithBackoff(gax.Backoff{ - Max: clientConfig.MaxRetrySleep, - Multiplier: clientConfig.RetryMultiplier, - }), - storage.WithPolicy(storage.RetryAlways), - storage.WithErrorFunc(storageutil.ShouldRetry)} + sh = &storageClient{ + storageControlClient: controlClient, + clientConfig: clientConfig, + directPathDetector: &gRPCDirectPathDetector{clientOptions: clientOpts}, + } + return +} - hc.SetRetry(retryOpts...) - gc.SetRetry(retryOpts...) +func (sh *storageClient) getClient(ctx context.Context, isbucketZonal bool) (*storage.Client, error) { + var err error + if isbucketZonal { + if sh.grpcClientWithBidiConfig == nil { + sh.grpcClientWithBidiConfig, err = createGRPCClientHandle(ctx, &sh.clientConfig, true) + } + return sh.grpcClientWithBidiConfig, err + } - // The default MaxRetryAttempts value is 0 indicates no limit. - if clientConfig.MaxRetryAttempts != 0 { - hc.SetRetry(storage.WithMaxAttempts(clientConfig.MaxRetryAttempts)) - gc.SetRetry(storage.WithMaxAttempts(clientConfig.MaxRetryAttempts)) + if sh.clientConfig.ClientProtocol == cfg.GRPC { + if sh.grpcClient == nil { + sh.grpcClient, err = createGRPCClientHandle(ctx, &sh.clientConfig, false) + } + return sh.grpcClient, err } - sh = &storageClient{ - httpClient: hc, - grpcClient: gc, - storageControlClient: controlClient, - clientProtocol: clientConfig.ClientProtocol, - directPathDetector: directPathDetector, + if sh.clientConfig.ClientProtocol == cfg.HTTP1 || sh.clientConfig.ClientProtocol == cfg.HTTP2 { + if sh.httpClient == nil { + sh.httpClient, err = createHTTPClientHandle(ctx, &sh.clientConfig) + } + return sh.httpClient, err } - return + + return nil, fmt.Errorf("invalid client-protocol requested: %s", sh.clientConfig.ClientProtocol) } func (sh *storageClient) BucketHandle(ctx context.Context, bucketName string, billingProject string) (bh *bucketHandle, err error) { @@ -301,18 +322,19 @@ func (sh *storageClient) BucketHandle(ctx context.Context, bucketName string, bi return nil, fmt.Errorf("storageLayout call failed: %s", err) } - if bucketType.Zonal || sh.clientProtocol == cfg.GRPC { - client = sh.grpcClient + client, err = sh.getClient(ctx, bucketType.Zonal) + if err != nil { + return nil, err + } + + if bucketType.Zonal || sh.clientConfig.ClientProtocol == cfg.GRPC { if sh.directPathDetector != nil { if err := sh.directPathDetector.isDirectPathPossible(ctx, bucketName); err != nil { logger.Warnf("Direct path connectivity unavailable for %s, reason: %v", bucketName, err) } } - } else if sh.clientProtocol == cfg.HTTP1 || sh.clientProtocol == cfg.HTTP2 { - client = sh.httpClient - } else { - return nil, fmt.Errorf("invalid client-protocol requested: %s", sh.clientProtocol) } + storageBucketHandle := client.Bucket(bucketName) if billingProject != "" { diff --git a/internal/storage/storage_handle_test.go b/internal/storage/storage_handle_test.go index 796bf5c509..849d7ef872 100644 --- a/internal/storage/storage_handle_test.go +++ b/internal/storage/storage_handle_test.go @@ -283,7 +283,17 @@ func (testSuite *StorageHandleTest) TestCreateGRPCClientHandle() { sc := storageutil.GetDefaultStorageClientConfig() sc.ClientProtocol = cfg.GRPC - storageClient, err := createGRPCClientHandle(testSuite.ctx, &sc) + storageClient, err := createGRPCClientHandle(testSuite.ctx, &sc, false) + + assert.Nil(testSuite.T(), err) + assert.NotNil(testSuite.T(), storageClient) +} + +func (testSuite *StorageHandleTest) TestCreateGRPCClientHandleWithBidiConfig() { + sc := storageutil.GetDefaultStorageClientConfig() + sc.ClientProtocol = cfg.GRPC + + storageClient, err := createGRPCClientHandle(testSuite.ctx, &sc, true) assert.Nil(testSuite.T(), err) assert.NotNil(testSuite.T(), storageClient) @@ -556,7 +566,7 @@ func (testSuite *StorageHandleTest) TestNewStorageHandleWithCustomEndpointAndEna func (testSuite *StorageHandleTest) TestCreateClientOptionForGRPCClient() { sc := storageutil.GetDefaultStorageClientConfig() - clientOption, err := createClientOptionForGRPCClient(&sc) + clientOption, err := createClientOptionForGRPCClient(&sc, false) assert.Nil(testSuite.T(), err) assert.NotNil(testSuite.T(), clientOption)