Skip to content

Commit

Permalink
Fix broker code (allegro#1738)
Browse files Browse the repository at this point in the history
  • Loading branch information
SkySurferOne committed Jan 19, 2024
1 parent 873c3aa commit 4b38552
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@

@Configuration
@EnableConfigurationProperties({
SslContextProperties.class,
HttpClientsMonitoringProperties.class,
SenderAsyncTimeoutProperties.class,
BatchProperties.class
SslContextProperties.class,
HttpClientsMonitoringProperties.class,
SenderAsyncTimeoutProperties.class,
BatchProperties.class
})
public class ConsumerSenderConfiguration {

Expand All @@ -78,16 +78,16 @@ public Http1ClientProperties http1SerialClientProperties() {

@Bean(name = "http1-serial-client")
public HttpClient http1SerialClient(
HttpClientsFactory httpClientsFactory,
@Named("http1-serial-client-parameters") Http1ClientParameters http1ClientParameters,
MetricsFacade metricsFacade
HttpClientsFactory httpClientsFactory,
@Named("http1-serial-client-parameters") Http1ClientParameters http1ClientParameters,
MetricsFacade metricsFacade
) {
HttpClient client = httpClientsFactory.createClientForHttp1("jetty-http1-serial-client", http1ClientParameters);
if (http1ClientParameters.isRequestProcessingMonitoringEnabled()) {
var metrics = metricsFacade.consumerSender();
enrichWithMetrics(
client, metrics.http1SerialClientRequestQueueWaitingTimer(),
metrics.http1SerialClientRequestProcessingTimer()
client, metrics.http1SerialClientRequestQueueWaitingTimer(),
metrics.http1SerialClientRequestProcessingTimer()
);
}
return client;
Expand All @@ -101,9 +101,9 @@ public Http2ClientProperties http2SerialClientProperties() {

@Bean
public Http2ClientHolder http2ClientHolder(
HttpClientsFactory httpClientsFactory,
@Named("http2-serial-client-parameters") Http2ClientProperties http2ClientProperties,
MetricsFacade metricsFacade
HttpClientsFactory httpClientsFactory,
@Named("http2-serial-client-parameters") Http2ClientProperties http2ClientProperties,
MetricsFacade metricsFacade
) {
if (!http2ClientProperties.isEnabled()) {
return new Http2ClientHolder(null);
Expand All @@ -112,8 +112,8 @@ public Http2ClientHolder http2ClientHolder(
if (http2ClientProperties.isRequestProcessingMonitoringEnabled()) {
var metrics = metricsFacade.consumerSender();
enrichWithMetrics(
client, metrics.http2SerialClientRequestQueueWaitingTimer(),
metrics.http2SerialClientRequestProcessingTimer()
client, metrics.http2SerialClientRequestQueueWaitingTimer(),
metrics.http2SerialClientRequestProcessingTimer()
);
}
return new Http2ClientHolder(client);
Expand All @@ -128,15 +128,15 @@ public Http1ClientProperties http1BatchClientProperties() {

@Bean(name = "http1-batch-client")
public HttpClient http1BatchClient(
HttpClientsFactory httpClientsFactory,
@Named("http1-batch-client-parameters") Http1ClientParameters http1ClientParameters,
MetricsFacade metricsFacade) {
HttpClientsFactory httpClientsFactory,
@Named("http1-batch-client-parameters") Http1ClientParameters http1ClientParameters,
MetricsFacade metricsFacade) {
HttpClient client = httpClientsFactory.createClientForHttp1("jetty-http1-batch-client", http1ClientParameters);
if (http1ClientParameters.isRequestProcessingMonitoringEnabled()) {
var metrics = metricsFacade.consumerSender();
enrichWithMetrics(
client, metrics.http1BatchClientRequestQueueWaitingTimer(),
metrics.http1BatchClientRequestProcessingTimer()
client, metrics.http1BatchClientRequestQueueWaitingTimer(),
metrics.http1BatchClientRequestProcessingTimer()
);
}
return client;
Expand All @@ -159,8 +159,8 @@ public MessageBatchSenderFactory httpMessageBatchSenderFactory(SendingResultHand
BatchHttpRequestFactory batchHttpRequestFactory
) {
return new HttpMessageBatchSenderFactory(
resultHandlers,
batchHttpRequestFactory);
resultHandlers,
batchHttpRequestFactory);
}

@Bean(initMethod = "start")
Expand All @@ -170,12 +170,12 @@ public HttpClientsWorkloadReporter httpClientsWorkloadReporter(MetricsFacade met
Http2ClientHolder http2ClientHolder,
HttpClientsMonitoringProperties monitoringProperties) {
return new HttpClientsWorkloadReporter(
metrics,
http1SerialClient,
http1BatchClient,
http2ClientHolder,
monitoringProperties.isRequestQueueMonitoringEnabled(),
monitoringProperties.isConnectionPoolMonitoringEnabled());
metrics,
http1SerialClient,
http1BatchClient,
http2ClientHolder,
monitoringProperties.isRequestQueueMonitoringEnabled(),
monitoringProperties.isConnectionPoolMonitoringEnabled());
}

@Bean(destroyMethod = "closeProviders")
Expand All @@ -193,15 +193,15 @@ public ProtocolMessageSenderProvider jettyHttpMessageSenderProvider(@Named("http
SendingResultHandlers sendingResultHandlers,
HttpRequestFactoryProvider requestFactoryProvider) {
return new JettyHttpMessageSenderProvider(
httpClient,
http2ClientHolder,
endpointAddressResolver,
metadataAppender,
authorizationProviderFactory,
httpHeadersProviderFactory,
sendingResultHandlers,
requestFactoryProvider,
ImmutableSet.of("http", "https")
httpClient,
http2ClientHolder,
endpointAddressResolver,
metadataAppender,
authorizationProviderFactory,
httpHeadersProviderFactory,
sendingResultHandlers,
requestFactoryProvider,
ImmutableSet.of("http", "https")
);
}

Expand Down Expand Up @@ -255,21 +255,21 @@ public MetadataAppender<Message> jmsMetadataAppender() {

@Bean(name = "defaultPubSubMessageSenderProvider")
public ProtocolMessageSenderProvider pubSubMessageSenderProvider(
GooglePubSubSenderTargetResolver targetResolver,
CredentialsProvider credentialsProvider,
ExecutorProvider executorProvider,
RetrySettings retrySettings,
BatchingSettings batchingSettings,
GooglePubSubMessageTransformerCreator googlePubSubMessageTransformerCreator,
TransportChannelProvider transportChannelProvider) {
GooglePubSubSenderTargetResolver targetResolver,
CredentialsProvider credentialsProvider,
ExecutorProvider executorProvider,
RetrySettings retrySettings,
BatchingSettings batchingSettings,
GooglePubSubMessageTransformerCreator googlePubSubMessageTransformerCreator,
TransportChannelProvider transportChannelProvider) {
return new GooglePubSubMessageSenderProvider(
targetResolver,
credentialsProvider,
executorProvider,
retrySettings,
batchingSettings,
transportChannelProvider,
googlePubSubMessageTransformerCreator
targetResolver,
credentialsProvider,
executorProvider,
retrySettings,
batchingSettings,
transportChannelProvider,
googlePubSubMessageTransformerCreator
);
}

Expand All @@ -294,18 +294,18 @@ public EndpointAddressResolver interpolatingEndpointAddressResolver(UriInterpola
public FutureAsyncTimeout futureAsyncTimeoutFactory(InstrumentedExecutorServiceFactory executorFactory,
SenderAsyncTimeoutProperties senderAsyncTimeoutProperties) {
ScheduledExecutorService timeoutExecutorService = executorFactory.getScheduledExecutorService(
"async-timeout",
senderAsyncTimeoutProperties.getThreadPoolSize(),
senderAsyncTimeoutProperties.isThreadPoolMonitoringEnabled()
"async-timeout",
senderAsyncTimeoutProperties.getThreadPoolSize(),
senderAsyncTimeoutProperties.isThreadPoolMonitoringEnabled()
);
return new FutureAsyncTimeout(timeoutExecutorService);
}

private static void enrichWithMetrics(
HttpClient client, HermesTimer requestQueueWaitingTimer, HermesTimer requestProcessingTimer
HttpClient client, HermesTimer requestQueueWaitingTimer, HermesTimer requestProcessingTimer
) {
client.getRequestListeners().add(
new JettyHttpClientMetrics(requestQueueWaitingTimer, requestProcessingTimer)
client.getRequestListeners().addListener(
new JettyHttpClientMetrics(requestQueueWaitingTimer, requestProcessingTimer)
);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package pl.allegro.tech.hermes.consumers.consumer.sender.http;

import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.Request;
import pl.allegro.tech.hermes.metrics.HermesTimer;

public class JettyHttpClientMetrics implements Request.Listener {
Expand Down

0 comments on commit 4b38552

Please sign in to comment.