Skip to content

Commit

Permalink
mv env var and flag handling into conf package
Browse files Browse the repository at this point in the history
  • Loading branch information
jhoblitt committed Nov 22, 2024
1 parent 06e0b70 commit 3ad74f9
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 139 deletions.
134 changes: 134 additions & 0 deletions conf/conf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package conf

import (
"flag"
"log"
"os"
"strconv"
"time"

k8sresource "k8s.io/apimachinery/pkg/api/resource"
)

type S3ndConf struct {
Host *string
Port *int
EndpointUrl *string
UploadMaxParallel *int64
UploadTimeout *time.Duration
QueueTimeout *time.Duration
UploadTries *int
UploadPartsize *k8sresource.Quantity
UploadBwlimit *k8sresource.Quantity
UploadWriteBufferSize *k8sresource.Quantity
}

// Parse the environment variables and flags. If a flag is not set, the
// environment variable is used. Errors are fatal.
func NewConf() S3ndConf {
conf := S3ndConf{}

// start flags
conf.Host = flag.String("host", os.Getenv("S3ND_HOST"), "S3 Daemon Host (S3ND_HOST)")

defaultPort, _ := strconv.Atoi(os.Getenv("S3ND_PORT"))
if defaultPort == 0 {
defaultPort = 15555
}
conf.Port = flag.Int("port", defaultPort, "S3 Daemon Port (S3ND_PORT)")

conf.EndpointUrl = flag.String("endpoint-url", os.Getenv("S3ND_ENDPOINT_URL"), "S3 Endpoint URL (S3ND_ENDPOINT_URL)")

var defaultUploadMaxParallel int64
defaultUploadMaxParallel, _ = strconv.ParseInt(os.Getenv("S3ND_UPLOAD_MAX_PARALLEL"), 10, 64)
if defaultUploadMaxParallel == 0 {
defaultUploadMaxParallel = 100
}
conf.UploadMaxParallel = flag.Int64("upload-max-parallel", defaultUploadMaxParallel, "Maximum number of parallel object uploads (S3ND_UPLOAD_MAX_PARALLEL)")

defaultUploadTimeout := os.Getenv("S3ND_UPLOAD_TIMEOUT")
if defaultUploadTimeout == "" {
defaultUploadTimeout = "10s"
}
uploadTimeout := flag.String("upload-timeout", defaultUploadTimeout, "Upload Timeout (S3ND_UPLOAD_TIMEOUT)")

defaultQueueTimeout := os.Getenv("S3ND_QUEUE_TIMEOUT")
if defaultQueueTimeout == "" {
defaultQueueTimeout = "10s"
}
queueTimeout := flag.String("queue-timeout", defaultQueueTimeout, "Queue Timeout waiting for transfer to start (S3ND_QUEUE_TIMEOUT)")

defaultUploadTries, _ := strconv.Atoi(os.Getenv("S3ND_UPLOAD_TRIES"))
if defaultUploadTries == 0 {
defaultUploadTries = 1
}
conf.UploadTries = flag.Int("upload-tries", defaultUploadTries, "Max number of upload tries (S3ND_UPLOAD_TRIES)")

defaultUploadPartsize := os.Getenv("S3ND_UPLOAD_PARTSIZE")
if defaultUploadPartsize == "" {
defaultUploadPartsize = "5Mi"
}
uploadPartsizeRaw := flag.String("upload-partsize", defaultUploadPartsize, "Upload Part Size (S3ND_UPLOAD_PARTSIZE)")

defaultUploadBwlimit := os.Getenv("S3ND_UPLOAD_BWLIMIT")
if defaultUploadBwlimit == "" {
defaultUploadBwlimit = "0"
}
uploadBwlimitRaw := flag.String("upload-bwlimit", defaultUploadBwlimit, "Upload bandwidth limit in bits per second (S3ND_UPLOAD_BWLIMIT)")

defaultUploadWriteBufferSize := os.Getenv("S3ND_UPLOAD_WRITE_BUFFER_SIZE")
if defaultUploadWriteBufferSize == "" {
defaultUploadWriteBufferSize = "64Ki"
}
uploadWriteBufferSizeRaw := flag.String("upload-write-buffer-size", defaultUploadWriteBufferSize, "Upload Write Buffer Size (S3ND_UPLOAD_WRITE_BUFFER_SIZE)")

flag.Parse()
// end flags

if *conf.EndpointUrl == "" {
log.Fatal("S3ND_ENDPOINT_URL is required")
}

uploadTimeoutDuration, err := time.ParseDuration(*uploadTimeout)
if err != nil {
log.Fatal("S3ND_UPLOAD_TIMEOUT is invalid")
}
conf.UploadTimeout = &uploadTimeoutDuration

queueTimeoutDuration, err := time.ParseDuration(*queueTimeout)
if err != nil {
log.Fatal("S3ND_QUEUE_TIMEOUT is invalid")
}
conf.QueueTimeout = &queueTimeoutDuration

uploadPartsize, err := k8sresource.ParseQuantity(*uploadPartsizeRaw)
if err != nil {
log.Fatal("S3ND_UPLOAD_PARTSIZE is invalid")
}
conf.UploadPartsize = &uploadPartsize

uploadBwlimit, err := k8sresource.ParseQuantity(*uploadBwlimitRaw)
if err != nil {
log.Fatal("S3ND_UPLOAD_BWLIMIT is invalid")
}
conf.UploadBwlimit = &uploadBwlimit

uploadWriteBufferSize, err := k8sresource.ParseQuantity(*uploadWriteBufferSizeRaw)
if err != nil {
log.Fatal("S3ND_UPLOAD_WRITE_BUFFER_SIZE is invalid")
}
conf.UploadWriteBufferSize = &uploadWriteBufferSize

log.Println("S3ND_HOST:", *conf.Host)
log.Println("S3ND_PORT:", *conf.Port)
log.Println("S3ND_ENDPOINT_URL:", *conf.EndpointUrl)
log.Println("S3ND_UPLOAD_MAX_PARALLEL:", *conf.UploadMaxParallel)
log.Println("S3ND_UPLOAD_TIMEOUT:", *conf.UploadTimeout)
log.Println("S3ND_QUEUE_TIMEOUT:", *conf.QueueTimeout)
log.Println("S3ND_UPLOAD_TRIES:", *conf.UploadTries)
log.Println("S3ND_UPLOAD_PARTSIZE:", conf.UploadPartsize.String())
log.Println("S3ND_UPLOAD_BWLIMIT:", conf.UploadBwlimit.String())
log.Println("S3ND_UPLOAD_WRITE_BUFFER_SIZE:", conf.UploadWriteBufferSize.String())

return conf
}
156 changes: 17 additions & 139 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"context"
"errors"
"flag"
"fmt"
"html"
"log"
Expand All @@ -12,10 +11,11 @@ import (
"net/url"
"os"
"path/filepath"
"strconv"
"syscall"
"time"

"github.com/lsst-dm/s3nd/conf"

"github.com/aws/aws-sdk-go-v2/aws"
awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
"github.com/aws/aws-sdk-go-v2/config"
Expand All @@ -24,24 +24,10 @@ import (
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/hyperledger/fabric/common/semaphore"
"golang.org/x/sys/unix"
k8sresource "k8s.io/apimachinery/pkg/api/resource"
)

type S3ndConf struct {
host *string
port *int
endpointUrl *string
uploadMaxParallel *int64
uploadTimeout *time.Duration
queueTimeout *time.Duration
uploadTries *int
uploadPartsize *k8sresource.Quantity
uploadBwlimit *k8sresource.Quantity
uploadWriteBufferSize *k8sresource.Quantity
}

type S3ndHandler struct {
Conf *S3ndConf
Conf *conf.S3ndConf
AwsConfig *aws.Config
S3Client *s3.Client
Uploader *manager.Uploader
Expand All @@ -64,10 +50,10 @@ func (h *S3ndHandler) UploadFileMultipart(ctx context.Context, task *S3ndUploadT
}
defer file.Close()

maxAttempts := *h.Conf.uploadTries
maxAttempts := *h.Conf.UploadTries
var attempt int
for attempt = 1; attempt <= maxAttempts; attempt++ {
uploadCtx, cancel := context.WithTimeout(ctx, *h.Conf.uploadTimeout)
uploadCtx, cancel := context.WithTimeout(ctx, *h.Conf.UploadTimeout)
defer cancel()
_, err = h.Uploader.Upload(uploadCtx, &s3.PutObjectInput{
Bucket: aws.String(*task.bucket),
Expand Down Expand Up @@ -150,7 +136,7 @@ func (h *S3ndHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Printf("queuing %v:%v | source %v\n", *task.bucket, *task.key, *task.file)

// limit the number of parallel uploads
semaCtx, cancel := context.WithTimeout(r.Context(), *h.Conf.queueTimeout)
semaCtx, cancel := context.WithTimeout(r.Context(), *h.Conf.QueueTimeout)
defer cancel()
if err := h.ParallelUploads.Acquire(semaCtx); err != nil {
w.WriteHeader(http.StatusServiceUnavailable)
Expand All @@ -169,130 +155,22 @@ func (h *S3ndHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Successful put %q\n", html.EscapeString(task.uri.String()))
}

func getConf() S3ndConf {
conf := S3ndConf{}

// start flags
conf.host = flag.String("host", os.Getenv("S3ND_HOST"), "S3 Daemon Host (S3ND_HOST)")

defaultPort, _ := strconv.Atoi(os.Getenv("S3ND_PORT"))
if defaultPort == 0 {
defaultPort = 15555
}
conf.port = flag.Int("port", defaultPort, "S3 Daemon Port (S3ND_PORT)")

conf.endpointUrl = flag.String("endpoint-url", os.Getenv("S3ND_ENDPOINT_URL"), "S3 Endpoint URL (S3ND_ENDPOINT_URL)")

var defaultUploadMaxParallel int64
defaultUploadMaxParallel, _ = strconv.ParseInt(os.Getenv("S3ND_UPLOAD_MAX_PARALLEL"), 10, 64)
if defaultUploadMaxParallel == 0 {
defaultUploadMaxParallel = 100
}
conf.uploadMaxParallel = flag.Int64("upload-max-parallel", defaultUploadMaxParallel, "Maximum number of parallel object uploads (S3ND_UPLOAD_MAX_PARALLEL)")

defaultUploadTimeout := os.Getenv("S3ND_UPLOAD_TIMEOUT")
if defaultUploadTimeout == "" {
defaultUploadTimeout = "10s"
}
uploadTimeout := flag.String("upload-timeout", defaultUploadTimeout, "Upload Timeout (S3ND_UPLOAD_TIMEOUT)")

defaultQueueTimeout := os.Getenv("S3ND_QUEUE_TIMEOUT")
if defaultQueueTimeout == "" {
defaultQueueTimeout = "10s"
}
queueTimeout := flag.String("queue-timeout", defaultQueueTimeout, "Queue Timeout waiting for transfer to start (S3ND_QUEUE_TIMEOUT)")

defaultUploadTries, _ := strconv.Atoi(os.Getenv("S3ND_UPLOAD_TRIES"))
if defaultUploadTries == 0 {
defaultUploadTries = 1
}
conf.uploadTries = flag.Int("upload-tries", defaultUploadTries, "Max number of upload tries (S3ND_UPLOAD_TRIES)")

defaultUploadPartsize := os.Getenv("S3ND_UPLOAD_PARTSIZE")
if defaultUploadPartsize == "" {
defaultUploadPartsize = "5Mi"
}
uploadPartsizeRaw := flag.String("upload-partsize", defaultUploadPartsize, "Upload Part Size (S3ND_UPLOAD_PARTSIZE)")

defaultUploadBwlimit := os.Getenv("S3ND_UPLOAD_BWLIMIT")
if defaultUploadBwlimit == "" {
defaultUploadBwlimit = "0"
}
uploadBwlimitRaw := flag.String("upload-bwlimit", defaultUploadBwlimit, "Upload bandwidth limit in bits per second (S3ND_UPLOAD_BWLIMIT)")

defaultUploadWriteBufferSize := os.Getenv("S3ND_UPLOAD_WRITE_BUFFER_SIZE")
if defaultUploadWriteBufferSize == "" {
defaultUploadWriteBufferSize = "64Ki"
}
uploadWriteBufferSizeRaw := flag.String("upload-write-buffer-size", defaultUploadWriteBufferSize, "Upload Write Buffer Size (S3ND_UPLOAD_WRITE_BUFFER_SIZE)")

flag.Parse()
// end flags

if *conf.endpointUrl == "" {
log.Fatal("S3ND_ENDPOINT_URL is required")
}

uploadTimeoutDuration, err := time.ParseDuration(*uploadTimeout)
if err != nil {
log.Fatal("S3ND_UPLOAD_TIMEOUT is invalid")
}
conf.uploadTimeout = &uploadTimeoutDuration

queueTimeoutDuration, err := time.ParseDuration(*queueTimeout)
if err != nil {
log.Fatal("S3ND_QUEUE_TIMEOUT is invalid")
}
conf.queueTimeout = &queueTimeoutDuration

uploadPartsize, err := k8sresource.ParseQuantity(*uploadPartsizeRaw)
if err != nil {
log.Fatal("S3ND_UPLOAD_PARTSIZE is invalid")
}
conf.uploadPartsize = &uploadPartsize

uploadBwlimit, err := k8sresource.ParseQuantity(*uploadBwlimitRaw)
if err != nil {
log.Fatal("S3ND_UPLOAD_BWLIMIT is invalid")
}
conf.uploadBwlimit = &uploadBwlimit

uploadWriteBufferSize, err := k8sresource.ParseQuantity(*uploadWriteBufferSizeRaw)
if err != nil {
log.Fatal("S3ND_UPLOAD_WRITE_BUFFER_SIZE is invalid")
}
conf.uploadWriteBufferSize = &uploadWriteBufferSize

log.Println("S3ND_HOST:", *conf.host)
log.Println("S3ND_PORT:", *conf.port)
log.Println("S3ND_ENDPOINT_URL:", *conf.endpointUrl)
log.Println("S3ND_UPLOAD_MAX_PARALLEL:", *conf.uploadMaxParallel)
log.Println("S3ND_UPLOAD_TIMEOUT:", *conf.uploadTimeout)
log.Println("S3ND_QUEUE_TIMEOUT:", *conf.queueTimeout)
log.Println("S3ND_UPLOAD_TRIES:", *conf.uploadTries)
log.Println("S3ND_UPLOAD_PARTSIZE:", conf.uploadPartsize.String())
log.Println("S3ND_UPLOAD_BWLIMIT:", conf.uploadBwlimit.String())
log.Println("S3ND_UPLOAD_WRITE_BUFFER_SIZE:", conf.uploadWriteBufferSize.String())

return conf
}

func NewHandler(conf *S3ndConf) *S3ndHandler {
func NewHandler(conf *conf.S3ndConf) *S3ndHandler {
handler := &S3ndHandler{
Conf: conf,
}

maxConns := int(*conf.uploadMaxParallel * 5) // allow for multipart upload creation
maxConns := int(*conf.UploadMaxParallel * 5) // allow for multipart upload creation

var httpClient *awshttp.BuildableClient

if conf.uploadBwlimit.Value() != 0 {
if conf.UploadBwlimit.Value() != 0 {
dialer := &net.Dialer{
Control: func(network, address string, conn syscall.RawConn) error {
// https://pkg.go.dev/syscall#RawConn
var operr error
if err := conn.Control(func(fd uintptr) {
operr = syscall.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_MAX_PACING_RATE, int(conf.uploadBwlimit.Value()/8))
operr = syscall.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_MAX_PACING_RATE, int(conf.UploadBwlimit.Value()/8))
}); err != nil {
return err
}
Expand All @@ -306,7 +184,7 @@ func NewHandler(conf *S3ndConf) *S3ndHandler {
t.MaxIdleConns = maxConns
t.MaxConnsPerHost = maxConns
t.MaxIdleConnsPerHost = maxConns
t.WriteBufferSize = int(conf.uploadWriteBufferSize.Value())
t.WriteBufferSize = int(conf.UploadWriteBufferSize.Value())
// disable http/2 to prevent muxing over a single tcp connection
t.ForceAttemptHTTP2 = false
t.TLSClientConfig.NextProtos = []string{"http/1.1"}
Expand All @@ -319,7 +197,7 @@ func NewHandler(conf *S3ndConf) *S3ndHandler {
t.MaxIdleConns = maxConns
t.MaxConnsPerHost = maxConns
t.MaxIdleConnsPerHost = maxConns
t.WriteBufferSize = int(conf.uploadWriteBufferSize.Value())
t.WriteBufferSize = int(conf.UploadWriteBufferSize.Value())
// disable http/2 to prevent muxing over a single tcp connection
t.ForceAttemptHTTP2 = false
t.TLSClientConfig.NextProtos = []string{"http/1.1"}
Expand All @@ -328,7 +206,7 @@ func NewHandler(conf *S3ndConf) *S3ndHandler {

awsCfg, err := config.LoadDefaultConfig(
context.TODO(),
config.WithBaseEndpoint(*conf.endpointUrl),
config.WithBaseEndpoint(*conf.EndpointUrl),
config.WithHTTPClient(httpClient),
)
if err != nil {
Expand All @@ -345,22 +223,22 @@ func NewHandler(conf *S3ndConf) *S3ndHandler {
handler.Uploader = manager.NewUploader(handler.S3Client, func(u *manager.Uploader) {
u.Concurrency = 1000
u.MaxUploadParts = 1000
u.PartSize = conf.uploadPartsize.Value()
u.PartSize = conf.UploadPartsize.Value()
})

sema := semaphore.New(int(*conf.uploadMaxParallel))
sema := semaphore.New(int(*conf.UploadMaxParallel))
handler.ParallelUploads = &sema

return handler
}

func main() {
conf := getConf()
conf := conf.NewConf()

handler := NewHandler(&conf)
http.Handle("/", handler)

addr := fmt.Sprintf("%s:%d", *conf.host, *conf.port)
addr := fmt.Sprintf("%s:%d", *conf.Host, *conf.Port)
log.Println("Listening on", addr)

err := http.ListenAndServe(addr, nil)
Expand Down

0 comments on commit 3ad74f9

Please sign in to comment.