Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding metrics support in warp test #30

Merged
merged 2 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions benchmark-script/read_operation/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
"strconv"
"syscall"
"time"
"context"
"go.opencensus.io/stats"

"golang.org/x/sync/errgroup"
)
Expand All @@ -26,7 +28,7 @@

eG errgroup.Group

OneKB = 1024

Check failure on line 31 in benchmark-script/read_operation/main.go

View workflow job for this annotation

GitHub Actions / audit

exported var OneKB should have comment or be unexported

fNumberOfRead = flag.Int("read-count", 1, "number of read iteration")

Expand All @@ -53,7 +55,7 @@
}

// Expect file is already opened, otherwise throws error
func readAlreadyOpenedFile(index int) (err error) {
func readAlreadyOpenedFile(ctx context.Context, index int) (err error) {
b := make([]byte, *fBlockSizeKB*1024)
for i := 0; i < *fNumberOfRead; i++ {
readStart := time.Now()
Expand All @@ -74,6 +76,7 @@
}

readLatency := time.Since(readStart)
stats.Record(ctx, readLatencyStat.M(float64(readLatency.Milliseconds())))

throughput := float64(*fFileSizeMB) / readLatency.Seconds()
gResult.Append(readLatency.Seconds(), throughput)
Expand All @@ -98,7 +101,7 @@
return pattern
}

func randReadAlreadyOpenedFile(index int) (err error) {
func randReadAlreadyOpenedFile(ctx context.Context, index int) (err error) {
pattern := getRandReadPattern()
b := make([]byte, *fBlockSizeKB*1024)
for i := 0; i < *fNumberOfRead; i++ {
Expand All @@ -118,7 +121,7 @@
readLatency := time.Since(readStart)
throughput := float64((*fBlockSizeKB) / 1024) / readLatency.Seconds()
gResult.Append(readLatency.Seconds(), throughput)

stats.Record(ctx, readLatencyStat.M(float64(readLatency.Milliseconds())))
}

if err != nil {
Expand All @@ -128,7 +131,7 @@
return
}

func runReadFileOperations() (err error) {
func runReadFileOperations(ctx context.Context) (err error) {
if *fDir == "" {
err = fmt.Errorf("you must set --dir flag")
return
Expand Down Expand Up @@ -158,9 +161,9 @@
index := i
eG.Go(func() error {
if *fReadType == "randread" {
err = randReadAlreadyOpenedFile(index)
err = randReadAlreadyOpenedFile(ctx, index)
} else {
err = readAlreadyOpenedFile(index)
err = readAlreadyOpenedFile(ctx, index)
}
if err != nil {
err = fmt.Errorf("while reading file: %w", err)
Expand Down Expand Up @@ -188,20 +191,33 @@
}

func main() {
ctx := context.Background()

flag.Parse()
fmt.Println("\n******* Passed flags: *******")
flag.VisitAll(func(f *flag.Flag) {
fmt.Printf("Flag: %s, Value: %v\n", f.Name, f.Value)
})

err := runReadFileOperations()
// Enable stack-driver exporter.
registerLatencyView()

err := enableSDExporter()
if err != nil {
fmt.Printf("while enabling stackdriver exporter: %v", err)
os.Exit(1)
}
defer closeSDExporter()

err = runReadFileOperations(ctx)
if err != nil {
fmt.Printf("while performing read: %v", err)
os.Exit(1)
}
if *fOutputDir == "" {
*fOutputDir, _ = os.Getwd()
}

csvFileName := "metrics_" + *fReadType + ".csv"
gResult.DumpMetricsCSV(path.Join(*fOutputDir, csvFileName))
gResult.PrintStats()
Expand Down
67 changes: 67 additions & 0 deletions benchmark-script/read_operation/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package main

import (
"fmt"
"log"
"time"

"contrib.go.opencensus.io/exporter/stackdriver"
"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)

var (
// The restaurant rating in number of stars.
readLatencyStat = stats.Float64("readLatency", "Complete read latency", stats.UnitMilliseconds)
)

var sdExporter *stackdriver.Exporter

func registerLatencyView() {
v := &view.View{
Name: "princer_warp_read_latency",
Measure: readLatencyStat,
Description: "Complete read latency for a given file system operation",
TagKeys: []tag.Key{tag.MustNewKey("warp_read_latency")},
Aggregation: ochttp.DefaultLatencyDistribution,
}

if err := view.Register(v); err != nil {
log.Fatalf("Failed to register the readLatency view: %v", err)
}
}

func enableSDExporter() (err error) {
sdExporter, err := stackdriver.NewExporter(stackdriver.Options{
// ProjectID <change this value>
ProjectID: "gcs-tess",
// MetricPrefix helps uniquely identify your metrics. <change this value>
MetricPrefix: "custom.googleapis.com/warp-test/",
// ReportingInterval sets the frequency of reporting metrics
// to the Cloud Monitoring backend.
ReportingInterval: 5 * time.Second,
})

if err != nil {
err = fmt.Errorf("while creating stackdriver exporter: %w", err)
return
}

if err = sdExporter.StartMetricsExporter(); err != nil {
return fmt.Errorf("start stackdriver exporter: %w", err)
}

fmt.Println("Stack driver agent started successfully!!")
return nil
}

func closeSDExporter() {
if sdExporter != nil {
sdExporter.StopMetricsExporter()
sdExporter.Flush()
}

sdExporter = nil
}
Loading