Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pyroscope: Add relabel component for modifying and filtering profiles #2574

Merged
merged 6 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ Main (unreleased)

- Add the possibility to export span events as logs in `otelcol.connector.spanlogs`. (@steve-hb)

- Add `pyroscope.relabel` component to modify or filter profiles using Prometheus relabeling rules. (@marcsanmi)

### Enhancements

- (_Experimental_) Log instance label key in `database_observability.mysql` (@cristiangreco)
Expand Down
2 changes: 2 additions & 0 deletions docs/sources/reference/compatibility/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ The following components, grouped by namespace, _export_ Pyroscope `ProfilesRece
<!-- START GENERATED SECTION: EXPORTERS OF Pyroscope `ProfilesReceiver` -->

{{< collapse title="pyroscope" >}}
- [pyroscope.relabel](../components/pyroscope/pyroscope.relabel)
- [pyroscope.write](../components/pyroscope/pyroscope.write)
{{< /collapse >}}

Expand All @@ -408,6 +409,7 @@ The following components, grouped by namespace, _consume_ Pyroscope `ProfilesRec
- [pyroscope.ebpf](../components/pyroscope/pyroscope.ebpf)
- [pyroscope.java](../components/pyroscope/pyroscope.java)
- [pyroscope.receive_http](../components/pyroscope/pyroscope.receive_http)
- [pyroscope.relabel](../components/pyroscope/pyroscope.relabel)
- [pyroscope.scrape](../components/pyroscope/pyroscope.scrape)
{{< /collapse >}}

Expand Down
137 changes: 137 additions & 0 deletions docs/sources/reference/components/pyroscope/pyroscope.relabel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
---
canonical: https://grafana.com/docs/alloy/latest/reference/components/pyroscope/pyroscope.relabel/
aliases:
- ../pyroscope.relabel/ # /docs/alloy/latest/reference/components/pyroscope.relabel/
description: Learn about pyroscope.relabel
title: pyroscope.relabel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What stage are we releasing this as.. GA? Preview? Experimental?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the "Public Preview" span in cf27661, thanks.

---

<span class="badge docs-labels__stage docs-labels__item">Public preview</span>

# `pyroscope.relabel`

The `pyroscope.relabel` component rewrites the label set of each profile passed to its receiver by applying one or more relabeling rules and forwards the results to the list of receivers.

If no rules are defined or applicable to some profiles, then those profiles are forwarded as-is to each receiver passed in the component's arguments.
The profile is dropped if no labels remain after the relabeling rules are applied.

The most common use of `pyroscope.relabel` is to filter profiles or standardize the label set that is passed to one or more downstream receivers. The `rule` blocks are applied to the label set of each profile in order of their appearance in the configuration file.

## Usage

```alloy
pyroscope.relabel "LABEL" {
forward_to = <RECEIVER_LIST>

rule {
...
}

...
}
```

## Arguments

You can use the following arguments with `pyroscope.relabel`:

| Name | Type | Description | Default | Required |
| ---------------- | ---------------------------- | --------------------------------------------------------- | ------- | -------- |
| `forward_to` | `list(pyroscope.Appendable)` | List of receivers to forward profiles to after relabeling | | yes |
| `max_cache_size` | `number` | Maximum number of entries in the label cache | 10000 | no |

## Blocks

You can use the following blocks with `pyroscope.relabel`:

| Name | Description | Required |
| -------------- | ------------------------------------------------------ | -------- |
| [`rule`][rule] | Relabeling rules to apply to received profile entries. | no |

[rule]: #rule

### rule

{{< docs/shared lookup="reference/components/rule-block.md" source="alloy" version="<ALLOY_VERSION>" >}}

## Exported fields

The following fields are exported and can be referenced by other components:

| Name | Type | Description |
| ---------- | ------------------ | ------------------------------------------------ |
| `receiver` | `ProfilesReceiver` | A receiver that accepts profiles for relabeling. |
| `rules` | `[]relabel.Config` | The list of relabeling rules. |

## Component health

`pyroscope.relabel` is reported as unhealthy if it is given an invalid configuration.

## Debug metrics

* `pyroscope_relabel_cache_hits` (counter): Total number of cache hits.
* `pyroscope_relabel_cache_misses` (counter): Total number of cache misses.
* `pyroscope_relabel_cache_size` (gauge): Total size of relabel cache.
* `pyroscope_relabel_profiles_dropped` (counter): Total number of profiles dropped by relabeling rules.
* `pyroscope_relabel_profiles_processed` (counter): Total number of profiles processed.
* `pyroscope_relabel_profiles_written` (counter): Total number of profiles forwarded.

## Example

