Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing CI #1926

Draft
wants to merge 40 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
1aa1c19
Fixing CI
faderskd Nov 19, 2024
5283c54
Trying to fix CI
faderskd Nov 19, 2024
e5b7204
Trying to fix CI
faderskd Nov 19, 2024
3f73a93
Fixing CI
faderskd Nov 19, 2024
91bb668
Trying to fix CI
faderskd Nov 19, 2024
a21593a
Trying to fix CI
faderskd Nov 20, 2024
f109085
Fixing CI
faderskd Nov 20, 2024
d1b70e3
Trying to fix CI
faderskd Nov 20, 2024
dae316c
Trying to fix CI
faderskd Nov 20, 2024
f0a4136
Fixing CI
faderskd Nov 20, 2024
2d43c28
Fixing CI
faderskd Nov 20, 2024
f83edcc
Trying to clear Hierarchical cache
faderskd Nov 20, 2024
ce44817
Fix CI
faderskd Nov 20, 2024
2bdf6fa
Trying to fix CI
faderskd Nov 20, 2024
2b38487
Trying to fix CI
faderskd Nov 20, 2024
4d0180a
Fixing CI
faderskd Nov 20, 2024
dccb057
Fixing CI
faderskd Nov 20, 2024
ad3fd5a
Fixing CI
faderskd Nov 20, 2024
872cbd4
Fixing CI
faderskd Nov 20, 2024
680129b
Renaming for easier code comprehension
faderskd Nov 21, 2024
55b782c
Zookeeper debug
faderskd Nov 21, 2024
9dcbd59
Java format
faderskd Nov 21, 2024
398c864
Fixing CI
faderskd Nov 21, 2024
46e7cfa
Thread logging
faderskd Nov 21, 2024
ed7b18c
Fixing CI
faderskd Nov 21, 2024
41bcb7e
Zookeeper restarting
faderskd Nov 22, 2024
ff378bf
Set timeout for management healthcheck
faderskd Nov 22, 2024
7c0b51a
More reiliability to staring zookeeper
faderskd Nov 22, 2024
f983490
More debug
faderskd Nov 22, 2024
cc0102d
More debug info
faderskd Nov 22, 2024
4e882dd
More debug
faderskd Nov 22, 2024
5097cd5
Stop restarting zookeeper
faderskd Nov 22, 2024
eea6df1
More logging
faderskd Nov 22, 2024
520793f
Restarting all containers
faderskd Nov 22, 2024
aa1a3f1
Upgrade curator
faderskd Nov 22, 2024
da7e24d
Add zookeeper container debug
faderskd Nov 25, 2024
e535e36
Zookeeper debug
faderskd Nov 25, 2024
4efaccd
Revert zookeeper debug
faderskd Nov 25, 2024
3c3f354
Clearing management data
faderskd Nov 25, 2024
92704a4
Debug Zookeerper
faderskd Nov 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -37,8 +37,8 @@ jobs:
tasks: [
# Add/remove task in Allure Report job also
{alias: "unitTests", name: "check"},
{alias: "integrationTests", name: "integrationTest"},
{alias: "slowIntegrationTests", name: "slowIntegrationTest"},
{alias: "integrationTests", name: "integrationTest -i"},
{alias: "slowIntegrationTests", name: "slowIntegrationTest -i"},
{alias: "benchmark", name: "jmh -Pjmh.iterations=1 -Pjmh.timeOnIteration=5s -Pjmh.warmupIterations=0"}
]
fail-fast: false
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -52,7 +52,7 @@ allprojects {
jackson : '2.17.0',
jersey : '3.1.6',
jetty : '12.0.8',
curator : '5.4.0',
curator : '5.7.1',
dropwizard_metrics: '4.2.25',
micrometer_metrics: '1.12.5',
wiremock : '3.9.0',
Original file line number Diff line number Diff line change
@@ -8,38 +8,42 @@
import org.apache.curator.framework.CuratorFramework;
import pl.allegro.tech.hermes.common.cache.queue.LinkedHashSetBlockingQueue;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.infrastructure.zookeeper.cache.ModelAwareZookeeperNotifyingCache;
import pl.allegro.tech.hermes.infrastructure.zookeeper.cache.PathDepthAwareZookeeperCallbackRegistrar;

public class ModelAwareZookeeperNotifyingCacheFactory {
public class ZookeeperCallbackRegistrarFactory {

private final CuratorFramework curator;

private final MetricsFacade metricsFacade;

private final ZookeeperParameters zookeeperParameters;

public ModelAwareZookeeperNotifyingCacheFactory(
private final String module;

public ZookeeperCallbackRegistrarFactory(
CuratorFramework curator,
MetricsFacade metricaFacade,
ZookeeperParameters zookeeperParameters) {
MetricsFacade metricsFacade,
ZookeeperParameters zookeeperParameters,
String module) {
this.curator = curator;
this.metricsFacade = metricaFacade;
this.metricsFacade = metricsFacade;
this.zookeeperParameters = zookeeperParameters;
this.module = module;
}

public ModelAwareZookeeperNotifyingCache provide() {
public PathDepthAwareZookeeperCallbackRegistrar provide() {
String rootPath = zookeeperParameters.getRoot();
ExecutorService executor =
createExecutor(rootPath, zookeeperParameters.getProcessingThreadPoolSize());
ModelAwareZookeeperNotifyingCache cache =
new ModelAwareZookeeperNotifyingCache(curator, executor, rootPath);
PathDepthAwareZookeeperCallbackRegistrar callbackRegistrar =
new PathDepthAwareZookeeperCallbackRegistrar(curator, executor, rootPath, module);
try {
cache.start();
callbackRegistrar.start();
} catch (Exception e) {
throw new IllegalStateException(
"Unable to start Zookeeper cache for root path " + rootPath, e);
"Unable to start Zookeeper callbackRegistrar for root path " + rootPath, e);
}
return cache;
return callbackRegistrar;
}

private ExecutorService createExecutor(String rootPath, int processingThreadPoolSize) {
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import pl.allegro.tech.hermes.domain.notifications.InternalNotificationsBus;
import pl.allegro.tech.hermes.domain.notifications.InternalCallbackRegistrar;
import pl.allegro.tech.hermes.schema.CachedCompiledSchemaRepository;
import pl.allegro.tech.hermes.schema.CachedSchemaVersionsRepository;
import pl.allegro.tech.hermes.schema.CompiledSchemaRepository;
@@ -15,13 +15,13 @@ public class SchemaVersionsRepositoryFactory {

private final RawSchemaClient rawSchemaClient;
private final SchemaVersionRepositoryParameters schemaVersionsRepositoryParameters;
private final InternalNotificationsBus notificationsBus;
private final InternalCallbackRegistrar notificationsBus;
private final CompiledSchemaRepository<?> compiledSchemaRepository;

public SchemaVersionsRepositoryFactory(
RawSchemaClient rawSchemaClient,
SchemaVersionRepositoryParameters schemaVersionsRepositoryParameters,
InternalNotificationsBus notificationsBus,
InternalCallbackRegistrar notificationsBus,
CompiledSchemaRepository<?> compiledSchemaRepository) {
this.rawSchemaClient = rawSchemaClient;
this.schemaVersionsRepositoryParameters = schemaVersionsRepositoryParameters;
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package pl.allegro.tech.hermes.domain.notifications;

/** All callbacks must be nonblocking. */
public interface InternalNotificationsBus {
public interface InternalCallbackRegistrar {

void registerSubscriptionCallback(SubscriptionCallback callback);

Original file line number Diff line number Diff line change
@@ -32,19 +32,23 @@ public class HierarchicalCache {

private HierarchicalCacheLevel rootCache;

private String module;

public HierarchicalCache(
CuratorFramework curatorFramework,
ExecutorService executorService,
String basePath,
int maxDepth,
List<String> levelPrefixes,
boolean removeNodesWithNoData) {
boolean removeNodesWithNoData,
String module) {
this.curatorFramework = curatorFramework;
this.executorService = executorService;
this.basePath = basePath;
this.removeNodesWithNoData = removeNodesWithNoData;
this.levelPrefixes.addAll(levelPrefixes);
this.maxDepth = maxDepth;
this.module = module;

for (int i = 0; i < maxDepth; ++i) {
levelCallbacks.add(new CacheListeners());
@@ -65,7 +69,7 @@ public void registerCallback(int depth, Consumer<PathChildrenCacheEvent> callbac
}

private HierarchicalCacheLevel createLevelCache(int depth, String path) {
BiFunction<Integer, String, HierarchicalCacheLevel> function =
BiFunction<Integer, String, HierarchicalCacheLevel> nextLevelFactory =
depth + 1 < maxDepth ? this::createLevelCache : null;
HierarchicalCacheLevel levelCache =
new HierarchicalCacheLevel(
@@ -74,8 +78,9 @@ private HierarchicalCacheLevel createLevelCache(int depth, String path) {
path(depth, path),
depth,
levelCallbacks.get(depth),
Optional.ofNullable(function),
removeNodesWithNoData);
Optional.ofNullable(nextLevelFactory),
removeNodesWithNoData,
module);
try {
logger.debug("Starting hierarchical cache level for path {} and depth {}", path, depth);
levelCache.start();
Original file line number Diff line number Diff line change
@@ -35,6 +35,7 @@ class HierarchicalCacheLevel extends PathChildrenCache implements PathChildrenCa
private final boolean removeNodesWithNoData;

private final Map<String, HierarchicalCacheLevel> subcacheMap = new HashMap<>();
private final String module;

HierarchicalCacheLevel(
CuratorFramework curatorClient,
@@ -43,14 +44,16 @@ class HierarchicalCacheLevel extends PathChildrenCache implements PathChildrenCa
int depth,
CacheListeners eventConsumer,
Optional<BiFunction<Integer, String, HierarchicalCacheLevel>> nextLevelFactory,
boolean removeNodesWithNoData) {
boolean removeNodesWithNoData,
String module) {
super(curatorClient, path, true, false, executorService);
this.curatorClient = curatorClient;
this.currentDepth = depth;
this.consumer = eventConsumer;
this.nextLevelFactory = nextLevelFactory;
this.removeNodesWithNoData = removeNodesWithNoData;
getListenable().addListener(this);
this.module = module;
}

@Override
@@ -61,7 +64,7 @@ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) th

String path = event.getData().getPath();
String cacheName = cacheNameFromPath(path);
logger.debug("Got {} event for path {}", event.getType(), path);
logger.debug("{}: Got {} event for path {}", module, event.getType(), path);

switch (event.getType()) {
case CHILD_ADDED:
Original file line number Diff line number Diff line change
@@ -10,10 +10,10 @@
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths;

public class ModelAwareZookeeperNotifyingCache {
public class PathDepthAwareZookeeperCallbackRegistrar {

private static final Logger logger =
LoggerFactory.getLogger(ModelAwareZookeeperNotifyingCache.class);
LoggerFactory.getLogger(PathDepthAwareZookeeperCallbackRegistrar.class);

private static final int GROUP_LEVEL = 0;

@@ -24,15 +24,15 @@ public class ModelAwareZookeeperNotifyingCache {
private final HierarchicalCache cache;
private final ExecutorService executor;

public ModelAwareZookeeperNotifyingCache(
CuratorFramework curator, ExecutorService executor, String rootPath) {
public PathDepthAwareZookeeperCallbackRegistrar(
CuratorFramework curator, ExecutorService executor, String rootPath, String module) {
List<String> levelPrefixes =
Arrays.asList(
ZookeeperPaths.GROUPS_PATH,
ZookeeperPaths.TOPICS_PATH,
ZookeeperPaths.SUBSCRIPTIONS_PATH);
this.executor = executor;
this.cache = new HierarchicalCache(curator, executor, rootPath, 3, levelPrefixes, true);
this.cache = new HierarchicalCache(curator, executor, rootPath, 3, levelPrefixes, true, module);
}

public void start() throws Exception {
Original file line number Diff line number Diff line change
@@ -13,29 +13,29 @@
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.domain.notifications.AdminCallback;
import pl.allegro.tech.hermes.domain.notifications.InternalNotificationsBus;
import pl.allegro.tech.hermes.domain.notifications.InternalCallbackRegistrar;
import pl.allegro.tech.hermes.domain.notifications.SubscriptionCallback;
import pl.allegro.tech.hermes.domain.notifications.TopicCallback;
import pl.allegro.tech.hermes.infrastructure.zookeeper.cache.ModelAwareZookeeperNotifyingCache;
import pl.allegro.tech.hermes.infrastructure.zookeeper.cache.PathDepthAwareZookeeperCallbackRegistrar;

public class ZookeeperInternalNotificationBus implements InternalNotificationsBus {
public class ZookeeperCallbackRegistrar implements InternalCallbackRegistrar {

private static final Logger logger =
LoggerFactory.getLogger(ZookeeperInternalNotificationBus.class);
private static final Logger logger = LoggerFactory.getLogger(ZookeeperCallbackRegistrar.class);

private final ObjectMapper objectMapper;

private final ModelAwareZookeeperNotifyingCache modelNotifyingCache;
private final PathDepthAwareZookeeperCallbackRegistrar pathDepthAwareCallbackRegistrar;

public ZookeeperInternalNotificationBus(
ObjectMapper objectMapper, ModelAwareZookeeperNotifyingCache modelNotifyingCache) {
public ZookeeperCallbackRegistrar(
ObjectMapper objectMapper,
PathDepthAwareZookeeperCallbackRegistrar pathDepthAwareCallbackRegistrar) {
this.objectMapper = objectMapper;
this.modelNotifyingCache = modelNotifyingCache;
this.pathDepthAwareCallbackRegistrar = pathDepthAwareCallbackRegistrar;
}

@Override
public void registerSubscriptionCallback(SubscriptionCallback callback) {
modelNotifyingCache.registerSubscriptionCallback(
pathDepthAwareCallbackRegistrar.registerSubscriptionCallback(
(e) -> {
switch (e.getType()) {
case CHILD_ADDED:
@@ -58,7 +58,7 @@ public void registerSubscriptionCallback(SubscriptionCallback callback) {

@Override
public void registerTopicCallback(TopicCallback callback) {
modelNotifyingCache.registerTopicCallback(
pathDepthAwareCallbackRegistrar.registerTopicCallback(
(e) -> {
switch (e.getType()) {
case CHILD_ADDED:
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@ class HierarchicalCacheTest extends IntegrationTest {
'/hierarchicalCacheTest',
3,
['groups', 'topics', 'subscriptions'],
true
true, "test"
)

private Set calledCallbacks = [] as Set
@@ -127,7 +127,7 @@ class HierarchicalCacheTest extends IntegrationTest {
'/hierarchicalCacheTest/workload',
2,
[],
removeEmptyNodes)
removeEmptyNodes, "test")

cache.registerCallback(0, loggingCallback)
cache.registerCallback(1, loggingCallback)
Original file line number Diff line number Diff line change
@@ -24,9 +24,9 @@
import pl.allegro.tech.hermes.common.di.factories.CuratorClientFactory;
import pl.allegro.tech.hermes.common.di.factories.HermesCuratorClientFactory;
import pl.allegro.tech.hermes.common.di.factories.MicrometerRegistryParameters;
import pl.allegro.tech.hermes.common.di.factories.ModelAwareZookeeperNotifyingCacheFactory;
import pl.allegro.tech.hermes.common.di.factories.ObjectMapperFactory;
import pl.allegro.tech.hermes.common.di.factories.PrometheusMeterRegistryFactory;
import pl.allegro.tech.hermes.common.di.factories.ZookeeperCallbackRegistrarFactory;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.NamespaceKafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.offset.SubscriptionOffsetChangeIndicator;
@@ -52,7 +52,7 @@
import pl.allegro.tech.hermes.domain.filtering.header.HeaderSubscriptionMessageFilterCompiler;
import pl.allegro.tech.hermes.domain.filtering.json.JsonPathSubscriptionMessageFilterCompiler;
import pl.allegro.tech.hermes.domain.group.GroupRepository;
import pl.allegro.tech.hermes.domain.notifications.InternalNotificationsBus;
import pl.allegro.tech.hermes.domain.notifications.InternalCallbackRegistrar;
import pl.allegro.tech.hermes.domain.oauth.OAuthProviderRepository;
import pl.allegro.tech.hermes.domain.subscription.SubscriptionRepository;
import pl.allegro.tech.hermes.domain.topic.TopicRepository;
@@ -68,9 +68,9 @@
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperSubscriptionRepository;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperTopicRepository;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperWorkloadConstraintsRepository;
import pl.allegro.tech.hermes.infrastructure.zookeeper.cache.ModelAwareZookeeperNotifyingCache;
import pl.allegro.tech.hermes.infrastructure.zookeeper.cache.PathDepthAwareZookeeperCallbackRegistrar;
import pl.allegro.tech.hermes.infrastructure.zookeeper.counter.SharedCounter;
import pl.allegro.tech.hermes.infrastructure.zookeeper.notifications.ZookeeperInternalNotificationBus;
import pl.allegro.tech.hermes.infrastructure.zookeeper.notifications.ZookeeperCallbackRegistrar;
import pl.allegro.tech.hermes.metrics.PathsCompiler;
import pl.allegro.tech.hermes.schema.SchemaRepository;

@@ -145,20 +145,21 @@ public CuratorClientFactory curatorClientFactory(
}

@Bean
public InternalNotificationsBus zookeeperInternalNotificationBus(
ObjectMapper objectMapper, ModelAwareZookeeperNotifyingCache modelNotifyingCache) {
return new ZookeeperInternalNotificationBus(objectMapper, modelNotifyingCache);
public InternalCallbackRegistrar zookeeperInternalNotificationBus(
ObjectMapper objectMapper, PathDepthAwareZookeeperCallbackRegistrar modelNotifyingCache) {
return new ZookeeperCallbackRegistrar(objectMapper, modelNotifyingCache);
}

@Bean(destroyMethod = "stop")
public ModelAwareZookeeperNotifyingCache modelAwareZookeeperNotifyingCache(
public PathDepthAwareZookeeperCallbackRegistrar modelAwareZookeeperNotifyingCache(
CuratorFramework curator,
MetricsFacade metricsFacade,
ZookeeperClustersProperties zookeeperClustersProperties,
DatacenterNameProvider datacenterNameProvider) {
ZookeeperProperties zookeeperProperties =
zookeeperClustersProperties.toZookeeperProperties(datacenterNameProvider);
return new ModelAwareZookeeperNotifyingCacheFactory(curator, metricsFacade, zookeeperProperties)
return new ZookeeperCallbackRegistrarFactory(
curator, metricsFacade, zookeeperProperties, "consumers")
.provide();
}

Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@
import pl.allegro.tech.hermes.common.schema.RawSchemaClientFactory;
import pl.allegro.tech.hermes.common.schema.SchemaRepositoryFactory;
import pl.allegro.tech.hermes.common.schema.SchemaVersionsRepositoryFactory;
import pl.allegro.tech.hermes.domain.notifications.InternalNotificationsBus;
import pl.allegro.tech.hermes.domain.notifications.InternalCallbackRegistrar;
import pl.allegro.tech.hermes.schema.CompiledSchemaRepository;
import pl.allegro.tech.hermes.schema.RawSchemaClient;
import pl.allegro.tech.hermes.schema.SchemaRepository;
@@ -91,7 +91,7 @@ public Client schemaRepositoryClient(ObjectMapper mapper, SchemaProperties schem
public SchemaVersionsRepository schemaVersionsRepositoryFactory(
RawSchemaClient rawSchemaClient,
SchemaProperties schemaProperties,
InternalNotificationsBus notificationsBus,
InternalCallbackRegistrar notificationsBus,
CompiledSchemaRepository<?> compiledSchemaRepository) {
return new SchemaVersionsRepositoryFactory(
rawSchemaClient,
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@
import pl.allegro.tech.hermes.consumers.subscription.id.SubscriptionIds;
import pl.allegro.tech.hermes.consumers.subscription.id.ZookeeperSubscriptionIdProvider;
import pl.allegro.tech.hermes.domain.group.GroupRepository;
import pl.allegro.tech.hermes.domain.notifications.InternalNotificationsBus;
import pl.allegro.tech.hermes.domain.notifications.InternalCallbackRegistrar;
import pl.allegro.tech.hermes.domain.subscription.SubscriptionRepository;
import pl.allegro.tech.hermes.domain.topic.TopicRepository;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths;
@@ -29,13 +29,13 @@ public SubscriptionIdProvider subscriptionIdProvider(

@Bean
public SubscriptionIds subscriptionIds(
InternalNotificationsBus internalNotificationsBus,
InternalCallbackRegistrar internalCallbackRegistrar,
SubscriptionsCache subscriptionsCache,
SubscriptionIdProvider subscriptionIdProvider,
CommonConsumerProperties commonConsumerProperties) {
NotificationAwareSubscriptionIdsCache cache =
new NotificationAwareSubscriptionIdsCache(
internalNotificationsBus,
internalCallbackRegistrar,
subscriptionsCache,
subscriptionIdProvider,
commonConsumerProperties.getSubscriptionIdsCacheRemovedExpireAfterAccess().toSeconds(),
@@ -46,7 +46,7 @@ public SubscriptionIds subscriptionIds(

@Bean
public SubscriptionsCache subscriptionsCache(
InternalNotificationsBus notificationsBus,
InternalCallbackRegistrar notificationsBus,
GroupRepository groupRepository,
TopicRepository topicRepository,
SubscriptionRepository subscriptionRepository) {
Loading