Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(extension): add blocking memorylimiter extension #172

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
527 changes: 527 additions & 0 deletions collector/config/configgrpc/configgrpc.go

Large diffs are not rendered by default.

1,304 changes: 1,304 additions & 0 deletions collector/config/configgrpc/configgrpc_test.go

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions collector/config/configgrpc/testdata/ca.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-----BEGIN CERTIFICATE-----
MIIDNjCCAh4CCQC0I5IQT7eziDANBgkqhkiG9w0BAQsFADBdMQswCQYDVQQGEwJB
VTESMBAGA1UECAwJQXVzdHJhbGlhMQ8wDQYDVQQHDAZTeWRuZXkxEjAQBgNVBAoM
CU15T3JnTmFtZTEVMBMGA1UEAwwMTXlDb21tb25OYW1lMB4XDTIyMDgwMzA0MTky
MVoXDTMyMDczMTA0MTkyMVowXTELMAkGA1UEBhMCQVUxEjAQBgNVBAgMCUF1c3Ry
YWxpYTEPMA0GA1UEBwwGU3lkbmV5MRIwEAYDVQQKDAlNeU9yZ05hbWUxFTATBgNV
BAMMDE15Q29tbW9uTmFtZTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEB
AMhGP0dy3zvkdx9zI+/XVjPOWlER0OUp7Sgzidc3nLOk42+bH4ofIVNtOFVqlNKi
O1bImu238VdBhd6R5IZZ1ZdIMcCeDgSJYu2X9wA3m4PKz8IdXo5ly2OHghhmCvqG
WxgqDj5wPXiczQwuf1EcDMtRWbXJ6Z/XH1U68R/kRdNLkiZ2LwtjoQpis5XYckLL
CrdF+AL6GeDIe0Mh9QGs26Vux+2kvaOGNUWRPE6Wt4GkqyKqmzYfR9HbflJ4xHT2
I+jE1lg+jMBeom7z8Z90RE4GGcHjO+Vens/88r5EAjTnFj1Kb5gL2deSHY1m/++R
Z/kRyg+zQJyw4fAzlAA4+VkCAwEAATANBgkqhkiG9w0BAQsFAAOCAQEAM3gRdTKX
eGwGYVmmKqA2vTxeigQYLHml7OSopcWj2wJfxfp49HXPRuvgpQn9iubxO3Zmhd83
2X1E+T0A8oy5CfxgpAhHb3lY0jm3TjKXm6m+dSODwL3uND8tX+SqR8sRTFxPvPuo
pmvhdTZoRI3EzIiHLTgCuSU25JNP/vrVoKk0JvCkDYTU/WcVfj0v95DTMoWR4JGz
mtBwrgD0EM2XRw5ZMc7sMPli1gqmCbCQUrDZ+rPB78WDCBILBd8Cz75qYTUp98BY
akJyBckdJHAdyEQYDKa9HpmpexOO7IhSXCTEN1DEBgpZgEi/lBDRG/b0OzenUUgt
LUABtWt3pNQ9HA==
-----END CERTIFICATE-----
20 changes: 20 additions & 0 deletions collector/config/configgrpc/testdata/client.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-----BEGIN CERTIFICATE-----
MIIDVTCCAj2gAwIBAgIJANt5fkUlfxyiMA0GCSqGSIb3DQEBCwUAMF0xCzAJBgNV
BAYTAkFVMRIwEAYDVQQIDAlBdXN0cmFsaWExDzANBgNVBAcMBlN5ZG5leTESMBAG
A1UECgwJTXlPcmdOYW1lMRUwEwYDVQQDDAxNeUNvbW1vbk5hbWUwHhcNMjIwODAz
MDQxOTIxWhcNMzIwNzMxMDQxOTIxWjBdMQswCQYDVQQGEwJBVTESMBAGA1UECAwJ
QXVzdHJhbGlhMQ8wDQYDVQQHDAZTeWRuZXkxEjAQBgNVBAoMCU15T3JnTmFtZTEV
MBMGA1UEAwwMTXlDb21tb25OYW1lMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB
CgKCAQEAm/gURxkdWTDS0TyL2j920SfOtOZIo7DjubWLbZtNLrNCZNBsV+8c/ko/
wleWmUJQRHeiZkNFs8TK6d8Grks6ta9oNO4CiCCO1kz4QidA827cL5+WaKWEVn8Y
Z8aiEMjDOnpYnb/ycsXpERN/P22jHpFD3DKSwLXoXQvasbSJsZro+AIaPAurFB7W
rMagCptwzGQDzryqVKEmXo+eN4XRxsoE8yroHsGbQ8GCZ+neftgV3Jhi1qcXZ//A
3ApY5lg06n1A03fYBlXE5L9tYKpIRNl2kq45mJ8DX6Tdp4Z1Y15+keIIyQpx4LRf
rtdbMQNJhBFOwpAajTmaKXxeICFRHQIDAQABoxgwFjAUBgNVHREEDTALgglsb2Nh
bGhvc3QwDQYJKoZIhvcNAQELBQADggEBAKWrbMxms658R/wYwLxzWPrZVKFswOJX
TpSkXGkyRnrhhZi3I8EhLZhlpZ9k8dplcvseVAUdX9hJu0BaDWBiW/VlPVUkWpWR
QZzrssAKhmSYMgl3OiayU30vL9bxYsAX9KeOJfnJ4kWoBpnguToED7wrC1lbzrVK
Vj1AiI3hBdKUdPNO0hyb8yfxbP3MOottMkk89DIebtOhqj2KEU7sKrhW9a5P5D7d
0A+0kf/IunUZ4IYFfha6qy0gRMyayfm9ttrPAY6q3faqtWR7nY87/T/7wHr1LQ1/
Q622p7v3j3y75lGN50kFnSd77ykag/8avEKxOTFoGOQc5VCRYJnJwb4=
-----END CERTIFICATE-----
27 changes: 27 additions & 0 deletions collector/config/configgrpc/testdata/client.key
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEAm/gURxkdWTDS0TyL2j920SfOtOZIo7DjubWLbZtNLrNCZNBs
V+8c/ko/wleWmUJQRHeiZkNFs8TK6d8Grks6ta9oNO4CiCCO1kz4QidA827cL5+W
aKWEVn8YZ8aiEMjDOnpYnb/ycsXpERN/P22jHpFD3DKSwLXoXQvasbSJsZro+AIa
PAurFB7WrMagCptwzGQDzryqVKEmXo+eN4XRxsoE8yroHsGbQ8GCZ+neftgV3Jhi
1qcXZ//A3ApY5lg06n1A03fYBlXE5L9tYKpIRNl2kq45mJ8DX6Tdp4Z1Y15+keII
yQpx4LRfrtdbMQNJhBFOwpAajTmaKXxeICFRHQIDAQABAoIBAQCWxrT7omi/vzYd
9dUQ8Acx3LS0JmaUb71F2x3loJt1iO+nO+FxBIPXw/ltK3U3xWaJOcnx6Biq15R9
kBAKUEl6OA6aFHi4FhlfS9s3QHFGo6YSF8m0ckXDxGvYbqpfZWVt07Z1EYkUsQRF
cL6zl454T1/1r6I0z+XIhVwuLGRsHt2+GCwSrLMnF9aTUJvPFy5G7YlxmL5q1BFu
F70AK9FLZcYqa5nP1F1HbIQB/zsQ8admpKIy5tjaZiLgctXv2GTzzXDEwEnaJMrq
SPr1dGDhdGs5iYRMOMT5Pp9dIG2+ZSSMHFAn4IRoB/cPJbNEUkgwQOPmDYETqSg5
tSjfIUw1AoGBAMjE6PlT1/orlHW4QvKmV73YtKPVfi1Coo/F8G45qFaHDkc6bI9W
ySrnvqWcPs++xOZMoGtLuESw/LEluFo8vMX8aQYrVSz4Pb7AvuYbBRE0EVVui7YB
3B1O0c7QTabmfQYeATYD7qSShLccSpUE+FQa6NdrJOxddJLM0Z9K73q7AoGBAMbg
I2+NYB6XME7tyStOS4pkA4y7brG/M8BCaY34nfOJT4Qh6pqZRnDJ4ReopGoXEqWg
hwFRsBNhsji4GGejRBPnYcfJTSuMXSPromgoH1tR0OQbMJB0pCTavbI9j+endlv4
/P+KV3ZMYOLhL/gaaTG+o6Wh2ehnE/8/rmqGpoIHAoGAHXVG+c5jkkFytxMiP5hI
p4J0ftWEff+Y+p+Ad6veF1QZtDnOU/nX6oO2ZXZXgQPswB3eK+AgWXPen994/USM
LkCq6EzTYpXJ+YMuf3TXeX66TF68ASiks2gtQLsvqZ2IGq2sX9CT43HcJ0Hvb44b
IbwRDgqakFPmFuQWndjQ6qECgYA9bOlFATOY/zWKi2NBHvOyEOYPx6yO9fF0Bo83
rHyMxfJra1Zc3c6l85S0jAAMTIgT5BsOyz5JHjm/zwyqpgDW7PaEkKZnNvllqNgG
t63HtOOCMOu1EnHIeE9zCBS0hkLGcYcjHoWZIkoiiU8ZoH6xQKKm+/CkGYJRqkei
22f+bQKBgGHq2/ZzgxfblD3blKWp8mh5Kw7c/2VwJRvLEMlgzrnRgF7QNhEcH3Jm
aD/pqzAkqHnLVVQ5ogMKrrLl11jQp4kX74+Ps2Yul7UgzXFYy020mQSpJF/FMjrl
PEqwfCiOT2nLyE30x9VClUOGXy1CxH52Yn/g81ENq3jKTptwh+fI
-----END RSA PRIVATE KEY-----
20 changes: 20 additions & 0 deletions collector/config/configgrpc/testdata/server.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-----BEGIN CERTIFICATE-----
MIIDVTCCAj2gAwIBAgIJANt5fkUlfxyhMA0GCSqGSIb3DQEBCwUAMF0xCzAJBgNV
BAYTAkFVMRIwEAYDVQQIDAlBdXN0cmFsaWExDzANBgNVBAcMBlN5ZG5leTESMBAG
A1UECgwJTXlPcmdOYW1lMRUwEwYDVQQDDAxNeUNvbW1vbk5hbWUwHhcNMjIwODAz
MDQxOTIxWhcNMzIwNzMxMDQxOTIxWjBdMQswCQYDVQQGEwJBVTESMBAGA1UECAwJ
QXVzdHJhbGlhMQ8wDQYDVQQHDAZTeWRuZXkxEjAQBgNVBAoMCU15T3JnTmFtZTEV
MBMGA1UEAwwMTXlDb21tb25OYW1lMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB
CgKCAQEAv1Pm2elKl/IpJlX5NqQjRTlA1rHws8F7v1IuaXB2qfk1MsDCt37OvlbR
4ARrY6zdUIrEQ/wrhQsZ2M5/yaj0rfeCgd/SDUKMAqDvXQXBY2AaLubTAIEMs4rF
R5Zq/pcBNz1zu8kRvRgvVuOpTCPR1kRvKFWAp689lXZhUU/BrQQXhdA993xVMRM9
u4fZuJLxNGGR/EhqTec4Z65jAZiUfO7ID94PtaxTrzR/Kjr2CiceR5hwdY40Bcme
D3IAd0J6nN1zIihe+Nqg/ImOG7YS+efQIEWJ8eHOoCK5knFBXRy6WwxeCyAPXyIb
DTrqTy67eTDYc0XZ24F/5Q3GSvfMDwIDAQABoxgwFjAUBgNVHREEDTALgglsb2Nh
bGhvc3QwDQYJKoZIhvcNAQELBQADggEBAKeFHP5rQasRS/XGbPkobfbFyTdGnLay
0Vr6+Rs5+4siKlAIhuUP9A/De61CEkFj8NFi2bmXYv8q3qP/z0lrjw7btrvD7Qc7
lth73k3U2sUVZoqbYQZz0GHCWfZm8yXjP63SKI+81LHbS40ArO0R44BLc9TbbRiR
/LwO/x2+cxs28KdsEkU6jQ6Ly5jyoxw1ysoIeRfIk+FnQD4w29TyGgtX/G15/NN0
ytByIZ8wdbUciunQc3nPXoPc41N+hyi2GZaXMuJ4VlsNmgY+wPmp4y3pl4l0bgCb
1FR8Vvtsi8jLH8J15oAMWdmHQKcoJDE49llx+bQGpNekp6mlfX1DIPI=
-----END CERTIFICATE-----
27 changes: 27 additions & 0 deletions collector/config/configgrpc/testdata/server.key
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEAv1Pm2elKl/IpJlX5NqQjRTlA1rHws8F7v1IuaXB2qfk1MsDC
t37OvlbR4ARrY6zdUIrEQ/wrhQsZ2M5/yaj0rfeCgd/SDUKMAqDvXQXBY2AaLubT
AIEMs4rFR5Zq/pcBNz1zu8kRvRgvVuOpTCPR1kRvKFWAp689lXZhUU/BrQQXhdA9
93xVMRM9u4fZuJLxNGGR/EhqTec4Z65jAZiUfO7ID94PtaxTrzR/Kjr2CiceR5hw
dY40BcmeD3IAd0J6nN1zIihe+Nqg/ImOG7YS+efQIEWJ8eHOoCK5knFBXRy6Wwxe
CyAPXyIbDTrqTy67eTDYc0XZ24F/5Q3GSvfMDwIDAQABAoIBAQC/BuxlAhKiJvyC
9DABKFy2zvU35y3mq/X8Dfec+tbf2pwM8nz3bLrLPDAMNR1rxbqqogJXxr1E9tJ1
r6fTFshFsewx8+DrsFfOgBS9kfOGXvuFfJ2L0U13LcTPNxXY37gtCUQ2aAk3/Z+2
Z1QvW0w1XNqHMOdlhQg95JZB8xnyvXs1niLT/I9d7KbPBmOWkB5Jp7+JaebmWqNS
alxnNqYnhXcrNSAbuR4bgz0l4I+Jprms26C6sakmgCfeMjfWbd2k3tp06vKXmT6q
qKa0855axP9wuSbKbscTDW5RFYTYnu/CSYJ4nZtzSS8a559iG3m61EgPOoVTnTX6
0t0I+kwRAoGBAN403NO4FfHG8k2bFpbATQkmC9UwjMbl5RIEL0fFhNVsuM5jTwHc
0wlFm9tMN8xqg66OFCimC/mUNPWX8nrb/MwrAw6/50rbyqOBFnmFKIfVf4ftpLzt
BLhEg7a/FPgdDgldQD+C6XbMyBA1AF3nbpTnbnj5WVQTl672s9teegSzAoGBANxs
1y6Nfh2DyyU16p61376AAP3WfHvuBgJAC0xGCqoTrbyzl4/r06BTMl51PbWJLDjm
FryTtgM7a8XO1jwfWJno71dnT7Bsy+wYnmJ5+9XHwgO9oZfSFUk+ELrEImI/4NZX
dJLkc0SuCG/wa3Wa76+sFNlzAzBBs83RE2j432E1AoGAF+x5GhJnynAxBkn8VJ6/
rIx8GafwgDmgQCBTNtb9Rj0+aHoot3qe/hCQhzvdhhSxuMlzQi0efPCIAyko4jFt
Nk4rNhtTO6wOVSxAzzSW+Ij0Ah6D7hNWvsAhrjtEdrIqILf5gt0FZdUGdTg/odyY
+08vhbbS90pkumG1W5kAaiECgYEAqjk3eBD26u4jjIn1tTk5H9GUcnMYUVCAvW4e
C3ovtCZcTlTW3+M73B1D0aRy0mWrjAlMV7cuoZJa6TiRQ37lmn5Dj1kONm3ekWZ1
shEIBZEtaFwila88lwJiQwlCkGNKS9zf/qyDw+8uPtwI8JqFLUIUG9VxCexDYddr
SO6g+10CgYEAwUs2BRJb6Od+8XtH32+8DDOnpfWJARY0CwogN2k+D1dbAB8Wkda1
BMADasAcjDFRX6xvyyDlqxcDIoCI1JvpS82I/PTNHeT8pEr5Caln7OHD/BtnPwmI
YR0bvKkoN0jdQdjifpMVXEbJS1VfFLdQYQ8iQMwZfkFmzIkYpvqWVtw=
-----END RSA PRIVATE KEY-----
34 changes: 34 additions & 0 deletions collector/config/configgrpc/wrappedstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright The OpenTelemetry Authors
// Copyright 2016 Michal Witkowski. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package configgrpc // import "go.opentelemetry.io/collector/config/configgrpc"

