Skip to content

Commit

Permalink
reformat code
Browse files Browse the repository at this point in the history
  • Loading branch information
eliquinox committed Nov 3, 2024
1 parent 599bad8 commit f8cf0cd
Show file tree
Hide file tree
Showing 71 changed files with 272 additions and 185 deletions.
20 changes: 8 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,17 @@ subscriptions and publications. Use `@Aeronic` to make the compiler generate sub
```java

@Aeronic
public interface TradeEvents {
public interface TradeEvents
{
void onTrade(long price);
}
```

A subscriber, containing business logic can then be defined by implementing the interface:

```java
public class TradeEventsImpl implements TradeEvents {
public class TradeEventsImpl implements TradeEvents
{

private long lastPrice;

Expand All @@ -57,18 +59,12 @@ The two will communicate via a given Aeron channel / stream ID:

```java
final Aeron aeron = Aeron.connect(aeronCtx);
final AeronicWizard aeronic = new AeronicWizard(aeron);
final AeronicImpl aeronic = new AeronicImpl(aeron);

final TradeEvents eventsPublisher = aeronic.createPublisher(TradeEvents.class, "aeron:ipc", 10);
final TradeEventsImpl subscriberImpl = new TradeEventsImpl();
aeronic.
aeronic.registerSubscriber(TradeEvents.class, subscriberImpl, "aeron:ipc", 10);

registerSubscriber(TradeEvents .class, subscriberImpl, "aeron:ipc",10);

publisher.

onTrade(123L);
subscriberImpl.

