Skip to content

Commit

Permalink
Replace akka with pekko
Browse files Browse the repository at this point in the history
  • Loading branch information
hmiguim committed Jul 4, 2024
1 parent 5f1b878 commit 32e6d72
Show file tree
Hide file tree
Showing 18 changed files with 122 additions and 124 deletions.
46 changes: 23 additions & 23 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@
<jackson.version>2.17.1</jackson.version>
<spring.version>6.1.6</spring.version>
<solr.version>9.6.0</solr.version>
<akka.version>2.8.5</akka.version>
<pekko.version>1.0.2</pekko.version>
<httpcomponents.version>4.5.14</httpcomponents.version>
<commons_ip2.version>2.6.1</commons_ip2.version>
<metrics.version>3.2.6</metrics.version>
Expand Down Expand Up @@ -855,41 +855,41 @@
</dependency>
<!-- REST API related dependencies - end -->

<!-- Akka related dependencies - start -->
<!-- Pekko related dependencies - start -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.12</artifactId>
<version>${akka.version}</version>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-actor_2.13</artifactId>
<version>${pekko.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster_2.12</artifactId>
<version>${akka.version}</version>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-cluster_2.13</artifactId>
<version>${pekko.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-tools_2.12</artifactId>
<version>${akka.version}</version>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-cluster-tools_2.13</artifactId>
<version>${pekko.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_2.12</artifactId>
<version>${akka.version}</version>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-testkit_2.13</artifactId>
<version>${pekko.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-persistence_2.12</artifactId>
<version>${akka.version}</version>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-persistence_2.13</artifactId>
<version>${pekko.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.12</artifactId>
<version>${akka.version}</version>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-slf4j_2.13</artifactId>
<version>${pekko.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-distributed-data_2.12</artifactId>
<version>${akka.version}</version>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-distributed-data_2.13</artifactId>
<version>${pekko.version}</version>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
Expand Down
4 changes: 2 additions & 2 deletions roda-core/roda-core-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@
<artifactId>hamcrest-all</artifactId>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_2.12</artifactId>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-testkit_2.13</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
Expand Down
24 changes: 12 additions & 12 deletions roda-core/roda-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -198,28 +198,28 @@

<!-- Akka related dependencies - start -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.12</artifactId>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-actor_2.13</artifactId>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster_2.12</artifactId>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-cluster_2.13</artifactId>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-tools_2.12</artifactId>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-cluster-tools_2.13</artifactId>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-persistence_2.12</artifactId>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-persistence_2.13</artifactId>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.12</artifactId>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-slf4j_2.13</artifactId>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-distributed-data_2.12</artifactId>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-distributed-data_2.13</artifactId>
</dependency>
<dependency>
<groupId>com.github.jknack</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import com.codahale.metrics.MetricRegistry;

import akka.actor.UntypedAbstractActor;
import org.apache.pekko.actor.UntypedAbstractActor;

public abstract class AkkaBaseActor extends UntypedAbstractActor {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;

import org.apache.commons.io.IOUtils;
import org.roda.core.RodaCoreFactory;
Expand All @@ -31,7 +32,7 @@ public static Config getAkkaConfiguration(String configFilename) {

try (InputStream originStream = RodaCoreFactory
.getConfigurationFileAsStream(RodaConstants.CORE_ORCHESTRATOR_FOLDER + "/" + configFilename)) {
String configAsString = IOUtils.toString(originStream, RodaConstants.DEFAULT_ENCODING);
String configAsString = IOUtils.toString(originStream, StandardCharsets.UTF_8);
akkaConfig = ConfigFactory.parseString(configAsString);
} catch (IOException e) {
LOGGER.error("Could not load Akka configuration '{}'", configFilename, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import akka.actor.DeadLetter;
import akka.actor.UntypedAbstractActor;
import org.apache.pekko.actor.DeadLetter;
import org.apache.pekko.actor.UntypedAbstractActor;

public class DeadLetterActor extends UntypedAbstractActor {
private static final Logger LOGGER = LoggerFactory.getLogger(DeadLetterActor.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.Map;
import java.util.Optional;

import org.apache.pekko.actor.ActorRef;
import org.roda.core.data.v2.IsRODAObject;
import org.roda.core.data.v2.LiteOptionalWithCause;
import org.roda.core.data.v2.SerializableOptional;
Expand All @@ -29,8 +30,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import akka.actor.ActorRef;

public class Messages {
private static final Logger LOGGER = LoggerFactory.getLogger(Messages.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,20 @@
package org.roda.core.events.akka;

import java.io.IOException;
import java.io.Serial;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.StringUtils;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Address;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.Terminated;
import org.apache.pekko.cluster.Cluster;
import org.apache.pekko.dispatch.OnComplete;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
Expand All @@ -34,18 +42,12 @@

import com.typesafe.config.Config;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.cluster.Cluster;
import akka.dispatch.OnComplete;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;

public class AkkaEventsHandlerAndNotifier extends AbstractEventsHandler implements EventsNotifier {
@Serial
private static final long serialVersionUID = 919188071375009042L;
private static final Logger LOGGER = LoggerFactory.getLogger(AkkaEventsHandlerAndNotifier.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package org.roda.core.events.akka;

import java.io.Serial;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
Expand All @@ -15,36 +16,31 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.cluster.Cluster;
import org.apache.pekko.cluster.ddata.DistributedData;
import org.apache.pekko.cluster.ddata.GSet;
import org.apache.pekko.cluster.ddata.GSetKey;
import org.apache.pekko.cluster.ddata.Key;
import org.apache.pekko.cluster.ddata.ORMap;
import org.apache.pekko.cluster.ddata.Replicator;
import org.apache.pekko.cluster.ddata.Replicator.Changed;
import org.apache.pekko.cluster.ddata.Replicator.Update;
import org.roda.core.RodaCoreFactory;
import org.roda.core.common.akka.Messages.EventGroupCreated;
import org.roda.core.common.akka.Messages.EventGroupDeleted;
import org.roda.core.common.akka.Messages.EventGroupUpdated;
import org.roda.core.common.akka.Messages.EventUserCreated;
import org.roda.core.common.akka.Messages.EventUserDeleted;
import org.roda.core.common.akka.Messages.EventUserUpdated;
import org.roda.core.data.common.SecureString;
import org.roda.core.data.v2.user.Group;
import org.roda.core.data.v2.user.User;
import org.roda.core.events.EventsHandler;
import org.roda.core.data.common.SecureString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.cluster.Cluster;
import akka.cluster.ddata.DistributedData;
import akka.cluster.ddata.GSet;
import akka.cluster.ddata.GSetKey;
import akka.cluster.ddata.Key;
import akka.cluster.ddata.ORMap;
import akka.cluster.ddata.Replicator.Changed;
import akka.cluster.ddata.Replicator.Subscribe;
import akka.cluster.ddata.Replicator.Update;
import akka.cluster.ddata.Replicator.UpdateFailure;
import akka.cluster.ddata.Replicator.UpdateSuccess;
import akka.cluster.ddata.Replicator.WriteAll;
import akka.cluster.ddata.Replicator.WriteConsistency;
import akka.cluster.ddata.Replicator.WriteMajority;
import scala.Option;
import scala.concurrent.duration.Duration;

Expand All @@ -65,7 +61,7 @@ public class AkkaEventsHandlerAndNotifierActor extends AbstractActor {
private final Key<GSet<ObjectKey>> objectKeysKey = GSetKey.create("objectKeys");
private Set<ObjectKey> objectKeys = new HashSet<>();

private final WriteConsistency writeConsistency;
private final Replicator.WriteConsistency writeConsistency;

public AkkaEventsHandlerAndNotifierActor(final EventsHandler eventsHandler, final String writeConsistency,
final int writeConsistencyTimeoutInSeconds) {
Expand All @@ -74,19 +70,19 @@ public AkkaEventsHandlerAndNotifierActor(final EventsHandler eventsHandler, fina
this.writeConsistency = instantiateWriteConsistency(writeConsistency, writeConsistencyTimeoutInSeconds);
}

private WriteConsistency instantiateWriteConsistency(String writeConsistency,
private Replicator.WriteConsistency instantiateWriteConsistency(String writeConsistency,
final int writeConsistencyTimeoutInSeconds) {
if ("WriteAll".equalsIgnoreCase(writeConsistency)) {
return new WriteAll(Duration.create(writeConsistencyTimeoutInSeconds, TimeUnit.SECONDS));
return new Replicator.WriteAll(Duration.create(writeConsistencyTimeoutInSeconds, TimeUnit.SECONDS));
} else {
return new WriteMajority(Duration.create(writeConsistencyTimeoutInSeconds, TimeUnit.SECONDS));
return new Replicator.WriteMajority(Duration.create(writeConsistencyTimeoutInSeconds, TimeUnit.SECONDS));
}

}

@Override
public void preStart() {
Subscribe<GSet<ObjectKey>> subscribe = new Subscribe<>(objectKeysKey, getSelf());
Replicator.Subscribe<GSet<ObjectKey>> subscribe = new Replicator.Subscribe<>(objectKeysKey, getSelf());
replicator.tell(subscribe, ActorRef.noSender());
}

Expand All @@ -96,47 +92,48 @@ public Receive createReceive() {
.match(EventUserUpdated.class, e -> handleUserUpdated(e)).match(EventUserDeleted.class, e -> handleUserDeleted(e))
.match(EventGroupCreated.class, e -> handleGroupCreated(e))
.match(EventGroupUpdated.class, e -> handleGroupUpdated(e))
.match(EventGroupDeleted.class, e -> handleGroupDeleted(e)).match(Changed.class, c -> handleChanged(c))
.match(UpdateSuccess.class, e -> handleUpdateSuccess(e)).match(UpdateFailure.class, e -> handleUpdateFailure(e))
.match(EventGroupDeleted.class, e -> handleGroupDeleted(e)).match(Replicator.Changed.class, c -> handleChanged(c))
.match(Replicator.UpdateSuccess.class, e -> handleUpdateSuccess(e))
.match(Replicator.UpdateFailure.class, e -> handleUpdateFailure(e))
.matchAny(msg -> {
LOGGER.warn("Received unknown message '{}'", msg);
}).build();
}

private void handleUpdateSuccess(UpdateSuccess e) {
private void handleUpdateSuccess(Replicator.UpdateSuccess e) {
// LOGGER.info("handleUpdateSuccess '{}'", e);
// FIXME 20180925 hsilva: do nothing???
}

private void handleUpdateFailure(UpdateFailure e) {
private void handleUpdateFailure(Replicator.UpdateFailure e) {
// LOGGER.info("handleUpdateFailure '{}'", e);
// FIXME 20180925 hsilva: what to do???
}

private void handleChanged(Changed<?> e) {
private void handleChanged(Replicator.Changed<?> e) {
if (e.key().equals(objectKeysKey)) {
handleObjectKeysChanged((Changed<GSet<ObjectKey>>) e);
handleObjectKeysChanged((Replicator.Changed<GSet<ObjectKey>>) e);
} else if (e.key() instanceof ObjectKey) {
handleObjectChanged((Changed<ORMap<String, CRDTWrapper>>) e);
}
}

private void handleObjectKeysChanged(Changed<GSet<ObjectKey>> e) {
private void handleObjectKeysChanged(Replicator.Changed<GSet<ObjectKey>> e) {
Set<ObjectKey> newKeys = e.dataValue().getElements();
Set<ObjectKey> keysToSubscribe = new HashSet<>(newKeys);
keysToSubscribe.removeAll(objectKeys);
keysToSubscribe.forEach(keyToSubscribe -> {
// subscribe to get notifications of when objects with this name are
// added or removed
replicator.tell(new Subscribe<>(keyToSubscribe, self()), self());
replicator.tell(new Replicator.Subscribe<>(keyToSubscribe, self()), self());
});
objectKeys = newKeys;

// 20180925 hsilva: to improve GC
keysToSubscribe = null;
}

private void handleObjectChanged(Changed<ORMap<String, CRDTWrapper>> e) {
private void handleObjectChanged(Replicator.Changed<ORMap<String, CRDTWrapper>> e) {
String objectId = e.key().id().replaceFirst(CACHE_PREFIX, "");
Option<CRDTWrapper> option = e.dataValue().get(objectId);
if (option.isDefined()) {
Expand Down Expand Up @@ -248,6 +245,7 @@ private ObjectKey dataKey(String entryKey) {
}

public static class ObjectKey extends Key<ORMap<String, CRDTWrapper>> {
@Serial
private static final long serialVersionUID = 4859356839497209682L;

public ObjectKey(String eventKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;

import akka.serialization.SerializerWithStringManifest;
import org.apache.pekko.serialization.SerializerWithStringManifest;

public class CRDTSerializer extends SerializerWithStringManifest {

Expand Down
Loading

0 comments on commit 32e6d72

Please sign in to comment.