Skip to content

Commit

Permalink
chore: optimize sync pieces error report (#2925)
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Ma <[email protected]>
  • Loading branch information
jim3ma authored Dec 14, 2023
1 parent 1303d77 commit 839e476
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 6 deletions.
5 changes: 2 additions & 3 deletions client/daemon/peer/peertask_conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func (pt *peerTaskConductor) register() error {
regSpan.End()

if err != nil {
if err == context.DeadlineExceeded {
if errors.Is(err, context.DeadlineExceeded) {
pt.Errorf("scheduler did not response in %s", pt.SchedulerOption.ScheduleTimeout.Duration)
}
pt.Errorf("step 1: peer %s register failed: %s", pt.request.PeerId, err)
Expand Down Expand Up @@ -800,8 +800,7 @@ func (pt *peerTaskConductor) confirmReceivePeerPacketError(err error) (cont bool
failedReason string
)
// extract DfError for grpc status
err = dferrors.ConvertGRPCErrorToDfError(err)
de, ok := err.(*dferrors.DfError)
de, ok := dferrors.IsGRPCDfError(err)
if ok {
switch de.Code {
case commonv1.Code_SchedNeedBackSource:
Expand Down
6 changes: 6 additions & 0 deletions client/daemon/peer/peertask_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,13 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require *
URLMeta: urlMeta,
PeerID: ts.peerID,
})

assert.True(ok, "reuse stream task")
assert.NotNil(rc, "reuse stream task")
if rc == nil {
return
}

defer func() {
assert.Nil(rc.Close())
}()
Expand Down
15 changes: 12 additions & 3 deletions client/daemon/peer/peertask_piecetask_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"

"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/internal/dferrors"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/dfnet"
"d7y.io/dragonfly/v2/pkg/net/ip"
Expand Down Expand Up @@ -406,7 +407,6 @@ func (s *pieceTaskSynchronizer) receive() {
s.Errorf("synchronizer receives with error: %s", err)
s.error.Store(&pieceTaskSynchronizerError{err})
s.reportError(err)
s.Errorf("synchronizer receives with error: %s", err)
}
}

Expand All @@ -430,7 +430,16 @@ func (s *pieceTaskSynchronizer) acquire(request *commonv1.PieceTaskRequest) erro

func (s *pieceTaskSynchronizer) reportError(err error) {
s.span.RecordError(err)
sendError := s.peerTaskConductor.sendPieceResult(compositePieceResult(s.peerTaskConductor, s.dstPeer, commonv1.Code_ClientPieceRequestFail))
errCode := commonv1.Code_ClientPieceRequestFail

// extract DfError for grpc status
de, ok := dferrors.IsGRPCDfError(err)
if ok {
errCode = de.Code
s.Errorf("report error with convert code from grpc error, code: %d, message: %s", de.Code, de.Message)
}

sendError := s.peerTaskConductor.sendPieceResult(compositePieceResult(s.peerTaskConductor, s.dstPeer, errCode))
if sendError != nil {
s.Errorf("sync piece info failed and send piece result with error: %s", sendError)
go s.peerTaskConductor.cancel(commonv1.Code_SchedError, sendError.Error())
Expand All @@ -440,7 +449,7 @@ func (s *pieceTaskSynchronizer) reportError(err error) {
}

func (s *pieceTaskSynchronizer) canceled(err error) bool {
if err == context.Canceled {
if errors.Is(err, context.Canceled) {
s.Debugf("context canceled, dst peer: %s", s.dstPeer.PeerId)
return true
}
Expand Down
20 changes: 20 additions & 0 deletions internal/dferrors/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,26 @@ func ConvertGRPCErrorToDfError(err error) error {
return err
}

func IsGRPCDfError(err error) (*DfError, bool) {
for _, d := range status.Convert(err).Details() {
switch internal := d.(type) {
case *commonv1.GrpcDfError:
return &DfError{
Code: internal.Code,
Message: internal.Message,
}, true
}
}

var de *DfError
ok := errors.As(err, &de)
if ok {
return de, true
}

return nil, false
}

// ConvertDfErrorToGRPCError converts DfError to grpc error, if it is.
func ConvertDfErrorToGRPCError(err error) error {
if v, ok := err.(*DfError); ok {
Expand Down

0 comments on commit 839e476

Please sign in to comment.