Skip to content

Commit

Permalink
Merge pull request #133753 from cockroachdb/blathers/backport-staging…
Browse files Browse the repository at this point in the history
…-v23.2.14-132953

staging-v23.2.14: cloud: add WithClientName option
  • Loading branch information
celiala authored Oct 30, 2024
2 parents 499621c + c5581d8 commit de12bb8
Show file tree
Hide file tree
Showing 15 changed files with 102 additions and 48 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func runBackupProcessor(
progCh: progCh,
settings: &flowCtx.Cfg.Settings.SV,
}
storage, err := flowCtx.Cfg.ExternalStorage(ctx, dest)
storage, err := flowCtx.Cfg.ExternalStorage(ctx, dest, cloud.WithClientName("backup"))
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,8 @@ func makeCloudStorageSink(

// We make the external storage with a nil IOAccountingInterceptor since we
// record usage metrics via s.metrics.
if s.es, err = makeExternalStorageFromURI(ctx, u.String(), user, cloud.WithIOAccountingInterceptor(nil)); err != nil {
s.es, err = makeExternalStorageFromURI(ctx, u.String(), user, cloud.WithIOAccountingInterceptor(nil), cloud.WithClientName("cdc"))
if err != nil {
return nil, err
}
if mb != nil && s.es != nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/ccl/changefeedccl/sink_cloudstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ func TestCloudStorageSink(t *testing.T) {
clientFactory := blobs.TestBlobServiceClient(settings.ExternalIODir)
externalStorageFromURI := func(ctx context.Context, uri string, user username.SQLUsername, opts ...cloud.ExternalStorageOption) (cloud.ExternalStorage,
error) {
var options cloud.ExternalStorageOptions
for _, opt := range opts {
opt(&options)
}
require.Equal(t, options.ClientName, "cdc")

return cloud.ExternalStorageFromURI(ctx, uri, base.ExternalIODirConfig{}, settings,
clientFactory,
user,
Expand Down
1 change: 1 addition & 0 deletions pkg/cloud/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ go_test(
name = "cloud_test",
srcs = [
"cloud_io_test.go",
"options_test.go",
"uris_test.go",
],
args = ["-test.timeout=295s"],
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloud/amazon/aws_kms.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func MakeAWSKMS(ctx context.Context, uri string, env cloud.KMSEnv) (cloud.KMS, e
// situation is.
region = "default-region"
}
client, err := cloud.MakeHTTPClient(env.ClusterSettings(), cloud.NilMetrics, "aws", "KMS")
client, err := cloud.MakeHTTPClient(env.ClusterSettings(), cloud.NilMetrics, "aws", "KMS", "")
if err != nil {
return nil, err
}
Expand Down
55 changes: 27 additions & 28 deletions pkg/cloud/amazon/s3_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,13 @@ var NightlyEnvVarKMSParams = map[string]string{
}

type s3Storage struct {
bucket *string
conf *cloudpb.ExternalStorage_S3
ioConf base.ExternalIODirConfig
settings *cluster.Settings
prefix string
metrics *cloud.Metrics
bucket *string
conf *cloudpb.ExternalStorage_S3
ioConf base.ExternalIODirConfig
settings *cluster.Settings
prefix string
metrics *cloud.Metrics
storageOptions cloud.ExternalStorageOptions

opts s3ClientConfig
cached *s3Client
Expand Down Expand Up @@ -474,7 +475,7 @@ func MakeS3Storage(
// other callers from making clients in the meantime, not just to avoid making
// duplicate clients in a race but also because making clients concurrently
// can fail if the AWS metadata server hits its rate limit.
client, _, err := newClient(ctx, args.MetricsRecorder, s.opts, s.settings)
client, _, err := s.newClient(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -504,11 +505,9 @@ var awsVerboseLogging = aws.LogLevel(aws.LogDebugWithRequestRetries | aws.LogDeb
// config's region is empty, used the passed bucket to determine a region and
// configures the client with it as well as returning it (so the caller can
// remember it for future calls).
func newClient(
ctx context.Context, metrics *cloud.Metrics, conf s3ClientConfig, settings *cluster.Settings,
) (s3Client, string, error) {
func (s *s3Storage) newClient(ctx context.Context) (s3Client, string, error) {
// Open a span if client creation will do IO/RPCs to find creds/bucket region.
if conf.region == "" || conf.auth == cloud.AuthParamImplicit {
if s.opts.region == "" || s.opts.auth == cloud.AuthParamImplicit {
var sp *tracing.Span
ctx, sp = tracing.ChildSpan(ctx, "s3.newClient")
defer sp.Finish()
Expand All @@ -517,22 +516,22 @@ func newClient(
opts := session.Options{}

{
httpClient, err := cloud.MakeHTTPClient(settings, metrics, "aws", conf.bucket)
httpClient, err := cloud.MakeHTTPClient(s.settings, s.metrics, "aws", s.opts.bucket, s.storageOptions.ClientName)
if err != nil {
return s3Client{}, "", err
}
opts.Config.HTTPClient = httpClient
}

if conf.endpoint != "" {
opts.Config.Endpoint = aws.String(conf.endpoint)
if s.opts.endpoint != "" {
opts.Config.Endpoint = aws.String(s.opts.endpoint)
opts.Config.S3ForcePathStyle = aws.Bool(true)

if conf.region == "" {
conf.region = "default-region"
if s.opts.region == "" {
s.opts.region = "default-region"
}

client, err := cloud.MakeHTTPClient(settings, metrics, "aws", conf.bucket)
client, err := cloud.MakeHTTPClient(s.settings, s.metrics, "aws", s.opts.bucket, s.storageOptions.ClientName)
if err != nil {
return s3Client{}, "", err
}
Expand All @@ -545,7 +544,7 @@ func newClient(
opts.Config.CredentialsChainVerboseErrors = aws.Bool(true)

opts.Config.Logger = newLogAdapter(ctx)
if conf.verbose {
if s.opts.verbose {
opts.Config.LogLevel = awsVerboseLogging
}

Expand All @@ -559,13 +558,13 @@ func newClient(
var sess *session.Session
var err error

switch conf.auth {
switch s.opts.auth {
case "", cloud.AuthParamSpecified:
sess, err = session.NewSessionWithOptions(opts)
if err != nil {
return s3Client{}, "", errors.Wrap(err, "new aws session")
}
sess.Config.Credentials = credentials.NewStaticCredentials(conf.accessKey, conf.secret, conf.tempToken)
sess.Config.Credentials = credentials.NewStaticCredentials(s.opts.accessKey, s.opts.secret, s.opts.tempToken)
case cloud.AuthParamImplicit:
opts.SharedConfigState = session.SharedConfigEnable
sess, err = session.NewSessionWithOptions(opts)
Expand All @@ -574,8 +573,8 @@ func newClient(
}
}

if conf.assumeRoleProvider.roleARN != "" {
for _, delegateProvider := range conf.delegateRoleProviders {
if s.opts.assumeRoleProvider.roleARN != "" {
for _, delegateProvider := range s.opts.delegateRoleProviders {
intermediateCreds := stscreds.NewCredentials(sess, delegateProvider.roleARN, withExternalID(delegateProvider.externalID))
opts.Config.Credentials = intermediateCreds

Expand All @@ -585,18 +584,18 @@ func newClient(
}
}

creds := stscreds.NewCredentials(sess, conf.assumeRoleProvider.roleARN, withExternalID(conf.assumeRoleProvider.externalID))
creds := stscreds.NewCredentials(sess, s.opts.assumeRoleProvider.roleARN, withExternalID(s.opts.assumeRoleProvider.externalID))
opts.Config.Credentials = creds
sess, err = session.NewSessionWithOptions(opts)
if err != nil {
return s3Client{}, "", errors.Wrap(err, "session with assume role credentials")
}
}

region := conf.region
region := s.opts.region
if region == "" {
if err := cloud.DelayedRetry(ctx, "s3manager.GetBucketRegion", s3ErrDelay, func() error {
region, err = s3manager.GetBucketRegion(ctx, sess, conf.bucket, "us-east-1")
region, err = s3manager.GetBucketRegion(ctx, sess, s.opts.bucket, "us-east-1")
return err
}); err != nil {
return s3Client{}, "", errors.Wrap(err, "could not find s3 bucket's region")
Expand All @@ -606,7 +605,7 @@ func newClient(

c := s3.New(sess)
u := s3manager.NewUploader(sess, func(uploader *s3manager.Uploader) {
uploader.PartSize = cloud.WriteChunkSize.Get(&settings.SV)
uploader.PartSize = cloud.WriteChunkSize.Get(&s.settings.SV)
})
return s3Client{client: c, uploader: u}, region, nil
}
Expand All @@ -615,7 +614,7 @@ func (s *s3Storage) getClient(ctx context.Context) (*s3.S3, error) {
if s.cached != nil {
return s.cached.client, nil
}
client, region, err := newClient(ctx, s.metrics, s.opts, s.settings)
client, region, err := s.newClient(ctx)
if err != nil {
return nil, err
}
Expand All @@ -629,7 +628,7 @@ func (s *s3Storage) getUploader(ctx context.Context) (*s3manager.Uploader, error
if s.cached != nil {
return s.cached.uploader, nil
}
client, region, err := newClient(ctx, s.metrics, s.opts, s.settings)
client, region, err := s.newClient(ctx)
if err != nil {
return nil, err
}
Expand Down
12 changes: 8 additions & 4 deletions pkg/cloud/amazon/s3_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,11 +598,15 @@ func TestNewClientErrorsOnBucketRegion(t *testing.T) {

testSettings := cluster.MakeTestingClusterSettings()
ctx := context.Background()
cfg := s3ClientConfig{
bucket: "bucket-does-not-exist-v1i3m",
auth: cloud.AuthParamImplicit,
s3 := s3Storage{
opts: s3ClientConfig{
bucket: "bucket-does-not-exist-v1i3m",
auth: cloud.AuthParamImplicit,
},
metrics: cloud.NilMetrics,
settings: testSettings,
}
_, _, err = newClient(ctx, cloud.NilMetrics, cfg, testSettings)
_, _, err = s3.newClient(ctx)
require.Regexp(t, "could not find s3 bucket's region", err)
}

Expand Down
8 changes: 2 additions & 6 deletions pkg/cloud/azure/azure_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ func makeAzureStorage(
return nil, errors.Wrap(err, "azure: account name is not valid")
}

t, err := cloud.MakeHTTPClient(args.Settings, args.MetricsRecorder, "azure", dest.AzureConfig.Container)
options := args.ExternalStorageOptions()
t, err := cloud.MakeHTTPClient(args.Settings, args.MetricsRecorder, "azure", dest.AzureConfig.Container, options.ClientName)
if err != nil {
return nil, errors.Wrap(err, "azure: unable to create transport")
}
Expand Down Expand Up @@ -260,11 +261,6 @@ func makeAzureStorage(
"implicit credentials disallowed for azure due to --external-io-disable-implicit-credentials flag")
}

options := cloud.ExternalStorageOptions{}
for _, o := range args.Options {
o(&options)
}

defaultCredentialsOptions := &DefaultAzureCredentialWithFileOptions{}
if knobs := options.AzureStorageTestingKnobs; knobs != nil {
defaultCredentialsOptions.testingKnobs = knobs.(*TestingKnobs)
Expand Down
8 changes: 4 additions & 4 deletions pkg/cloud/cloud_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ var httpMetrics = settings.RegisterBoolSetting(
// MakeHTTPClient makes an http client configured with the common settings used
// for interacting with cloud storage (timeouts, retries, CA certs, etc).
func MakeHTTPClient(
settings *cluster.Settings, metrics *Metrics, cloud, bucket string,
settings *cluster.Settings, metrics *Metrics, cloud, bucket, client string,
) (*http.Client, error) {
t, err := MakeTransport(settings, metrics, cloud, bucket)
t, err := MakeTransport(settings, metrics, cloud, bucket, client)
if err != nil {
return nil, err
}
Expand All @@ -104,7 +104,7 @@ func MakeHTTPClientForTransport(t http.RoundTripper) (*http.Client, error) {
// used for interacting with cloud storage (timeouts, retries, CA certs, etc).
// Prefer MakeHTTPClient where possible.
func MakeTransport(
settings *cluster.Settings, metrics *Metrics, cloud, bucket string,
settings *cluster.Settings, metrics *Metrics, cloud, bucket, client string,
) (*http.Transport, error) {
var tlsConf *tls.Config
if pem := httpCustomCA.Get(&settings.SV); pem != "" {
Expand All @@ -123,7 +123,7 @@ func MakeTransport(
// Add our custom CA.
t.TLSClientConfig = tlsConf
if metrics != nil {
t.DialContext = metrics.NetMetrics.Wrap(t.DialContext, cloud, bucket)
t.DialContext = metrics.NetMetrics.Wrap(t.DialContext, cloud, bucket, client)
}
return t, nil
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/cloud/external_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,22 @@ type ExternalStorageContext struct {
MetricsRecorder *Metrics
}

// ExternalStorageOptions rolls up the Options into a struct.
func (e *ExternalStorageContext) ExternalStorageOptions() ExternalStorageOptions {
var options ExternalStorageOptions
for _, option := range e.Options {
option(&options)
}
return options
}

// ExternalStorageOptions holds dependencies and values that can be
// overridden by callers of an ExternalStorageFactory via a passed
// ExternalStorageOption.
type ExternalStorageOptions struct {
ioAccountingInterceptor ReadWriterInterceptor
AzureStorageTestingKnobs base.ModuleTestingKnobs
ClientName string
}

// ExternalStorageConstructor is a function registered to create instances
Expand Down
3 changes: 2 additions & 1 deletion pkg/cloud/gcp/gcs_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ func makeGCSStorage(
opts = append(opts, assumeOpt)
}

baseTransport, err := cloud.MakeTransport(args.Settings, args.MetricsRecorder, "gcs", conf.Bucket)
clientName := args.ExternalStorageOptions().ClientName
baseTransport, err := cloud.MakeTransport(args.Settings, args.MetricsRecorder, "gcs", conf.Bucket, clientName)
if err != nil {
return nil, errors.Wrap(err, "failed to create http transport")
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/cloud/httpsink/http_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ func MakeHTTPStorage(
return nil, errors.Errorf("HTTP storage requested but prefix path not provided")
}

client, err := cloud.MakeHTTPClient(args.Settings, args.MetricsRecorder, "http", base)
clientName := args.ExternalStorageOptions().ClientName
client, err := cloud.MakeHTTPClient(args.Settings, args.MetricsRecorder, "http", base, clientName)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloud/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func MakeMetrics(cidrLookup *cidr.Lookup) *Metrics {
ConnsOpened: metric.NewCounter(connsOpened),
ConnsReused: metric.NewCounter(connsReused),
TLSHandhakes: metric.NewCounter(tlsHandhakes),
NetMetrics: cidrLookup.MakeNetMetrics(cloudWriteBytes, cloudReadBytes, "cloud", "bucket"),
NetMetrics: cidrLookup.MakeNetMetrics(cloudWriteBytes, cloudReadBytes, "cloud", "bucket", "client"),
}
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/cloud/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,10 @@ func WithAzureStorageTestingKnobs(knobs base.ModuleTestingKnobs) ExternalStorage
opts.AzureStorageTestingKnobs = knobs
}
}

// WithClientName sets the "client" label on network metrics.
func WithClientName(name string) ExternalStorageOption {
return func(opts *ExternalStorageOptions) {
opts.ClientName = name
}
}
28 changes: 28 additions & 0 deletions pkg/cloud/options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package cloud

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestClientName(t *testing.T) {
options := func(options ...ExternalStorageOption) ExternalStorageOptions {
context := ExternalStorageContext{
Options: options,
}
return context.ExternalStorageOptions()
}
require.Empty(t, options().ClientName)
require.Equal(t, options(WithClientName("this-is-the-name")).ClientName, "this-is-the-name")
}

0 comments on commit de12bb8

Please sign in to comment.