Skip to content

Commit

Permalink
s3store: Prevent data race using errgroup package
Browse files Browse the repository at this point in the history
  • Loading branch information
Acconut committed Oct 4, 2024
1 parent 9b68f9f commit 94a2c11
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 24 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
github.com/vimeo/go-util v1.4.1
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df
golang.org/x/net v0.29.0
golang.org/x/sync v0.8.0
google.golang.org/api v0.199.0
google.golang.org/grpc v1.67.0
google.golang.org/protobuf v1.34.2
Expand Down Expand Up @@ -95,7 +96,6 @@ require (
go.opentelemetry.io/otel/trace v1.29.0 // indirect
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/oauth2 v0.23.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
golang.org/x/time v0.6.0 // indirect
Expand Down
51 changes: 28 additions & 23 deletions pkg/s3store/s3store.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ import (
"github.com/tus/tusd/v2/internal/uid"
"github.com/tus/tusd/v2/pkg/handler"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
Expand Down Expand Up @@ -469,8 +470,7 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re
}()
go partProducer.produce(producerCtx, optimalPartSize)

var wg sync.WaitGroup
var uploadErr error
var eg errgroup.Group

for {
// We acquire the semaphore before starting the goroutine to avoid
Expand All @@ -497,10 +497,8 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re
}
upload.parts = append(upload.parts, part)

wg.Add(1)
go func(file io.ReadSeeker, part *s3Part, closePart func() error) {
eg.Go(func() error {
defer upload.store.releaseUploadSemaphore()
defer wg.Done()

t := time.Now()
uploadPartInput := &s3.UploadPartInput{
Expand All @@ -509,39 +507,46 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re
UploadId: aws.String(upload.multipartId),
PartNumber: aws.Int32(part.number),
}
etag, err := upload.putPartForUpload(ctx, uploadPartInput, file, part.size)
etag, err := upload.putPartForUpload(ctx, uploadPartInput, partfile, part.size)
store.observeRequestDuration(t, metricUploadPart)
if err != nil {
uploadErr = err
} else {
if err == nil {
part.etag = etag
}
if cerr := closePart(); cerr != nil && uploadErr == nil {
uploadErr = cerr

cerr := closePart()
if err != nil {
return err
}
if cerr != nil {
return cerr
}
}(partfile, part, closePart)
return nil
})
} else {
wg.Add(1)
go func(file io.ReadSeeker, closePart func() error) {
eg.Go(func() error {
defer upload.store.releaseUploadSemaphore()
defer wg.Done()

if err := store.putIncompletePartForUpload(ctx, upload.objectId, file); err != nil {
uploadErr = err
err := store.putIncompletePartForUpload(ctx, upload.objectId, partfile)
if err == nil {
upload.incompletePartSize = partsize
}
if cerr := closePart(); cerr != nil && uploadErr == nil {
uploadErr = cerr

cerr := closePart()
if err != nil {
return err
}
if cerr != nil {
return cerr
}
upload.incompletePartSize = partsize
}(partfile, closePart)
return nil
})
}

bytesUploaded += partsize
nextPartNum += 1
}

wg.Wait()

uploadErr := eg.Wait()
if uploadErr != nil {
return 0, uploadErr
}
Expand Down

0 comments on commit 94a2c11

Please sign in to comment.