getLastPrice(); // 123L
publisher.onTrade(123L);
subscriberImpl.getLastPrice(); // 123L
```
3 changes: 2 additions & 1 deletion aeronic-processor/src/main/java/io/aeronic/Aeronic.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.aeronic;

public interface Aeronic {
public interface Aeronic
{
<T> T createPublisher(Class<T> clazz, String channel, int streamId);

<T> void registerSubscriber(Class<T> clazz, T subscriberImplementation, String channel, int streamId);
Expand Down
167 changes: 84 additions & 83 deletions aeronic-processor/src/main/java/io/aeronic/AeronicImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
import static io.aeron.Aeron.NULL_VALUE;
import static org.awaitility.Awaitility.await;

public class AeronicImpl implements AutoCloseable, Aeronic {
public class AeronicImpl implements AutoCloseable, Aeronic
{
private final Aeron aeron;
private final AeronArchive aeronArchive;
private final IdleStrategy idleStrategy;
Expand Down Expand Up @@ -59,49 +60,48 @@ public static AeronicImpl launch(final Context ctx)
return aeronic;
}

public static class Context {
private Aeron aeron;
private AeronArchive aeronArchive;
private IdleStrategy idleStrategy = NoOpIdleStrategy.INSTANCE;
private ErrorHandler errorHandler = Throwable::printStackTrace;
private AtomicCounter atomicCounter;
private LongConsumer offerFailureHandler = f -> {
};

public Context aeron(final Aeron aeron)
{
this.aeron = aeron;
return this;
}

public Context aeronArchive(final AeronArchive aeronArchive)
{
this.aeronArchive = aeronArchive;
return this;
}
public static <T> ClientSessionPublication<T> createClusterEgressPublisher(final Class<T> clazz)
{
final String publisherName = clazz.getName() + "__EgressPublisher";
final ClientSessionPublication<T> publication = new ClientSessionPublication<>(publisherName);
final T publisher = createPublisher(clazz, publication);
publication.bindPublisher(publisher);
return publication;
}

public Context idleStrategy(final IdleStrategy idleStrategy)
{
this.idleStrategy = idleStrategy;
return this;
@SuppressWarnings("unchecked")
public static <T> T createPublisher(final Class<T> clazz, final AeronicPublication publication)
{
try {
return (T) Class.forName(clazz.getName() + "Publisher").getConstructor(AeronicPublication.class).newInstance(publication);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

public Context errorHandler(final ErrorHandler errorHandler)
{
this.errorHandler = errorHandler;
return this;
@SuppressWarnings("unchecked")
public static <T> AbstractSubscriberInvoker<T> createSubscriberInvoker(final Class<T> clazz, final T subscriberImplementation)
{
try {
return (AbstractSubscriberInvoker<T>) Class.forName(clazz.getName() + "Invoker")
.getConstructor(clazz)
.newInstance(subscriberImplementation);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

public Context atomicCounter(final AtomicCounter atomicCounter)
{
this.atomicCounter = atomicCounter;
return this;
}
@SuppressWarnings("unchecked")
public static <T> AbstractSubscriberInvoker<T> createSubscriberInvoker(final Class<T> clazz)
{
// allow for late binding of subscriber impl
try {
return (AbstractSubscriberInvoker<T>) Class.forName(clazz.getName() + "Invoker")
.getConstructor()
.newInstance();

public Context offerFailureHandler(final LongConsumer offerFailureHandler)
{
this.offerFailureHandler = offerFailureHandler;
return this;
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

Expand Down Expand Up @@ -155,25 +155,6 @@ public <T> T createClusterIngressPublisher(final Class<T> clazz, final String in
);
}

public static <T> ClientSessionPublication<T> createClusterEgressPublisher(final Class<T> clazz)
{
final String publisherName = clazz.getName() + "__EgressPublisher";
final ClientSessionPublication<T> publication = new ClientSessionPublication<>(publisherName);
final T publisher = createPublisher(clazz, publication);
publication.bindPublisher(publisher);
return publication;
}

@SuppressWarnings("unchecked")
public static <T> T createPublisher(final Class<T> clazz, final AeronicPublication publication)
{
try {
return (T) Class.forName(clazz.getName() + "Publisher").getConstructor(AeronicPublication.class).newInstance(publication);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

@Override
public <T> void registerSubscriber(final Class<T> clazz, final T subscriberImplementation, final String channel, final int streamId)
{
Expand Down Expand Up @@ -277,32 +258,6 @@ private long fetchRecordingId(final String liveChannelAlias)
return recordingIdRef.get();
}

@SuppressWarnings("unchecked")
public static <T> AbstractSubscriberInvoker<T> createSubscriberInvoker(final Class<T> clazz, final T subscriberImplementation)
{
try {
return (AbstractSubscriberInvoker<T>) Class.forName(clazz.getName() + "Invoker")
.getConstructor(clazz)
.newInstance(subscriberImplementation);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

@SuppressWarnings("unchecked")
public static <T> AbstractSubscriberInvoker<T> createSubscriberInvoker(final Class<T> clazz)
{
// allow for late binding of subscriber impl
try {
return (AbstractSubscriberInvoker<T>) Class.forName(clazz.getName() + "Invoker")
.getConstructor()
.newInstance();

} catch (final Exception e) {
throw new RuntimeException(e);
}
}

private void start()
{
agentRunner = new AgentRunner(
Expand Down Expand Up @@ -340,4 +295,50 @@ public void awaitUntilPubsAndSubsConnect()
return allConnected;
});
}

public static class Context
{
private Aeron aeron;
private AeronArchive aeronArchive;
private IdleStrategy idleStrategy = NoOpIdleStrategy.INSTANCE;
private ErrorHandler errorHandler = Throwable::printStackTrace;
private AtomicCounter atomicCounter;
private LongConsumer offerFailureHandler = f -> {};

public Context aeron(final Aeron aeron)
{
this.aeron = aeron;
return this;
}

public Context aeronArchive(final AeronArchive aeronArchive)
{
this.aeronArchive = aeronArchive;
return this;
}

public Context idleStrategy(final IdleStrategy idleStrategy)
{
this.idleStrategy = idleStrategy;
return this;
}

public Context errorHandler(final ErrorHandler errorHandler)
{
this.errorHandler = errorHandler;
return this;
}

public Context atomicCounter(final AtomicCounter atomicCounter)
{
this.atomicCounter = atomicCounter;
return this;
}

public Context offerFailureHandler(final LongConsumer offerFailureHandler)
{
this.offerFailureHandler = offerFailureHandler;
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import io.aeronic.net.AeronicPublication;
import org.agrona.DirectBuffer;

public class AeronClusterPublication implements AeronicPublication {
public class AeronClusterPublication implements AeronicPublication
{

private final AeronCluster aeronCluster;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

import org.agrona.concurrent.Agent;

public class AeronClusterPublicationAgent implements Agent {
public class AeronClusterPublicationAgent implements Agent
{
private final AeronClusterPublication publication;
private final String publisherName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class AeronicClusteredServiceContainer implements ClusteredService {
public class AeronicClusteredServiceContainer implements ClusteredService
{
private final ClusteredService clusteredService;
private final AeronicClusteredServiceRegistry registry;
private final AtomicReference<Cluster> clusterRef;
Expand Down Expand Up @@ -135,7 +136,8 @@ public Cluster.Role getRole()
return role;
}

public static class Configuration {
public static class Configuration
{
private ClusteredService clusteredService;
private final AeronicClusteredServiceRegistry registry = new AeronicClusteredServiceRegistry();
private final AtomicReference<Cluster> clusterRef = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
import java.util.Map;
import java.util.function.Supplier;

public class AeronicClusteredServiceRegistry {
public class AeronicClusteredServiceRegistry
{
private final Map<Long, AbstractSubscriberInvoker<?>> invokersBySessionId = new Long2ObjectHashMap<>();
private final Map<String, AbstractSubscriberInvoker<?>> invokerByName = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import io.aeron.security.CredentialsSupplier;
import org.agrona.collections.ArrayUtil;

public class AeronicCredentialsSupplier implements CredentialsSupplier {
public class AeronicCredentialsSupplier implements CredentialsSupplier
{
private final String name;

public AeronicCredentialsSupplier(final String name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
import java.util.IdentityHashMap;
import java.util.Set;

public class ClientSessionPublication<T> implements AeronicPublication {
public class ClientSessionPublication<T> implements AeronicPublication
{
private final String publisherName;
private final Set<ClientSession> clientSessions = Collections.newSetFromMap(new IdentityHashMap<>());
private T publisher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
import java.util.function.IntFunction;
import java.util.function.Supplier;

public class BufferDecoder {
public class BufferDecoder
{
private DirectBuffer buffer;
private int currentOffset = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
import java.util.Collection;
import java.util.List;

public class BufferEncoder {
public class BufferEncoder
{
private final MutableDirectBuffer buffer;
private int currentOffset = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DecodedBy {
public @interface DecodedBy
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


@FunctionalInterface
public interface Decoder<T> {
public interface Decoder<T>
{
T decode(BufferDecoder bufferDecoder);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


@FunctionalInterface
public interface Encodable {
public interface Encodable
{
void encode(BufferEncoder bufferEncoder);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


@FunctionalInterface
public interface Encoder<T> {
public interface Encoder<T>
{
void encode(BufferEncoder bufferEncoder, T toEncode);
}
3 changes: 2 additions & 1 deletion aeronic-processor/src/main/java/io/aeronic/gen/Aeronic.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.SOURCE)
public @interface Aeronic {
public @interface Aeronic
{
}
Loading

0 comments on commit f8cf0cd

Please sign in to comment.