From 3ad74f97a36c6bad6e1291ae2fca18692b40f9a2 Mon Sep 17 00:00:00 2001 From: Joshua Hoblitt Date: Fri, 22 Nov 2024 15:57:14 -0700 Subject: [PATCH] mv env var and flag handling into conf package --- conf/conf.go | 134 +++++++++++++++++++++++++++++++++++++++++++ main.go | 156 ++++++--------------------------------------------- 2 files changed, 151 insertions(+), 139 deletions(-) create mode 100644 conf/conf.go diff --git a/conf/conf.go b/conf/conf.go new file mode 100644 index 0000000..113c988 --- /dev/null +++ b/conf/conf.go @@ -0,0 +1,134 @@ +package conf + +import ( + "flag" + "log" + "os" + "strconv" + "time" + + k8sresource "k8s.io/apimachinery/pkg/api/resource" +) + +type S3ndConf struct { + Host *string + Port *int + EndpointUrl *string + UploadMaxParallel *int64 + UploadTimeout *time.Duration + QueueTimeout *time.Duration + UploadTries *int + UploadPartsize *k8sresource.Quantity + UploadBwlimit *k8sresource.Quantity + UploadWriteBufferSize *k8sresource.Quantity +} + +// Parse the environment variables and flags. If a flag is not set, the +// environment variable is used. Errors are fatal. +func NewConf() S3ndConf { + conf := S3ndConf{} + + // start flags + conf.Host = flag.String("host", os.Getenv("S3ND_HOST"), "S3 Daemon Host (S3ND_HOST)") + + defaultPort, _ := strconv.Atoi(os.Getenv("S3ND_PORT")) + if defaultPort == 0 { + defaultPort = 15555 + } + conf.Port = flag.Int("port", defaultPort, "S3 Daemon Port (S3ND_PORT)") + + conf.EndpointUrl = flag.String("endpoint-url", os.Getenv("S3ND_ENDPOINT_URL"), "S3 Endpoint URL (S3ND_ENDPOINT_URL)") + + var defaultUploadMaxParallel int64 + defaultUploadMaxParallel, _ = strconv.ParseInt(os.Getenv("S3ND_UPLOAD_MAX_PARALLEL"), 10, 64) + if defaultUploadMaxParallel == 0 { + defaultUploadMaxParallel = 100 + } + conf.UploadMaxParallel = flag.Int64("upload-max-parallel", defaultUploadMaxParallel, "Maximum number of parallel object uploads (S3ND_UPLOAD_MAX_PARALLEL)") + + defaultUploadTimeout := os.Getenv("S3ND_UPLOAD_TIMEOUT") + if defaultUploadTimeout == "" { + defaultUploadTimeout = "10s" + } + uploadTimeout := flag.String("upload-timeout", defaultUploadTimeout, "Upload Timeout (S3ND_UPLOAD_TIMEOUT)") + + defaultQueueTimeout := os.Getenv("S3ND_QUEUE_TIMEOUT") + if defaultQueueTimeout == "" { + defaultQueueTimeout = "10s" + } + queueTimeout := flag.String("queue-timeout", defaultQueueTimeout, "Queue Timeout waiting for transfer to start (S3ND_QUEUE_TIMEOUT)") + + defaultUploadTries, _ := strconv.Atoi(os.Getenv("S3ND_UPLOAD_TRIES")) + if defaultUploadTries == 0 { + defaultUploadTries = 1 + } + conf.UploadTries = flag.Int("upload-tries", defaultUploadTries, "Max number of upload tries (S3ND_UPLOAD_TRIES)") + + defaultUploadPartsize := os.Getenv("S3ND_UPLOAD_PARTSIZE") + if defaultUploadPartsize == "" { + defaultUploadPartsize = "5Mi" + } + uploadPartsizeRaw := flag.String("upload-partsize", defaultUploadPartsize, "Upload Part Size (S3ND_UPLOAD_PARTSIZE)") + + defaultUploadBwlimit := os.Getenv("S3ND_UPLOAD_BWLIMIT") + if defaultUploadBwlimit == "" { + defaultUploadBwlimit = "0" + } + uploadBwlimitRaw := flag.String("upload-bwlimit", defaultUploadBwlimit, "Upload bandwidth limit in bits per second (S3ND_UPLOAD_BWLIMIT)") + + defaultUploadWriteBufferSize := os.Getenv("S3ND_UPLOAD_WRITE_BUFFER_SIZE") + if defaultUploadWriteBufferSize == "" { + defaultUploadWriteBufferSize = "64Ki" + } + uploadWriteBufferSizeRaw := flag.String("upload-write-buffer-size", defaultUploadWriteBufferSize, "Upload Write Buffer Size (S3ND_UPLOAD_WRITE_BUFFER_SIZE)") + + flag.Parse() + // end flags + + if *conf.EndpointUrl == "" { + log.Fatal("S3ND_ENDPOINT_URL is required") + } + + uploadTimeoutDuration, err := time.ParseDuration(*uploadTimeout) + if err != nil { + log.Fatal("S3ND_UPLOAD_TIMEOUT is invalid") + } + conf.UploadTimeout = &uploadTimeoutDuration + + queueTimeoutDuration, err := time.ParseDuration(*queueTimeout) + if err != nil { + log.Fatal("S3ND_QUEUE_TIMEOUT is invalid") + } + conf.QueueTimeout = &queueTimeoutDuration + + uploadPartsize, err := k8sresource.ParseQuantity(*uploadPartsizeRaw) + if err != nil { + log.Fatal("S3ND_UPLOAD_PARTSIZE is invalid") + } + conf.UploadPartsize = &uploadPartsize + + uploadBwlimit, err := k8sresource.ParseQuantity(*uploadBwlimitRaw) + if err != nil { + log.Fatal("S3ND_UPLOAD_BWLIMIT is invalid") + } + conf.UploadBwlimit = &uploadBwlimit + + uploadWriteBufferSize, err := k8sresource.ParseQuantity(*uploadWriteBufferSizeRaw) + if err != nil { + log.Fatal("S3ND_UPLOAD_WRITE_BUFFER_SIZE is invalid") + } + conf.UploadWriteBufferSize = &uploadWriteBufferSize + + log.Println("S3ND_HOST:", *conf.Host) + log.Println("S3ND_PORT:", *conf.Port) + log.Println("S3ND_ENDPOINT_URL:", *conf.EndpointUrl) + log.Println("S3ND_UPLOAD_MAX_PARALLEL:", *conf.UploadMaxParallel) + log.Println("S3ND_UPLOAD_TIMEOUT:", *conf.UploadTimeout) + log.Println("S3ND_QUEUE_TIMEOUT:", *conf.QueueTimeout) + log.Println("S3ND_UPLOAD_TRIES:", *conf.UploadTries) + log.Println("S3ND_UPLOAD_PARTSIZE:", conf.UploadPartsize.String()) + log.Println("S3ND_UPLOAD_BWLIMIT:", conf.UploadBwlimit.String()) + log.Println("S3ND_UPLOAD_WRITE_BUFFER_SIZE:", conf.UploadWriteBufferSize.String()) + + return conf +} diff --git a/main.go b/main.go index 1b9dee7..2e4eec9 100644 --- a/main.go +++ b/main.go @@ -3,7 +3,6 @@ package main import ( "context" "errors" - "flag" "fmt" "html" "log" @@ -12,10 +11,11 @@ import ( "net/url" "os" "path/filepath" - "strconv" "syscall" "time" + "github.com/lsst-dm/s3nd/conf" + "github.com/aws/aws-sdk-go-v2/aws" awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http" "github.com/aws/aws-sdk-go-v2/config" @@ -24,24 +24,10 @@ import ( "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/hyperledger/fabric/common/semaphore" "golang.org/x/sys/unix" - k8sresource "k8s.io/apimachinery/pkg/api/resource" ) -type S3ndConf struct { - host *string - port *int - endpointUrl *string - uploadMaxParallel *int64 - uploadTimeout *time.Duration - queueTimeout *time.Duration - uploadTries *int - uploadPartsize *k8sresource.Quantity - uploadBwlimit *k8sresource.Quantity - uploadWriteBufferSize *k8sresource.Quantity -} - type S3ndHandler struct { - Conf *S3ndConf + Conf *conf.S3ndConf AwsConfig *aws.Config S3Client *s3.Client Uploader *manager.Uploader @@ -64,10 +50,10 @@ func (h *S3ndHandler) UploadFileMultipart(ctx context.Context, task *S3ndUploadT } defer file.Close() - maxAttempts := *h.Conf.uploadTries + maxAttempts := *h.Conf.UploadTries var attempt int for attempt = 1; attempt <= maxAttempts; attempt++ { - uploadCtx, cancel := context.WithTimeout(ctx, *h.Conf.uploadTimeout) + uploadCtx, cancel := context.WithTimeout(ctx, *h.Conf.UploadTimeout) defer cancel() _, err = h.Uploader.Upload(uploadCtx, &s3.PutObjectInput{ Bucket: aws.String(*task.bucket), @@ -150,7 +136,7 @@ func (h *S3ndHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Printf("queuing %v:%v | source %v\n", *task.bucket, *task.key, *task.file) // limit the number of parallel uploads - semaCtx, cancel := context.WithTimeout(r.Context(), *h.Conf.queueTimeout) + semaCtx, cancel := context.WithTimeout(r.Context(), *h.Conf.QueueTimeout) defer cancel() if err := h.ParallelUploads.Acquire(semaCtx); err != nil { w.WriteHeader(http.StatusServiceUnavailable) @@ -169,130 +155,22 @@ func (h *S3ndHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "Successful put %q\n", html.EscapeString(task.uri.String())) } -func getConf() S3ndConf { - conf := S3ndConf{} - - // start flags - conf.host = flag.String("host", os.Getenv("S3ND_HOST"), "S3 Daemon Host (S3ND_HOST)") - - defaultPort, _ := strconv.Atoi(os.Getenv("S3ND_PORT")) - if defaultPort == 0 { - defaultPort = 15555 - } - conf.port = flag.Int("port", defaultPort, "S3 Daemon Port (S3ND_PORT)") - - conf.endpointUrl = flag.String("endpoint-url", os.Getenv("S3ND_ENDPOINT_URL"), "S3 Endpoint URL (S3ND_ENDPOINT_URL)") - - var defaultUploadMaxParallel int64 - defaultUploadMaxParallel, _ = strconv.ParseInt(os.Getenv("S3ND_UPLOAD_MAX_PARALLEL"), 10, 64) - if defaultUploadMaxParallel == 0 { - defaultUploadMaxParallel = 100 - } - conf.uploadMaxParallel = flag.Int64("upload-max-parallel", defaultUploadMaxParallel, "Maximum number of parallel object uploads (S3ND_UPLOAD_MAX_PARALLEL)") - - defaultUploadTimeout := os.Getenv("S3ND_UPLOAD_TIMEOUT") - if defaultUploadTimeout == "" { - defaultUploadTimeout = "10s" - } - uploadTimeout := flag.String("upload-timeout", defaultUploadTimeout, "Upload Timeout (S3ND_UPLOAD_TIMEOUT)") - - defaultQueueTimeout := os.Getenv("S3ND_QUEUE_TIMEOUT") - if defaultQueueTimeout == "" { - defaultQueueTimeout = "10s" - } - queueTimeout := flag.String("queue-timeout", defaultQueueTimeout, "Queue Timeout waiting for transfer to start (S3ND_QUEUE_TIMEOUT)") - - defaultUploadTries, _ := strconv.Atoi(os.Getenv("S3ND_UPLOAD_TRIES")) - if defaultUploadTries == 0 { - defaultUploadTries = 1 - } - conf.uploadTries = flag.Int("upload-tries", defaultUploadTries, "Max number of upload tries (S3ND_UPLOAD_TRIES)") - - defaultUploadPartsize := os.Getenv("S3ND_UPLOAD_PARTSIZE") - if defaultUploadPartsize == "" { - defaultUploadPartsize = "5Mi" - } - uploadPartsizeRaw := flag.String("upload-partsize", defaultUploadPartsize, "Upload Part Size (S3ND_UPLOAD_PARTSIZE)") - - defaultUploadBwlimit := os.Getenv("S3ND_UPLOAD_BWLIMIT") - if defaultUploadBwlimit == "" { - defaultUploadBwlimit = "0" - } - uploadBwlimitRaw := flag.String("upload-bwlimit", defaultUploadBwlimit, "Upload bandwidth limit in bits per second (S3ND_UPLOAD_BWLIMIT)") - - defaultUploadWriteBufferSize := os.Getenv("S3ND_UPLOAD_WRITE_BUFFER_SIZE") - if defaultUploadWriteBufferSize == "" { - defaultUploadWriteBufferSize = "64Ki" - } - uploadWriteBufferSizeRaw := flag.String("upload-write-buffer-size", defaultUploadWriteBufferSize, "Upload Write Buffer Size (S3ND_UPLOAD_WRITE_BUFFER_SIZE)") - - flag.Parse() - // end flags - - if *conf.endpointUrl == "" { - log.Fatal("S3ND_ENDPOINT_URL is required") - } - - uploadTimeoutDuration, err := time.ParseDuration(*uploadTimeout) - if err != nil { - log.Fatal("S3ND_UPLOAD_TIMEOUT is invalid") - } - conf.uploadTimeout = &uploadTimeoutDuration - - queueTimeoutDuration, err := time.ParseDuration(*queueTimeout) - if err != nil { - log.Fatal("S3ND_QUEUE_TIMEOUT is invalid") - } - conf.queueTimeout = &queueTimeoutDuration - - uploadPartsize, err := k8sresource.ParseQuantity(*uploadPartsizeRaw) - if err != nil { - log.Fatal("S3ND_UPLOAD_PARTSIZE is invalid") - } - conf.uploadPartsize = &uploadPartsize - - uploadBwlimit, err := k8sresource.ParseQuantity(*uploadBwlimitRaw) - if err != nil { - log.Fatal("S3ND_UPLOAD_BWLIMIT is invalid") - } - conf.uploadBwlimit = &uploadBwlimit - - uploadWriteBufferSize, err := k8sresource.ParseQuantity(*uploadWriteBufferSizeRaw) - if err != nil { - log.Fatal("S3ND_UPLOAD_WRITE_BUFFER_SIZE is invalid") - } - conf.uploadWriteBufferSize = &uploadWriteBufferSize - - log.Println("S3ND_HOST:", *conf.host) - log.Println("S3ND_PORT:", *conf.port) - log.Println("S3ND_ENDPOINT_URL:", *conf.endpointUrl) - log.Println("S3ND_UPLOAD_MAX_PARALLEL:", *conf.uploadMaxParallel) - log.Println("S3ND_UPLOAD_TIMEOUT:", *conf.uploadTimeout) - log.Println("S3ND_QUEUE_TIMEOUT:", *conf.queueTimeout) - log.Println("S3ND_UPLOAD_TRIES:", *conf.uploadTries) - log.Println("S3ND_UPLOAD_PARTSIZE:", conf.uploadPartsize.String()) - log.Println("S3ND_UPLOAD_BWLIMIT:", conf.uploadBwlimit.String()) - log.Println("S3ND_UPLOAD_WRITE_BUFFER_SIZE:", conf.uploadWriteBufferSize.String()) - - return conf -} - -func NewHandler(conf *S3ndConf) *S3ndHandler { +func NewHandler(conf *conf.S3ndConf) *S3ndHandler { handler := &S3ndHandler{ Conf: conf, } - maxConns := int(*conf.uploadMaxParallel * 5) // allow for multipart upload creation + maxConns := int(*conf.UploadMaxParallel * 5) // allow for multipart upload creation var httpClient *awshttp.BuildableClient - if conf.uploadBwlimit.Value() != 0 { + if conf.UploadBwlimit.Value() != 0 { dialer := &net.Dialer{ Control: func(network, address string, conn syscall.RawConn) error { // https://pkg.go.dev/syscall#RawConn var operr error if err := conn.Control(func(fd uintptr) { - operr = syscall.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_MAX_PACING_RATE, int(conf.uploadBwlimit.Value()/8)) + operr = syscall.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_MAX_PACING_RATE, int(conf.UploadBwlimit.Value()/8)) }); err != nil { return err } @@ -306,7 +184,7 @@ func NewHandler(conf *S3ndConf) *S3ndHandler { t.MaxIdleConns = maxConns t.MaxConnsPerHost = maxConns t.MaxIdleConnsPerHost = maxConns - t.WriteBufferSize = int(conf.uploadWriteBufferSize.Value()) + t.WriteBufferSize = int(conf.UploadWriteBufferSize.Value()) // disable http/2 to prevent muxing over a single tcp connection t.ForceAttemptHTTP2 = false t.TLSClientConfig.NextProtos = []string{"http/1.1"} @@ -319,7 +197,7 @@ func NewHandler(conf *S3ndConf) *S3ndHandler { t.MaxIdleConns = maxConns t.MaxConnsPerHost = maxConns t.MaxIdleConnsPerHost = maxConns - t.WriteBufferSize = int(conf.uploadWriteBufferSize.Value()) + t.WriteBufferSize = int(conf.UploadWriteBufferSize.Value()) // disable http/2 to prevent muxing over a single tcp connection t.ForceAttemptHTTP2 = false t.TLSClientConfig.NextProtos = []string{"http/1.1"} @@ -328,7 +206,7 @@ func NewHandler(conf *S3ndConf) *S3ndHandler { awsCfg, err := config.LoadDefaultConfig( context.TODO(), - config.WithBaseEndpoint(*conf.endpointUrl), + config.WithBaseEndpoint(*conf.EndpointUrl), config.WithHTTPClient(httpClient), ) if err != nil { @@ -345,22 +223,22 @@ func NewHandler(conf *S3ndConf) *S3ndHandler { handler.Uploader = manager.NewUploader(handler.S3Client, func(u *manager.Uploader) { u.Concurrency = 1000 u.MaxUploadParts = 1000 - u.PartSize = conf.uploadPartsize.Value() + u.PartSize = conf.UploadPartsize.Value() }) - sema := semaphore.New(int(*conf.uploadMaxParallel)) + sema := semaphore.New(int(*conf.UploadMaxParallel)) handler.ParallelUploads = &sema return handler } func main() { - conf := getConf() + conf := conf.NewConf() handler := NewHandler(&conf) http.Handle("/", handler) - addr := fmt.Sprintf("%s:%d", *conf.host, *conf.port) + addr := fmt.Sprintf("%s:%d", *conf.Host, *conf.Port) log.Println("Listening on", addr) err := http.ListenAndServe(addr, nil)