Skip to content

Commit

Permalink
Merge pull request #211 from bcc-code/feat/limit-rclone
Browse files Browse the repository at this point in the history
Limit and prioritize Rclone transfers
  • Loading branch information
KillerX authored Apr 3, 2024
2 parents 326f831 + 07c1d77 commit 241116a
Show file tree
Hide file tree
Showing 23 changed files with 261 additions and 30 deletions.
5 changes: 5 additions & 0 deletions activities/rclone.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ func (ua UtilActivities) RcloneWaitForJob(ctx context.Context, jobID int) (bool,
}
return true, nil
}

time.Sleep(time.Second * 10)
}
}

type RcloneCopyDirInput struct {
Source string
Destination string
Priority rclone.Priority
}

func (ua UtilActivities) RcloneCopyDir(ctx context.Context, input RcloneCopyDirInput) (int, error) {
Expand All @@ -53,6 +55,7 @@ func (ua UtilActivities) RcloneCopyDir(ctx context.Context, input RcloneCopyDirI
type RcloneFileInput struct {
Source paths.Path
Destination paths.Path
Priority rclone.Priority
}

func (ua UtilActivities) RcloneMoveFile(ctx context.Context, input RcloneFileInput) (int, error) {
Expand All @@ -65,6 +68,7 @@ func (ua UtilActivities) RcloneMoveFile(ctx context.Context, input RcloneFileInp
res, err := rclone.MoveFile(
srcFs, srcRemote,
dstFs, dstRemote,
input.Priority,
)
if err != nil {
return 0, err
Expand All @@ -83,6 +87,7 @@ func (ua UtilActivities) RcloneCopyFile(ctx context.Context, input RcloneFileInp
res, err := rclone.CopyFile(
srcFs, srcRemote,
dstFs, dstRemote,
input.Priority,
)
if err != nil {
return 0, err
Expand Down
3 changes: 3 additions & 0 deletions cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"github.com/bcc-code/bcc-media-flows/services/rclone"
"log"
"os"
"reflect"
Expand Down Expand Up @@ -129,6 +130,8 @@ func main() {
MaxConcurrentActivityExecutionSize: activityCount, // Doesn't make sense to have more than one activity running at a time
}

go rclone.StartFileTransferQueue()

registerWorker(c, environment.GetQueue(), workerOptions)
}

Expand Down
27 changes: 26 additions & 1 deletion services/rclone/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,36 @@ type JobStatus struct {
Error string `json:"error"`
Finished bool `json:"finished"`
Group string `json:"group"`
Id int `json:"id"`
ID int `json:"id"`
Output Output `json:"output"`
StartTime time.Time `json:"startTime"`
Success bool `json:"success"`
}

type Output struct {
Bytes int64 `json:"bytes"`
Checks int `json:"checks"`
DeletedDirs int `json:"deletedDirs"`
Deletes int `json:"deletes"`
ElapsedTime float64 `json:"elapsedTime"`
Errors int `json:"errors"`
Eta int `json:"eta"`
FatalError bool `json:"fatalError"`
LastError string `json:"lastError"`
Renames int `json:"renames"`
RetryError bool `json:"retryError"`
ServerSideCopies int `json:"serverSideCopies"`
ServerSideCopyBytes int `json:"serverSideCopyBytes"`
ServerSideMoveBytes int `json:"serverSideMoveBytes"`
ServerSideMoves int `json:"serverSideMoves"`
Speed float64 `json:"speed"`
TotalBytes int64 `json:"totalBytes"`
TotalChecks int `json:"totalChecks"`
TotalTransfers int `json:"totalTransfers"`
TransferTime float64 `json:"transferTime"`
Transfers int `json:"transfers"`
}

type JobResponse struct {
JobID int `json:"jobid"`
}
Expand Down
85 changes: 85 additions & 0 deletions services/rclone/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package rclone

import (
"sync"
"time"

"github.com/ansel1/merry/v2"
)

const maxConcurrentTransfers = 5

var queueLock = sync.Mutex{}
var transferQueue = map[Priority][]chan bool{}

func init() {
for _, priority := range Priorities.Members() {
transferQueue[priority] = []chan bool{}
}
}

func waitForTransferSlot(priority Priority, timeout time.Duration) error {
// Create an unbuffered channel
ch := make(chan bool)

queueLock.Lock()
transferQueue[priority] = append(transferQueue[priority], ch)
queueLock.Unlock()

select {
case <-ch:
break
case <-time.After(timeout):
return merry.Wrap(errTimeout)
}

return nil
}

func StartFileTransferQueue() {
for {
checkFileTransferQueue()
time.Sleep(time.Second * 5)
}
}

func checkFileTransferQueue() {
stats, _ := GetRcloneStatus()
count := len(stats.Transferring)

if count >= maxConcurrentTransfers {
return
}

queueLock.Lock()
defer queueLock.Unlock()

for _, priority := range Priorities.Members() {
started := 0
for _, ch := range transferQueue[priority] {

// This is a non-blocking send, so if the channel is full, we can skip it.
// It basically means that the other side is not listening and we can move on.
// this approach works because we're using an unbuffered channel
select {
case ch <- true:
count++
started++
default:
}

if count >= maxConcurrentTransfers {
// If we've reached the maximum number of concurrent transfers, then we can stop processing the queue
// and remove the items that we've already started
transferQueue[priority] = transferQueue[priority][started:]
return
}
}

if started > 0 {
// If we get to here, then we've exhausted the queue for this priority and can replace it with an empty slice
transferQueue[priority] = []chan bool{}
}
}

}
60 changes: 60 additions & 0 deletions services/rclone/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package rclone

import (
"net/http"
)

type CoreStats struct {
Bytes int64 `json:"bytes"`
Checks int `json:"checks"`
DeletedDirs int `json:"deletedDirs"`
Deletes int `json:"deletes"`
ElapsedTime float64 `json:"elapsedTime"`
Errors int `json:"errors"`
Eta int `json:"eta"`
FatalError bool `json:"fatalError"`
LastError string `json:"lastError"`
Renames int `json:"renames"`
RetryError bool `json:"retryError"`
ServerSideCopies int `json:"serverSideCopies"`
ServerSideCopyBytes int `json:"serverSideCopyBytes"`
ServerSideMoveBytes int `json:"serverSideMoveBytes"`
ServerSideMoves int `json:"serverSideMoves"`
Speed float64 `json:"speed"`
TotalBytes int64 `json:"totalBytes"`
TotalChecks int `json:"totalChecks"`
TotalTransfers int `json:"totalTransfers"`
TransferTime float64 `json:"transferTime"`
Transferring []Transferring `json:"transferring"`
Transfers int `json:"transfers"`
}

type Transferring struct {
Bytes int64 `json:"bytes"`
DstFs string `json:"dstFs"`
Eta int `json:"eta"`
Group string `json:"group"`
Name string `json:"name"`
Percentage int `json:"percentage"`
Size int64 `json:"size"`
Speed float64 `json:"speed"`
SpeedAvg float64 `json:"speedAvg"`
SrcFs string `json:"srcFs"`
}

func GetRcloneStatus() (*CoreStats, error) {
req, err := http.NewRequest(http.MethodPost, baseUrl+"/core/stats", nil)
if err != nil {
return nil, err
}

res, err := doRequest[CoreStats](req)
if err != nil {
return nil, err
}

if res.Transferring == nil {
res.Transferring = []Transferring{}
}
return res, nil
}
33 changes: 31 additions & 2 deletions services/rclone/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,30 @@ package rclone
import (
"bytes"
"encoding/json"
"github.com/ansel1/merry/v2"
"github.com/orsinium-labs/enum"
"net/http"
"time"
)

const baseUrl = "http://rclone.lan.bcc.media"

var (
errTimeout = merry.Sentinel("timeout waiting for transfer slot")
)

type Priority enum.Member[string]

var (
PriorityLow = Priority{Value: "low"}
PriorityNormal = Priority{Value: "normal"}
PriorityHigh = Priority{Value: "high"}

// Priorities determines the order of priority
// The leftmost item is the highest priority
Priorities = enum.New(PriorityHigh, PriorityNormal, PriorityLow)
)

type copyRequest struct {
Async bool `json:"_async"`
Source string `json:"srcFs"`
Expand Down Expand Up @@ -40,7 +59,7 @@ type fileRequest struct {
DestinationPath string `json:"dstRemote"`
}

func MoveFile(sourceRemote, sourcePath, destinationRemote, destinationPath string) (*JobResponse, error) {
func MoveFile(sourceRemote, sourcePath, destinationRemote, destinationPath string, priority Priority) (*JobResponse, error) {
body, err := json.Marshal(fileRequest{
Async: true,
SourceRemote: sourceRemote,
Expand All @@ -52,6 +71,11 @@ func MoveFile(sourceRemote, sourcePath, destinationRemote, destinationPath strin
return nil, err
}

err = waitForTransferSlot(priority, time.Hour)
if err != nil {
return nil, err
}

req, err := http.NewRequest("POST", baseUrl+"/operations/movefile", bytes.NewReader(body))
if err != nil {
return nil, err
Expand All @@ -60,7 +84,7 @@ func MoveFile(sourceRemote, sourcePath, destinationRemote, destinationPath strin
return doRequest[JobResponse](req)
}

func CopyFile(sourceRemote, sourcePath, destinationRemote, destinationPath string) (*JobResponse, error) {
func CopyFile(sourceRemote, sourcePath, destinationRemote, destinationPath string, priority Priority) (*JobResponse, error) {
body, err := json.Marshal(fileRequest{
Async: true,
SourceRemote: sourceRemote,
Expand All @@ -72,6 +96,11 @@ func CopyFile(sourceRemote, sourcePath, destinationRemote, destinationPath strin
return nil, err
}

err = waitForTransferSlot(priority, time.Hour)
if err != nil {
return nil, err
}

req, err := http.NewRequest("POST", baseUrl+"/operations/copyfile", bytes.NewReader(body))
if err != nil {
return nil, err
Expand Down
18 changes: 11 additions & 7 deletions utils/workflows/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package wfutils
import (
"encoding/xml"
"fmt"
"github.com/bcc-code/bcc-media-flows/services/rclone"
"go.temporal.io/sdk/temporal"
"path/filepath"
"strings"
Expand All @@ -29,11 +30,11 @@ func StandardizeFileName(ctx workflow.Context, file paths.Path) (paths.Path, err
return result.Path, err
}

func MoveFile(ctx workflow.Context, source, destination paths.Path) error {
func MoveFile(ctx workflow.Context, source, destination paths.Path, priority rclone.Priority) error {
external := source.OnExternalDrive() || destination.OnExternalDrive()

if external {
return RcloneMoveFile(ctx, source, destination)
return RcloneMoveFile(ctx, source, destination, priority)
} else {
return Execute(ctx, activities.Util.MoveFile, activities.MoveFileInput{
Source: source,
Expand All @@ -49,9 +50,9 @@ func CopyFile(ctx workflow.Context, source, destination paths.Path) error {
}).Get(ctx, nil)
}

func MoveToFolder(ctx workflow.Context, file, folder paths.Path) (paths.Path, error) {
func MoveToFolder(ctx workflow.Context, file, folder paths.Path, priority rclone.Priority) (paths.Path, error) {
newPath := folder.Append(file.Base())
err := MoveFile(ctx, file, newPath)
err := MoveFile(ctx, file, newPath, priority)
return newPath, err
}

Expand Down Expand Up @@ -201,10 +202,11 @@ func RcloneWaitForFileGone(ctx workflow.Context, file paths.Path, retries int) e
return nil
}

func RcloneCopyFile(ctx workflow.Context, source, destination paths.Path) error {
func RcloneCopyFile(ctx workflow.Context, source, destination paths.Path, priority rclone.Priority) error {
jobID, err := Execute(ctx, activities.Util.RcloneCopyFile, activities.RcloneFileInput{
Source: source,
Destination: destination,
Priority: priority,
}).Result(ctx)
if err != nil {
return err
Expand All @@ -219,10 +221,11 @@ func RcloneCopyFile(ctx workflow.Context, source, destination paths.Path) error
return nil
}

func RcloneMoveFile(ctx workflow.Context, source, destination paths.Path) error {
func RcloneMoveFile(ctx workflow.Context, source, destination paths.Path, priority rclone.Priority) error {
jobID, err := Execute(ctx, activities.Util.RcloneMoveFile, activities.RcloneFileInput{
Source: source,
Destination: destination,
Priority: priority,
}).Result(ctx)
if err != nil {
return err
Expand All @@ -237,10 +240,11 @@ func RcloneMoveFile(ctx workflow.Context, source, destination paths.Path) error
return nil
}

func RcloneCopyDir(ctx workflow.Context, source, destination string) error {
func RcloneCopyDir(ctx workflow.Context, source, destination string, priority rclone.Priority) error {
jobID, err := Execute(ctx, activities.Util.RcloneCopyDir, activities.RcloneCopyDirInput{
Source: source,
Destination: destination,
Priority: priority,
}).Result(ctx)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 241116a

Please sign in to comment.