Skip to content

Commit

Permalink
fix expiration issue
Browse files Browse the repository at this point in the history
  • Loading branch information
pvormste committed Nov 1, 2024
1 parent 2c6c2bb commit d37cf5c
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 97 deletions.
9 changes: 4 additions & 5 deletions ee/middleware/streams/analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,15 @@ func (n *NoopStreamAnalyticsFactory) CreateResponseWriter(w http.ResponseWriter,
}

type StreamAnalyticsRecorder interface {
CreateRecord(r *http.Request) *analytics.AnalyticsRecord
RecordHit(record *analytics.AnalyticsRecord, statusCode int) error
PrepareRecord(r *http.Request)
RecordHit(statusCode int, latency analytics.Latency) error
}

type NoopStreamAnalyticsRecorder struct{}

func (n *NoopStreamAnalyticsRecorder) CreateRecord(r *http.Request) *analytics.AnalyticsRecord {
return &analytics.AnalyticsRecord{}
func (n *NoopStreamAnalyticsRecorder) PrepareRecord(r *http.Request) {

Check failure on line 36 in ee/middleware/streams/analytics.go

View workflow job for this annotation

GitHub Actions / golangci-lint

unused-parameter: parameter 'r' seems to be unused, consider removing or renaming it to match ^_ (revive)
}

func (n *NoopStreamAnalyticsRecorder) RecordHit(record *analytics.AnalyticsRecord, statusCode int) error {
func (n *NoopStreamAnalyticsRecorder) RecordHit(statusCode int, latency analytics.Latency) error {

Check failure on line 39 in ee/middleware/streams/analytics.go

View workflow job for this annotation

GitHub Actions / golangci-lint

unused-parameter: parameter 'statusCode' seems to be unused, consider removing or renaming it to match ^_ (revive)
return nil
}
122 changes: 42 additions & 80 deletions gateway/analytics_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ package gateway

import (
"bufio"
"io"
"net"
"net/http"
"strconv"
"strings"
"time"

Expand All @@ -14,8 +16,6 @@ import (
"github.com/TykTechnologies/tyk-pump/analytics"

"github.com/TykTechnologies/tyk/ee/middleware/streams"
"github.com/TykTechnologies/tyk/header"
"github.com/TykTechnologies/tyk/request"
)

type DefaultStreamAnalyticsFactory struct {
Expand All @@ -42,7 +42,7 @@ func (d *DefaultStreamAnalyticsFactory) CreateRecorder(r *http.Request) streams.
return NewWebSocketStreamAnalyticsRecorder(d.Gw, d.Spec, detailed)
}

return NewDefaultStreamAnalyticsRecorder(d.Gw, d.Spec, detailed)
return NewDefaultStreamAnalyticsRecorder(d.Gw, d.Spec)
}

func (d *DefaultStreamAnalyticsFactory) CreateResponseWriter(w http.ResponseWriter, r *http.Request, streamID string, recorder streams.StreamAnalyticsRecorder) http.ResponseWriter {
Expand All @@ -52,81 +52,41 @@ func (d *DefaultStreamAnalyticsFactory) CreateResponseWriter(w http.ResponseWrit
type DefaultStreamAnalyticsRecorder struct {
Gw *Gateway
Spec *APISpec
Detailed bool
reqCopy *http.Request
respCopy *http.Response
}

func NewDefaultStreamAnalyticsRecorder(gw *Gateway, spec *APISpec, detailed bool) *DefaultStreamAnalyticsRecorder {
func NewDefaultStreamAnalyticsRecorder(gw *Gateway, spec *APISpec) *DefaultStreamAnalyticsRecorder {
return &DefaultStreamAnalyticsRecorder{
Gw: gw,
Spec: spec,
Detailed: detailed,
Gw: gw,
Spec: spec,
}
}

func (s *DefaultStreamAnalyticsRecorder) CreateRecord(r *http.Request) *analytics.AnalyticsRecord {
// Preparation for analytics record
alias := ""
oauthClientID := ""
session := ctxGetSession(r)
tags := make([]string, 0, estimateTagsCapacity(session, s.Spec))

if session != nil {
oauthClientID = session.OauthClientID
alias = session.Alias
tags = append(tags, getSessionTags(session)...)
}

if len(s.Spec.TagHeaders) > 0 {
tags = tagHeaders(r, s.Spec.TagHeaders, tags)
func (s *DefaultStreamAnalyticsRecorder) PrepareRecord(r *http.Request) {
s.reqCopy = r.Clone(r.Context())
s.respCopy = &http.Response{
StatusCode: 200,
Header: make(http.Header),
}

if len(s.Spec.Tags) > 0 {
tags = append(tags, s.Spec.Tags...)
}

trackEP := false
trackedPath := r.URL.Path
s.respCopy.Header.Set("Content-Length", strconv.FormatInt(0, 10))
s.respCopy.Body = io.NopCloser(strings.NewReader(""))
s.respCopy.ContentLength = 0
}

if p := ctxGetTrackedPath(r); p != "" {
trackEP = true
trackedPath = p
}
func (s *DefaultStreamAnalyticsRecorder) RecordHit(statusCode int, latency analytics.Latency) error {
s.respCopy.StatusCode = statusCode

// Create record for started stream
t := time.Now()
return &analytics.AnalyticsRecord{
Method: r.Method,
Host: r.URL.Host,
Path: trackedPath,
RawPath: r.URL.Path,
ContentLength: r.ContentLength,
UserAgent: r.Header.Get(header.UserAgent),
Day: t.Day(),
Month: t.Month(),
Year: t.Year(),
Hour: t.Hour(),
ResponseCode: http.StatusSwitchingProtocols,
APIKey: ctxGetAuthToken(r),
TimeStamp: t,
APIVersion: s.Spec.getVersionFromRequest(r),
APIName: s.Spec.Name,
APIID: s.Spec.APIID,
OrgID: s.Spec.OrgID,
OauthID: oauthClientID,
RequestTime: 0,
Latency: analytics.Latency{},
IPAddress: request.RealIP(r),
Geo: analytics.GeoData{},
Network: analytics.NetworkStats{},
Tags: tags,
Alias: alias,
TrackPath: trackEP,
ExpireAt: t,
handler := SuccessHandler{
&BaseMiddleware{
Spec: s.Spec,
Gw: s.Gw,
},
}
}

func (s *DefaultStreamAnalyticsRecorder) RecordHit(record *analytics.AnalyticsRecord, statusCode int) error {
return streamRecordHit(s.Gw, record, statusCode)
handler.RecordHit(s.reqCopy, latency, statusCode, s.respCopy, false)
return nil
}

type WebSocketStreamAnalyticsRecorder struct {
Expand All @@ -141,16 +101,16 @@ func NewWebSocketStreamAnalyticsRecorder(gw *Gateway, spec *APISpec, detailed bo
Gw: gw,
Spec: spec,
Detailed: detailed,
simpleStreamAnalyticsRecorder: NewDefaultStreamAnalyticsRecorder(gw, spec, detailed),
simpleStreamAnalyticsRecorder: NewDefaultStreamAnalyticsRecorder(gw, spec),
}
}

func (d *WebSocketStreamAnalyticsRecorder) CreateRecord(r *http.Request) *analytics.AnalyticsRecord {
return d.simpleStreamAnalyticsRecorder.CreateRecord(r)
func (d *WebSocketStreamAnalyticsRecorder) PrepareRecord(r *http.Request) {
d.simpleStreamAnalyticsRecorder.PrepareRecord(r)
}

func (d *WebSocketStreamAnalyticsRecorder) RecordHit(record *analytics.AnalyticsRecord, statusCode int) error {
return d.simpleStreamAnalyticsRecorder.RecordHit(record, statusCode)
func (d *WebSocketStreamAnalyticsRecorder) RecordHit(statusCode int, latency analytics.Latency) error {
return d.simpleStreamAnalyticsRecorder.RecordHit(statusCode, latency)
}

type StreamAnalyticsResponseWriter struct {
Expand Down Expand Up @@ -182,13 +142,20 @@ func (s *StreamAnalyticsResponseWriter) Header() http.Header {
}

func (s *StreamAnalyticsResponseWriter) Write(bytes []byte) (int, error) {
now := time.Now()
n, err := s.w.Write(bytes)
if err != nil {
return n, err
}

record := s.recorder.CreateRecord(s.r)
recorderErr := s.recorder.RecordHit(record, s.writtenStatusCode)
totalMillisecond := DurationToMillisecond(time.Since(now))
latency := analytics.Latency{
Total: int64(totalMillisecond),
Upstream: int64(totalMillisecond),
}

s.recorder.PrepareRecord(s.r)
recorderErr := s.recorder.RecordHit(s.writtenStatusCode, latency)
if recorderErr != nil {
s.logger.Errorf("Failed to record analytics for stream on path '%s %s', %v", s.r.Method, s.r.URL.Path, recorderErr)
}
Expand All @@ -206,8 +173,8 @@ func (s *StreamAnalyticsResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, e
return nil, nil, streams.ErrResponseWriterNotHijackable
}

record := s.recorder.CreateRecord(s.r)
recorderErr := s.recorder.RecordHit(record, http.StatusSwitchingProtocols)
s.recorder.PrepareRecord(s.r)
recorderErr := s.recorder.RecordHit(http.StatusSwitchingProtocols, analytics.Latency{})
if recorderErr != nil {
s.logger.Errorf("Failed to record analytics for connection upgrade on path 'UPGRADE %s', %v", s.r.URL.Path, recorderErr)
}
Expand All @@ -221,11 +188,6 @@ func (s *StreamAnalyticsResponseWriter) Flush() {
}
}

func streamRecordHit(gw *Gateway, record *analytics.AnalyticsRecord, statusCode int) error {
record.ResponseCode = statusCode
return gw.Analytics.RecordHit(record)
}

func isWebsocketUpgrade(r *http.Request) bool {
return strings.ToLower(r.Header.Get("Connection")) == "upgrade" && strings.ToLower(r.Header.Get("Upgrade")) == "websocket"
}
24 changes: 12 additions & 12 deletions gateway/analytics_streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,8 @@ func TestDefaultStreamAnalyticsFactory_CreateRecorder(t *testing.T) {
factory := NewStreamAnalyticsFactory(nil, nil, spec)
recorder := factory.CreateRecorder(req)

defaultRecorder, ok := recorder.(*DefaultStreamAnalyticsRecorder)
_, ok := recorder.(*DefaultStreamAnalyticsRecorder)
assert.True(t, ok)
assert.Equal(t, tc.expectedDetailedRecording, defaultRecorder.Detailed)
})
}
})
Expand Down Expand Up @@ -150,17 +149,18 @@ func TestDefaultStreamAnalyticsFactory_CreateRecorder(t *testing.T) {
})
}

func TestDefaultStreamAnalyticsRecorder_CreateRecord(t *testing.T) {
t.Run("should create non-detailed record", func(t *testing.T) {
func TestDefaultStreamAnalyticsRecorder_PrepareRecord(t *testing.T) {
t.Run("should prepare non-detailed record", func(t *testing.T) {
req, err := http.NewRequest(http.MethodPost, "http://localhost:8080/path", nil)
require.NoError(t, err)

recorder := NewDefaultStreamAnalyticsRecorder(nil, &APISpec{APIDefinition: &apidef.APIDefinition{}}, false)
record := recorder.CreateRecord(req)
recorder := NewDefaultStreamAnalyticsRecorder(nil, &APISpec{APIDefinition: &apidef.APIDefinition{}})
recorder.PrepareRecord(req)

assert.Equal(t, "localhost:8080", record.Host)
assert.Equal(t, "/path", record.Path)
assert.Equal(t, http.MethodPost, record.Method)
assert.NotNil(t, recorder.respCopy)
assert.NotNil(t, recorder.reqCopy)
assert.Equal(t, "/path", recorder.reqCopy.URL.Path)
assert.Equal(t, http.MethodPost, recorder.reqCopy.Method)
})
}

Expand Down Expand Up @@ -278,16 +278,16 @@ type testStreamAnalyticsRecorder struct {
actualRecord *analytics.AnalyticsRecord
}

func (t *testStreamAnalyticsRecorder) CreateRecord(r *http.Request) *analytics.AnalyticsRecord {
func (t *testStreamAnalyticsRecorder) PrepareRecord(r *http.Request) {
t.actualRecord = &analytics.AnalyticsRecord{
Method: r.Method,
Host: r.Host,
Path: r.URL.Path,
}
return t.actualRecord
return
}

func (t *testStreamAnalyticsRecorder) RecordHit(record *analytics.AnalyticsRecord, statusCode int) error {
func (t *testStreamAnalyticsRecorder) RecordHit(statusCode int, latency analytics.Latency) error {
t.actualRecord.ResponseCode = statusCode
return nil
}
Expand Down

0 comments on commit d37cf5c

Please sign in to comment.