Skip to content

Commit

Permalink
implement evaluator by machine learning
Browse files Browse the repository at this point in the history
Signed-off-by: XZ <[email protected]>
  • Loading branch information
fcgxz2003 committed Dec 11, 2023
1 parent 8e839e4 commit 0eb10fd
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 2 deletions.
5 changes: 3 additions & 2 deletions scheduler/scheduling/evaluator/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ func New(algorithm string, pluginDir string) Evaluator {
if plugin, err := LoadPlugin(pluginDir); err == nil {
return plugin
}
// TODO Implement MLAlgorithm.
case MLAlgorithm, DefaultAlgorithm:
case DefaultAlgorithm:
return NewEvaluatorBase()
case MLAlgorithm:
return NewEvaluatorML()
}

return NewEvaluatorBase()
Expand Down
125 changes: 125 additions & 0 deletions scheduler/scheduling/evaluator/evaluator_ml.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright 2023 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package evaluator

import (
"math/big"
"net"
"sort"

Check failure on line 23 in scheduler/scheduling/evaluator/evaluator_ml.go

View workflow job for this annotation

GitHub Actions / Lint

File is not `gci`-ed with --skip-generated -s standard,default,prefix(d7y.io/api),prefix(d7y.io/dragonfly/v2) (gci)
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/scheduler/resource"
"github.com/montanaflynn/stats"

Check failure on line 26 in scheduler/scheduling/evaluator/evaluator_ml.go

View workflow job for this annotation

GitHub Actions / Lint

File is not `gci`-ed with --skip-generated -s standard,default,prefix(d7y.io/api),prefix(d7y.io/dragonfly/v2) (gci)
)

type evaluatorML struct {
}

func NewEvaluatorML() Evaluator {
return &evaluatorML{}
}

// EvaluateParents sort parents by evaluating multiple feature scores.
func (em *evaluatorML) EvaluateParents(parents []*resource.Peer, child *resource.Peer, totalPieceCount int32) []*resource.Peer {
sort.Slice(
parents,
func(i, j int) bool {
return evaluateML(parents[i], child, totalPieceCount) > evaluateML(parents[j], child, totalPieceCount)
},
)

return parents
}

// The larger the value, the higher the priority.
func evaluateML(parent *resource.Peer, child *resource.Peer, totalPieceCount int32) float64 {
parentLocation := parent.Host.Network.Location
parentIDC := parent.Host.Network.IDC
childLocation := child.Host.Network.Location
childIDC := child.Host.Network.IDC

// score, err := inference()
// if err == nil {
// logger.Info("using machine learning algorithm")
// return score
// }

logger.Info("using evaluate base algorithm")
return finishedPieceWeight*calculatePieceScore(parent, child, totalPieceCount) +
parentHostUploadSuccessWeight*calculateParentHostUploadSuccessScore(parent) +
freeUploadWeight*calculateFreeUploadScore(parent.Host) +
hostTypeWeight*calculateHostTypeScore(parent) +
idcAffinityWeight*calculateIDCAffinityScore(parentIDC, childIDC) +
locationAffinityWeight*calculateMultiElementAffinityScore(parentLocation, childLocation)
}

func (em *evaluatorML) IsBadNode(peer *resource.Peer) bool {
if peer.FSM.Is(resource.PeerStateFailed) || peer.FSM.Is(resource.PeerStateLeave) || peer.FSM.Is(resource.PeerStatePending) ||
peer.FSM.Is(resource.PeerStateReceivedTiny) || peer.FSM.Is(resource.PeerStateReceivedSmall) ||
peer.FSM.Is(resource.PeerStateReceivedNormal) || peer.FSM.Is(resource.PeerStateReceivedEmpty) {
peer.Log.Debugf("peer is bad node because peer status is %s", peer.FSM.Current())
return true
}

// Determine whether to bad node based on piece download costs.
costs := stats.LoadRawData(peer.PieceCosts())
len := len(costs)
// Peer has not finished downloading enough piece.
if len < minAvailableCostLen {
logger.Debugf("peer %s has not finished downloading enough piece, it can't be bad node", peer.ID)
return false
}

lastCost := costs[len-1]
mean, _ := stats.Mean(costs[:len-1]) // nolint: errcheck

// Download costs does not meet the normal distribution,
// if the last cost is twenty times more than mean, it is bad node.
if len < normalDistributionLen {
isBadNode := big.NewFloat(lastCost).Cmp(big.NewFloat(mean*20)) > 0
logger.Debugf("peer %s mean is %.2f and it is bad node: %t", peer.ID, mean, isBadNode)
return isBadNode
}

// Download costs satisfies the normal distribution,
// last cost falling outside of three-sigma effect need to be adjusted parent,
// refer to https://en.wikipedia.org/wiki/68%E2%80%9395%E2%80%9399.7_rule.
stdev, _ := stats.StandardDeviation(costs[:len-1]) // nolint: errcheck
isBadNode := big.NewFloat(lastCost).Cmp(big.NewFloat(mean+3*stdev)) > 0
logger.Debugf("peer %s meet the normal distribution, costs mean is %.2f and standard deviation is %.2f, peer is bad node: %t",
peer.ID, mean, stdev, isBadNode)
return isBadNode
}

// parseIP parses an ip address to a feature vector.
func parseIP(ip string) ([]int64, error) {

Check failure on line 109 in scheduler/scheduling/evaluator/evaluator_ml.go

View workflow job for this annotation

GitHub Actions / Lint

`parseIP` is unused (deadcode)
var features = make([]int64, 32)
data := net.ParseIP(ip)
prase := data.To4()
if prase != nil {
for i := 0; i < net.IPv4len; i++ {
d := prase[i]
for j := 0; j < 8; j++ {
features[i*8+j] = int64(d & 0x1)
d = d >> 1
}

}
}

return features, nil
}
45 changes: 45 additions & 0 deletions scheduler/scheduling/evaluator/evaluator_ml_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2023 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package evaluator

import (
"reflect"
"testing"

"github.com/stretchr/testify/assert"
)

func TestEvaluatorML_NewEvaluatorML(t *testing.T) {
tests := []struct {
name string
expect func(t *testing.T, e any)
}{
{
name: "new evaluator commonv1",
expect: func(t *testing.T, e any) {
assert := assert.New(t)
assert.Equal(reflect.TypeOf(e).Elem().Name(), "evaluatorML")
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tc.expect(t, NewEvaluatorML())
})
}
}

0 comments on commit 0eb10fd

Please sign in to comment.