```alloy
pyroscope.receive_http "default" {
forward_to = [pyroscope.relabel.filter_profiles.receiver]

http {
listen_address = "0.0.0.0"
listen_port = 9999
}
}

pyroscope.relabel "filter_profiles" {
forward_to = [pyroscope.write.staging.receiver]

// This creates a consistent hash value (0 or 1) for each unique combination of labels
// Using multiple source labels provides better sampling distribution across your profiles
rule {
source_labels = ["env"]
target_label = "__tmp_hash"
action = "hashmod"
modulus = 2
}

// This effectively samples ~50% of profile series
// The same combination of source label values will always hash to the same number,
// ensuring consistent sampling
rule {
source_labels = ["__tmp_hash"]
action = "drop"
regex = "^1$"
}
}

pyroscope.write "staging" {
endpoint {
url = "http://pyroscope-staging:4040"
}
}
```

<!-- START GENERATED COMPATIBLE COMPONENTS -->

## Compatible components

`pyroscope.relabel` can accept arguments from the following components:

- Components that export [Pyroscope `ProfilesReceiver`](../../../compatibility/#pyroscope-profilesreceiver-exporters)

`pyroscope.relabel` has exports that can be consumed by the following components:

- Components that consume [Pyroscope `ProfilesReceiver`](../../../compatibility/#pyroscope-profilesreceiver-consumers)

{{< admonition type="note" >}}
Connecting some components may not be sensible or components may require further configuration to make the connection work correctly.
Refer to the linked documentation for more details.
{{< /admonition >}}

<!-- END GENERATED COMPATIBLE COMPONENTS -->
1 change: 1 addition & 0 deletions internal/component/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ import (
_ "github.com/grafana/alloy/internal/component/pyroscope/ebpf" // Import pyroscope.ebpf
_ "github.com/grafana/alloy/internal/component/pyroscope/java" // Import pyroscope.java
_ "github.com/grafana/alloy/internal/component/pyroscope/receive_http" // Import pyroscope.receive_http
_ "github.com/grafana/alloy/internal/component/pyroscope/relabel" // Import pyroscope.relabel
_ "github.com/grafana/alloy/internal/component/pyroscope/scrape" // Import pyroscope.scrape
_ "github.com/grafana/alloy/internal/component/pyroscope/write" // Import pyroscope.write
_ "github.com/grafana/alloy/internal/component/remote/http" // Import remote.http
Expand Down
1 change: 1 addition & 0 deletions internal/component/pyroscope/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type IncomingProfile struct {
Body io.ReadCloser
Headers http.Header
URL *url.URL
Labels labels.Labels
}

var _ Appendable = (*Fanout)(nil)
Expand Down
22 changes: 22 additions & 0 deletions internal/component/pyroscope/receive_http/receive_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1"
"github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
"github.com/grafana/pyroscope/api/model/labelset"
)

const (
Expand Down Expand Up @@ -218,6 +219,26 @@ func (c *Component) getAppendables() []pyroscope.Appendable {
func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) {
appendables := c.getAppendables()

// Parse labels early
var lbls labels.Labels
if nameParam := r.URL.Query().Get("name"); nameParam != "" {
ls, err := labelset.Parse(nameParam)
if err != nil {
level.Warn(c.opts.Logger).Log(
"msg", "Failed to parse labels from name parameter",
"name", nameParam,
"err", err,
)
// Continue with empty labels instead of returning an error
} else {
var labelPairs []labels.Label
for k, v := range ls.Labels() {
labelPairs = append(labelPairs, labels.Label{Name: k, Value: v})
}
lbls = labels.New(labelPairs...)
}
}

// Create a pipe for each appendable
pipeWriters := make([]io.Writer, len(appendables))
pipeReaders := make([]io.Reader, len(appendables))
Expand Down Expand Up @@ -251,6 +272,7 @@ func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) {
Body: io.NopCloser(pipeReaders[i]),
Headers: r.Header.Clone(),
URL: r.URL,
Labels: lbls,
}

err := appendable.Appender().AppendIngest(ctx, profile)
Expand Down
62 changes: 55 additions & 7 deletions internal/component/pyroscope/receive_http/receive_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1"
"github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
"github.com/grafana/pyroscope/api/model/labelset"
)

// TestForwardsProfilesIngest verifies the behavior of the
Expand All @@ -44,13 +45,14 @@ func TestForwardsProfilesIngest(t *testing.T) {
appendableErrors []error
expectedStatus int
expectedForwards int
expectedLabels map[string]string
}{
{
name: "Small profile",
profileSize: 1024, // 1KB
method: "POST",
path: "/ingest",
queryParams: "name=test_app_1&from=1234567890&until=1234567900",
queryParams: "name=test_app&from=1234567890&until=1234567900",
headers: map[string]string{"Content-Type": "application/octet-stream"},
appendableErrors: []error{nil, nil},
expectedStatus: http.StatusOK,
Expand All @@ -61,7 +63,7 @@ func TestForwardsProfilesIngest(t *testing.T) {
profileSize: 1024 * 1024, // 1MB
method: "POST",
path: "/ingest",
queryParams: "name=test_app_2&from=1234567891&until=1234567901&custom=param1",
queryParams: "name=test_app&from=1234567891&until=1234567901&custom=param1",
headers: map[string]string{"X-Scope-OrgID": "1234"},
appendableErrors: []error{nil},
expectedStatus: http.StatusOK,
Expand All @@ -72,7 +74,7 @@ func TestForwardsProfilesIngest(t *testing.T) {
profileSize: 1024,
method: "GET",
path: "/ingest",
queryParams: "name=test_app_3&from=1234567892&until=1234567902",
queryParams: "name=test_app&from=1234567892&until=1234567902",
headers: map[string]string{},
appendableErrors: []error{nil, nil},
expectedStatus: http.StatusMethodNotAllowed,
Expand All @@ -94,7 +96,7 @@ func TestForwardsProfilesIngest(t *testing.T) {
profileSize: 1024,
method: "POST",
path: "/invalid",
queryParams: "name=test_app_4&from=1234567893&until=1234567903",
queryParams: "name=test_app&from=1234567893&until=1234567903",
headers: map[string]string{"Content-Type": "application/octet-stream"},
appendableErrors: []error{nil, nil},
expectedStatus: http.StatusNotFound,
Expand All @@ -105,7 +107,7 @@ func TestForwardsProfilesIngest(t *testing.T) {
profileSize: 2048,
method: "POST",
path: "/ingest",
queryParams: "name=test_app_5&from=1234567894&until=1234567904&scenario=all_fail",
queryParams: "name=test_app&from=1234567894&until=1234567904&scenario=all_fail",
headers: map[string]string{"Content-Type": "application/octet-stream", "X-Test": "fail-all"},
appendableErrors: []error{fmt.Errorf("error1"), fmt.Errorf("error2")},
expectedStatus: http.StatusInternalServerError,
Expand All @@ -116,12 +118,42 @@ func TestForwardsProfilesIngest(t *testing.T) {
profileSize: 4096,
method: "POST",
path: "/ingest",
queryParams: "name=test_app_6&from=1234567895&until=1234567905&scenario=partial_failure",
queryParams: "name=test_app&from=1234567895&until=1234567905&scenario=partial_failure",
headers: map[string]string{"X-Custom-ID": "test-6"},
appendableErrors: []error{fmt.Errorf("error"), nil},
expectedStatus: http.StatusInternalServerError,
expectedForwards: 2,
},
{
name: "Valid labels are parsed and forwarded",
profileSize: 1024,
method: "POST",
path: "/ingest",
queryParams: "name=test.app{env=prod,region=us-east}",
headers: map[string]string{"Content-Type": "application/octet-stream"},
appendableErrors: []error{nil, nil},
expectedStatus: http.StatusOK,
expectedForwards: 2,
expectedLabels: map[string]string{
"__name__": "test.app",
"env": "prod",
"region": "us-east",
},
},
{
name: "Invalid labels still forward profile",
profileSize: 1024,
method: "POST",
path: "/ingest",
queryParams: "name=test.app{invalid-label-syntax}",
headers: map[string]string{"Content-Type": "application/octet-stream"},
appendableErrors: []error{nil, nil},
expectedStatus: http.StatusOK,
expectedForwards: 2,
expectedLabels: map[string]string{
"__name__": "test.app", // Only __name__ is preserved
},
},
}

for _, tt := range tests {
Expand All @@ -136,7 +168,7 @@ func TestForwardsProfilesIngest(t *testing.T) {
require.Equal(t, tt.expectedForwards, forwardedCount, "Unexpected number of forwards")

if tt.expectedForwards > 0 {
verifyForwardedProfiles(t, appendables, testProfile, tt.headers, tt.queryParams)
verifyForwardedProfiles(t, appendables, testProfile, tt.headers, tt.queryParams, tt.expectedLabels)
}
})
}
Expand Down Expand Up @@ -296,11 +328,26 @@ func verifyForwardedProfiles(
expectedProfile []byte,
expectedHeaders map[string]string,
expectedQueryParams string,
expectedLabels map[string]string,
) {
for i, app := range appendables {
testApp, ok := app.(*testAppender)
require.True(t, ok, "Appendable is not a testAppender")

// Verify labels if name parameter exists and is valid
if nameParam := testApp.lastProfile.URL.Query().Get("name"); nameParam != "" {
ls, err := labelset.Parse(nameParam)
if err == nil {
require.Equal(t, ls.Labels(), testApp.lastProfile.Labels.Map(),
"Labels mismatch for appendable %d", i)
}
}

if expectedLabels != nil {
require.Equal(t, expectedLabels, testApp.lastProfile.Labels.Map(),
"Labels mismatch for appendable %d", i)
}

if testApp.lastProfile != nil {
// Verify profile body
body, err := io.ReadAll(testApp.lastProfile.Body)
Expand Down Expand Up @@ -446,6 +493,7 @@ func (a *testAppender) AppendIngest(_ context.Context, profile *pyroscope.Incomi
Body: io.NopCloser(&buf),
Headers: profile.Headers,
URL: profile.URL,
Labels: profile.Labels,
}
a.lastProfile = newProfile

Expand Down
Loading
Loading