Skip to content

Commit

Permalink
Use an interface to cleanly switch between different implementations …
Browse files Browse the repository at this point in the history
…of metrics (#2699)

* Use an interface to cleanly switch between different implementations of
metrics

* Instead of using package method calls, call methods on metricHandle
  which is more object-oriented and enhances testability.
* Use the Strategy design pattern to switch between OC, OTel
  and no-op implementations of metrics.
* Currently only OpenCensus implementation is complete. OTel support will be
  implemented as a follow-up.
* Extracting metrics related code in one place helps with a better
  segregation of concerns.
  • Loading branch information
kislaykishore authored Dec 4, 2024
1 parent febf169 commit 380cff3
Show file tree
Hide file tree
Showing 41 changed files with 568 additions and 437 deletions.
5 changes: 5 additions & 0 deletions cfg/config_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,8 @@ func ListCacheTTLSecsToDuration(secs int64) time.Duration {

return time.Duration(secs * int64(time.Second))
}

// IsMetricsEnabled returns true if metrics are enabled.
func IsMetricsEnabled(c *MetricsConfig) bool {
return c.CloudMetricsExportIntervalSecs > 0 || c.PrometheusPort > 0
}
22 changes: 22 additions & 0 deletions cfg/config_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,25 @@ func TestIsTracingEnabled(t *testing.T) {
})
}
}

