diff --git a/go.mod b/go.mod index cea5c5a6a..b278b7760 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( github.com/vimeo/go-util v1.4.1 golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df golang.org/x/net v0.29.0 + golang.org/x/sync v0.8.0 google.golang.org/api v0.199.0 google.golang.org/grpc v1.67.0 google.golang.org/protobuf v1.34.2 @@ -95,7 +96,6 @@ require ( go.opentelemetry.io/otel/trace v1.29.0 // indirect golang.org/x/crypto v0.27.0 // indirect golang.org/x/oauth2 v0.23.0 // indirect - golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect golang.org/x/time v0.6.0 // indirect diff --git a/pkg/s3store/s3store.go b/pkg/s3store/s3store.go index db4f29fde..5e61718e7 100644 --- a/pkg/s3store/s3store.go +++ b/pkg/s3store/s3store.go @@ -88,6 +88,7 @@ import ( "github.com/tus/tusd/v2/internal/uid" "github.com/tus/tusd/v2/pkg/handler" "golang.org/x/exp/slices" + "golang.org/x/sync/errgroup" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -469,8 +470,7 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re }() go partProducer.produce(producerCtx, optimalPartSize) - var wg sync.WaitGroup - var uploadErr error + var eg errgroup.Group for { // We acquire the semaphore before starting the goroutine to avoid @@ -497,10 +497,8 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re } upload.parts = append(upload.parts, part) - wg.Add(1) - go func(file io.ReadSeeker, part *s3Part, closePart func() error) { + eg.Go(func() error { defer upload.store.releaseUploadSemaphore() - defer wg.Done() t := time.Now() uploadPartInput := &s3.UploadPartInput{ @@ -509,39 +507,46 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re UploadId: aws.String(upload.multipartId), PartNumber: aws.Int32(part.number), } - etag, err := upload.putPartForUpload(ctx, uploadPartInput, file, part.size) + etag, err := upload.putPartForUpload(ctx, uploadPartInput, partfile, part.size) store.observeRequestDuration(t, metricUploadPart) - if err != nil { - uploadErr = err - } else { + if err == nil { part.etag = etag } - if cerr := closePart(); cerr != nil && uploadErr == nil { - uploadErr = cerr + + cerr := closePart() + if err != nil { + return err + } + if cerr != nil { + return cerr } - }(partfile, part, closePart) + return nil + }) } else { - wg.Add(1) - go func(file io.ReadSeeker, closePart func() error) { + eg.Go(func() error { defer upload.store.releaseUploadSemaphore() - defer wg.Done() - if err := store.putIncompletePartForUpload(ctx, upload.objectId, file); err != nil { - uploadErr = err + err := store.putIncompletePartForUpload(ctx, upload.objectId, partfile) + if err == nil { + upload.incompletePartSize = partsize } - if cerr := closePart(); cerr != nil && uploadErr == nil { - uploadErr = cerr + + cerr := closePart() + if err != nil { + return err + } + if cerr != nil { + return cerr } - upload.incompletePartSize = partsize - }(partfile, closePart) + return nil + }) } bytesUploaded += partsize nextPartNum += 1 } - wg.Wait() - + uploadErr := eg.Wait() if uploadErr != nil { return 0, uploadErr }