Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: RLQS Prototype #11456

Draft
wants to merge 52 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
7cb13d9
RlqsFilter WIP
sergiitk Feb 7, 2024
75d2dde
Basic GrpcService type
sergiitk Feb 7, 2024
5120691
Basic GrpcService
sergiitk Feb 7, 2024
5ad1b0c
Basic interceptor
sergiitk Feb 7, 2024
b567cc2
Notes from the sync with Eric
sergiitk Feb 12, 2024
ab4c4ab
post-rebase fix
sergiitk Mar 19, 2024
326db46
RlqsClientPool, RlqsClient, working on shutdown
sergiitk Mar 26, 2024
b5b5d5f
another note
sergiitk Mar 26, 2024
b74e523
categorize todos
sergiitk Mar 26, 2024
2ac01c1
Basic RlqsBucketSettings and Matcher parsing
sergiitk Mar 26, 2024
9e1834d
Minimal CelMatcher
sergiitk Mar 27, 2024
50e6c07
basic cel-java integration/test
sergiitk Mar 27, 2024
1a0416f
Implement GrpcCelEnvironment and MetadataHelper
sergiitk Sep 4, 2024
d76b06f
Use dev.cel:runtime in the prod code
sergiitk Sep 6, 2024
919a21e
Add RlqsClientPool/RlqsClient/RlqsApiClient classes
sergiitk Sep 16, 2024
8481d34
Draft bucket processing logic
sergiitk Sep 24, 2024
a6aae0a
Filter chain lifecycle bookmarks - filter provider refactoring TBD
sergiitk Sep 24, 2024
e1fe8a8
Draft reports and timers
sergiitk Sep 25, 2024
bbcf6d5
RlqsClient -> RlqsEngine
sergiitk Sep 25, 2024
0726f7a
Remove periodic cleanup logic from RlqsClientPool
sergiitk Sep 25, 2024
163ea57
RlqsClientPool -> RlqsCache
sergiitk Sep 25, 2024
7e48ade
RlqsApiClient -> RlqsClient
sergiitk Sep 25, 2024
d616bc8
More class drafting
sergiitk Sep 25, 2024
227af0a
Create proper RateLimitResult
sergiitk Sep 26, 2024
6f8f5fc
Draft Bucket: Usage Reports, RateLimitStrategy, TTLs
sergiitk Sep 26, 2024
0eb7217
Improve method names
sergiitk Sep 27, 2024
97cff3f
RateLimitResult -> RlqsRateLimitResult
sergiitk Sep 27, 2024
0484dbc
More API improvements
sergiitk Sep 27, 2024
fec4420
getOrCreate pattern for bucket cache and timers
sergiitk Sep 27, 2024
2c19f72
RlqsClient doesn't know about the bucket cache anymore; uses callbacks
sergiitk Sep 28, 2024
394691e
Minor renames
sergiitk Sep 28, 2024
c688115
improved getOrCreateRlqsEngine
sergiitk Sep 28, 2024
cb900a2
Handle special case
sergiitk Sep 28, 2024
9095953
Dynamic bucket id builder processing initial logic.
sergiitk Oct 3, 2024
86ad5f8
hash config to a long
sergiitk Oct 4, 2024
6a2cf80
Remove outdated note
sergiitk Oct 8, 2024
d3662f9
add Filter.isEnabled()
sergiitk Oct 15, 2024
f4cd6bf
add GRPC_EXPERIMENTAL_RLQS_DRY_RUN
sergiitk Oct 15, 2024
7d7f746
XdsTestServer: add --xds_server_mode
sergiitk Oct 17, 2024
678b6ad
convert logid to local
sergiitk Oct 17, 2024
e66fb53
PSM e2e: works!
sergiitk Oct 17, 2024
b546c04
RlqsEngine -> RlqsFilterState
sergiitk Oct 17, 2024
a7c95e9
Add CEL types to the message printer
sergiitk Oct 21, 2024
a227f06
Add CEL macro verifications
sergiitk Oct 24, 2024
8546c10
Add CelMatcher.fromEnvoyProto - just to import dev.cel.expr
sergiitk Oct 25, 2024
f913e88
LongAdder note
sergiitk Oct 31, 2024
b0a1b5a
CEL variable resolver
sergiitk Oct 28, 2024
956fc56
update cel to 0.9.1-proto3
sergiitk Jan 17, 2025
1b1c91e
Use new cel options
sergiitk Jan 17, 2025
8023e93
add simple tests for the new options
sergiitk Jan 18, 2025
75d9edc
add maxRegexProgramSize check too!
sergiitk Jan 18, 2025
5d94ef0
minor rename and error check
sergiitk Jan 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ commons-math3 = "org.apache.commons:commons-math3:3.6.1"
conscrypt = "org.conscrypt:conscrypt-openjdk-uber:2.5.2"
cronet-api = "org.chromium.net:cronet-api:119.6045.31"
cronet-embedded = "org.chromium.net:cronet-embedded:119.6045.31"
dev-cel-compiler = "dev.cel:compiler:0.9.1-proto3"
dev-cel-protobuf = "dev.cel:protobuf:0.9.1-proto3"
dev-cel-runtime = "dev.cel:runtime:0.9.1-proto3"
# error-prone 2.31.0+ blocked on https://github.com/grpc/grpc-java/issues/10152
# It breaks Bazel (ArrayIndexOutOfBoundsException in turbine) and Dexing ("D8:
# java.lang.NullPointerException"). We can trivially upgrade the Bazel CI to
# 6.3.0+ (https://github.com/bazelbuild/bazel/issues/18743).
errorprone-annotations = "com.google.errorprone:error_prone_annotations:2.30.0"
errorprone-annotations = "com.google.errorprone:error_prone_annotations:2.36.0"
# error-prone 2.32.0+ require Java 17+
errorprone-core = "com.google.errorprone:error_prone_core:2.31.0"
google-api-protos = "com.google.api.grpc:proto-google-common-protos:2.48.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.grpc.ServerCredentials;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.gcp.csm.observability.CsmObservability;
import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
Expand Down Expand Up @@ -84,6 +85,7 @@ public final class XdsTestServer {
private int port = 8080;
private int maintenancePort = 8080;
private boolean secureMode = false;
private boolean xdsServerMode = false;
private boolean enableCsmObservability;
private String serverId = "java_server";
private HealthStatusManager health;
Expand Down Expand Up @@ -144,7 +146,10 @@ void parseArgs(String[] args) {
maintenancePort = Integer.valueOf(value);
} else if ("secure_mode".equals(key)) {
secureMode = Boolean.parseBoolean(value);
} else if ("enable_csm_observability".equals(key)) {
} else if ("xds_server_mode".equals(key)) {
xdsServerMode = Boolean.parseBoolean(value);
}
else if ("enable_csm_observability".equals(key)) {
enableCsmObservability = Boolean.valueOf(value);
} else if ("server_id".equals(key)) {
serverId = value;
Expand All @@ -165,6 +170,9 @@ void parseArgs(String[] args) {
+ maintenancePort);
usage = true;
}
if (secureMode) {
xdsServerMode = true;
}

if (usage) {
XdsTestServer s = new XdsTestServer();
Expand All @@ -181,6 +189,9 @@ void parseArgs(String[] args) {
+ " port and maintenance_port should be different for secure mode."
+ "\n Default: "
+ s.secureMode
+ "\n --xds_server_mode=BOOLEAN Start in xDS Server mode."
+ "\n Default: "
+ s.xdsServerMode
+ "\n --enable_csm_observability=BOOL Enable CSM observability reporting. Default: "
+ s.enableCsmObservability
+ "\n --server_id=STRING server ID for response."
Expand Down Expand Up @@ -215,66 +226,72 @@ void start() throws Exception {
throw new RuntimeException(e);
}
health = new HealthStatusManager();
ServerServiceDefinition testServiceInterceptor = ServerInterceptors.intercept(
new TestServiceImpl(serverId, host),
new TestInfoInterceptor(host));
ServerCredentials insecureServerCreds = InsecureServerCredentials.create();

@SuppressWarnings("deprecation")
BindableService oldReflectionService = ProtoReflectionService.newInstance();
if (secureMode) {
if (addressType != Util.AddressType.IPV4_IPV6) {
throw new IllegalArgumentException("Secure mode only supports IPV4_IPV6 address type");
}
maintenanceServer =
Grpc.newServerBuilderForPort(maintenancePort, InsecureServerCredentials.create())
Grpc.newServerBuilderForPort(maintenancePort, insecureServerCreds)
.addService(new XdsUpdateHealthServiceImpl(health))
.addService(health.getHealthService())
.addService(oldReflectionService)
.addService(ProtoReflectionServiceV1.newInstance())
.addServices(AdminInterface.getStandardServices())
.build();
maintenanceServer.start();
server =
XdsServerBuilder.forPort(
port, XdsServerCredentials.create(InsecureServerCredentials.create()))
.addService(
ServerInterceptors.intercept(
new TestServiceImpl(serverId, host), new TestInfoInterceptor(host)))
server = XdsServerBuilder.forPort(port, XdsServerCredentials.create(insecureServerCreds))
.addService(testServiceInterceptor)
.build();
server.start();
} else {
ServerBuilder<?> serverBuilder;
ServerCredentials insecureServerCreds = InsecureServerCredentials.create();
switch (addressType) {
case IPV4_IPV6:
serverBuilder = Grpc.newServerBuilderForPort(port, insecureServerCreds);
break;
case IPV4:
SocketAddress v4Address = Util.getV4Address(port);
InetSocketAddress localV4Address = new InetSocketAddress("127.0.0.1", port);
serverBuilder = NettyServerBuilder.forAddress(
localV4Address, insecureServerCreds);
if (v4Address != null && !v4Address.equals(localV4Address) ) {
((NettyServerBuilder) serverBuilder).addListenAddress(v4Address);
}
break;
case IPV6:
List<SocketAddress> v6Addresses = Util.getV6Addresses(port);
InetSocketAddress localV6Address = new InetSocketAddress("::1", port);
serverBuilder = NettyServerBuilder.forAddress(localV6Address, insecureServerCreds);
for (SocketAddress address : v6Addresses) {
if (!address.equals(localV6Address)) {
((NettyServerBuilder) serverBuilder).addListenAddress(address);
}
health.setStatus("", ServingStatus.SERVING);
return;
}

ServerBuilder<?> serverBuilder;
switch (addressType) {
case IPV4_IPV6:
serverBuilder = Grpc.newServerBuilderForPort(port, insecureServerCreds);
break;
case IPV4:
SocketAddress v4Address = Util.getV4Address(port);
InetSocketAddress localV4Address = new InetSocketAddress("127.0.0.1", port);
serverBuilder = NettyServerBuilder.forAddress(
localV4Address, insecureServerCreds);
if (v4Address != null && !v4Address.equals(localV4Address) ) {
((NettyServerBuilder) serverBuilder).addListenAddress(v4Address);
}
break;
case IPV6:
List<SocketAddress> v6Addresses = Util.getV6Addresses(port);
InetSocketAddress localV6Address = new InetSocketAddress("::1", port);
serverBuilder = NettyServerBuilder.forAddress(localV6Address, insecureServerCreds);
for (SocketAddress address : v6Addresses) {
if (!address.equals(localV6Address)) {
((NettyServerBuilder) serverBuilder).addListenAddress(address);
}
break;
default:
throw new AssertionError("Unknown address type: " + addressType);
}
break;
default:
throw new AssertionError("Unknown address type: " + addressType);
}

if (xdsServerMode) {
if (addressType != Util.AddressType.IPV4_IPV6) {
throw new IllegalArgumentException("xDS Server mode only supports IPV4_IPV6 address type");
}

logger.info("Starting server on port " + port + " with address type " + addressType);

server =
serverBuilder
.addService(
ServerInterceptors.intercept(
new TestServiceImpl(serverId, host), new TestInfoInterceptor(host)))
.addService(testServiceInterceptor)
.addService(new XdsUpdateHealthServiceImpl(health))
.addService(health.getHealthService())
.addService(oldReflectionService)
Expand All @@ -283,7 +300,23 @@ void start() throws Exception {
.build();
server.start();
maintenanceServer = null;
health.setStatus("", ServingStatus.SERVING);
return;
}

logger.info("Starting server on port " + port + " with address type " + addressType);

server =
serverBuilder
.addService(testServiceInterceptor)
.addService(new XdsUpdateHealthServiceImpl(health))
.addService(health.getHealthService())
.addService(oldReflectionService)
.addService(ProtoReflectionServiceV1.newInstance())
.addServices(AdminInterface.getStandardServices())
.build();
server.start();
maintenanceServer = null;
health.setStatus("", ServingStatus.SERVING);
}

Expand Down
5 changes: 4 additions & 1 deletion xds/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ dependencies {
project(':grpc-services'),
project(':grpc-auth'),
project(path: ':grpc-alts', configuration: 'shadow'),
libraries.dev.cel.runtime,
libraries.dev.cel.protobuf,
libraries.guava,
libraries.gson,
libraries.re2j,
Expand All @@ -70,7 +72,8 @@ dependencies {
compileOnly libraries.netty.transport.epoll

testImplementation project(':grpc-testing'),
project(':grpc-testing-proto')
project(':grpc-testing-proto'),
libraries.dev.cel.compiler
testImplementation (libraries.netty.transport.epoll) {
artifact {
classifier = "linux-x86_64"
Expand Down
19 changes: 19 additions & 0 deletions xds/src/main/java/io/grpc/xds/Filter.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
*/
String[] typeUrls();

default boolean isEnabled() {
return true;
}

/**
* Parses the top-level filter config from raw proto message. The message may be either a {@link
* com.google.protobuf.Any} or a {@link com.google.protobuf.Struct}.
Expand All @@ -50,6 +54,12 @@
*/
ConfigOrError<? extends FilterConfig> parseFilterConfigOverride(Message rawProtoMessage);

default void shutdown() {
// Implement as needed.
// TODO(sergiitk): [DESIGN] important to cover and discuss in the design.
// TODO(sergiitk): [QUESTION] should it be in ServerInterceptorBuilder?
}

Check warning on line 61 in xds/src/main/java/io/grpc/xds/Filter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/Filter.java#L61

Added line #L61 was not covered by tests

/** Represents an opaque data structure holding configuration for a filter. */
interface FilterConfig {
String typeUrl();
Expand All @@ -68,6 +78,15 @@
@Nullable
ServerInterceptor buildServerInterceptor(
FilterConfig config, @Nullable FilterConfig overrideConfig);

@Nullable
default ServerInterceptor buildServerInterceptor(
FilterConfig config,
@Nullable FilterConfig overrideConfig,
ScheduledExecutorService scheduler) {
return buildServerInterceptor(config, overrideConfig);
}

}

/** Filter config with instance name. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
drainGraceTime = drainGraceNanosObj;
drainGraceTimeUnit = TimeUnit.NANOSECONDS;
}
// TODO(sergiitk): [design] drains connections on LDS update.
FilterChainSelectorManager.Closer closer = new FilterChainSelectorManager.Closer(
new GracefullyShutdownChannelRunnable(ctx.channel(), drainGraceTime, drainGraceTimeUnit));
FilterChainSelector selector = filterChainSelectorManager.register(closer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public void updateSelector(FilterChainSelector newSelector) {
closers = new TreeSet<Closer>(closers.comparator());
selector = newSelector;
}
// TODO(sergiitk): [design] calls the closer of FilterChainMatchingNegotiatorServerFactory
for (Closer closer : oldClosers) {
closer.closer.run();
}
Expand Down
6 changes: 5 additions & 1 deletion xds/src/main/java/io/grpc/xds/FilterRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ static synchronized FilterRegistry getDefaultRegistry() {
instance = newRegistry().register(
FaultFilter.INSTANCE,
RouterFilter.INSTANCE,
RbacFilter.INSTANCE);
RbacFilter.INSTANCE,
RlqsFilter.INSTANCE);
}
return instance;
}
Expand All @@ -50,6 +51,9 @@ static FilterRegistry newRegistry() {
@VisibleForTesting
FilterRegistry register(Filter... filters) {
for (Filter filter : filters) {
if (!filter.isEnabled()) {
continue;
}
for (String typeUrl : filter.typeUrls()) {
supportedFilters.put(typeUrl, filter);
}
Expand Down
9 changes: 9 additions & 0 deletions xds/src/main/java/io/grpc/xds/MessagePrinter.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.grpc.xds;

import com.github.xds.type.matcher.v3.CelMatcher;
import com.github.xds.type.matcher.v3.HttpAttributesCelMatchInput;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
Expand All @@ -28,6 +30,8 @@
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
import io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig;
import io.envoyproxy.envoy.extensions.filters.http.fault.v3.HTTPFault;
import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaFilterConfig;
import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaOverride;
import io.envoyproxy.envoy.extensions.filters.http.rbac.v3.RBAC;
import io.envoyproxy.envoy.extensions.filters.http.rbac.v3.RBACPerRoute;
import io.envoyproxy.envoy.extensions.filters.http.router.v3.Router;
Expand Down Expand Up @@ -58,6 +62,11 @@ private static JsonFormat.Printer newPrinter() {
.add(RBAC.getDescriptor())
.add(RBACPerRoute.getDescriptor())
.add(Router.getDescriptor())
// RLQS
.add(RateLimitQuotaFilterConfig.getDescriptor())
.add(RateLimitQuotaOverride.getDescriptor())
.add(HttpAttributesCelMatchInput.getDescriptor())
.add(CelMatcher.getDescriptor())
// UpstreamTlsContext and DownstreamTlsContext in v3 are not transitively imported
// by top-level resource types.
.add(UpstreamTlsContext.getDescriptor())
Expand Down
Loading
Loading