Skip to content

Commit

Permalink
feat: Allow directory copying during a dedup process
Browse files Browse the repository at this point in the history
  • Loading branch information
mirkobrombin committed Jul 8, 2024
1 parent 66c0393 commit 1d9e1e9
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 16 deletions.
15 changes: 11 additions & 4 deletions cmd/cp.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ import (
func NewCpCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "cp [source] [dest] [storage]",
Short: "Copy a file and deduplicate it in storage",
Short: "Copy a file or directory and deduplicate it in storage",
Args: cobra.ExactArgs(3),
Run: cpCommand,
}

cmd.Flags().BoolP("with-metadata", "m", false, "Include file metadata in hash calculation")
cmd.Flags().BoolP("verbose", "v", false, "Verbose output")
cmd.Flags().BoolP("append", "a", false, "Append directory contents to destination (same as dedup -d)")

return cmd
}
Expand All @@ -28,6 +29,7 @@ func cpCommand(cmd *cobra.Command, args []string) {
source, dest, storagePath := args[0], args[1], args[2]
withMetadata, _ := cmd.Flags().GetBool("with-metadata")
verbose, _ := cmd.Flags().GetBool("verbose")
appendFlag, _ := cmd.Flags().GetBool("append")

// Create storage
storageOpts := storage.StorageOptions{
Expand All @@ -42,12 +44,17 @@ func cpCommand(cmd *cobra.Command, args []string) {
// Create hash generator
h := hash.NewSHA256Generator()

// Create processor
processor := processor.NewCpProcessor(source, dest, s, h)
// Create processor based on the append flag
var proc processor.Processor
if appendFlag {
proc = processor.NewDedupProcessor(source, dest, s, h, 10)
} else {
proc = processor.NewCpProcessor(source, dest, s, h)
}

// Run the processor
log.Printf("Copying %s to %s..", source, dest)
d := dabadee.NewDaBaDee(processor, verbose)
d := dabadee.NewDaBaDee(proc, verbose)
if err := d.Run(); err != nil {
log.Fatalf("Error during copy and link: %v", err)
}
Expand Down
4 changes: 3 additions & 1 deletion cmd/dedup.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func NewDedupCommand() *cobra.Command {
cmd.Flags().BoolP("with-metadata", "m", false, "Include file metadata in hash calculation")
cmd.Flags().BoolP("verbose", "v", false, "Verbose output")
cmd.Flags().String("manifest-output", "", "Output manifest file to the given path")
cmd.Flags().String("dest", "", "Destination directory for copying deduplicated files")

return cmd
}
Expand All @@ -33,6 +34,7 @@ func dedupCommand(cmd *cobra.Command, args []string) {
withMetadata, _ := cmd.Flags().GetBool("with-metadata")
verbose, _ := cmd.Flags().GetBool("verbose")
outputManifest, _ := cmd.Flags().GetString("manifest-output")
destDir, _ := cmd.Flags().GetString("dest")
workers, err := strconv.Atoi(workersStr)
if err != nil {
log.Fatalf("Invalid number of workers: %v", err)
Expand All @@ -52,7 +54,7 @@ func dedupCommand(cmd *cobra.Command, args []string) {
h := hash.NewSHA256Generator()

// Create processor
processor := processor.NewDedupProcessor(source, s, h, workers)
processor := processor.NewDedupProcessor(source, destDir, s, h, workers)

// Run the processor
log.Printf("Deduplicating %s..", source)
Expand Down
45 changes: 34 additions & 11 deletions pkg/processor/dedup.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type DedupProcessor struct {
// Source is the path of the directory to deduplicate
Source string

// DestDir is the path of the directory to copy deduplicated files to
DestDir string

// Storage is the storage interface to use
Storage *storage.Storage

Expand All @@ -40,19 +43,20 @@ type DedupProcessor struct {
}

// NewDedupProcessor creates a new DedupProcessor
func NewDedupProcessor(source string, storage *storage.Storage, hashGen hash.Generator, workers int) *DedupProcessor {
func NewDedupProcessor(source, destDir string, storage *storage.Storage, hashGen hash.Generator, workers int) *DedupProcessor {
return &DedupProcessor{
Source: source,
DestDir: destDir,
Storage: storage,
HashGen: hashGen,
Workers: workers,
FileMap: make(map[string]string),
}
}

// startProcessing marks the given hash as processing and returns a channel to
// dedupStartProcessing marks the given hash as processing and returns a channel to
// wait on if the hash is already being processed
func startProcessing(hash string) (alreadyProcessing bool, waitChan chan struct{}) {
func dedupStartProcessing(hash string) (alreadyProcessing bool, waitChan chan struct{}) {
globalLock.Lock()
defer globalLock.Unlock()

Expand All @@ -72,9 +76,9 @@ func startProcessing(hash string) (alreadyProcessing bool, waitChan chan struct{
return false, nil
}

// finishProcessing marks the given hash as no longer processing and closes the
// dedupFinishProcessing marks the given hash as no longer processing and closes the
// channel to signal that the processing has finished
func finishProcessing(hash string) {
func dedupFinishProcessing(hash string) {
globalLock.Lock()
defer globalLock.Unlock()

Expand Down Expand Up @@ -142,7 +146,7 @@ func (p *DedupProcessor) processFile(path string) (err error) {
}

// Check if the file is already being processed
alreadyProcessing, waitChan := startProcessing(finalHash)
alreadyProcessing, waitChan := dedupStartProcessing(finalHash)
if alreadyProcessing {
<-waitChan // Wait for the processing to finish
}
Expand All @@ -151,22 +155,22 @@ func (p *DedupProcessor) processFile(path string) (err error) {
dedupPath := filepath.Join(p.Storage.Opts.Root, finalHash)
exists, err := p.Storage.FileExists(dedupPath)
if err != nil {
finishProcessing(finalHash)
dedupFinishProcessing(finalHash)
return fmt.Errorf("checking file existence in storage: %w", err)
}

if !exists {
// If the file does not exist in storage, move it there
err = p.Storage.MoveFileToStorage(path, finalHash)
if err != nil {
finishProcessing(finalHash)
dedupFinishProcessing(finalHash)
return fmt.Errorf("moving file to storage: %w", err)
}
} else {
// If the file already exists in storage, remove the source file
err = os.Remove(path)
if err != nil {
finishProcessing(finalHash)
dedupFinishProcessing(finalHash)
return fmt.Errorf("removing source file: %w", err)
}
}
Expand All @@ -176,14 +180,33 @@ func (p *DedupProcessor) processFile(path string) (err error) {
p.FileMap[path] = finalHash
p.mapMutex.Unlock()

// Create a link at the original location
if _, err := os.Lstat(path); os.IsNotExist(err) {
err = os.Link(dedupPath, path)
if err != nil {
finishProcessing(finalHash)
dedupFinishProcessing(finalHash)
return fmt.Errorf("creating link to deduplicated file: %w", err)
}
}

finishProcessing(finalHash)
// Create a link at the destination if DestDir is set
if p.DestDir != "" {
relativePath, err := filepath.Rel(p.Source, path)
if err != nil {
dedupFinishProcessing(finalHash)
return fmt.Errorf("getting relative path: %w", err)
}

destPath := filepath.Join(p.DestDir, relativePath)
if _, err := os.Lstat(destPath); os.IsNotExist(err) {
err = os.Link(dedupPath, destPath)
if err != nil {
dedupFinishProcessing(finalHash)
return fmt.Errorf("creating link to deduplicated file in destination: %w", err)
}
}
}

dedupFinishProcessing(finalHash)
return nil
}

0 comments on commit 1d9e1e9

Please sign in to comment.