diff --git a/.gitignore b/.gitignore index b899367..527c3d6 100644 --- a/.gitignore +++ b/.gitignore @@ -11,8 +11,6 @@ src/vendor +src/scytale/output # config --src/scytale/scytale.json -+src/scytale/scytale.json +keys/*.private src/scytale/scytale @@ -20,7 +18,6 @@ src/scytale/debug src/scytale/output # config -src/scytale/scytale.json keys/*.private # Vim diff --git a/src/glide.lock b/src/glide.lock index 0bbd22a..af22eac 100644 --- a/src/glide.lock +++ b/src/glide.lock @@ -1,121 +1,100 @@ -hash: 11fd9e9e4e4d3feb84c1b74871fa41edc6a10ae2ad4455be111229ceccd283e1 -updated: 2017-03-24T15:48:29.120584135-07:00 +hash: 195116f8a1a9144f387c4ce5a6c723747feb0d680edf805029cf8bc52f12cd5d +updated: 2017-09-18T15:31:40.263684908-07:00 imports: - name: github.com/c9s/goprocinfo version: 19cb9f127a9c8d2034cf59ccb683cdb94b9deb6c subpackages: - linux - name: github.com/Comcast/webpa-common - version: 43a164e42882fc7acf3a1d12267d0d5d3a047dab + version: 05094818a42ba3682b43c60c29a237d8e6995341 subpackages: - concurrent - - handler - health + - httperror - logging - - secure + - middleware - server - - device - - fact - - hash - - secure/key - - logging/golog + - tracing + - tracing/tracinghttp - wrp - - resource - - types -- name: github.com/davecgh/go-spew - version: 6d212800a42e8ab5c146b8ace3490ee17e5225f9 - subpackages: - - spew + - wrp/wrpendpoint + - wrp/wrphttp - name: github.com/fsnotify/fsnotify - version: ff7bc41d4007f67e5456703c34342df4e0113f64 + version: 4da3e2cfbabc9f751898f250b49f2439785783a1 +- name: github.com/go-kit/kit + version: a9ca6725cbbea455e61c6bc8a1ed28e81eb3493b + subpackages: + - endpoint + - log + - log/level + - transport/http +- name: github.com/go-logfmt/logfmt + version: 390ab7935ee28ec6b286364bba9b4dd6410cb3d5 +- name: github.com/go-stack/stack + version: 817915b46b97fd7bb80e8ab6b69f01a53ac3eebf - name: github.com/gorilla/context version: 08b5f424b9271eedf6f9f0ce86cb9396ed337a42 - name: github.com/gorilla/mux - version: 599cba5e7b6137d46ddf58fb1765f5d928e69604 -- name: github.com/gorilla/websocket - version: 3ab3a8b8831546bd18fd182c20687ca853b2bb13 + version: 392c28fe23e1c45ddba891b0320b3b5df220beea - name: github.com/hashicorp/hcl - version: 630949a3c5fa3c613328e1b8256052cbc2327c9b + version: 8f6b1344a92ff8877cf24a5de9177bf7d0a2a187 subpackages: - hcl/ast - hcl/parser - - hcl/token - - json/parser - hcl/scanner - hcl/strconv + - hcl/token + - json/parser - json/scanner - json/token -- name: github.com/ian-kent/go-log - version: 5731446c36ab9f716106ce0731f484c50fdf1ad1 - subpackages: - - appenders - - layout - - levels - - logger -- name: github.com/jtacoma/uritemplates - version: 307ae868f90f4ee1b73ebe4596e0394237dacce8 -- name: github.com/justinas/alice - version: 1051eaf52fcafdd87ead59d28b065f1fcb8274ec +- name: github.com/kr/logfmt + version: b84e30acd515aadc4b783ad4ff83aff3299bdfe0 - name: github.com/magiconair/properties - version: 51463bfca2576e06c62a8504b5c0f06d61312647 + version: 8d7837e64d3c1ee4e54a880c5a920ab4316fc90a - name: github.com/mitchellh/mapstructure - version: 53818660ed4955e899c0bcafa97299a388bd7c8e -- name: github.com/pelletier/go-buffruneio - version: c37440a7cf42ac63b919c752ca73a85067e05992 + version: d0303fe809921458f417bcf828397a65db30a7e4 - name: github.com/pelletier/go-toml - version: f6e7596e8daafd44dc9e5c208dc73035020c8481 -- name: github.com/philhofer/fwd - version: 98c11a7a6ec829d672b03833c3d69a7fae1ca972 -- name: github.com/pmezard/go-difflib - version: d8ed2627bdf02c080bf22230dbb337003b7aba2d - subpackages: - - difflib -- name: github.com/SermoDigital/jose - version: f6df55f235c24f236d11dbcf665249a59ac2021f - subpackages: - - crypto - - jws - - jwt + version: 9c1b4e331f1e3d98e72600677699fbe212cd6d16 - name: github.com/spf13/afero - version: 9be650865eab0c12963d8753212f4f9c66cdcf12 + version: ee1bd8ee15a1306d1f9201acc41ef39cd9f99a1b subpackages: - mem - name: github.com/spf13/cast - version: ce135a4ebeee6cfe9a26c93ee0d37825f26113c7 + version: acbeb36b902d72a7a4c18e8f3241075e7ab763e4 - name: github.com/spf13/jwalterweatherman - version: fa7ca7e836cf3a8bb4ebf799f472c12d7e903d66 + version: 12bd96e66386c1960ab0f74ced1362f66f552f7b - name: github.com/spf13/pflag - version: 9ff6c6923cfffbcd502984b8e0c80539a94968b7 + version: e57e3eeb33f795204c1ca35f56c44f83227c6e66 - name: github.com/spf13/viper - version: 5ed0fc31f7f453625df314d8e66b9791e8d13003 -- name: github.com/stretchr/objx - version: cbeaeb16a013161a98496fad62933b1d21786672 -- name: github.com/stretchr/testify - version: 69483b4bd14f5845b5a1e55bca19e954e827f1d0 - subpackages: - - mock - - assert -- name: github.com/t-k/fluent-logger-golang - version: 0f8ec08f2057a61574b6943e75045fffbeae894e - subpackages: - - fluent -- name: github.com/tinylib/msgp - version: 38a6f61a768dc24552dad94611923841b53acc4f - subpackages: - - msgp + version: 25b30aa063fc18e48662b86996252eabdcf2f0c7 - name: github.com/ugorji/go version: d23841a297e5489e787e72fceffabf9d2994b52a subpackages: - codec - name: golang.org/x/sys - version: 8fd966b47dbdd4faa03de0d06e3d733baeb9a1a9 + version: 7ddbeae9ae08c6a06a59597f0c9edbc5ff2444ce subpackages: - unix - name: golang.org/x/text - version: fc7fa097411d30e6708badff276c4c164425590c + version: bd91bbf73e9a4a801adbfb97133c992678533126 subpackages: - transform - unicode/norm +- name: gopkg.in/natefinch/lumberjack.v2 + version: a96e63847dc3c67d17befa69c303767e2f84e54f - name: gopkg.in/yaml.v2 - version: a3f3340b5840cee44f372bddb5880fcbc419b46a -testImports: [] + version: eb3733d160e74a9c7e442f435eb3bea458e1d19f +testImports: +- name: github.com/davecgh/go-spew + version: 04cdfd42973bb9c8589fd6a731800cf222fde1a9 + subpackages: + - spew +- name: github.com/pmezard/go-difflib + version: d8ed2627bdf02c080bf22230dbb337003b7aba2d + subpackages: + - difflib +- name: github.com/stretchr/testify + version: 890a5c3458b43e6104ff5da8dfa139d013d77544 + subpackages: + - assert + - require diff --git a/src/glide.yaml b/src/glide.yaml index 4fe83d3..75b6dc9 100644 --- a/src/glide.yaml +++ b/src/glide.yaml @@ -1,14 +1,4 @@ package: . import: - package: github.com/Comcast/webpa-common - subpackages: - - concurrent - - handler - - health - - logging - - secure - - server -- package: github.com/gorilla/mux -- package: github.com/justinas/alice -- package: github.com/spf13/pflag -- package: github.com/spf13/viper + version: 05094818a42ba3682b43c60c29a237d8e6995341 diff --git a/src/scytale/http.go b/src/scytale/http.go deleted file mode 100644 index a4ac354..0000000 --- a/src/scytale/http.go +++ /dev/null @@ -1,97 +0,0 @@ -package main - -import ( - "github.com/Comcast/webpa-common/logging" - "io/ioutil" - "net/http" - "time" -) - -type Send func(inFunc func(workerID int)) error - -// Below is the struct that will implement our ServeHTTP method -type ServerHandler struct { - logging.Logger - scytaleHandler RequestHandler - scytaleHealth HealthTracker - doJob Send -} - -func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) { - defer request.Body.Close() - - sh.Info("Receiving incoming post...") - - timeStamps := ScytaleTimestamps{ - TimeReceived: time.Now(), - } - - myPayload, err := ioutil.ReadAll(request.Body) - if err != nil { - statusMsg := "Unable to retrieve the request body: " + err.Error() + ".\n" - response.WriteHeader(http.StatusBadRequest) - response.Write([]byte(statusMsg)) - return - } - - var contentType string - if value, ok := request.Header["Content-Type"]; ok { - if len(value) == 1 { - contentType = value[0] - switch contentType { - case "application/json": - case "application/wrp": - default: - response.WriteHeader(http.StatusBadRequest) - response.Write([]byte("Only Content-Type values of \"application/json\" or \"application/wrp\" are supported.\n")) - } - } else { - response.WriteHeader(http.StatusBadRequest) - response.Write([]byte("Content-Type cannot have more than one specification.\n")) - } - } else { - response.WriteHeader(http.StatusBadRequest) - response.Write([]byte("Content-Type must be set in the header.\n")) - } - - if contentType == "" { - return - } - - targetURL := request.URL.String() - - scytaleRequest := ScytaleRequest{ - Payload: myPayload, - ContentType: contentType, - TargetURL: targetURL, - Timestamps: timeStamps, - } - - scytaleRequest.Timestamps.TimeAccepted = time.Now() - - err = sh.doJob(func(workerID int) { sh.scytaleHandler.HandleRequest(workerID, scytaleRequest) }) - if err != nil { - // return a 408 - response.WriteHeader(http.StatusRequestTimeout) - response.Write([]byte("Unable to handle request at this time.\n")) - } else { - // return a 202 - response.WriteHeader(http.StatusAccepted) - response.Write([]byte("Request placed on to queue.\n")) - sh.scytaleHealth.IncrementBucket(len(myPayload)) - } -} - -type ProfileHandler struct { - logging.Logger -} - -// TODO: temporarily adding this to check and see if we're getting what we expect -func (ph *ProfileHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) { - defer request.Body.Close() - - ph.Info("Receiving request for server stats...") - - response.WriteHeader(http.StatusOK) - response.Write([]byte("Placeholder.\n")) -} diff --git a/src/scytale/primaryHandler.go b/src/scytale/primaryHandler.go new file mode 100644 index 0000000..b5bf503 --- /dev/null +++ b/src/scytale/primaryHandler.go @@ -0,0 +1,72 @@ +package main + +import ( + "fmt" + "net/http" + + "github.com/Comcast/webpa-common/wrp" + "github.com/Comcast/webpa-common/wrp/wrphttp" + "github.com/go-kit/kit/log" + gokithttp "github.com/go-kit/kit/transport/http" + "github.com/gorilla/mux" + "github.com/spf13/viper" +) + +const ( + baseURI = "/api" + version = "v2" +) + +func NewPrimaryHandler(logger log.Logger, v *viper.Viper) (http.Handler, error) { + fanoutOptions := new(wrphttp.FanoutOptions) + if err := v.UnmarshalKey("fanout", fanoutOptions); err != nil { + return nil, err + } + + fanoutOptions.Logger = logger + fanoutEndpoint, err := wrphttp.NewFanoutEndpoint(fanoutOptions) + if err != nil { + return nil, err + } + + var ( + router = mux.NewRouter() + subrouter = router.Path(fmt.Sprintf("%s/%s/device", baseURI, version)).Methods("POST", "PUT").Subrouter() + timeLayout = "" + ) + + subrouter.Headers(wrphttp.MessageTypeHeader, "").Handler( + gokithttp.NewServer( + fanoutEndpoint, + wrphttp.ServerDecodeRequestHeaders(logger), + wrphttp.ServerEncodeResponseHeaders(timeLayout), + gokithttp.ServerErrorEncoder( + wrphttp.ServerErrorEncoder(timeLayout), + ), + ), + ) + + subrouter.Headers("Content-Type", wrp.JSON.ContentType()).Handler( + gokithttp.NewServer( + fanoutEndpoint, + wrphttp.ServerDecodeRequestBody(logger, fanoutOptions.NewDecoderPool(wrp.JSON)), + wrphttp.ServerEncodeResponseBody(timeLayout, fanoutOptions.NewEncoderPool(wrp.JSON)), + gokithttp.ServerErrorEncoder( + wrphttp.ServerErrorEncoder(timeLayout), + ), + ), + ) + + subrouter.Headers("Content-Type", wrp.Msgpack.ContentType()).Handler( + gokithttp.NewServer( + fanoutEndpoint, + wrphttp.ServerDecodeRequestBody(logger, fanoutOptions.NewDecoderPool(wrp.Msgpack)), + wrphttp.ServerEncodeResponseBody(timeLayout, fanoutOptions.NewEncoderPool(wrp.Msgpack)), + gokithttp.ServerErrorEncoder( + wrphttp.ServerErrorEncoder(timeLayout), + ), + ), + ) + + return router, nil +} diff --git a/src/scytale/scytale.go b/src/scytale/scytale.go index e082a6d..51267db 100644 --- a/src/scytale/scytale.go +++ b/src/scytale/scytale.go @@ -2,25 +2,29 @@ package main import ( "fmt" + _ "net/http/pprof" + "os" + "github.com/Comcast/webpa-common/concurrent" - "github.com/Comcast/webpa-common/handler" - "github.com/Comcast/webpa-common/secure" + "github.com/Comcast/webpa-common/logging" "github.com/Comcast/webpa-common/server" - "github.com/gorilla/mux" - "github.com/justinas/alice" + "github.com/go-kit/kit/log/level" "github.com/spf13/pflag" "github.com/spf13/viper" - "os" - "os/signal" ) const ( applicationName = "scytale" + release = "Developer" ) // scytale is the driver function for Scytale. It performs everything main() would do, // except for obtaining the command-line arguments (which are passed to it). func scytale(arguments []string) int { + // + // Initialize the server environment: command-line flags, Viper, logging, and the WebPA instance + // + var ( f = pflag.NewFlagSet(applicationName, pflag.ContinueOnError) v = viper.New() @@ -33,70 +37,27 @@ func scytale(arguments []string) int { return 1 } - logger.Info("Using configuration file: %s", v.ConfigFileUsed()) + logger.Log(level.Key(), level.InfoValue(), "configurationFile", v.ConfigFileUsed()) - scytaleConfig := new(ScytaleConfig) - err = v.Unmarshal(scytaleConfig) + primaryHandler, err := NewPrimaryHandler(logger, v) if err != nil { - return 1 + logger.Log(level.Key(), level.ErrorValue(), logging.ErrorKey(), err, logging.MessageKey(), "unable to create primary handler") + return 2 } - workerPool := WorkerPoolFactory{ - NumWorkers: scytaleConfig.NumWorkerThreads, - QueueSize: scytaleConfig.JobQueueSize, - }.New() - - serverWrapper := &ServerHandler{ - Logger: logger, - scytaleHandler: &ScytaleHandler{ - Logger: logger, - }, - doJob: workerPool.Send, - } - - profileWrapper := &ProfileHandler{ - Logger: logger, - } - - validator := secure.Validators{ - secure.ExactMatchValidator(scytaleConfig.AuthHeader), - } - - authHandler := handler.AuthorizationHandler{ - HeaderName: "Authorization", - ForbiddenStatusCode: 403, - Validator: validator, - Logger: logger, - } - - scytaleHandler := alice.New(authHandler.Decorate) - - mux := mux.NewRouter() - mux.Handle("/api/v1/run", scytaleHandler.Then(serverWrapper)) - mux.Handle("/api/v1/profile", scytaleHandler.Then(profileWrapper)) - - scytaleHealth := &ScytaleHealth{} - var runnable concurrent.Runnable - - scytaleHealth.Monitor, runnable = webPA.Prepare(logger, mux) - serverWrapper.scytaleHealth = scytaleHealth - - waitGroup, shutdown, err := concurrent.Execute(runnable) - if err != nil { - fmt.Fprintf(os.Stderr, "Unable to start device manager: %s\n", err) - return 1 - } - - logger.Info("Scytale is up and running!") - var ( - signals = make(chan os.Signal, 1) + _, runnable = webPA.Prepare(logger, nil, primaryHandler) + signals = make(chan os.Signal, 1) ) - signal.Notify(signals) - <-signals - close(shutdown) - waitGroup.Wait() + // + // Execute the runnable, which runs all the servers, and wait for a signal + // + + if err := concurrent.Await(runnable, signals); err != nil { + fmt.Fprintf(os.Stderr, "Error when starting %s: %s", applicationName, err) + return 4 + } return 0 } diff --git a/src/scytale/scytale.json b/src/scytale/scytale.json new file mode 100644 index 0000000..0449ca1 --- /dev/null +++ b/src/scytale/scytale.json @@ -0,0 +1,25 @@ +{ + "primary": { + "address": ":6000" + }, + + "health": { + "address": ":6001" + }, + + "pprof": { + "address": ":6002" + }, + + "fanout": { + "method": "POST", + "endpoints": ["http://localhost:7000/api/v2/device/send"] + }, + + + "log" : { + "file" : "stdout", + "level" : "DEBUG", + "json": true + } +} diff --git a/src/scytale/scytale_test.go b/src/scytale/scytale_test.go deleted file mode 100644 index 1b70c49..0000000 --- a/src/scytale/scytale_test.go +++ /dev/null @@ -1,201 +0,0 @@ -package main - -import ( - "errors" - "github.com/Comcast/webpa-common/health" - "github.com/Comcast/webpa-common/logging" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - "math" - "net/http" - "net/http/httptest" - "os" - "strings" - "sync" - "testing" -) - -// Begin mock declarations - -// mockHandler only needs to mock the `HandleRequest` method -type mockHandler struct { - mock.Mock -} - -func (m *mockHandler) HandleRequest(workerID int, inRequest ScytaleRequest) { - m.Called(workerID, inRequest) -} - -// mockHealthTracker needs to mock things from both the `HealthTracker` -// interface as well as the `health.Monitor` interface -type mockHealthTracker struct { - mock.Mock -} - -func (m *mockHealthTracker) SendEvent(healthFunc health.HealthFunc) { - m.Called(healthFunc) -} - -func (m *mockHealthTracker) ServeHTTP(response http.ResponseWriter, request *http.Request) { - m.Called(response, request) -} - -func (m *mockHealthTracker) IncrementBucket(inSize int) { - m.Called(inSize) -} - -// Begin test functions - -func TestMain(m *testing.M) { - os.Exit(m.Run()) -} - -func TestWorkerPool(t *testing.T) { - assert := assert.New(t) - - workerPool := WorkerPoolFactory{ - NumWorkers: 1, - QueueSize: 1, - }.New() - - t.Run("TestWorkerPoolSend", func(t *testing.T) { - testWG := new(sync.WaitGroup) - testWG.Add(1) - - require.NotNil(t, workerPool) - err := workerPool.Send(func(workerID int) { - testWG.Done() - }) - - testWG.Wait() - assert.Nil(err) - }) - - workerPool = WorkerPoolFactory{ - NumWorkers: 0, - QueueSize: 0, - }.New() - - t.Run("TestWorkerPoolFullQueue", func(t *testing.T) { - require.NotNil(t, workerPool) - err := workerPool.Send(func(workerID int) { - assert.Fail("This should not execute because our worker queue is full and we have no workers.") - }) - - assert.NotNil(err) - }) -} - -func TestScytaleHealth(t *testing.T) { - assert := assert.New(t) - - testData := []struct { - inSize int - expectedStat health.Stat - }{ - {inSize: -1, expectedStat: PayloadsOverZero}, - {inSize: 0, expectedStat: PayloadsOverZero}, - {inSize: 99, expectedStat: PayloadsOverZero}, - {inSize: 999, expectedStat: PayloadsOverHundred}, - {inSize: 9999, expectedStat: PayloadsOverThousand}, - {inSize: 10001, expectedStat: PayloadsOverTenThousand}, - {inSize: math.MaxInt32, expectedStat: PayloadsOverTenThousand}, - } - - t.Run("TestIncrementBucket", func(t *testing.T) { - for _, data := range testData { - fakeMonitor := new(mockHealthTracker) - fakeMonitor.On("SendEvent", mock.AnythingOfType("health.HealthFunc")).Run( - func(args mock.Arguments) { - healthFunc := args.Get(0).(health.HealthFunc) - stats := make(health.Stats) - - healthFunc(stats) - assert.Equal(1, stats[data.expectedStat]) - }).Once() - - scytaleHealth := &ScytaleHealth{fakeMonitor} - - scytaleHealth.IncrementBucket(data.inSize) - fakeMonitor.AssertExpectations(t) - } - }) -} - -func TestServeHandler(t *testing.T) { - assert := assert.New(t) - - logger := logging.DefaultLogger() - fakeHandler := new(mockHandler) - fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), mock.AnythingOfType("ScytaleRequest")).Return().Once() - - fakeHealth := new(mockHealthTracker) - fakeHealth.On("IncrementBucket", mock.AnythingOfType("int")).Return().Once() - - requestSuccessful := func(func(workerID int)) error { - fakeHandler.HandleRequest(0, ScytaleRequest{}) - return nil - } - - serverWrapper := &ServerHandler{ - Logger: logger, - scytaleHandler: fakeHandler, - scytaleHealth: fakeHealth, - doJob: requestSuccessful, - } - - req := httptest.NewRequest("POST", "localhost:8080", strings.NewReader("Test payload.")) - - t.Run("TestServeHTTPHappyPath", func(t *testing.T) { - req.Header.Set("Content-Type", "application/json") - - w := httptest.NewRecorder() - serverWrapper.ServeHTTP(w, req) - resp := w.Result() - - assert.Equal(202, resp.StatusCode) - fakeHandler.AssertExpectations(t) - fakeHealth.AssertExpectations(t) - }) - - t.Run("TestServeHTTPTooManyHeaders", func(t *testing.T) { - req.Header.Add("Content-Type", "too/many/headers") - - w := httptest.NewRecorder() - serverWrapper.ServeHTTP(w, req) - resp := w.Result() - - assert.Equal(400, resp.StatusCode) - fakeHandler.AssertExpectations(t) - fakeHealth.AssertExpectations(t) - }) - - t.Run("TestServeHTTPWrongHeader", func(t *testing.T) { - req.Header.Del("Content-Type") - - w := httptest.NewRecorder() - serverWrapper.ServeHTTP(w, req) - resp := w.Result() - - assert.Equal(400, resp.StatusCode) - fakeHandler.AssertExpectations(t) - fakeHealth.AssertExpectations(t) - }) - - t.Run("TestServeHTTPFullQueue", func(t *testing.T) { - req.Header.Set("Content-Type", "application/json") - - w := httptest.NewRecorder() - requestTimeout := func(func(workerID int)) error { - return errors.New("Intentional error.") - } - serverWrapper.doJob = requestTimeout - serverWrapper.ServeHTTP(w, req) - resp := w.Result() - - assert.Equal(408, resp.StatusCode) - fakeHandler.AssertExpectations(t) - fakeHealth.AssertExpectations(t) - }) -} diff --git a/src/scytale/scytale_type.go b/src/scytale/scytale_type.go deleted file mode 100644 index ca90b07..0000000 --- a/src/scytale/scytale_type.go +++ /dev/null @@ -1,124 +0,0 @@ -package main - -import ( - "errors" - "github.com/Comcast/webpa-common/health" - "github.com/Comcast/webpa-common/logging" - "time" -) - -const ( - // Stuff we're looking at health-wise - // TODO: figure out how to add per IP buckets - PayloadsOverZero health.Stat = "PayloadsOverZero" - PayloadsOverHundred health.Stat = "PayloadsOverHundred" - PayloadsOverThousand health.Stat = "PayloadsOverThousand" - PayloadsOverTenThousand health.Stat = "PayloadsOverTenThousand" -) - -// Below is the struct we're using to contain the data from a provided config file -// TODO: Try to figure out how to make bucket ranges configurable -type ScytaleConfig struct { - AuthHeader string - NumWorkerThreads int - JobQueueSize int - TotalIncomingPayloadSizeBuckets []int - PerSourceIncomingPayloadSizeBuckets []int -} - -// Below is the struct we're using to create a request to scytale -type ScytaleRequest struct { - Payload []byte - ContentType string - TargetURL string - Timestamps ScytaleTimestamps -} - -type ScytaleTimestamps struct { - TimeReceived time.Time - TimeAccepted time.Time - TimeProcessingStart time.Time - TimeProcessingEnd time.Time -} - -type RequestHandler interface { - HandleRequest(workerID int, inRequest ScytaleRequest) -} - -type ScytaleHandler struct { - logging.Logger -} - -func (ch *ScytaleHandler) HandleRequest(workerID int, inRequest ScytaleRequest) { - inRequest.Timestamps.TimeProcessingStart = time.Now() - - ch.Info("Worker #%d received a request, payload:\t%s", workerID, string(inRequest.Payload)) - ch.Info("Worker #%d received a request, type:\t\t%s", workerID, inRequest.ContentType) - ch.Info("Worker #%d received a request, url:\t\t%s", workerID, inRequest.TargetURL) - - inRequest.Timestamps.TimeProcessingEnd = time.Now() - - ch.Info("Worker #%d printing message time stats:\t%v", workerID, inRequest.Timestamps) -} - -type HealthTracker interface { - SendEvent(health.HealthFunc) - IncrementBucket(inSize int) -} - -// Below is the struct and implementation of how we're tracking health stuff -type ScytaleHealth struct { - health.Monitor -} - -func (ch *ScytaleHealth) IncrementBucket(inSize int) { - if inSize < 101 { - ch.SendEvent(health.Inc(PayloadsOverZero, 1)) - } else if inSize < 1001 { - ch.SendEvent(health.Inc(PayloadsOverHundred, 1)) - } else if inSize < 10001 { - ch.SendEvent(health.Inc(PayloadsOverThousand, 1)) - } else { - ch.SendEvent(health.Inc(PayloadsOverTenThousand, 1)) - } -} - -// Below is the struct and implementation of our worker pool factory -type WorkerPoolFactory struct { - NumWorkers int - QueueSize int -} - -func (wpf WorkerPoolFactory) New() (wp *WorkerPool) { - jobs := make(chan func(workerID int), wpf.QueueSize) - - for i := 0; i < wpf.NumWorkers; i++ { - go func(id int) { - for f := range jobs { - f(id) - } - }(i) - } - - wp = &WorkerPool{ - jobs: jobs, - } - - return -} - -// Below is the struct and implementation of our worker pool -// It utilizes a non-blocking channel, so we throw away any requests that exceed -// the channel's limit (indicated by its buffer size) -type WorkerPool struct { - jobs chan func(workerID int) -} - -func (wp *WorkerPool) Send(inFunc func(workerID int)) error { - select { - case wp.jobs <- inFunc: - return nil - default: - return errors.New("Worker pool channel full.") - } -}