func TestIsMetricsEnabled(t *testing.T) {
t.Parallel()
var testCases = []struct {
testName string
m *MetricsConfig
enabled bool
}{
{"cloud_metrics_export_interval_set", &MetricsConfig{CloudMetricsExportIntervalSecs: 100}, true},
{"prom_port_set", &MetricsConfig{PrometheusPort: 10000}, true},
{"none_set", &MetricsConfig{CloudMetricsExportIntervalSecs: 0, PrometheusPort: 0}, false},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.testName, func(t *testing.T) {
t.Parallel()

assert.Equal(t, tc.enabled, IsMetricsEnabled(tc.m))
})
}
}
21 changes: 14 additions & 7 deletions cmd/legacy_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func createStorageHandle(newConfig *cfg.Config, userAgent string) (storageHandle
////////////////////////////////////////////////////////////////////////

// Mount the file system according to arguments in the supplied context.
func mountWithArgs(bucketName string, mountPoint string, newConfig *cfg.Config) (mfs *fuse.MountedFileSystem, err error) {
func mountWithArgs(bucketName string, mountPoint string, newConfig *cfg.Config, metricHandle common.MetricHandle) (mfs *fuse.MountedFileSystem, err error) {
// Enable invariant checking if requested.
if newConfig.Debug.ExitOnInvariantViolation {
locker.EnableInvariantsCheck()
Expand Down Expand Up @@ -176,7 +176,8 @@ func mountWithArgs(bucketName string, mountPoint string, newConfig *cfg.Config)
bucketName,
mountPoint,
newConfig,
storageHandle)
storageHandle,
metricHandle)

if err != nil {
err = fmt.Errorf("mountWithStorageHandle: %w", err)
Expand Down Expand Up @@ -373,10 +374,16 @@ func Mount(newConfig *cfg.Config, bucketName, mountPoint string) (err error) {

ctx := context.Background()
var metricExporterShutdownFn common.ShutdownFn
if newConfig.Metrics.EnableOtel {
metricExporterShutdownFn = monitor.SetupOTelMetricExporters(ctx, newConfig)
} else {
metricExporterShutdownFn = monitor.SetupOpenCensusExporters(newConfig)
metricHandle := common.NewNoopMetrics()
if cfg.IsMetricsEnabled(&newConfig.Metrics) {
if newConfig.Metrics.EnableOtel {
metricExporterShutdownFn = monitor.SetupOTelMetricExporters(ctx, newConfig)
} else {
metricExporterShutdownFn = monitor.SetupOpenCensusExporters(newConfig)
if metricHandle, err = common.NewOCMetrics(); err != nil {
metricHandle = common.NewNoopMetrics()
}
}
}
shutdownTracingFn := monitor.SetupTracing(ctx, newConfig)
shutdownFn := common.JoinShutdownFunc(metricExporterShutdownFn, shutdownTracingFn)
Expand All @@ -385,7 +392,7 @@ func Mount(newConfig *cfg.Config, bucketName, mountPoint string) (err error) {
// daemonize gives us and telling it about the outcome.
var mfs *fuse.MountedFileSystem
{
mfs, err = mountWithArgs(bucketName, mountPoint, newConfig)
mfs, err = mountWithArgs(bucketName, mountPoint, newConfig, metricHandle)

// This utility is to absorb the error
// returned by daemonize.SignalOutcome calls by simply
Expand Down
7 changes: 5 additions & 2 deletions cmd/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/googlecloudplatform/gcsfuse/v2/cfg"
"github.com/googlecloudplatform/gcsfuse/v2/common"
"github.com/googlecloudplatform/gcsfuse/v2/internal/mount"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage"
"golang.org/x/net/context"
Expand All @@ -40,7 +41,8 @@ func mountWithStorageHandle(
bucketName string,
mountPoint string,
newConfig *cfg.Config,
storageHandle storage.StorageHandle) (mfs *fuse.MountedFileSystem, err error) {
storageHandle storage.StorageHandle,
metricHandle common.MetricHandle) (mfs *fuse.MountedFileSystem, err error) {
// Sanity check: make sure the temporary directory exists and is writable
// currently. This gives a better user experience than harder to debug EIO
// errors when reading files in the future.
Expand Down Expand Up @@ -91,7 +93,7 @@ be interacting with the file system.`)
OpRateLimitHz: newConfig.GcsConnection.LimitOpsPerSec,
StatCacheMaxSizeMB: uint64(newConfig.MetadataCache.StatCacheMaxSizeMb),
StatCacheTTL: time.Duration(newConfig.MetadataCache.TtlSecs) * time.Second,
EnableMonitoring: newConfig.Metrics.StackdriverExportInterval > 0 || newConfig.Metrics.PrometheusPort != 0,
EnableMonitoring: cfg.IsMetricsEnabled(&newConfig.Metrics),
AppendThreshold: 1 << 21, // 2 MiB, a total guess.
TmpObjectPrefix: ".gcsfuse_tmp/",
}
Expand All @@ -115,6 +117,7 @@ be interacting with the file system.`)
SequentialReadSizeMb: int32(newConfig.GcsConnection.SequentialReadSizeMb),
EnableNonexistentTypeCache: newConfig.MetadataCache.EnableNonexistentTypeCache,
NewConfig: newConfig,
MetricHandle: metricHandle,
}

logger.Infof("Creating a new server...\n")
Expand Down
39 changes: 39 additions & 0 deletions common/noop_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import "context"

func NewNoopMetrics() MetricHandle {
var n noopMetrics
return &n
}

type noopMetrics struct{}

func (*noopMetrics) GCSReadBytesCount(_ context.Context, _ int64, _ []MetricAttr) {}
func (*noopMetrics) GCSReaderCount(_ context.Context, _ int64, _ []MetricAttr) {}
func (*noopMetrics) GCSRequestCount(_ context.Context, _ int64, _ []MetricAttr) {}
func (*noopMetrics) GCSRequestLatency(_ context.Context, value float64, _ []MetricAttr) {}
func (*noopMetrics) GCSReadCount(_ context.Context, _ int64, _ []MetricAttr) {}
func (*noopMetrics) GCSDownloadBytesCount(_ context.Context, _ int64, _ []MetricAttr) {}

func (*noopMetrics) OpsCount(_ context.Context, _ int64, _ []MetricAttr) {}
func (*noopMetrics) OpsLatency(_ context.Context, value float64, _ []MetricAttr) {}
func (*noopMetrics) OpsErrorCount(_ context.Context, _ int64, _ []MetricAttr) {}

func (*noopMetrics) FileCacheReadCount(_ context.Context, _ int64, _ []MetricAttr) {}
func (*noopMetrics) FileCacheReadBytesCount(_ context.Context, _ int64, _ []MetricAttr) {}
func (*noopMetrics) FileCacheReadLatency(_ context.Context, value float64, _ []MetricAttr) {}
234 changes: 234 additions & 0 deletions common/oc_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import (
"context"
"fmt"

"github.com/googlecloudplatform/gcsfuse/v2/internal/logger"
"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)

const (
// IOMethod annotates the event that opens or closes a connection or file.
IOMethod = "io_method"

// GCSMethod annotates the method called in the GCS client library.
GCSMethod = "gcs_method"

// FSOp annotates the file system op processed.
FSOp = "fs_op"

// FSErrCategory reduces the cardinality of FSError by grouping errors together.
FSErrCategory = "fs_error_category"

// ReadType annotates the read operation with the type - Sequential/Random
ReadType = "read_type"

// CacheHit annotates the read operation from file cache with true or false.
CacheHit = "cache_hit"
)

type ocMetrics struct {
// GCS measures
gcsReadBytesCount *stats.Int64Measure
gcsReaderCount *stats.Int64Measure
gcsRequestCount *stats.Int64Measure
gcsRequestLatency *stats.Float64Measure
gcsReadCount *stats.Int64Measure
gcsDownloadBytesCount *stats.Int64Measure

// Ops measures
opsCount *stats.Int64Measure
opsErrorCount *stats.Int64Measure
opsLatency *stats.Float64Measure

// File cache measures
fileCacheReadCount *stats.Int64Measure
fileCacheReadBytesCount *stats.Int64Measure
fileCacheReadLatency *stats.Float64Measure
}

func attrsToTags(attrs []MetricAttr) []tag.Mutator {
mutators := make([]tag.Mutator, 0, len(attrs))
for _, attr := range attrs {
mutators = append(mutators, tag.Upsert(tag.MustNewKey(attr.Key), attr.Value))
}
return mutators
}
func (o *ocMetrics) GCSReadBytesCount(ctx context.Context, inc int64, attrs []MetricAttr) {
recordOCMetric(ctx, o.gcsReadBytesCount, inc, attrs, "GCS read bytes count")
}

func (o *ocMetrics) GCSReaderCount(ctx context.Context, inc int64, attrs []MetricAttr) {
recordOCMetric(ctx, o.gcsReaderCount, inc, attrs, "GCS reader count")
}

func (o *ocMetrics) GCSRequestCount(ctx context.Context, inc int64, attrs []MetricAttr) {
recordOCMetric(ctx, o.gcsRequestCount, inc, attrs, "GCS request count")
}

func (o *ocMetrics) GCSRequestLatency(ctx context.Context, value float64, attrs []MetricAttr) {
recordOCLatencyMetric(ctx, o.gcsRequestLatency, value, attrs, "GCS request latency")
}
func (o *ocMetrics) GCSReadCount(ctx context.Context, inc int64, attrs []MetricAttr) {
recordOCMetric(ctx, o.gcsReadCount, inc, attrs, "GCS read count")
}
func (o *ocMetrics) GCSDownloadBytesCount(ctx context.Context, inc int64, attrs []MetricAttr) {
recordOCMetric(ctx, o.gcsDownloadBytesCount, inc, attrs, "GCS download bytes count")
}

func (o *ocMetrics) OpsCount(ctx context.Context, inc int64, attrs []MetricAttr) {
recordOCMetric(ctx, o.opsCount, inc, attrs, "file system op count")
}
func (o *ocMetrics) OpsLatency(ctx context.Context, value float64, attrs []MetricAttr) {
recordOCLatencyMetric(ctx, o.opsLatency, value, attrs, "file system op latency")
}
func (o *ocMetrics) OpsErrorCount(ctx context.Context, inc int64, attrs []MetricAttr) {
recordOCMetric(ctx, o.opsErrorCount, inc, attrs, "file system op error count")
}

func (o *ocMetrics) FileCacheReadCount(ctx context.Context, inc int64, attrs []MetricAttr) {
recordOCMetric(ctx, o.fileCacheReadCount, inc, attrs, "file cache read count")
}
func (o *ocMetrics) FileCacheReadBytesCount(ctx context.Context, inc int64, attrs []MetricAttr) {
recordOCMetric(ctx, o.fileCacheReadBytesCount, inc, attrs, "file cache read bytes count")
}
func (o *ocMetrics) FileCacheReadLatency(ctx context.Context, value float64, attrs []MetricAttr) {
recordOCLatencyMetric(ctx, o.fileCacheReadLatency, value, attrs, "file cache read latency")
}

func recordOCMetric(ctx context.Context, m *stats.Int64Measure, inc int64, attrs []MetricAttr, metricStr string) {
if err := stats.RecordWithTags(
ctx,
attrsToTags(attrs),
m.M(inc),
); err != nil {
logger.Errorf("Cannot record %s: %v: %v", metricStr, attrs, err)
}
}

func recordOCLatencyMetric(ctx context.Context, m *stats.Float64Measure, inc float64, attrs []MetricAttr, metricStr string) {
if err := stats.RecordWithTags(
ctx,
attrsToTags(attrs),
m.M(inc),
); err != nil {
logger.Errorf("Cannot record %s: %v: %v", metricStr, attrs, err)
}
}

func NewOCMetrics() (MetricHandle, error) {
gcsReadBytesCount := stats.Int64("gcs/read_bytes_count", "The number of bytes read from GCS objects.", stats.UnitBytes)
gcsReaderCount := stats.Int64("gcs/reader_count", "The number of GCS object readers opened or closed.", stats.UnitDimensionless)
gcsRequestCount := stats.Int64("gcs/request_count", "The number of GCS requests processed.", stats.UnitDimensionless)
gcsRequestLatency := stats.Float64("gcs/request_latency", "The latency of a GCS request.", stats.UnitMilliseconds)
gcsReadCount := stats.Int64("gcs/read_count", "Specifies the number of gcs reads made along with type - Sequential/Random", stats.UnitDimensionless)
gcsDownloadBytesCount := stats.Int64("gcs/download_bytes_count", "The cumulative number of bytes downloaded from GCS along with type - Sequential/Random", stats.UnitBytes)

opsCount := stats.Int64("fs/ops_count", "The number of ops processed by the file system.", stats.UnitDimensionless)
opsLatency := stats.Float64("fs/ops_latency", "The latency of a file system operation.", "us")
opsErrorCount := stats.Int64("fs/ops_error_count", "The number of errors generated by file system operation.", stats.UnitDimensionless)

fileCacheReadCount := stats.Int64("file_cache/read_count", "Specifies the number of read requests made via file cache along with type - Sequential/Random and cache hit - true/false", stats.UnitDimensionless)
fileCacheReadBytesCount := stats.Int64("file_cache/read_bytes_count", "The cumulative number of bytes read from file cache along with read type - Sequential/Random", stats.UnitBytes)
fileCacheReadLatency := stats.Float64("file_cache/read_latency", "Latency of read from file cache along with cache hit - true/false", "us")
// OpenCensus views (aggregated measures)
if err := view.Register(
&view.View{
Name: "gcs/read_bytes_count",
Measure: gcsReadBytesCount,
Description: "The cumulative number of bytes read from GCS objects.",
Aggregation: view.Sum(),
},
&view.View{
Name: "gcs/reader_count",
Measure: gcsReaderCount,
Description: "The cumulative number of GCS object readers opened or closed.",
Aggregation: view.Sum(),
TagKeys: []tag.Key{tag.MustNewKey(IOMethod)},
},
&view.View{
Name: "gcs/request_count",
Measure: gcsRequestCount,
Description: "The cumulative number of GCS requests processed.",
Aggregation: view.Sum(),
TagKeys: []tag.Key{tag.MustNewKey(GCSMethod)},
},
&view.View{
Name: "gcs/request_latencies",
Measure: gcsRequestLatency,
Description: "The cumulative distribution of the GCS request latencies.",
Aggregation: ochttp.DefaultLatencyDistribution,
TagKeys: []tag.Key{tag.MustNewKey(GCSMethod)},
},
&view.View{
Name: "gcs/read_count",
Measure: gcsReadCount,
Description: "Specifies the number of gcs reads made along with type - Sequential/Random",
Aggregation: view.Sum(),
TagKeys: []tag.Key{tag.MustNewKey(ReadType)},
},
&view.View{
Name: "gcs/download_bytes_count",
Measure: gcsDownloadBytesCount,
Description: "The cumulative number of bytes downloaded from GCS along with type - Sequential/Random",
Aggregation: view.Sum(),
TagKeys: []tag.Key{tag.MustNewKey(ReadType)},
},
&view.View{
Name: "fs/ops_count",
Measure: opsCount,
Description: "The cumulative number of ops processed by the file system.",
Aggregation: view.Sum(),
TagKeys: []tag.Key{tag.MustNewKey(FSOp)},
},
&view.View{
Name: "fs/ops_error_count",
Measure: opsErrorCount,
Description: "The cumulative number of errors generated by file system operations",
Aggregation: view.Sum(),
TagKeys: []tag.Key{tag.MustNewKey(FSOp), tag.MustNewKey(FSErrCategory)},
},
&view.View{
Name: "fs/ops_latency",
Measure: opsLatency,
Description: "The cumulative distribution of file system operation latencies",
Aggregation: ochttp.DefaultLatencyDistribution,
TagKeys: []tag.Key{tag.MustNewKey(FSOp)},
}); err != nil {
return nil, fmt.Errorf("failed to register OpenCensus metrics for GCS client library: %w", err)
}
return &ocMetrics{
gcsReadBytesCount: gcsReadBytesCount,
gcsReaderCount: gcsReaderCount,
gcsRequestCount: gcsRequestCount,
gcsRequestLatency: gcsRequestLatency,
gcsReadCount: gcsReadCount,
gcsDownloadBytesCount: gcsDownloadBytesCount,

opsCount: opsCount,
opsErrorCount: opsErrorCount,
opsLatency: opsLatency,

fileCacheReadCount: fileCacheReadCount,
fileCacheReadBytesCount: fileCacheReadBytesCount,
fileCacheReadLatency: fileCacheReadLatency,
}, nil
}
Loading

0 comments on commit 380cff3

Please sign in to comment.