import (
"context"

"google.golang.org/grpc"
)

// this functionality was originally copied from grpc-ecosystem/go-grpc-middleware project

// wrappedServerStream is a thin wrapper around grpc.ServerStream that allows modifying context.
type wrappedServerStream struct {
grpc.ServerStream
// wrappedContext is the wrapper's own Context. You can assign it.
wrappedCtx context.Context
}

// Context returns the wrapper's wrappedContext, overwriting the nested grpc.ServerStream.Context()
func (w *wrappedServerStream) Context() context.Context {
return w.wrappedCtx
}

// wrapServerStream returns a ServerStream with the new context.
func wrapServerStream(wrappedCtx context.Context, stream grpc.ServerStream) *wrappedServerStream {
if existing, ok := stream.(*wrappedServerStream); ok {
existing.wrappedCtx = wrappedCtx
return existing
}
return &wrappedServerStream{ServerStream: stream, wrappedCtx: wrappedCtx}
}
43 changes: 43 additions & 0 deletions collector/config/configgrpc/wrappedstream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright The OpenTelemetry Authors
// Copyright 2016 Michal Witkowski. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package configgrpc // import "go.opentelemetry.io/collector/internal/middleware"

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
)

type ctxKey struct{}

var oneCtxKey = ctxKey{}
var otherCtxKey = ctxKey{}

