Skip to content

Commit

Permalink
upload: consolidate upload packages
Browse files Browse the repository at this point in the history
move the s3 and docker packages into the upload package, rename the
uploader to scheduler
  • Loading branch information
fho committed Nov 12, 2018
1 parent a9302c1 commit 8cc2573
Show file tree
Hide file tree
Showing 14 changed files with 98 additions and 68 deletions.
4 changes: 2 additions & 2 deletions buildoutput.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package baur

import (
"github.com/simplesurance/baur/digest"
"github.com/simplesurance/baur/upload"
"github.com/simplesurance/baur/upload/scheduler"
)

// BuildOutput is an interface for build artifacts
type BuildOutput interface {
Exists() bool
UploadJob() (upload.Job, error)
UploadJob() (scheduler.Job, error)
Name() string
String() string
LocalPath() string
Expand Down
2 changes: 1 addition & 1 deletion buildoutputbackends.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package baur

import "github.com/simplesurance/baur/docker"
import "github.com/simplesurance/baur/upload/docker"

// BuildOutputBackends contains a list of backends that are required to interact
// with artifacts
Expand Down
22 changes: 11 additions & 11 deletions command/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ import (
"github.com/simplesurance/baur/build/seq"
"github.com/simplesurance/baur/digest"
"github.com/simplesurance/baur/digest/sha384"
"github.com/simplesurance/baur/docker"
"github.com/simplesurance/baur/log"
"github.com/simplesurance/baur/prettyprint"
"github.com/simplesurance/baur/s3"
"github.com/simplesurance/baur/storage"
"github.com/simplesurance/baur/term"
"github.com/simplesurance/baur/upload"
sequploader "github.com/simplesurance/baur/upload/seq"
"github.com/simplesurance/baur/upload/docker"
"github.com/simplesurance/baur/upload/s3"
"github.com/simplesurance/baur/upload/scheduler"
sequploader "github.com/simplesurance/baur/upload/scheduler/seq"
)

const (
Expand Down Expand Up @@ -131,7 +131,7 @@ func resultAddBuildResult(bud *buildUserData, r *build.Result) {

}

func resultAddUploadResult(appName string, ar baur.BuildOutput, r *upload.Result) {
func resultAddUploadResult(appName string, ar baur.BuildOutput, r *scheduler.Result) {
var arType storage.ArtifactType

resultLock.Lock()
Expand All @@ -142,9 +142,9 @@ func resultAddUploadResult(appName string, ar baur.BuildOutput, r *upload.Result
log.Fatalf("resultAddUploadResult: %q does not exist in build result map", appName)
}

if r.Job.Type() == upload.JobDocker {
if r.Job.Type() == scheduler.JobDocker {
arType = storage.DockerArtifact
} else if r.Job.Type() == upload.JobS3 {
} else if r.Job.Type() == scheduler.JobS3 {
arType = storage.FileArtifact
}

Expand Down Expand Up @@ -264,7 +264,7 @@ func createBuildJobs(apps []*baur.App) []*build.Job {
return buildJobs
}

func startBGUploader(outputCnt int, uploadChan chan *upload.Result) upload.Manager {
func startBGUploader(outputCnt int, uploadChan chan *scheduler.Result) scheduler.Manager {
var dockerUploader *docker.Client
s3Uploader, err := s3.NewClient(log.StdLogger)
if err != nil {
Expand Down Expand Up @@ -311,7 +311,7 @@ func appsToString(apps []*baur.App) string {
return res
}

func waitPrintUploadStatus(uploader upload.Manager, uploadChan chan *upload.Result, finished chan struct{}, outputCnt int) {
func waitPrintUploadStatus(uploader scheduler.Manager, uploadChan chan *scheduler.Result, finished chan struct{}, outputCnt int) {
var resultCnt int

for res := range uploadChan {
Expand Down Expand Up @@ -386,7 +386,7 @@ func outstandingBuilds(storage storage.Storer, apps []*baur.App) []*baur.App {
func buildRun(cmd *cobra.Command, args []string) {
var apps []*baur.App
var uploadWatchFin chan struct{}
var uploader upload.Manager
var uploader scheduler.Manager

repo := MustFindRepository()
startTs := time.Now()
Expand Down Expand Up @@ -420,7 +420,7 @@ func buildRun(cmd *cobra.Command, args []string) {
outputCnt := outputCount(apps)

if buildUpload {
uploadChan := make(chan *upload.Result, outputCnt)
uploadChan := make(chan *scheduler.Result, outputCnt)
uploader = startBGUploader(outputCnt, uploadChan)
uploadWatchFin = make(chan struct{}, 1)
go waitPrintUploadStatus(uploader, uploadChan, uploadWatchFin, outputCnt)
Expand Down
9 changes: 4 additions & 5 deletions dockerartifact.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package baur

import (
"context"
"fmt"

"github.com/pkg/errors"

"github.com/simplesurance/baur/digest"
"github.com/simplesurance/baur/fs"
"github.com/simplesurance/baur/upload"
"github.com/simplesurance/baur/upload/scheduler"
)

// DockerArtifact is a docker container artifact
Expand Down Expand Up @@ -38,13 +37,13 @@ func (d *DockerArtifact) ImageID() (string, error) {
}

// UploadJob returns a upload.DockerJob for the artifact
func (d *DockerArtifact) UploadJob() (upload.Job, error) {
func (d *DockerArtifact) UploadJob() (scheduler.Job, error) {
id, err := d.ImageID()
if err != nil {
return nil, err
}

return &upload.DockerJob{
return &scheduler.DockerJob{
ImageID: id,
Repository: d.Repository,
Tag: d.Tag,
Expand Down Expand Up @@ -93,7 +92,7 @@ func (d *DockerArtifact) Size(b *BuildOutputBackends) (int64, error) {
return -1, errors.Wrap(err, "reading imageID from file failed")
}

return b.DockerClt.Size(context.Background(), id)
return b.DockerClt.Size(id)
}

// Type returns "docker"
Expand Down
7 changes: 3 additions & 4 deletions docker/docker.go → upload/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package docker
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"net/url"
Expand All @@ -29,7 +28,7 @@ var defLogFn = func(string, ...interface{}) { return }

// NewClientwAuth intializes a new docker client.
// The username and password is used to authenticate at the registry for an
// Upload() (= docker push) operation.
// Upload() = docker push) operation.
// The following environment variables are respected:
// Use DOCKER_HOST to set the url to the docker server.
// Use DOCKER_API_VERSION to set the version of the API to reach, leave empty for latest.
Expand Down Expand Up @@ -174,7 +173,7 @@ func parseRepositoryURI(dest string) (server, repository, tag string, err error)

// Upload tags and uploads an image into a docker registry repository
// destURI format: [<server[:port]>/]<owner>/<repository>:<tag>
func (c *Client) Upload(ctx context.Context, image, destURI string) (string, error) {
func (c *Client) Upload(image, destURI string) (string, error) {
server, repository, tag, err := parseRepositoryURI(destURI)
if err != nil {
return "", err
Expand Down Expand Up @@ -217,7 +216,7 @@ func (c *Client) Upload(ctx context.Context, image, destURI string) (string, err
}

// Size returns the size of an image in Bytes
func (c *Client) Size(ctx context.Context, imageID string) (int64, error) {
func (c *Client) Size(imageID string) (int64, error) {
summaries, err := c.clt.ListImages(docker.ListImagesOptions{})
if err != nil {
return -1, errors.Wrap(err, "fetching imagelist failed")
Expand Down
File renamed without changes.
1 change: 0 additions & 1 deletion s3/s3.go → upload/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ type Client struct {

// Logger defines the interface for an S3 logger
type Logger interface {
Debugf(format string, v ...interface{})
Debugln(v ...interface{})
DebugEnabled() bool
}
Expand Down
2 changes: 1 addition & 1 deletion upload/dockerjob.go → upload/scheduler/dockerjob.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package upload
package scheduler

import "fmt"

Expand Down
40 changes: 40 additions & 0 deletions upload/scheduler/filejob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package scheduler

import "fmt"

// FileCopyJob is an upload jobs for files to S3 repositories
type FileCopyJob struct {
UserData interface{}
Src string
Dst string
}

// LocalPath returns the local path of the file that is uploaded
func (f *FileCopyJob) LocalPath() string {
return f.Src
}

// RemoteDest returns the destination path
func (f *FileCopyJob) RemoteDest() string {
return f.Dst
}

// Type returns JobFileCopy
func (f *FileCopyJob) Type() JobType {
return JobFileCopy
}

// GetUserData returns the UserData
func (f *FileCopyJob) GetUserData() interface{} {
return f.UserData
}

// SetUserData sets the UserData
func (f *FileCopyJob) SetUserData(u interface{}) {
f.UserData = u
}

// String returns the string representation
func (f *FileCopyJob) String() string {
return fmt.Sprintf("%s -> %s", f.Src, f.Dst)
}
2 changes: 1 addition & 1 deletion upload/job.go → upload/scheduler/job.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package upload
package scheduler

// JobType describes the type of a job
type JobType int
Expand Down
2 changes: 1 addition & 1 deletion upload/s3job.go → upload/scheduler/s3job.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package upload
package scheduler

import "fmt"

Expand Down
18 changes: 18 additions & 0 deletions upload/scheduler/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package scheduler

import "time"

// Manager is an interface for upload managers
type Manager interface {
Add(Job)
Start()
Stop()
}

// Result result of an upload attempt
type Result struct {
Err error
URL string
Duration time.Duration
Job Job
}
26 changes: 13 additions & 13 deletions upload/seq/manager.go → upload/scheduler/seq/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
package seq

import (
"context"
"fmt"
"sync"
"time"

"github.com/pkg/errors"

"github.com/simplesurance/baur/upload"
"github.com/simplesurance/baur/upload/scheduler"
)

// Logger defines the logger interface
Expand All @@ -20,30 +20,30 @@ type Logger interface {

// Uploader is a sequential uploader
type Uploader struct {
s3 upload.S3Uploader
docker upload.DockerUploader
s3 upload.Uploader
docker upload.Uploader
lock sync.Mutex
queue []upload.Job
queue []scheduler.Job
stopProcessing bool
statusChan chan<- *upload.Result
statusChan chan<- *scheduler.Result
logger Logger
}

// New initializes a sequential uploader
// Status chan must have a buffer count > 1 otherwise a deadlock occurs
func New(logger Logger, s3Uploader upload.S3Uploader, dockerUploader upload.DockerUploader, status chan<- *upload.Result) *Uploader {
func New(logger Logger, s3Uploader, dockerUploader upload.Uploader, status chan<- *scheduler.Result) *Uploader {
return &Uploader{
logger: logger,
s3: s3Uploader,
statusChan: status,
lock: sync.Mutex{},
queue: []upload.Job{},
queue: []scheduler.Job{},
docker: dockerUploader,
}
}

// Add adds a new upload job, can be called after Start()
func (u *Uploader) Add(job upload.Job) {
func (u *Uploader) Add(job scheduler.Job) {
u.lock.Lock()
defer u.lock.Unlock()

Expand All @@ -54,7 +54,7 @@ func (u *Uploader) Add(job upload.Job) {
// If the statusChan buffer is full, uploading will be blocked.
func (u *Uploader) Start() {
for {
var job upload.Job
var job scheduler.Job

u.lock.Lock()
if len(u.queue) > 0 {
Expand All @@ -69,21 +69,21 @@ func (u *Uploader) Start() {
startTs := time.Now()

u.logger.Debugf("uploading %s", job)
if job.Type() == upload.JobS3 {
if job.Type() == scheduler.JobS3 {
url, err = u.s3.Upload(job.LocalPath(), job.RemoteDest())
if err != nil {
err = errors.Wrap(err, "S3 upload failed")
}
} else if job.Type() == upload.JobDocker {
url, err = u.docker.Upload(context.Background(), job.LocalPath(), job.RemoteDest())
} else if job.Type() == scheduler.JobDocker {
url, err = u.docker.Upload(job.LocalPath(), job.RemoteDest())
if err != nil {
err = errors.Wrap(err, "Docker upload failed")
}
} else {
panic(fmt.Sprintf("invalid job %+v", job))
}

u.statusChan <- &upload.Result{
u.statusChan <- &scheduler.Result{
Err: err,
URL: url,
Duration: time.Since(startTs),
Expand Down
31 changes: 3 additions & 28 deletions upload/uploader.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,6 @@
package upload

import (
"context"
"time"
)

// S3Uploader is an interface for S3 uploader
type S3Uploader interface {
Upload(file, dest string) (string, error)
}

// DockerUploader is an interface for docker uploader
type DockerUploader interface {
Upload(ctx context.Context, image, dest string) (string, error)
}

// Manager is an interface for upload managers
type Manager interface {
Add(Job)
Start()
Stop()
}

// Result result of an upload attempt
type Result struct {
Err error
URL string
Duration time.Duration
Job Job
//Uploader is an interface for storing files in another place
type Uploader interface {
Upload(from, to string) (string, error)
}

0 comments on commit 8cc2573

Please sign in to comment.