Skip to content

Commit

Permalink
[Write-stall] Create chunk transfer timeout flag (#2755)
Browse files Browse the repository at this point in the history
* add hidden flag

* add chunk transfer timeout flag with default 10s value

* small fix

* remove redudunt line

* fix validation

* remove redudunt changes

* fix unit tests

* fix unit tests

* nit fix

* review comments

* review comments

* fix unit tests

* fix unit tests

* fix unit tests

* remove from write config and add it in gcs retries

* remove from write config and add it in gcs retries

* remove from write config and add it in gcs retries

* Add unit tests

* Add unit tests

* small fix

* review comment
  • Loading branch information
Tulsishah authored Dec 10, 2024
1 parent 9c10d53 commit 94a9357
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 5 deletions.
12 changes: 12 additions & 0 deletions cfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ type GcsConnectionConfig struct {
}

type GcsRetriesConfig struct {
ChunkTransferTimeoutSecs int64 `yaml:"chunk-transfer-timeout-secs"`

MaxRetryAttempts int64 `yaml:"max-retry-attempts"`

MaxRetrySleep time.Duration `yaml:"max-retry-sleep"`
Expand Down Expand Up @@ -255,6 +257,12 @@ func BuildFlagSet(flagSet *pflag.FlagSet) error {

flagSet.StringP("cache-dir", "", "", "Enables file-caching. Specifies the directory to use for file-cache.")

flagSet.IntP("chunk-transfer-timeout-secs", "", 10, "We send larger file uploads in 16 MiB chunks. This flag controls the duration that the HTTP client will wait for a response after making a request to upload a chunk. The default value of 10s indicates that the client will wait 10 seconds for upload completion; otherwise, it cancels the request and retries for that chunk till chunkRetryDeadline(32s). 0 means no timeout.")

if err := flagSet.MarkHidden("chunk-transfer-timeout-secs"); err != nil {
return err
}

flagSet.StringP("client-protocol", "", "http1", "The protocol used for communicating with the GCS backend. Value can be 'http1' (HTTP/1.1), 'http2' (HTTP/2) or 'grpc'.")

flagSet.IntP("cloud-metrics-export-interval-secs", "", 0, "Specifies the interval at which the metrics are uploaded to cloud monitoring")
Expand Down Expand Up @@ -584,6 +592,10 @@ func BindFlags(v *viper.Viper, flagSet *pflag.FlagSet) error {
return err
}

if err := v.BindPFlag("gcs-retries.chunk-transfer-timeout-secs", flagSet.Lookup("chunk-transfer-timeout-secs")); err != nil {
return err
}

if err := v.BindPFlag("gcs-connection.client-protocol", flagSet.Lookup("client-protocol")); err != nil {
return err
}
Expand Down
11 changes: 11 additions & 0 deletions cfg/params.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,17 @@
usage: "File chunk size to read from GCS in one call. Need to specify the value in MB. ChunkSize less than 1MB is not supported"
default: "200"

- config-path: "gcs-retries.chunk-transfer-timeout-secs"
flag-name: "chunk-transfer-timeout-secs"
type: "int"
usage: >-
We send larger file uploads in 16 MiB chunks. This flag controls the duration
that the HTTP client will wait for a response after making a request to upload a chunk.
The default value of 10s indicates that the client will wait 10 seconds for upload completion;
otherwise, it cancels the request and retries for that chunk till chunkRetryDeadline(32s). 0 means no timeout.
default: "10"
hide-flag: true

- config-path: "gcs-retries.max-retry-attempts"
flag-name: "max-retry-attempts"
type: "int"
Expand Down
11 changes: 11 additions & 0 deletions cfg/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,13 @@ func isValidMetricsConfig(m *MetricsConfig) error {
return nil
}

func isValidChunkTransferTimeoutForRetriesConfig(chunkTransferTimeoutSecs int64) error {
if chunkTransferTimeoutSecs < 0 || chunkTransferTimeoutSecs > maxSupportedTTLInSeconds {
return fmt.Errorf("invalid value of ChunkTransferTimeout: %d; should be > 0 or 0 (for infinite)", chunkTransferTimeoutSecs)
}
return nil
}

// ValidateConfig returns a non-nil error if the config is invalid.
func ValidateConfig(v isSet, config *Config) error {
var err error
Expand Down Expand Up @@ -228,6 +235,10 @@ func ValidateConfig(v isSet, config *Config) error {
return fmt.Errorf("error parsing read-stall-gcs-retries config: %w", err)
}

if err = isValidChunkTransferTimeoutForRetriesConfig(config.GcsRetries.ChunkTransferTimeoutSecs); err != nil {
return fmt.Errorf("error parsing chunk-transfer-timeout-secs config: %w", err)
}

if err = isValidMetricsConfig(&config.Metrics); err != nil {
return fmt.Errorf("error parsing metrics config: %w", err)
}
Expand Down
28 changes: 28 additions & 0 deletions cfg/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,21 @@ func TestValidateConfigSuccessful(t *testing.T) {
FileSystem: FileSystemConfig{KernelListCacheTtlSecs: 30},
},
},
{
name: "valid_chunk_transfer_timeout_secs",
config: &Config{
Logging: LoggingConfig{LogRotate: validLogRotateConfig()},
FileCache: validFileCacheConfig(t),
GcsConnection: GcsConnectionConfig{
SequentialReadSizeMb: 10,
},
MetadataCache: MetadataCacheConfig{
ExperimentalMetadataPrefetchOnMount: "sync",
},
FileSystem: FileSystemConfig{KernelListCacheTtlSecs: 30},
GcsRetries: GcsRetriesConfig{ChunkTransferTimeoutSecs: 15},
},
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -326,6 +341,19 @@ func TestValidateConfig_ErrorScenarios(t *testing.T) {
},
},
},
{
name: "chunk_transfer_timeout_in_negative",
config: &Config{
Logging: LoggingConfig{LogRotate: validLogRotateConfig()},
FileCache: validFileCacheConfig(t),
MetadataCache: MetadataCacheConfig{
ExperimentalMetadataPrefetchOnMount: "sync",
},
GcsRetries: GcsRetriesConfig{
ChunkTransferTimeoutSecs: -5,
},
},
},
}

for _, tc := range testCases {
Expand Down
17 changes: 12 additions & 5 deletions cmd/config_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,8 +693,8 @@ func TestValidateConfigFile_MetadataCacheConfigSuccessful(t *testing.T) {
}
}

func TestValidateConfigFile_ReadStallConfigSuccessful(t *testing.T) {
testCases := []struct {
func TestValidateConfigFile_GCSRetries(t *testing.T) {
tests := []struct {
name string
configFile string
expectedConfig *cfg.Config
Expand All @@ -705,6 +705,10 @@ func TestValidateConfigFile_ReadStallConfigSuccessful(t *testing.T) {
configFile: "testdata/empty_file.yaml",
expectedConfig: &cfg.Config{
GcsRetries: cfg.GcsRetriesConfig{
ChunkTransferTimeoutSecs: 10,
MaxRetryAttempts: 0,
MaxRetrySleep: 30 * time.Second,
Multiplier: 2,
ReadStall: cfg.ReadStallGcsRetriesConfig{
Enable: false,
MinReqTimeout: 1500 * time.Millisecond,
Expand All @@ -721,6 +725,10 @@ func TestValidateConfigFile_ReadStallConfigSuccessful(t *testing.T) {
configFile: "testdata/valid_config.yaml",
expectedConfig: &cfg.Config{
GcsRetries: cfg.GcsRetriesConfig{
ChunkTransferTimeoutSecs: 20,
MaxRetryAttempts: 0,
MaxRetrySleep: 30 * time.Second,
Multiplier: 2,
ReadStall: cfg.ReadStallGcsRetriesConfig{
Enable: true,
MinReqTimeout: 10 * time.Second,
Expand All @@ -733,13 +741,12 @@ func TestValidateConfigFile_ReadStallConfigSuccessful(t *testing.T) {
},
},
}

for _, tc := range testCases {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
gotConfig, err := getConfigObjectWithConfigFile(t, tc.configFile)

if assert.NoError(t, err) {
assert.EqualValues(t, tc.expectedConfig.GcsRetries.ReadStall, gotConfig.GcsRetries.ReadStall)
assert.EqualValues(t, tc.expectedConfig.GcsRetries, gotConfig.GcsRetries)
}
})
}
Expand Down
46 changes: 46 additions & 0 deletions cmd/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1017,3 +1017,49 @@ func TestArgsParsing_MetadataCacheFlags(t *testing.T) {
})
}
}

func TestArgParsing_GCSRetries(t *testing.T) {
tests := []struct {
name string
args []string
expectedConfig *cfg.Config
}{
{
name: "Test with non default chunkTransferTimeout",
args: []string{"gcsfuse", "--chunk-transfer-timeout-secs=30", "abc", "pqr"},
expectedConfig: &cfg.Config{
GcsRetries: cfg.GcsRetriesConfig{
ChunkTransferTimeoutSecs: 30,
MaxRetryAttempts: 0,
MaxRetrySleep: 30 * time.Second,
Multiplier: 2,
ReadStall: cfg.ReadStallGcsRetriesConfig{
Enable: false,
InitialReqTimeout: 20 * time.Second,
MinReqTimeout: 1500 * time.Millisecond,
MaxReqTimeout: 1200 * time.Second,
ReqIncreaseRate: 15,
ReqTargetPercentile: 0.99,
},
},
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
var gotConfig *cfg.Config
cmd, err := newRootCmd(func(cfg *cfg.Config, _, _ string) error {
gotConfig = cfg
return nil
})
require.Nil(t, err)
cmd.SetArgs(convertToPosixArgs(tc.args, cmd))

err = cmd.Execute()

if assert.NoError(t, err) {
assert.Equal(t, tc.expectedConfig.GcsRetries, gotConfig.GcsRetries)
}
})
}
}
1 change: 1 addition & 0 deletions cmd/testdata/valid_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ gcs-connection:
max-idle-conns-per-host: 20
sequential-read-size-mb: 450
gcs-retries:
chunk-transfer-timeout-secs: 20
read-stall:
enable: true
min-req-timeout: 10s
Expand Down

0 comments on commit 94a9357

Please sign in to comment.