Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added writetests #14

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 44 additions & 42 deletions benchmark-script/write_operations/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"crypto/rand"
"flag"
"fmt"
"io"
"os"
"path"
"strconv"
Expand All @@ -15,6 +14,7 @@ import (
)

var (
fPrefix = flag.String("prefix", "", "prefix for each file.")
fDir = flag.String("dir", "", "Directory file to be opened.")
fNumOfThreads = flag.Int("threads", 1, "Number of threads to read parallel")

Expand All @@ -29,47 +29,59 @@ var (
fFileSize = flag.Int("file-size", 1, "in KB")

fNumOfWrite = flag.Int("write-count", 1, "number of write iteration")
fOutputDir = flag.String("output-dir", "", "Directory to dump the output")
)

func openFile(index int) (err error) {
fileName := path.Join(*fDir, "file_"+strconv.Itoa(index))
fileHandle, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC|syscall.O_DIRECT, 0644)
var gResult *Result

func init() {
gResult = &Result{}
}

func openFile(fileSuffix string) (fileHandle *os.File, err error) {
fileName := path.Join(*fDir, *fPrefix+"_file_"+fileSuffix)
fileHandle, err = os.OpenFile(fileName, os.O_CREATE|os.O_RDWR|os.O_TRUNC|syscall.O_DIRECT, 0644)
if err != nil {
err = fmt.Errorf("while opening file: %w", err)
return
}
fileHandles[index] = fileHandle

return
}

// Expect file is already opened, otherwise throws error
func overWriteAlreadyOpenedFile(index int) (err error) {
for cnt := 0; cnt < *fNumOfWrite; cnt++ {
for i := 0; i < (*fFileSize / *fBlockSize); i++ {
b := make([]byte, *fBlockSize*OneKB)
b := make([]byte, *fBlockSize*OneKB)
_, err = rand.Read(b)

startByte := int64(i * (*fBlockSize * OneKB))
for cnt := 0; cnt < *fNumOfWrite; cnt++ {
writeStart := time.Now()
fileName := strconv.Itoa(*fFileSize) + "_" + strconv.Itoa(index) + "__" + strconv.Itoa(cnt)
fh, err := openFile(fileName)
if err != nil {
return fmt.Errorf("Error while creating the file %v", err)
}

_, err = rand.Read(b)
for i := 0; i < (*fFileSize / *fBlockSize); i++ {
if err != nil {
return fmt.Errorf("while generating random bytest: %v", err)
}

_, err = fileHandles[index].Seek(startByte, io.SeekStart)
if err != nil {
return fmt.Errorf("while changing the seek position")
}

_, err = fileHandles[index].Write(b)
_, err = fh.Write(b)
if err != nil {
return fmt.Errorf("while overwriting the file: %v", err)
}
}

err = fileHandles[index].Sync()
if err != nil {
return fmt.Errorf("while syncing the file: %v", err)
}
err = fh.Close()
if err != nil {
return fmt.Errorf("while closing the file: %v", err)
}

writeLatency := time.Since(writeStart)

throughput := float64(*fFileSize) / writeLatency.Seconds()
gResult.Append(writeLatency.Seconds(), throughput)
}

return
Expand All @@ -86,16 +98,6 @@ func runReadFileOperations() (err error) {
return
}

fileHandles = make([]*os.File, *fNumOfThreads)

for i := 0; i < *fNumOfThreads; i++ {
err = openFile(i)
if err != nil {
err = fmt.Errorf("while opening file: %w", err)
return err
}
}

for i := 0; i < *fNumOfThreads; i++ {
index := i
eG.Go(func() error {
Expand All @@ -110,30 +112,30 @@ func runReadFileOperations() (err error) {

err = eG.Wait()

if err == nil {
fmt.Println("write benchmark completed successfully!")
fmt.Println("Waiting for 3 minutes")

time.Sleep(3 * time.Minute)
}

for i := 0; i < *fNumOfThreads; i++ {
if err = fileHandles[i].Close(); err != nil {
err = fmt.Errorf("while closing the fileHandle: %w", err)
return
}
if err != nil {
return err
}

return
}

func main() {
fmt.Println(os.Stderr, "Started execution")
flag.Parse()
fmt.Println("\n******* Passed flags: *******")
flag.VisitAll(func(f *flag.Flag) {
fmt.Printf("Flag: %s, Value: %v\n", f.Name, f.Value)
})

err := runReadFileOperations()
if err != nil {
fmt.Println(os.Stderr, err)
os.Exit(1)
}

if *fOutputDir == "" {
*fOutputDir, _ = os.Getwd()
}
gResult.DumpMetricsCSV(path.Join(*fOutputDir, "metrics.csv"))
gResult.PrintStats()
}
158 changes: 158 additions & 0 deletions benchmark-script/write_operations/read_result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package main

import (
"encoding/csv"
"encoding/json"
"fmt"
"os"
"sort"
"sync"
"time"
)

type Metric struct {
latency float64
throughput float64
unixTime int64
}

type Result struct {
metrics []Metric // List to store metrics
mutex sync.Mutex
}

func (r *Result) Append(latency float64, throughput float64) {
r.mutex.Lock()
defer r.mutex.Unlock()

newMetric := Metric{
latency: latency,
throughput: throughput,
unixTime: time.Now().Unix(),
}
r.metrics = append(r.metrics, newMetric)
}

func (r *Result) GetMetrics() []Metric {
r.mutex.Lock()
defer r.mutex.Unlock()

metricsCopy := make([]Metric, len(r.metrics))
copy(metricsCopy, r.metrics)
return metricsCopy
}

func (r *Result) DumpMetricsJson(filePath string) error {
r.mutex.Lock()
defer r.mutex.Unlock()
fmt.Print(r.metrics)

// Marshal the metrics int64o JSON format
jsonData, err := json.Marshal(r.metrics)
if err != nil {
return err
}

// Write the JSON data to the file
err = os.WriteFile(filePath, jsonData, 0644)
if err != nil {
return err
}

return nil
}

func (r *Result) DumpMetricsCSV(filePath string) error {
r.mutex.Lock()
defer r.mutex.Unlock()

// Create the CSV file
file, err := os.Create(filePath)
if err != nil {
return err
}
defer file.Close()

// Create a CSV writer
writer := csv.NewWriter(file)
defer writer.Flush()

// Write the CSV header
err = writer.Write([]string{"Timestamp", "ReadLatency(s)", "Throughput(MiB/s)"})
if err != nil {
return err
}

for _, metric := range r.metrics {
err = writer.Write([]string{
fmt.Sprintf("%d", metric.unixTime),
fmt.Sprintf("%.3f", metric.latency),
fmt.Sprintf("%.3f", metric.throughput),
})
if err != nil {
return err
}
}

return nil
}

func (r *Result) PrintStats() {
r.mutex.Lock()
defer r.mutex.Unlock()

if len(r.metrics) == 0 {
fmt.Println("No metrics collected yet.")
return
}

// Calculate averages
var totalLatency, totalThroughput float64
for _, metric := range r.metrics {
totalLatency += metric.latency
totalThroughput += metric.throughput
}
avgLatency := totalLatency / float64(len(r.metrics))
avgThroughput := totalThroughput / float64(len(r.metrics))

// Calculate percentiles (e.g., 50th, 90th, 95th, 99th)
latencyValues := make([]float64, len(r.metrics))
throughputValues := make([]float64, len(r.metrics))
for i, metric := range r.metrics {
latencyValues[i] = metric.latency
throughputValues[i] = metric.throughput
}

sort.Float64s(latencyValues)
sort.Float64s(throughputValues)

fmt.Println("\n******* Metrics Summary: WriteLatency (s) *******")
fmt.Printf("Average Latency: %.2f\n", avgLatency)
fmt.Printf("p0: %.2f\n", percentileFloat64(latencyValues, 0))
fmt.Printf("p50: %.2f\n", percentileFloat64(latencyValues, 50))
fmt.Printf("p90: %.2f\n", percentileFloat64(latencyValues, 90))
fmt.Printf("p95: %.2f\n", percentileFloat64(latencyValues, 95))
fmt.Printf("p99: %.2f\n", percentileFloat64(latencyValues, 99))
fmt.Printf("p100: %.2f\n", percentileFloat64(latencyValues, 100))

fmt.Println("\n******* Metrics Summary: Throughput (MiB/s): *******")
fmt.Printf("Average Throughput: %.2f\n", avgThroughput)
fmt.Printf("p0: %.2f\n", percentileFloat64(throughputValues, 0))
fmt.Printf("p50: %.2f\n", percentileFloat64(throughputValues, 50))
fmt.Printf("p90: %.2f\n", percentileFloat64(throughputValues, 90))
fmt.Printf("p95: %.2f\n", percentileFloat64(throughputValues, 95))
fmt.Printf("p99: %.2f\n", percentileFloat64(throughputValues, 99))
fmt.Printf("p100: %.2f\n", percentileFloat64(throughputValues, 100))
}

func percentileFloat64(values []float64, p int) float64 {
if p < 0 || p > 100 {
panic("Percentile must be between 1 and 100")
}

index := int((float32(p) / float32(100)) * float32(len(values)))
if index == len(values) {
index--
}
return values[index]
}