func TestWrapServerStream(t *testing.T) {
ctx := context.WithValue(context.TODO(), oneCtxKey, 1)
fake := &fakeServerStream{ctx: ctx}
assert.NotNil(t, fake.Context().Value(oneCtxKey), "values from fake must propagate to wrapper")
wrapped := wrapServerStream(context.WithValue(fake.Context(), otherCtxKey, 2), fake)
assert.NotNil(t, wrapped.Context().Value(oneCtxKey), "values from wrapper must be set")
assert.NotNil(t, wrapped.Context().Value(otherCtxKey), "values from wrapper must be set")
}

func TestDoubleWrapping(t *testing.T) {
fake := &fakeServerStream{ctx: context.Background()}
wrapped := wrapServerStream(fake.Context(), fake)
assert.Same(t, wrapped, wrapServerStream(wrapped.Context(), wrapped)) // should be noop
assert.Equal(t, fake, wrapped.ServerStream)
}

type fakeServerStream struct {
grpc.ServerStream
ctx context.Context
}

func (f *fakeServerStream) Context() context.Context {
return f.ctx
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package blockingmemorylimiterextension // import "github.com/open-telemetry/otel-arrow/collector/blockingmemorylimiterextension"

import (
"context"
"fmt"
"time"

"golang.org/x/sync/semaphore"
"go.uber.org/zap"
"google.golang.org/grpc"

"go.opentelemetry.io/collector/component"
)

type blockingMLExtension struct {
limitBytes int64
sem *semaphore.Weighted
logger *zap.Logger
timeout time.Duration
}

// This interface is meant to access the size of a
// ExportTraceServiceRequest, ExportMetricsServiceRequest, ExportLogsServicesRequest
type telemetryServiceRequest = interface {
Size() int
}


// newMemoryLimiter returns a new memorylimiter extension.
func newBlockingMLExtension(cfg *Config, logger *zap.Logger) (*blockingMLExtension, error) {
limitBytes := int64(cfg.MemoryLimitMiB) << 20
return &blockingMLExtension{
limitBytes: limitBytes,
sem: semaphore.NewWeighted(limitBytes),
timeout: cfg.Timeout,
logger: logger,
}, nil
}

func (bml *blockingMLExtension) Start(ctx context.Context, host component.Host) error {
return nil
}

func (bml *blockingMLExtension) Shutdown(ctx context.Context) error {
return nil
}

func (bml *blockingMLExtension) MustRefuse() bool {
return false
}

func (bml *blockingMLExtension) UnaryInterceptorGenerator() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
a := req.(telemetryServiceRequest)
requestSize := int64(a.Size())

semCtx, cancel := context.WithTimeout(context.Background(), bml.timeout)
defer cancel()

err = bml.sem.Acquire(semCtx, requestSize)
if err != nil {
return nil, fmt.Errorf("not enough memory available to process request, %w", err)
}

resp, err = handler(ctx, req)
bml.sem.Release(requestSize)

return resp, err
}
}
Loading
Loading