Skip to content

Commit

Permalink
close upload failure channel in case of error during finalize
Browse files Browse the repository at this point in the history
  • Loading branch information
ashmeenkaur committed Jan 30, 2025
1 parent 45a8f47 commit bad7405
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 2 deletions.
14 changes: 12 additions & 2 deletions internal/bufferedwrites/upload_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ func (uh *UploadHandler) uploader() {
_, err := io.Copy(uh.writer, currBlock.Reader())
if err != nil {
logger.Errorf("buffered write upload failed for object %s: error in io.Copy: %v", uh.objectName, err)
// Close the channel to signal upload failure.
close(uh.signalUploadFailure)
uh.closeUploadFailureChannel()
}
}
// Put back the uploaded block on the freeBlocksChannel for re-use.
Expand All @@ -151,6 +150,7 @@ func (uh *UploadHandler) Finalize() (*gcs.MinObject, error) {

obj, err := uh.bucket.FinalizeUpload(context.Background(), uh.writer)
if err != nil {
uh.closeUploadFailureChannel()
return nil, fmt.Errorf("FinalizeUpload failed for object %s: %w", uh.objectName, err)
}
return obj, nil
Expand Down Expand Up @@ -192,3 +192,13 @@ func (uh *UploadHandler) Destroy() {
}
}
}

// Closes the channel if not already closed to signal upload failure.
func (uh *UploadHandler) closeUploadFailureChannel() {
select {
case <-uh.signalUploadFailure:
break
default:
close(uh.signalUploadFailure)
}
}
33 changes: 33 additions & 0 deletions internal/bufferedwrites/upload_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func (t *UploadHandlerTest) TestFinalizeWhenFinalizeUploadFails() {
assert.Nil(t.T(), obj)
assert.ErrorContains(t.T(), err, "taco")
assert.ErrorContains(t.T(), err, "FinalizeUpload failed for object")
assertUploadFailureSignal(t.T(), t.uh)
}

func (t *UploadHandlerTest) TestUploadSingleBlockThrowsErrorInCopy() {
Expand Down Expand Up @@ -389,3 +390,35 @@ func (t *UploadHandlerTest) createBlocks(count int) []block.Block {

return blocks
}

func (t *UploadHandlerTest) TestUploadHandler_closeUploadFailureChannel() {
testCases := []struct {
name string
initialState chan error
}{
{
name: "Channel_initially_open",
initialState: make(chan error),
},
{
name: "Channel_already_closed",
initialState: func() chan error { ch := make(chan error); close(ch); return ch }(),
},
}

for _, tc := range testCases {
t.Run(tc.name, func() {
uh := &UploadHandler{
signalUploadFailure: tc.initialState,
}

uh.closeUploadFailureChannel()

select {
case <-uh.signalUploadFailure:
default:
t.T().Error("expected channel to be closed but it was not")
}
})
}
}

0 comments on commit bad7405

Please sign in to comment.