Skip to content

Commit

Permalink
Merge pull request #30 from raj-prince/wt_metrics_support
Browse files Browse the repository at this point in the history
Adding metrics support in warp test
  • Loading branch information
raj-prince authored Nov 19, 2024
2 parents eab8804 + 8cc24fc commit d57e5fe
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 7 deletions.
30 changes: 23 additions & 7 deletions benchmark-script/read_operation/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"strconv"
"syscall"
"time"
"context"
"go.opencensus.io/stats"

"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -53,7 +55,7 @@ func openFile(index int) (err error) {
}

// Expect file is already opened, otherwise throws error
func readAlreadyOpenedFile(index int) (err error) {
func readAlreadyOpenedFile(ctx context.Context, index int) (err error) {
b := make([]byte, *fBlockSizeKB*1024)
for i := 0; i < *fNumberOfRead; i++ {
readStart := time.Now()
Expand All @@ -74,6 +76,7 @@ func readAlreadyOpenedFile(index int) (err error) {
}

readLatency := time.Since(readStart)
stats.Record(ctx, readLatencyStat.M(float64(readLatency.Milliseconds())))

throughput := float64(*fFileSizeMB) / readLatency.Seconds()
gResult.Append(readLatency.Seconds(), throughput)
Expand All @@ -98,7 +101,7 @@ func getRandReadPattern() []int64 {
return pattern
}

func randReadAlreadyOpenedFile(index int) (err error) {
func randReadAlreadyOpenedFile(ctx context.Context, index int) (err error) {
pattern := getRandReadPattern()
b := make([]byte, *fBlockSizeKB*1024)
for i := 0; i < *fNumberOfRead; i++ {
Expand All @@ -118,7 +121,7 @@ func randReadAlreadyOpenedFile(index int) (err error) {
readLatency := time.Since(readStart)
throughput := float64((*fBlockSizeKB) / 1024) / readLatency.Seconds()
gResult.Append(readLatency.Seconds(), throughput)

stats.Record(ctx, readLatencyStat.M(float64(readLatency.Milliseconds())))
}

if err != nil {
Expand All @@ -128,7 +131,7 @@ func randReadAlreadyOpenedFile(index int) (err error) {
return
}

func runReadFileOperations() (err error) {
func runReadFileOperations(ctx context.Context) (err error) {
if *fDir == "" {
err = fmt.Errorf("you must set --dir flag")
return
Expand Down Expand Up @@ -158,9 +161,9 @@ func runReadFileOperations() (err error) {
index := i
eG.Go(func() error {
if *fReadType == "randread" {
err = randReadAlreadyOpenedFile(index)
err = randReadAlreadyOpenedFile(ctx, index)
} else {
err = readAlreadyOpenedFile(index)
err = readAlreadyOpenedFile(ctx, index)
}
if err != nil {
err = fmt.Errorf("while reading file: %w", err)
Expand Down Expand Up @@ -188,20 +191,33 @@ func runReadFileOperations() (err error) {
}

func main() {
ctx := context.Background()

flag.Parse()
fmt.Println("\n******* Passed flags: *******")
flag.VisitAll(func(f *flag.Flag) {
fmt.Printf("Flag: %s, Value: %v\n", f.Name, f.Value)
})

err := runReadFileOperations()
// Enable stack-driver exporter.
registerLatencyView()

err := enableSDExporter()
if err != nil {
fmt.Printf("while enabling stackdriver exporter: %v", err)
os.Exit(1)
}
defer closeSDExporter()

err = runReadFileOperations(ctx)
if err != nil {
fmt.Printf("while performing read: %v", err)
os.Exit(1)
}
if *fOutputDir == "" {
*fOutputDir, _ = os.Getwd()
}

csvFileName := "metrics_" + *fReadType + ".csv"
gResult.DumpMetricsCSV(path.Join(*fOutputDir, csvFileName))
gResult.PrintStats()
Expand Down
67 changes: 67 additions & 0 deletions benchmark-script/read_operation/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package main

import (
"fmt"
"log"
"time"

"contrib.go.opencensus.io/exporter/stackdriver"
"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)

var (
// The restaurant rating in number of stars.
readLatencyStat = stats.Float64("readLatency", "Complete read latency", stats.UnitMilliseconds)
)

var sdExporter *stackdriver.Exporter

func registerLatencyView() {
v := &view.View{
Name: "princer_warp_read_latency",
Measure: readLatencyStat,
Description: "Complete read latency for a given file system operation",
TagKeys: []tag.Key{tag.MustNewKey("warp_read_latency")},
Aggregation: ochttp.DefaultLatencyDistribution,
}

if err := view.Register(v); err != nil {
log.Fatalf("Failed to register the readLatency view: %v", err)
}
}

func enableSDExporter() (err error) {
sdExporter, err := stackdriver.NewExporter(stackdriver.Options{
// ProjectID <change this value>
ProjectID: "gcs-tess",
// MetricPrefix helps uniquely identify your metrics. <change this value>
MetricPrefix: "custom.googleapis.com/warp-test/",
// ReportingInterval sets the frequency of reporting metrics
// to the Cloud Monitoring backend.
ReportingInterval: 5 * time.Second,
})

if err != nil {
err = fmt.Errorf("while creating stackdriver exporter: %w", err)
return
}

if err = sdExporter.StartMetricsExporter(); err != nil {
return fmt.Errorf("start stackdriver exporter: %w", err)
}

fmt.Println("Stack driver agent started successfully!!")
return nil
}

func closeSDExporter() {
if sdExporter != nil {
sdExporter.StopMetricsExporter()
sdExporter.Flush()
}

sdExporter = nil
}

0 comments on commit d57e5fe

Please sign in to comment.