Skip to content

Commit

Permalink
Feature/code spec (dragonflyoss#336)
Browse files Browse the repository at this point in the history
* Modify the code according to https://github.com/uber-go/guide/blob/master/style.md

Signed-off-by: santong <[email protected]>
  • Loading branch information
244372610 authored Jun 21, 2021
1 parent dcd3569 commit 2101a4b
Show file tree
Hide file tree
Showing 70 changed files with 301 additions and 352 deletions.
6 changes: 3 additions & 3 deletions cdnsystem/daemon/cdn/cache_data_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (mm *cacheDataManager) writeFileMetaDataByTask(task *types.SeedTask) (*stor
}

if err := mm.storage.WriteFileMetaData(task.TaskID, metaData); err != nil {
return nil, errors.Wrapf(err, "failed to write file metadata to storage")
return nil, errors.Wrapf(err, "write task %s metadata file", task.TaskID)
}

return metaData, nil
Expand Down Expand Up @@ -170,7 +170,7 @@ func (mm *cacheDataManager) readPieceMetaRecords(taskID string) ([]*storage.Piec
func (mm *cacheDataManager) getPieceMd5Sign(taskID string) (string, []*storage.PieceMetaRecord, error) {
pieceMetaRecords, err := mm.storage.ReadPieceMetaRecords(taskID)
if err != nil {
return "", nil, errors.Wrapf(err, "failed to read piece meta file")
return "", nil, errors.Wrapf(err, "read piece meta file")
}
var pieceMd5 []string
sort.Slice(pieceMetaRecords, func(i, j int) bool {
Expand All @@ -185,7 +185,7 @@ func (mm *cacheDataManager) getPieceMd5Sign(taskID string) (string, []*storage.P
func (mm *cacheDataManager) readFileMetaData(taskID string) (*storage.FileMetaData, error) {
fileMeta, err := mm.storage.ReadFileMetaData(taskID)
if err != nil {
return nil, errors.Wrapf(err, "failed to read file metadata from storage")
return nil, errors.Wrapf(err, "read file metadata of task %s from storage", taskID)
}
return fileMeta, nil
}
Expand Down
14 changes: 7 additions & 7 deletions cdnsystem/daemon/cdn/cache_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (cd *cacheDetector) detectCache(task *types.SeedTask, fileMd5 hash.Hash) (*
func (cd *cacheDetector) doDetect(task *types.SeedTask, fileMd5 hash.Hash) (result *cacheResult, err error) {
fileMetaData, err := cd.cacheDataManager.readFileMetaData(task.TaskID)
if err != nil {
return nil, errors.Wrapf(err, "failed to read file meta data")
return nil, errors.Wrapf(err, "read file meta data of task %s", task.TaskID)
}
if err := checkSameFile(task, fileMetaData); err != nil {
return nil, errors.Wrapf(err, "task does not match meta information of task file")
Expand Down Expand Up @@ -114,7 +114,7 @@ func (cd *cacheDetector) doDetect(task *types.SeedTask, fileMd5 hash.Hash) (resu
defer rangeCancel()
supportRange, err := source.IsSupportRange(ctx, task.URL, task.Header)
if err != nil {
return nil, errors.Wrapf(err, "failed to check if url(%s) supports range request", task.URL)
return nil, errors.Wrapf(err, "check if url(%s) supports range request", task.URL)
}
if !supportRange {
return nil, errors.Wrapf(cdnerrors.ErrResourceNotSupportRangeRequest, "url:%s", task.URL)
Expand All @@ -129,15 +129,15 @@ func (cd *cacheDetector) parseByReadMetaFile(taskID string, fileMetaData *storag
}
pieceMetaRecords, err := cd.cacheDataManager.readAndCheckPieceMetaRecords(taskID, fileMetaData.PieceMd5Sign)
if err != nil {
return nil, errors.Wrapf(err, "failed to check piece meta integrity")
return nil, errors.Wrapf(err, "check piece meta integrity")
}
if fileMetaData.TotalPieceCount > 0 && len(pieceMetaRecords) != int(fileMetaData.TotalPieceCount) {
return nil, errors.Wrapf(cdnerrors.ErrPieceCountNotEqual, "piece file piece count(%d), "+
"meta file piece count(%d)", len(pieceMetaRecords), fileMetaData.TotalPieceCount)
}
storageInfo, err := cd.cacheDataManager.statDownloadFile(taskID)
if err != nil {
return nil, errors.Wrapf(err, "failed to get cdn file length")
return nil, errors.Wrapf(err, "get cdn file length")
}
// check file data integrity by file size
if fileMetaData.CdnFileLength != storageInfo.Size {
Expand All @@ -155,12 +155,12 @@ func (cd *cacheDetector) parseByReadMetaFile(taskID string, fileMetaData *storag
func (cd *cacheDetector) parseByReadFile(taskID string, metaData *storage.FileMetaData, fileMd5 hash.Hash) (*cacheResult, error) {
reader, err := cd.cacheDataManager.readDownloadFile(taskID)
if err != nil {
return nil, errors.Wrapf(err, "failed to read data file")
return nil, errors.Wrapf(err, "read data file")
}
defer reader.Close()
tempRecords, err := cd.cacheDataManager.readPieceMetaRecords(taskID)
if err != nil {
return nil, errors.Wrapf(err, "parseByReadFile:failed to read piece meta file")
return nil, errors.Wrapf(err, "read piece meta file")
}

// sort piece meta records by pieceNum
Expand All @@ -169,7 +169,7 @@ func (cd *cacheDetector) parseByReadFile(taskID string, metaData *storage.FileMe
})

var breakPoint uint64 = 0
pieceMetaRecords := make([]*storage.PieceMetaRecord, 0, 0)
pieceMetaRecords := make([]*storage.PieceMetaRecord, 0, len(tempRecords))
for index := range tempRecords {
if int32(index) != tempRecords[index].PieceNum {
break
Expand Down
2 changes: 1 addition & 1 deletion cdnsystem/daemon/cdn/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (re *reporter) reportCache(taskID string, detectResult *cacheResult) error
if detectResult != nil && detectResult.pieceMetaRecords != nil {
for _, record := range detectResult.pieceMetaRecords {
if err := re.reportPieceMetaRecord(taskID, record, CacheReport); err != nil {
return errors.Wrapf(err, "failed to publish pieceMetaRecord:%v, seedPiece:%v", record,
return errors.Wrapf(err, "publish pieceMetaRecord:%v, seedPiece:%v", record,
convertPieceMeta2SeedPiece(record))
}
}
Expand Down
10 changes: 5 additions & 5 deletions cdnsystem/daemon/cdn/storage/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,11 @@ func (s *diskStorageMgr) ReadPieceMetaRecords(taskID string) ([]*storage.PieceMe
return nil, err
}
pieceMetaRecords := strings.Split(strings.TrimSpace(string(readBytes)), "\n")
var result = make([]*storage.PieceMetaRecord, 0)
var result = make([]*storage.PieceMetaRecord, 0, len(pieceMetaRecords))
for _, pieceStr := range pieceMetaRecords {
record, err := storage.ParsePieceMetaRecord(pieceStr)
if err != nil {
return nil, errors.Wrapf(err, "failed to get piece meta record: %v", pieceStr)
return nil, errors.Wrapf(err, "get piece meta record: %v", pieceStr)
}
result = append(result, record)
}
Expand Down Expand Up @@ -160,20 +160,20 @@ func (s *diskStorageMgr) WriteDownloadFile(taskID string, offset int64, len int6
func (s *diskStorageMgr) ReadFileMetaData(taskID string) (*storage.FileMetaData, error) {
bytes, err := s.diskDriver.GetBytes(storage.GetTaskMetaDataRaw(taskID))
if err != nil {
return nil, errors.Wrapf(err, "failed to get metadata bytes")
return nil, errors.Wrapf(err, "get metadata bytes")
}

metaData := &storage.FileMetaData{}
if err := json.Unmarshal(bytes, metaData); err != nil {
return nil, errors.Wrapf(err, "failed to unmarshal metadata bytes")
return nil, errors.Wrapf(err, "unmarshal metadata bytes")
}
return metaData, nil
}

func (s *diskStorageMgr) WriteFileMetaData(taskID string, metaData *storage.FileMetaData) error {
data, err := json.Marshal(metaData)
if err != nil {
return errors.Wrapf(err, "failed to marshal metadata")
return errors.Wrapf(err, "marshal metadata")
}
return s.diskDriver.PutBytes(storage.GetTaskMetaDataRaw(taskID), data)
}
Expand Down
10 changes: 5 additions & 5 deletions cdnsystem/daemon/cdn/storage/hybrid/hybrid.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,11 @@ func (h *hybridStorageMgr) ReadPieceMetaRecords(taskID string) ([]*storage.Piece
return nil, err
}
pieceMetaRecords := strings.Split(strings.TrimSpace(string(readBytes)), "\n")
var result = make([]*storage.PieceMetaRecord, 0)
var result = make([]*storage.PieceMetaRecord, 0, len(pieceMetaRecords))
for _, pieceStr := range pieceMetaRecords {
record, err := storage.ParsePieceMetaRecord(pieceStr)
if err != nil {
return nil, errors.Wrapf(err, "failed to get piece meta record:%v", pieceStr)
return nil, errors.Wrapf(err, "get piece meta record:%v", pieceStr)
}
result = append(result, record)
}
Expand All @@ -238,12 +238,12 @@ func (h *hybridStorageMgr) ReadPieceMetaRecords(taskID string) ([]*storage.Piece
func (h *hybridStorageMgr) ReadFileMetaData(taskID string) (*storage.FileMetaData, error) {
readBytes, err := h.diskDriver.GetBytes(storage.GetTaskMetaDataRaw(taskID))
if err != nil {
return nil, errors.Wrapf(err, "failed to get metadata bytes")
return nil, errors.Wrapf(err, "get metadata bytes")
}

metaData := &storage.FileMetaData{}
if err := json.Unmarshal(readBytes, metaData); err != nil {
return nil, errors.Wrapf(err, "failed to unmarshal metadata bytes")
return nil, errors.Wrapf(err, "unmarshal metadata bytes")
}
return metaData, nil
}
Expand All @@ -255,7 +255,7 @@ func (h *hybridStorageMgr) AppendPieceMetaData(taskID string, record *storage.Pi
func (h *hybridStorageMgr) WriteFileMetaData(taskID string, metaData *storage.FileMetaData) error {
data, err := json.Marshal(metaData)
if err != nil {
return errors.Wrapf(err, "failed to marshal metadata")
return errors.Wrapf(err, "marshal metadata")
}
return h.diskDriver.PutBytes(storage.GetTaskMetaDataRaw(taskID), data)
}
Expand Down
4 changes: 2 additions & 2 deletions cdnsystem/daemon/cdn/storage/storage_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package storage

import (
"fmt"
"os"
"strings"
"time"
Expand All @@ -28,7 +29,6 @@ import (
"d7y.io/dragonfly/v2/pkg/util/timeutils"
"github.com/emirpasic/gods/maps/treemap"
godsutils "github.com/emirpasic/gods/utils"
"github.com/pkg/errors"
)

type Cleaner struct {
Expand Down Expand Up @@ -57,7 +57,7 @@ func (cleaner *Cleaner) GC(storagePattern string, force bool) ([]string, error)
}
freeSpace, _ = cleaner.driver.GetAvailSpace()
} else {
return nil, errors.Wrapf(err, "failed to get avail space")
return nil, fmt.Errorf("get available space: %v", err)
}
}
fullGC := force
Expand Down
6 changes: 3 additions & 3 deletions cdnsystem/daemon/cdn/storage/storage_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,11 @@ func Register(name string, builder ManagerBuilder) error {
Result: cfg,
})
if err != nil {
return nil, fmt.Errorf("failed to create decoder: %v", err)
return nil, fmt.Errorf("create decoder: %v", err)
}
err = decoder.Decode(conf)
if err != nil {
return nil, fmt.Errorf("failed to parse config: %v", err)
return nil, fmt.Errorf("parse config: %v", err)
}
return newManagerPlugin(name, builder, cfg)
}
Expand All @@ -251,7 +251,7 @@ func newManagerPlugin(name string, builder ManagerBuilder, cfg *Config) (plugins

instant, err := builder(cfg)
if err != nil {
return nil, fmt.Errorf("failed to init storage manager %s: %v", name, err)
return nil, fmt.Errorf("init storage manager %s: %v", name, err)
}

return &managerPlugin{
Expand Down
17 changes: 9 additions & 8 deletions cdnsystem/daemon/progress/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package progress
import (
"container/list"
"context"
"fmt"
"sort"
"strconv"
"sync"
Expand Down Expand Up @@ -85,11 +86,11 @@ func (pm *Manager) WatchSeedProgress(ctx context.Context, taskID string) (<-chan
defer pm.mu.UnLock(taskID, true)
chanList, err := pm.seedSubscribers.GetAsList(taskID)
if err != nil {
return nil, errors.Wrap(err, "failed to get seed subscribers")
return nil, fmt.Errorf("get seed subscribers: %v", err)
}
pieceMetaDataRecords, err := pm.getPieceMetaRecordsByTaskID(taskID)
if err != nil {
return nil, errors.Wrap(err, "failed to get piece meta records by taskID")
return nil, fmt.Errorf("get piece meta records by taskID: %v", err)
}
ch := make(chan *types.SeedPiece, pm.buffer)
ele := chanList.PushBack(ch)
Expand All @@ -116,11 +117,11 @@ func (pm *Manager) PublishPiece(taskID string, record *types.SeedPiece) error {
defer pm.mu.UnLock(taskID, false)
err := pm.setPieceMetaRecord(taskID, record)
if err != nil {
return errors.Wrap(err, "failed to set piece meta record")
return fmt.Errorf("set piece meta record: %v", err)
}
chanList, err := pm.seedSubscribers.GetAsList(taskID)
if err != nil {
return errors.Wrap(err, "failed to get seed subscribers")
return fmt.Errorf("get seed subscribers: %v", err)
}
var wg sync.WaitGroup
for e := chanList.Front(); e != nil; e = e.Next() {
Expand All @@ -145,7 +146,7 @@ func (pm *Manager) PublishTask(ctx context.Context, taskID string, task *types.S
defer pm.mu.UnLock(taskID, false)
chanList, err := pm.seedSubscribers.GetAsList(taskID)
if err != nil {
return errors.Wrap(err, "failed to get seed subscribers")
return fmt.Errorf("get seed subscribers: %v", err)
}
// unwatch
for e := chanList.Front(); e != nil; e = e.Next() {
Expand All @@ -165,7 +166,7 @@ func (pm *Manager) Clear(taskID string) error {
defer pm.mu.UnLock(taskID, false)
chanList, err := pm.seedSubscribers.GetAsList(taskID)
if err != nil && errors.Cause(err) != dferrors.ErrDataNotFound {
return errors.Wrap(err, "failed to get seed subscribers")
return errors.Wrap(err, "get seed subscribers")
}
if chanList != nil {
for e := chanList.Front(); e != nil; e = e.Next() {
Expand All @@ -181,11 +182,11 @@ func (pm *Manager) Clear(taskID string) error {
}
err = pm.seedSubscribers.Remove(taskID)
if err != nil && dferrors.ErrDataNotFound != errors.Cause(err) {
return errors.Wrap(err, "failed to clear seed subscribes")
return errors.Wrap(err, "clear seed subscribes")
}
err = pm.taskPieceMetaRecords.Remove(taskID)
if err != nil && dferrors.ErrDataNotFound != errors.Cause(err) {
return errors.Wrap(err, "failed to clear piece meta records")
return errors.Wrap(err, "clear piece meta records")
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions cdnsystem/daemon/task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (tm *Manager) Register(ctx context.Context, req *types.TaskRegisterRequest)

// trigger CDN
if err := tm.triggerCdnSyncAction(ctx, task); err != nil {
return nil, errors.Wrapf(err, "failed to trigger cdn")
return nil, errors.Wrapf(err, "trigger cdn")
}
logger.WithTaskID(task.TaskID).Infof("successfully trigger cdn sync action")
// watch seed progress
Expand Down Expand Up @@ -111,7 +111,7 @@ func (tm *Manager) triggerCdnSyncAction(ctx context.Context, task *types.SeedTas
CdnStatus: types.TaskInfoCdnStatusRunning,
})
if err != nil {
return errors.Wrapf(err, "failed to update task")
return errors.Wrapf(err, "update task")
}
// triggerCDN goroutine
go func() {
Expand Down
7 changes: 4 additions & 3 deletions cdnsystem/daemon/task/manager_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package task

import (
"context"
"fmt"
"time"

"d7y.io/dragonfly/v2/cdnsystem/config"
Expand Down Expand Up @@ -47,8 +48,8 @@ func (tm *Manager) addOrUpdateTask(ctx context.Context, request *types.TaskRegis
if key, err := tm.taskURLUnReachableStore.Get(taskID); err == nil {
if unReachableStartTime, ok := key.(time.Time); ok &&
time.Since(unReachableStartTime) < tm.cfg.FailAccessInterval {
return nil, errors.Wrapf(cdnerrors.ErrURLNotReachable,
"task hit unReachable cache and interval less than %d, url: %s", tm.cfg.FailAccessInterval, request.URL)
return nil, cdnerrors.ErrURLNotReachable{URL: request.URL, Cause: fmt.Errorf("task hit unReachable cache and interval less than %d, url: %s",
tm.cfg.FailAccessInterval, request.URL)}
}
tm.taskURLUnReachableStore.Delete(taskID)
logger.Debugf("delete taskID:%s from url unReachable store", taskID)
Expand All @@ -67,7 +68,7 @@ func (tm *Manager) addOrUpdateTask(ctx context.Context, request *types.TaskRegis
if v, err := tm.taskStore.Get(taskID); err == nil {
existTask := v.(*types.SeedTask)
if !isSameTask(existTask, newTask) {
return nil, errors.Wrapf(cdnerrors.ErrTaskIDDuplicate, "newTask:%+v, existTask:%+v", newTask, existTask)
return nil, cdnerrors.ErrTaskIDDuplicate{TaskID: taskID, Cause: fmt.Errorf("newTask:%+v, existTask:%+v", newTask, existTask)}
}
task = existTask
logger.Debugf("get exist task for taskID:%s", taskID)
Expand Down
36 changes: 28 additions & 8 deletions cdnsystem/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,35 @@
package errors

import (
"fmt"

"github.com/pkg/errors"
)

// ErrURLNotReachable represents the url is a not reachable.
type ErrURLNotReachable struct {
URL string
Cause error
}

func (e ErrURLNotReachable) Error() string {
return fmt.Sprintf("url %s not reachable: %v", e.URL, e.Cause)
}

// ErrTaskIDDuplicate represents the task id is in conflict.
type ErrTaskIDDuplicate struct {
TaskID string
Cause error
}

func (e ErrTaskIDDuplicate) Error() string {
return fmt.Sprintf("taskId %s conflict: %v", e.TaskID, e.Cause)
}

var (
// ErrSystemError represents the error is a system error.
ErrSystemError = errors.New("system error")

// ErrURLNotReachable represents the url is a not reachable.
ErrURLNotReachable = errors.New("url not reachable")

// ErrTaskIDDuplicate represents the task id is in conflict.
ErrTaskIDDuplicate = errors.New("taskId conflict")

// ErrPieceCountNotEqual represents the number of pieces downloaded does not match the amount of meta information
ErrPieceCountNotEqual = errors.New("inconsistent number of pieces")

Expand Down Expand Up @@ -71,12 +87,16 @@ func IsSystemError(err error) bool {

// IsURLNotReachable checks the error is a url not reachable or not.
func IsURLNotReachable(err error) bool {
return errors.Cause(err) == ErrURLNotReachable
err = errors.Cause(err)
_, ok := err.(ErrURLNotReachable)
return ok
}

// IsTaskIDDuplicate checks the error is a TaskIDDuplicate error or not.
func IsTaskIDDuplicate(err error) bool {
return errors.Cause(err) == ErrTaskIDDuplicate
err = errors.Cause(err)
_, ok := err.(ErrTaskIDDuplicate)
return ok
}

func IsPieceCountNotEqual(err error) bool {
Expand Down
Loading

0 comments on commit 2101a4b

Please sign in to comment.