Skip to content

Commit

Permalink
feat(kfluxdp-49): address pr comments
Browse files Browse the repository at this point in the history
  • Loading branch information
flacatus committed Nov 14, 2024
1 parent 64226a4 commit d6403df
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 37 deletions.
9 changes: 6 additions & 3 deletions cmd/oci/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ Examples:

// If neither 'repo' nor 'repos' is provided, show command-specific help
if opts.repo == "" && len(opts.repos) == 0 {
cmd.Help()
if err := cmd.Help(); err != nil {
return fmt.Errorf("failed to display help: %w", err)
}
return fmt.Errorf("either --repo or --repos must be specified")
}

Expand All @@ -94,7 +96,7 @@ Examples:
}

// Create the cache directory if it doesn't exist
if err := os.MkdirAll(opts.ociCache, os.ModePerm); err != nil {
if err := os.MkdirAll(opts.ociCache, 0o750); err != nil {
return fmt.Errorf("could not create cache directory: %v", err)
}

Expand All @@ -120,7 +122,7 @@ Examples:
}

// Call ProcessTag to get details of the tag (implement as needed)
if err := ociController.ProcessTag(repo, tag, time.Now().Format(time.RFC1123), time.Duration(0)); err != nil {
if err := ociController.ProcessTag(repo, tag, time.Now().Format(time.RFC1123)); err != nil {
return fmt.Errorf("failed to fetch tag: %v", err)
}
}
Expand Down Expand Up @@ -150,6 +152,7 @@ Examples:
if err != nil {
return fmt.Errorf("invalid time format for --since: %v", err)
}
fmt.Println(duration)
errors := ociController.ProcessRepositories([]string{repo}, duration)
allErrors = append(allErrors, errors...)
}
Expand Down
39 changes: 24 additions & 15 deletions pkg/oci/blob_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (c *Controller) extractTarGz(gzipStream io.Reader, dest string) error {
if err != nil {
return fmt.Errorf("failed to read tar header: %w", err)
}

// #nosec G305
destPath := filepath.Join(dest, header.Name)
if err := c.handleTarEntry(header, tarReader, destPath); err != nil {
return err
Expand All @@ -51,7 +51,7 @@ func (c *Controller) extractTarGz(gzipStream io.Reader, dest string) error {
func (c *Controller) handleTarEntry(header *tar.Header, tarReader *tar.Reader, destPath string) error {
switch header.Typeflag {
case tar.TypeDir:
return os.MkdirAll(destPath, 0755)
return os.MkdirAll(destPath, 0o750)
case tar.TypeReg:
if _, err := os.Stat(destPath); err == nil {
return nil
Expand All @@ -65,20 +65,21 @@ func (c *Controller) handleTarEntry(header *tar.Header, tarReader *tar.Reader, d
// Creates a file from the tar reader.
// It writes the contents of the tar entry to a newly created file.
func (c *Controller) createFileFromTar(tarReader *tar.Reader, destPath string) error {
outFile, err := os.Create(destPath)
cleanDestPath := filepath.Clean(destPath)

outFile, err := os.Create(cleanDestPath)
if err != nil {
return fmt.Errorf("failed to create file %s: %w", destPath, err)
return fmt.Errorf("failed to create file %s: %w", cleanDestPath, err)
}
defer outFile.Close()

// Copy contents from the tar reader to the file
if _, err := io.Copy(outFile, tarReader); err != nil {
return fmt.Errorf("failed to write file %s: %w", destPath, err)
return fmt.Errorf("failed to write file %s: %w", cleanDestPath, err)
}
return nil
}

// Handles the extraction of individual blobs.
// HandleBlob handles the extraction of individual blobs.
// It manages concurrency with WaitGroup and semaphore for blob processing.
func (c *Controller) HandleBlob(blobPath, outputDir string, wg *sync.WaitGroup, errors chan<- error, sem chan struct{}) {
defer wg.Done()
Expand All @@ -94,23 +95,29 @@ func (c *Controller) HandleBlob(blobPath, outputDir string, wg *sync.WaitGroup,
// Processes the blob file for extraction.
// It checks for file existence, size, and identifies if it's a tar.gz blob.
func (c *Controller) processBlob(blobPath, outputDir string) error {
fileInfo, err := os.Stat(blobPath)
// Normalize the path to prevent directory traversal
cleanBlobPath := filepath.Clean(blobPath)

// Check file existence and size
fileInfo, err := os.Stat(cleanBlobPath)
if err != nil {
return fmt.Errorf("failed to stat blob %s: %w", blobPath, err)
return fmt.Errorf("failed to stat blob %s: %w", cleanBlobPath, err)
}

if fileInfo.Size() == 0 {
return fmt.Errorf("blob %s is empty, skipping", blobPath)
return fmt.Errorf("blob %s is empty, skipping", cleanBlobPath)
}

file, err := os.Open(blobPath)
// Open the file safely
file, err := os.Open(cleanBlobPath)
if err != nil {
return fmt.Errorf("failed to open blob %s: %w", blobPath, err)
return fmt.Errorf("failed to open blob %s: %w", cleanBlobPath, err)
}
defer file.Close()

if isTarGzBlob(blobPath, file) {
return c.extractBlob(blobPath, file, outputDir)
// Check if the blob is a tar.gz and process it if so
if isTarGzBlob(cleanBlobPath, file) {
return c.extractBlob(cleanBlobPath, file, outputDir)
}

return nil
Expand All @@ -123,7 +130,9 @@ func isTarGzBlob(blobPath string, file *os.File) bool {
if _, err := file.Read(buf); err != nil {
return false
}
file.Seek(0, 0)
if _, err := file.Seek(0, 0); err != nil {
return false
}
return strings.HasSuffix(blobPath, ".tar.gz") || (len(buf) == 2 && buf[0] == 0x1F && buf[1] == 0x8B)
}

Expand Down
21 changes: 14 additions & 7 deletions pkg/oci/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ type Controller struct {
}

// NewController initializes a new Controller instance with the specified output and OCI store path.
func NewController(outputDir string, OCIStorePath string) (*Controller, error) {
store, err := oci.New(OCIStorePath)
func NewController(outputDir string, ociStorePath string) (*Controller, error) {
store, err := oci.New(ociStorePath)
if err != nil {
return nil, fmt.Errorf("failed to initialize OCI store at path %s: %w", OCIStorePath, err)
return nil, fmt.Errorf("failed to initialize OCI store at path %s: %w", ociStorePath, err)
}

return &Controller{
OutputDir: outputDir,
BlobDir: OCIStorePath + "/blobs/sha256/",
OCIStorePath: OCIStorePath,
BlobDir: ociStorePath + "/blobs/sha256/",
OCIStorePath: ociStorePath,
Store: store,
}, nil
}
Expand Down Expand Up @@ -117,8 +117,15 @@ func (c *Controller) processRepository(repo string, since time.Duration) error {

// Process each tag within the repository.
for _, tagInfo := range tags {
if err := c.ProcessTag(repo, tagInfo.Name, tagInfo.LastModified, since); err != nil {
return fmt.Errorf("failed to process tag %s in repository %s: %w", tagInfo.Name, repo, err)
parsedDate, err := time.Parse(time.RFC1123, tagInfo.LastModified)
if err != nil {
return fmt.Errorf("failed to parse creation date %s: %w", tagInfo.LastModified, err)
}

if time.Since(parsedDate) < since {
if err := c.ProcessTag(repo, tagInfo.Name, tagInfo.LastModified); err != nil {
return fmt.Errorf("failed to process tag %s in repository %s: %w", tagInfo.Name, repo, err)
}
}
}

Expand Down
17 changes: 14 additions & 3 deletions pkg/oci/repository_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"net/url"
)

// Constants for OCI API configuration
Expand Down Expand Up @@ -61,10 +62,20 @@ func (c *Controller) buildTagsURL(repo string, page int) string {

// sendTagsRequest sends a GET request to the provided URL and decodes the response into a TagResponse struct.
// It returns an error if the request fails or if the response cannot be decoded.
func (c *Controller) sendTagsRequest(url string) (*TagResponse, error) {
resp, err := http.Get(url)
func (c *Controller) sendTagsRequest(urlStr string) (*TagResponse, error) {
parsedURL, err := url.Parse(urlStr) // Use url.Parse instead of urlStr.Parse
if err != nil {
return nil, fmt.Errorf("failed to fetch tags from URL %s: %w", url, err)
return nil, fmt.Errorf("invalid URL %s: %w", urlStr, err)
}

if parsedURL.Scheme != "http" && parsedURL.Scheme != "https" {
return nil, fmt.Errorf("unsupported URL scheme %s in URL %s", parsedURL.Scheme, urlStr)
}

// Perform the HTTP GET request
resp, err := http.Get(parsedURL.String())
if err != nil {
return nil, fmt.Errorf("failed to fetch tags from URL %s: %w", urlStr, err)
}
defer resp.Body.Close()

Expand Down
15 changes: 6 additions & 9 deletions pkg/oci/tag_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,8 @@ const (
blobTimeout = 2 * time.Minute
)

// Processes individual tags from a given repository
func (c *Controller) ProcessTag(repo, tag, creationDate string, since time.Duration) error {

if err := c.validateCreationDate(creationDate, since); err != nil {
return err
}

// ProcessTag processes individual tags from a given repository
func (c *Controller) ProcessTag(repo, tag, creationDate string) error {
ctx, cancel := context.WithTimeout(context.Background(), blobTimeout)
defer cancel()

Expand All @@ -42,7 +37,7 @@ func (c *Controller) ProcessTag(repo, tag, creationDate string, since time.Durat
}

outputDir := c.createOutputDirectory(repo, creationDate, tag)
if err := os.MkdirAll(outputDir, 0755); err != nil {
if err := os.MkdirAll(outputDir, 0o750); err != nil {
return fmt.Errorf("failed to create output directory %s: %w", outputDir, err)
}

Expand All @@ -55,8 +50,10 @@ func (c *Controller) validateCreationDate(creationDate string, since time.Durati
if err != nil {
return fmt.Errorf("failed to parse creation date %s: %w", creationDate, err)
}

fmt.Println(time.Since(parsedDate))
fmt.Println(since)
if time.Since(parsedDate) > since {
fmt.Println("aaa")
return nil
}

Expand Down

0 comments on commit d6403df

Please sign in to comment.