Skip to content

Commit

Permalink
feat: add prefetch to trigger seed peer downloads entire task (#2929)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Dec 7, 2023
1 parent 1d75ddf commit 76530ae
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 75 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.21

require (
d7y.io/api/v2 v2.0.59
d7y.io/api/v2 v2.0.60
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.6
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
d7y.io/api/v2 v2.0.59 h1:Q+OF7kYCk5vOwubs6tKEC7HYTrkwVEOTy/6LG9rc8NQ=
d7y.io/api/v2 v2.0.59/go.mod h1:ve0R4ePgRYZVdnVyhWTOi2LdP3/gyf21ZwP2xij+3Io=
d7y.io/api/v2 v2.0.60 h1:er07NeKpjnBOB8JzkddjtGWNRdRkhavO1Qn+0meajVw=
d7y.io/api/v2 v2.0.60/go.mod h1:ve0R4ePgRYZVdnVyhWTOi2LdP3/gyf21ZwP2xij+3Io=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
104 changes: 81 additions & 23 deletions scheduler/service/service_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"strings"
"time"

"github.com/go-http-utils/headers"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -92,13 +93,23 @@ func NewV1(

// RegisterPeerTask registers peer and triggers seed peer download task.
func (v *V1) RegisterPeerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest) (*schedulerv1.RegisterResult, error) {
logger.WithPeer(req.PeerHost.GetId(), req.GetTaskId(), req.GetPeerId()).Infof("register peer task request: %#v", req)
log := logger.WithPeer(req.PeerHost.GetId(), req.GetTaskId(), req.GetPeerId())
log.Infof("register peer task request: %#v", req)

// Store resource.
task := v.storeTask(ctx, req, commonv2.TaskType_DFDAEMON)
host := v.storeHost(ctx, req.GetPeerHost())
peer := v.storePeer(ctx, req.GetPeerId(), req.UrlMeta.GetPriority(), req.UrlMeta.GetRange(), task, host)

// Prefetch the entire task.
if req.GetPrefetch() {
go func() {
if _, err := v.prefetchTask(ctx, req); err != nil {
peer.Log.Errorf("prefetch task failed: %s", err.Error())
}
}()
}

// Trigger the first download of the task.
if err := v.triggerTask(ctx, req, task, host, peer, v.dynconfig); err != nil {
peer.Log.Error(err)
Expand Down Expand Up @@ -281,12 +292,13 @@ func (v *V1) ReportPieceResult(stream schedulerv1.Scheduler_ReportPieceResultSer

// ReportPeerResult handles peer result reported by dfdaemon.
func (v *V1) ReportPeerResult(ctx context.Context, req *schedulerv1.PeerResult) error {
logger.WithTaskAndPeerID(req.GetTaskId(), req.GetPeerId()).Infof("report peer result request: %#v", req)
log := logger.WithTaskAndPeerID(req.GetTaskId(), req.GetPeerId())
log.Infof("report peer result request: %#v", req)

peer, loaded := v.resource.PeerManager().Load(req.GetPeerId())
if !loaded {
msg := fmt.Sprintf("peer %s not found", req.GetPeerId())
logger.Error(msg)
log.Error(msg)
return dferrors.New(commonv1.Code_SchedPeerNotFound, msg)
}

Expand Down Expand Up @@ -420,12 +432,13 @@ func (v *V1) AnnounceTask(ctx context.Context, req *schedulerv1.AnnounceTaskRequ

// StatTask checks the current state of the task.
func (v *V1) StatTask(ctx context.Context, req *schedulerv1.StatTaskRequest) (*schedulerv1.Task, error) {
logger.WithTaskID(req.GetTaskId()).Infof("stat task request: %#v", req)
log := logger.WithTaskID(req.GetTaskId())
log.Infof("stat task request: %#v", req)

task, loaded := v.resource.TaskManager().Load(req.GetTaskId())
if !loaded {
msg := fmt.Sprintf("task %s not found", req.GetTaskId())
logger.Info(msg)
log.Info(msg)
return nil, dferrors.New(commonv1.Code_PeerTaskNotFound, msg)
}

Expand All @@ -442,12 +455,13 @@ func (v *V1) StatTask(ctx context.Context, req *schedulerv1.StatTaskRequest) (*s

// LeaveTask releases peer in scheduler.
func (v *V1) LeaveTask(ctx context.Context, req *schedulerv1.PeerTarget) error {
logger.WithTaskAndPeerID(req.GetTaskId(), req.GetPeerId()).Infof("leave task request: %#v", req)
log := logger.WithTaskAndPeerID(req.GetTaskId(), req.GetPeerId())
log.Infof("leave task request: %#v", req)

peer, loaded := v.resource.PeerManager().Load(req.GetPeerId())
if !loaded {
msg := fmt.Sprintf("peer %s not found", req.GetPeerId())
logger.Error(msg)
log.Error(msg)
return dferrors.New(commonv1.Code_SchedPeerNotFound, msg)
}

Expand Down Expand Up @@ -646,12 +660,13 @@ func (v *V1) AnnounceHost(ctx context.Context, req *schedulerv1.AnnounceHostRequ

// LeaveHost releases host in scheduler.
func (v *V1) LeaveHost(ctx context.Context, req *schedulerv1.LeaveHostRequest) error {
logger.WithHostID(req.GetId()).Infof("leave host request: %#v", req)
log := logger.WithHostID(req.GetId())
log.Infof("leave host request: %#v", req)

host, loaded := v.resource.HostManager().Load(req.GetId())
if !loaded {
msg := fmt.Sprintf("host %s not found", req.GetId())
logger.Error(msg)
log.Error(msg)
return dferrors.New(commonv1.Code_BadRequest, msg)
}

Expand All @@ -661,7 +676,7 @@ func (v *V1) LeaveHost(ctx context.Context, req *schedulerv1.LeaveHostRequest) e
// Delete host from network topology.
if v.networkTopology != nil {
if err := v.networkTopology.DeleteHost(host.ID); err != nil {
logger.Errorf("delete network topology host error: %s", err.Error())
log.Errorf("delete network topology host error: %s", err.Error())
return err
}
}
Expand All @@ -686,15 +701,15 @@ func (v *V1) SyncProbes(stream schedulerv1.Scheduler_SyncProbesServer) error {
return err
}

logger := logger.WithHost(req.Host.GetId(), req.Host.GetHostname(), req.Host.GetIp())
log := logger.WithHost(req.Host.GetId(), req.Host.GetHostname(), req.Host.GetIp())
switch syncProbesRequest := req.GetRequest().(type) {
case *schedulerv1.SyncProbesRequest_ProbeStartedRequest:
// Find probed hosts in network topology. Based on the source host information,
// the most candidate hosts will be evaluated.
logger.Info("receive SyncProbesRequest_ProbeStartedRequest")
log.Info("receive SyncProbesRequest_ProbeStartedRequest")
hosts, err := v.networkTopology.FindProbedHosts(req.Host.GetId())
if err != nil {
logger.Error(err)
log.Error(err)
return status.Error(codes.FailedPrecondition, err.Error())
}

Expand All @@ -711,26 +726,26 @@ func (v *V1) SyncProbes(stream schedulerv1.Scheduler_SyncProbesServer) error {
})
}

logger.Infof("probe started: %#v", probedHosts)
log.Infof("probe started: %#v", probedHosts)
if err := stream.Send(&schedulerv1.SyncProbesResponse{
Hosts: probedHosts,
}); err != nil {
logger.Error(err)
log.Error(err)
return err
}
case *schedulerv1.SyncProbesRequest_ProbeFinishedRequest:
// Store probes in network topology. First create the association between
// source host and destination host, and then store the value of probe.
logger.Info("receive SyncProbesRequest_ProbeFinishedRequest")
log.Info("receive SyncProbesRequest_ProbeFinishedRequest")
for _, probe := range syncProbesRequest.ProbeFinishedRequest.Probes {
probedHost, loaded := v.resource.HostManager().Load(probe.Host.Id)
if !loaded {
logger.Errorf("host %s not found", probe.Host.Id)
log.Errorf("host %s not found", probe.Host.Id)
continue
}

if err := v.networkTopology.Store(req.Host.GetId(), probedHost.ID); err != nil {
logger.Errorf("store failed: %s", err.Error())
log.Errorf("store failed: %s", err.Error())
continue
}

Expand All @@ -739,29 +754,72 @@ func (v *V1) SyncProbes(stream schedulerv1.Scheduler_SyncProbesServer) error {
RTT: probe.Rtt.AsDuration(),
CreatedAt: probe.CreatedAt.AsTime(),
}); err != nil {
logger.Errorf("enqueue failed: %s", err.Error())
log.Errorf("enqueue failed: %s", err.Error())
continue
}

logger.Infof("probe finished: %#v", probe)
log.Infof("probe finished: %#v", probe)
}
case *schedulerv1.SyncProbesRequest_ProbeFailedRequest:
// Log failed probes.
logger.Info("receive SyncProbesRequest_ProbeFailedRequest")
log.Info("receive SyncProbesRequest_ProbeFailedRequest")
var failedProbedHostIDs []string
for _, failedProbe := range syncProbesRequest.ProbeFailedRequest.Probes {
failedProbedHostIDs = append(failedProbedHostIDs, failedProbe.Host.Id)
}

logger.Warnf("probe failed: %#v", failedProbedHostIDs)
log.Warnf("probe failed: %#v", failedProbedHostIDs)
default:
msg := fmt.Sprintf("receive unknow request: %#v", syncProbesRequest)
logger.Error(msg)
log.Error(msg)
return status.Error(codes.FailedPrecondition, msg)
}
}
}

// prefetchTask prefetches the task with seed peer.
func (v *V1) prefetchTask(ctx context.Context, rawReq *schedulerv1.PeerTaskRequest) (*resource.Task, error) {
// If seed peer is disabled, then return error.
if !v.config.SeedPeer.Enable {
return nil, errors.New("seed peer is disabled")
}

log := logger.WithTaskAndPeerID(rawReq.GetTaskId(), rawReq.GetPeerId())

// Construct prefetch task request.
req := &schedulerv1.PeerTaskRequest{
Url: rawReq.GetUrl(),
UrlMeta: &commonv1.UrlMeta{
// Remove digest.
Digest: "",
Tag: rawReq.GetUrlMeta().GetTag(),
// Remove range.
Range: "",
Filter: rawReq.GetUrlMeta().GetFilter(),
Header: rawReq.GetUrlMeta().GetHeader(),
Application: rawReq.GetUrlMeta().GetApplication(),
Priority: rawReq.GetUrlMeta().GetPriority(),
},
Prefetch: rawReq.GetPrefetch(),
IsMigrating: rawReq.GetIsMigrating(),
}

// Delete range header.
delete(req.UrlMeta.Header, headers.Range)

// Generate entire task id.
req.TaskId = idgen.TaskIDV1(req.GetUrl(), req.GetUrlMeta())

// Start trigger seed peer task.
log.Infof("prefetch task: %s", req.TaskId)

// Store resource.
task := v.storeTask(ctx, req, commonv2.TaskType_DFDAEMON)

v.triggerSeedPeerTask(ctx, nil, task)
return task, nil
}

// triggerTask triggers the first download of the task.
func (v *V1) triggerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest, task *resource.Task, host *resource.Host, peer *resource.Peer, dynconfig config.DynconfigInterface) error {
// If task has available peer, peer does not need to be triggered.
Expand Down
109 changes: 108 additions & 1 deletion scheduler/service/service_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package service
import (
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
Expand Down Expand Up @@ -200,7 +201,7 @@ var (
mockTaskTag = "d7y"
mockTaskApplication = "foo"
mockTaskFilters = []string{"bar"}
mockTaskHeader = map[string]string{"content-length": "100"}
mockTaskHeader = map[string]string{"Content-Length": "100", "Range": "bytes=0-99"}
mockTaskPieceLength int32 = 2048
mockResourceConfig = &config.ResourceConfig{
Task: config.TaskConfig{
Expand Down Expand Up @@ -3069,6 +3070,112 @@ func TestServiceV1_SyncProbes(t *testing.T) {
}
}

func TestServiceV1_prefetchTask(t *testing.T) {
fmt.Println("TestServiceV1_prefetchTask")
tests := []struct {
name string
config *config.Config
req *schedulerv1.PeerTaskRequest
mock func(task *resource.Task, peer *resource.Peer, taskManager resource.TaskManager, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mc *resource.MockSeedPeerMockRecorder)
expect func(t *testing.T, task *resource.Task, err error)
}{
{
name: "prefetch task with seed peer",
config: &config.Config{
Scheduler: mockSchedulerConfig,
SeedPeer: mockSeedPeerConfig,
},
req: &schedulerv1.PeerTaskRequest{
Url: mockTaskURL,
UrlMeta: &commonv1.UrlMeta{
Digest: mockTaskDigest.String(),
Tag: mockTaskTag,
Range: mockURLMetaRange,
Filter: strings.Join(mockTaskFilters, idgen.URLFilterSeparator),
Header: mockTaskHeader,
Application: mockTaskApplication,
Priority: commonv1.Priority_LEVEL0,
},
PeerId: mockPeerID,
PeerHost: mockPeerHost,
Prefetch: true,
IsMigrating: false,
TaskId: mockTaskID,
},
mock: func(task *resource.Task, peer *resource.Peer, taskManager resource.TaskManager, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mc *resource.MockSeedPeerMockRecorder) {
task.FSM.SetState(resource.TaskStateRunning)
peer.FSM.SetState(resource.PeerStateRunning)
gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.Load(gomock.Eq("7aecbd0437cf6b429dc623686d36208135b3d2d1831a90b644458964297943a4")).Return(task, true).Times(1),
mr.SeedPeer().Return(seedPeer).Times(1),
mc.TriggerTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(peer, &schedulerv1.PeerResult{}, nil).Times(1),
)
},
expect: func(t *testing.T, task *resource.Task, err error) {
assert := assert.New(t)
assert.True(task.FSM.Is(resource.TaskStateSucceeded))
assert.Equal(task.Header, map[string]string{"Content-Length": "100"})
},
},
{
name: "prefetch task without seed peer",
config: &config.Config{
Scheduler: mockSchedulerConfig,
},
req: &schedulerv1.PeerTaskRequest{
Url: mockTaskURL,
UrlMeta: &commonv1.UrlMeta{
Digest: mockTaskDigest.String(),
Tag: mockTaskTag,
Range: mockURLMetaRange,
Filter: strings.Join(mockTaskFilters, idgen.URLFilterSeparator),
Header: mockTaskHeader,
Application: mockTaskApplication,
Priority: commonv1.Priority_LEVEL0,
},
PeerId: mockPeerID,
PeerHost: mockPeerHost,
Prefetch: true,
IsMigrating: false,
TaskId: mockTaskID,
},
mock: func(task *resource.Task, peer *resource.Peer, taskManager resource.TaskManager, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mc *resource.MockSeedPeerMockRecorder) {
task.FSM.SetState(resource.TaskStateRunning)
peer.FSM.SetState(resource.PeerStateRunning)
},
expect: func(t *testing.T, task *resource.Task, err error) {
assert := assert.New(t)
assert.EqualError(err, "seed peer is disabled")
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
seedPeer := resource.NewMockSeedPeer(ctl)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockResourceConfig, task, mockHost)
svc := NewV1(tc.config, res, scheduling, dynconfig, storage, networkTopology)
taskManager := resource.NewMockTaskManager(ctl)

tc.mock(task, peer, taskManager, seedPeer, res.EXPECT(), taskManager.EXPECT(), seedPeer.EXPECT())
task, err := svc.prefetchTask(context.Background(), tc.req)
tc.expect(t, task, err)
})
}
}

func TestServiceV1_triggerTask(t *testing.T) {
tests := []struct {
name string
Expand Down
Loading

0 comments on commit 76530ae

Please sign in to comment.