Skip to content

Commit

Permalink
use []byte for profile body and copy profiles in fanout AppendIngest
Browse files Browse the repository at this point in the history
  • Loading branch information
marcsanmi committed Jan 31, 2025
1 parent a0e3479 commit 3670c7f
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 51 deletions.
29 changes: 26 additions & 3 deletions internal/component/pyroscope/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package pyroscope

import (
"context"
"io"
"net/http"
"net/url"
"sync"
Expand Down Expand Up @@ -34,7 +33,7 @@ type RawSample struct {
}

type IncomingProfile struct {
Body io.ReadCloser
RawBody []byte
Headers http.Header
URL *url.URL
Labels labels.Labels
Expand Down Expand Up @@ -131,7 +130,15 @@ func (a *appender) AppendIngest(ctx context.Context, profile *IncomingProfile) e
}()
var multiErr error
for _, x := range a.children {
err := x.AppendIngest(ctx, profile)
// Create a copy for each child
profileCopy := &IncomingProfile{
RawBody: profile.RawBody, // []byte is immutable, safe to share
Headers: profile.Headers.Clone(),
URL: profile.URL, // URL is immutable once created
Labels: profile.Labels.Copy(),
}

err := x.AppendIngest(ctx, profileCopy)
if err != nil {
multiErr = multierror.Append(multiErr, err)
}
Expand All @@ -153,3 +160,19 @@ func (f AppendableFunc) AppendIngest(_ context.Context, _ *IncomingProfile) erro
// This is a no-op implementation
return nil
}

// For testing AppendIngest operations
type AppendableIngestFunc func(ctx context.Context, profile *IncomingProfile) error

func (f AppendableIngestFunc) Appender() Appender {
return f
}

func (f AppendableIngestFunc) AppendIngest(ctx context.Context, p *IncomingProfile) error {
return f(ctx, p)
}

func (f AppendableIngestFunc) Append(_ context.Context, _ labels.Labels, _ []*RawSample) error {
// This is a no-op implementation
return nil
}
49 changes: 49 additions & 0 deletions internal/component/pyroscope/appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,52 @@ func Test_FanOut(t *testing.T) {
require.Error(t, f.Appender().Append(context.Background(), lbls, []*RawSample{}))
require.Equal(t, int32(2), totalAppend.Load())
}

func Test_FanOut_AppendIngest(t *testing.T) {
totalAppend := atomic.NewInt32(0)
profile := &IncomingProfile{
RawBody: []byte("test"),
Labels: labels.Labels{{Name: "foo", Value: "bar"}},
}

f := NewFanout([]Appendable{
AppendableIngestFunc(func(_ context.Context, p *IncomingProfile) error {
require.Equal(t, profile.RawBody, p.RawBody)
require.Equal(t, profile.Labels, p.Labels)
totalAppend.Inc()
return nil
}),
AppendableIngestFunc(func(_ context.Context, p *IncomingProfile) error {
require.Equal(t, profile.RawBody, p.RawBody)
require.Equal(t, profile.Labels, p.Labels)
totalAppend.Inc()
return nil
}),
AppendableIngestFunc(func(_ context.Context, p *IncomingProfile) error {
require.Equal(t, profile.RawBody, p.RawBody)
require.Equal(t, profile.Labels, p.Labels)
totalAppend.Inc()
return errors.New("foo")
}),
}, "foo", prometheus.NewRegistry())
totalAppend.Store(0)
require.Error(t, f.Appender().AppendIngest(context.Background(), profile))
require.Equal(t, int32(3), totalAppend.Load())
f.UpdateChildren([]Appendable{
AppendableIngestFunc(func(_ context.Context, p *IncomingProfile) error {
require.Equal(t, profile.RawBody, p.RawBody)
require.Equal(t, profile.Labels, p.Labels)
totalAppend.Inc()
return nil
}),
AppendableIngestFunc(func(_ context.Context, p *IncomingProfile) error {
require.Equal(t, profile.RawBody, p.RawBody)
require.Equal(t, profile.Labels, p.Labels)
totalAppend.Inc()
return errors.New("bar")
}),
})
totalAppend.Store(0)
require.Error(t, f.Appender().AppendIngest(context.Background(), profile))
require.Equal(t, int32(2), totalAppend.Load())
}
2 changes: 1 addition & 1 deletion internal/component/pyroscope/receive_http/receive_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) {
for i, appendable := range appendables {
g.Go(func() error {
profile := &pyroscope.IncomingProfile{
Body: io.NopCloser(bytes.NewReader(buf.Bytes())),
RawBody: buf.Bytes(),
Headers: r.Header.Clone(),
URL: r.URL,
Labels: lbls,
Expand Down
16 changes: 2 additions & 14 deletions internal/component/pyroscope/receive_http/receive_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"crypto/rand"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"testing"
Expand Down Expand Up @@ -350,9 +349,7 @@ func verifyForwardedProfiles(

if testApp.lastProfile != nil {
// Verify profile body
body, err := io.ReadAll(testApp.lastProfile.Body)
require.NoError(t, err, "Failed to read profile body for appendable %d", i)
require.Equal(t, expectedProfile, body, "Profile mismatch for appendable %d", i)
require.Equal(t, expectedProfile, testApp.lastProfile.RawBody, "Profile mismatch for appendable %d", i)

// Verify headers
for key, value := range expectedHeaders {
Expand Down Expand Up @@ -486,22 +483,13 @@ func (a *testAppender) Append(_ context.Context, lbls labels.Labels, samples []*
}

func (a *testAppender) AppendIngest(_ context.Context, profile *pyroscope.IncomingProfile) error {
var buf bytes.Buffer
tee := io.TeeReader(profile.Body, &buf)

newProfile := &pyroscope.IncomingProfile{
Body: io.NopCloser(&buf),
RawBody: profile.RawBody,
Headers: profile.Headers,
URL: profile.URL,
Labels: profile.Labels,
}
a.lastProfile = newProfile

_, err := io.Copy(io.Discard, tee)
if err != nil {
return err
}

return a.appendErr
}

Expand Down
30 changes: 9 additions & 21 deletions internal/component/pyroscope/relabel/relabel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ import (
"strings"
"sync"
"testing"
"time"

"github.com/grafana/alloy/internal/component"
alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel"
"github.com/grafana/alloy/internal/component/pyroscope"
"github.com/grafana/alloy/internal/runtime/componenttest"
"github.com/grafana/alloy/internal/util"
"github.com/grafana/pyroscope/api/model/labelset"
"github.com/grafana/regexp"
Expand Down Expand Up @@ -156,31 +154,21 @@ func TestRelabeling(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
app := NewTestAppender()

tc, err := componenttest.NewControllerFromID(util.TestLogger(t), "pyroscope.relabel")
c, err := New(component.Options{
Logger: util.TestLogger(t),
Registerer: prometheus.NewRegistry(),
OnStateChange: func(e component.Exports) {},
}, Arguments{
ForwardTo: []pyroscope.Appendable{app},
RelabelConfigs: tt.rules,
MaxCacheSize: 10,
})
require.NoError(t, err)

go func() {
err = tc.Run(componenttest.TestContext(t), Arguments{
ForwardTo: []pyroscope.Appendable{app},
RelabelConfigs: tt.rules,
MaxCacheSize: 10,
})
require.NoError(t, err)
}()

// Wait for the component to be ready
require.NoError(t, tc.WaitExports(time.Second))
time.Sleep(100 * time.Millisecond) // Give a little extra time for initialization

profile := &pyroscope.IncomingProfile{
Labels: tt.inputLabels,
}

// Get the actual component
comp, err := tc.GetComponent()
require.NoError(t, err)
c := comp.(*Component)

err = c.AppendIngest(context.Background(), profile)

profiles := app.Profiles()
Expand Down
11 changes: 1 addition & 10 deletions internal/component/pyroscope/write/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,15 +359,6 @@ func (e *PyroscopeWriteError) Error() string {

// AppendIngest implements the pyroscope.Appender interface.
func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.IncomingProfile) error {
// Read the entire body into memory
// This matches how Append() handles profile data (as RawProfile),
// but means the entire profile will be held in memory
var buf bytes.Buffer
if _, err := io.Copy(&buf, profile.Body); err != nil {
return fmt.Errorf("reading profile body: %w", err)
}
bodyBytes := buf.Bytes()

g, ctx := errgroup.WithContext(ctx)

// Send to each endpoint concurrently
Expand Down Expand Up @@ -397,7 +388,7 @@ func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.Inco
}
u.RawQuery = query.Encode()

req, err := http.NewRequestWithContext(ctx, "POST", u.String(), bytes.NewReader(bodyBytes))
req, err := http.NewRequestWithContext(ctx, "POST", u.String(), bytes.NewReader(profile.RawBody))
if err != nil {
return fmt.Errorf("create request: %w", err)
}
Expand Down
3 changes: 1 addition & 2 deletions internal/component/pyroscope/write/write_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package write

import (
"bytes"
"context"
"errors"
"io"
Expand Down Expand Up @@ -344,7 +343,7 @@ func Test_Write_AppendIngest(t *testing.T) {
require.NotNil(t, export.Receiver, "Receiver is nil")

incomingProfile := &pyroscope.IncomingProfile{
Body: io.NopCloser(bytes.NewReader(testData)),
RawBody: testData,
Headers: http.Header{
"X-Test-Header": []string{"profile-value"}, // This should be overridden by endpoint
"X-Profile-Header": []string{"profile-value1", "profile-value2"}, // This should be preserved
Expand Down

0 comments on commit 3670c7f

Please sign in to comment.