Skip to content

Commit

Permalink
reformat code
Browse files Browse the repository at this point in the history
  • Loading branch information
eliquinox committed Nov 3, 2024
1 parent 62d4949 commit 599bad8
Show file tree
Hide file tree
Showing 72 changed files with 1,112 additions and 1,260 deletions.
20 changes: 10 additions & 10 deletions .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ jobs:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Set up JDK 17
uses: actions/setup-java@v2
with:
java-version: '17'
distribution: 'zulu'
- name: Build with Gradle
uses: gradle/gradle-build-action@937999e9cc2425eddc7fd62d1053baf041147db7
with:
arguments: build
- uses: actions/checkout@v2
- name: Set up JDK 17
uses: actions/setup-java@v2
with:
java-version: '17'
distribution: 'zulu'
- name: Build with Gradle
uses: gradle/gradle-build-action@937999e9cc2425eddc7fd62d1053baf041147db7
with:
arguments: build
26 changes: 16 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[![Java CI with Gradle](https://github.com/eliquinox/aeronic/actions/workflows/gradle.yml/badge.svg)](https://github.com/eliquinox/aeronic/actions/workflows/gradle.yml)
[![License](https://img.shields.io/badge/License-Apache_2.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)

# aeronic

Usage:
Expand All @@ -20,22 +21,21 @@ dependencies {

## Quickstart

Aeronic allows for flexible usage of [Aeron](https://github.com/real-logic/aeron) by way of proxy generation for
Aeronic allows for flexible usage of [Aeron](https://github.com/real-logic/aeron) by way of proxy generation for
subscriptions and publications. Use `@Aeronic` to make the compiler generate subscriber and publisher proxies:

```java

@Aeronic
public interface TradeEvents
{
public interface TradeEvents {
void onTrade(long price);
}
```

A subscriber, containing business logic can then be defined by implementing the interface:

```java
public class TradeEventsImpl implements TradeEvents
{
public class TradeEventsImpl implements TradeEvents {

private long lastPrice;

Expand All @@ -44,15 +44,15 @@ public class TradeEventsImpl implements TradeEvents
{
this.lastPrice = price;
}

public long getLastPrice()
{
return lastPrice;
}
}
```

`AeronicWizard` can then be used to create a publisher of type `TradeEvents` and bind a subscriber implemented above.
`AeronicWizard` can then be used to create a publisher of type `TradeEvents` and bind a subscriber implemented above.
The two will communicate via a given Aeron channel / stream ID:

```java
Expand All @@ -61,8 +61,14 @@ final AeronicWizard aeronic = new AeronicWizard(aeron);

final TradeEvents eventsPublisher = aeronic.createPublisher(TradeEvents.class, "aeron:ipc", 10);
final TradeEventsImpl subscriberImpl = new TradeEventsImpl();
aeronic.registerSubscriber(TradeEvents.class, subscriberImpl, "aeron:ipc", 10);
aeronic.

registerSubscriber(TradeEvents .class, subscriberImpl, "aeron:ipc",10);

publisher.

onTrade(123L);
subscriberImpl.

publisher.onTrade(123L);
subscriberImpl.getLastPrice(); // 123L
getLastPrice(); // 123L
```
85 changes: 55 additions & 30 deletions aeronic-processor/src/main/java/io/aeronic/AeronicImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.locks.LockSupport;
import java.util.function.LongConsumer;
Expand All @@ -43,7 +42,8 @@ public class AeronicImpl implements AutoCloseable, Aeronic {
private final LongConsumer offerFailureHandler;
private AgentRunner agentRunner;

private AeronicImpl(final AeronicImpl.Context ctx) {
private AeronicImpl(final AeronicImpl.Context ctx)
{
this.aeron = ctx.aeron;
this.aeronArchive = ctx.aeronArchive;
this.idleStrategy = ctx.idleStrategy;
Expand All @@ -52,7 +52,8 @@ private AeronicImpl(final AeronicImpl.Context ctx) {
this.offerFailureHandler = ctx.offerFailureHandler;
}

public static AeronicImpl launch(final Context ctx) {
public static AeronicImpl launch(final Context ctx)
{
final AeronicImpl aeronic = new AeronicImpl(ctx);
aeronic.start();
return aeronic;
Expand All @@ -67,46 +68,54 @@ public static class Context {
private LongConsumer offerFailureHandler = f -> {
};

public Context aeron(final Aeron aeron) {
public Context aeron(final Aeron aeron)
{
this.aeron = aeron;
return this;
}

public Context aeronArchive(final AeronArchive aeronArchive) {
public Context aeronArchive(final AeronArchive aeronArchive)
{
this.aeronArchive = aeronArchive;
return this;
}

public Context idleStrategy(final IdleStrategy idleStrategy) {
public Context idleStrategy(final IdleStrategy idleStrategy)
{
this.idleStrategy = idleStrategy;
return this;
}

public Context errorHandler(final ErrorHandler errorHandler) {
public Context errorHandler(final ErrorHandler errorHandler)
{
this.errorHandler = errorHandler;
return this;
}

public Context atomicCounter(final AtomicCounter atomicCounter) {
public Context atomicCounter(final AtomicCounter atomicCounter)
{
this.atomicCounter = atomicCounter;
return this;
}

public Context offerFailureHandler(final LongConsumer offerFailureHandler) {
public Context offerFailureHandler(final LongConsumer offerFailureHandler)
{
this.offerFailureHandler = offerFailureHandler;
return this;
}
}

@Override
public <T> T createPublisher(final Class<T> clazz, final String channel, final int streamId) {
public <T> T createPublisher(final Class<T> clazz, final String channel, final int streamId)
{
final Publication rawPublication = aeron.addPublication(channel, streamId);
final AeronicPublication publication = new SimplePublication(rawPublication, offerFailureHandler);
publications.add(publication);
return createPublisher(clazz, publication);
}

public <T> T createRecordedPublisher(final Class<T> clazz, final String channel, final int streamId) {
public <T> T createRecordedPublisher(final Class<T> clazz, final String channel, final int streamId)
{
final Publication rawPublication = aeron.addPublication(channel, streamId);
final AeronicPublication publication = new SimplePublication(rawPublication, offerFailureHandler);
publications.add(publication);
Expand All @@ -118,13 +127,15 @@ public <T> T createRecordedPublisher(final Class<T> clazz, final String channel,
return createPublisher(clazz, publication);
}

public void awaitRecordingCounterId(final CountersReader counters, final int sessionId) {
public void awaitRecordingCounterId(final CountersReader counters, final int sessionId)
{
while (NULL_VALUE == RecordingPos.findCounterIdBySession(counters, sessionId)) {
LockSupport.parkNanos(1000);
}
}

public <T> T createClusterIngressPublisher(final Class<T> clazz, final AeronCluster.Context aeronClusterCtx) {
public <T> T createClusterIngressPublisher(final Class<T> clazz, final AeronCluster.Context aeronClusterCtx)
{
final String publisherName = clazz.getName() + "__IngressPublisher";
final AeronClusterPublication publication = new AeronClusterPublication(publisherName, aeronClusterCtx);

Expand All @@ -133,17 +144,19 @@ public <T> T createClusterIngressPublisher(final Class<T> clazz, final AeronClus
return createPublisher(clazz, publication);
}

public <T> T createClusterIngressPublisher(final Class<T> clazz, final String ingressChannel) {
public <T> T createClusterIngressPublisher(final Class<T> clazz, final String ingressChannel)
{
return createClusterIngressPublisher(
clazz,
new AeronCluster.Context()
.errorHandler(Throwable::printStackTrace)
.ingressChannel(ingressChannel)
.aeronDirectoryName(aeron.context().aeronDirectoryName())
);
);
}

public static <T> ClientSessionPublication<T> createClusterEgressPublisher(final Class<T> clazz) {
public static <T> ClientSessionPublication<T> createClusterEgressPublisher(final Class<T> clazz)
{
final String publisherName = clazz.getName() + "__EgressPublisher";
final ClientSessionPublication<T> publication = new ClientSessionPublication<>(publisherName);
final T publisher = createPublisher(clazz, publication);
Expand All @@ -152,7 +165,8 @@ public static <T> ClientSessionPublication<T> createClusterEgressPublisher(final
}

@SuppressWarnings("unchecked")
public static <T> T createPublisher(final Class<T> clazz, final AeronicPublication publication) {
public static <T> T createPublisher(final Class<T> clazz, final AeronicPublication publication)
{
try {
return (T) Class.forName(clazz.getName() + "Publisher").getConstructor(AeronicPublication.class).newInstance(publication);
} catch (final Exception e) {
Expand All @@ -161,15 +175,17 @@ public static <T> T createPublisher(final Class<T> clazz, final AeronicPublicati
}

@Override
public <T> void registerSubscriber(final Class<T> clazz, final T subscriberImplementation, final String channel, final int streamId) {
public <T> void registerSubscriber(final Class<T> clazz, final T subscriberImplementation, final String channel, final int streamId)
{
final Subscription subscription = aeron.addSubscription(channel, streamId);
subscriptions.add(subscription);

final AbstractSubscriberInvoker<T> invoker = createSubscriberInvoker(clazz, subscriberImplementation);
dynamicCompositeAgent.tryAdd(new SubscriptionAgent<>(subscription, invoker));
}

public <T> void registerClusterEgressSubscriber(final Class<T> clazz, final T subscriberImplementation, final String ingressChannel) {
public <T> void registerClusterEgressSubscriber(final Class<T> clazz, final T subscriberImplementation, final String ingressChannel)
{
final String subscriberName = clazz.getName() + "__EgressSubscriber";
final AbstractSubscriberInvoker<T> invoker = createSubscriberInvoker(clazz, subscriberImplementation);

Expand All @@ -184,14 +200,15 @@ public <T> void registerClusterEgressSubscriber(final Class<T> clazz, final T su
dynamicCompositeAgent.tryAdd(new AeronClusterAgent(aeronCluster, subscriberName));
}

public <T> void registerClusterEgressSubscriber(final Class<T> clazz, final T subscriberImplementation, final AeronCluster.Context aeronClusterCtx) {
public <T> void registerClusterEgressSubscriber(final Class<T> clazz, final T subscriberImplementation, final AeronCluster.Context aeronClusterCtx)
{
final String subscriberName = clazz.getName() + "__EgressSubscriber";
final AbstractSubscriberInvoker<T> invoker = createSubscriberInvoker(clazz, subscriberImplementation);
final AeronCluster aeronCluster = AeronCluster.connect(
aeronClusterCtx
.credentialsSupplier(new AeronicCredentialsSupplier(subscriberName))
.egressListener((clusterSessionId, timestamp, buffer, offset, length, header) -> invoker.handle(buffer, offset))
);
);
dynamicCompositeAgent.tryAdd(new AeronClusterAgent(aeronCluster, subscriberName));
}

Expand All @@ -200,7 +217,8 @@ public <T> void registerPersistentSubscriber(
final T subscriberImplementation,
final String livePublicationAlias,
final int streamId
) {
)
{
final String subscriptionChannel = new ChannelUriStringBuilder()
.media(CommonContext.UDP_MEDIA)
.controlMode(CommonContext.MDC_CONTROL_MODE_MANUAL)
Expand Down Expand Up @@ -240,7 +258,8 @@ public <T> void registerPersistentSubscriber(
dynamicCompositeAgent.tryAdd(new ReplayMergeAgent<>(replayMerge, invoker));
}

private long fetchRecordingId(final String liveChannelAlias) {
private long fetchRecordingId(final String liveChannelAlias)
{
final MutableLong recordingIdRef = new MutableLong();

aeronArchive.listRecordings(
Expand All @@ -253,13 +272,14 @@ private long fetchRecordingId(final String liveChannelAlias) {
recordingIdRef.set(recordingId);
}
}
);
);

return recordingIdRef.get();
}

@SuppressWarnings("unchecked")
public static <T> AbstractSubscriberInvoker<T> createSubscriberInvoker(final Class<T> clazz, final T subscriberImplementation) {
public static <T> AbstractSubscriberInvoker<T> createSubscriberInvoker(final Class<T> clazz, final T subscriberImplementation)
{
try {
return (AbstractSubscriberInvoker<T>) Class.forName(clazz.getName() + "Invoker")
.getConstructor(clazz)
Expand All @@ -270,7 +290,8 @@ public static <T> AbstractSubscriberInvoker<T> createSubscriberInvoker(final Cla
}

@SuppressWarnings("unchecked")
public static <T> AbstractSubscriberInvoker<T> createSubscriberInvoker(final Class<T> clazz) {
public static <T> AbstractSubscriberInvoker<T> createSubscriberInvoker(final Class<T> clazz)
{
// allow for late binding of subscriber impl
try {
return (AbstractSubscriberInvoker<T>) Class.forName(clazz.getName() + "Invoker")
Expand All @@ -282,7 +303,8 @@ public static <T> AbstractSubscriberInvoker<T> createSubscriberInvoker(final Cla
}
}

private void start() {
private void start()
{
agentRunner = new AgentRunner(
idleStrategy,
errorHandler,
Expand All @@ -294,19 +316,22 @@ private void start() {
}

@Override
public void close() {
public void close()
{
if (agentRunner != null) {
agentRunner.close();
}
publications.forEach(AeronicPublication::close);
}

public boolean allConnected() {
public boolean allConnected()
{
return publications.stream().allMatch(AeronicPublication::isConnected)
&& subscriptions.stream().allMatch(Subscription::isConnected);
}

public void awaitUntilPubsAndSubsConnect() {
public void awaitUntilPubsAndSubsConnect()
{
await()
.timeout(Duration.ofSeconds(5))
.until(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@
import io.aeronic.net.AeronicPublication;
import org.agrona.DirectBuffer;

public class AeronClusterPublication implements AeronicPublication
{
public class AeronClusterPublication implements AeronicPublication {

private final AeronCluster aeronCluster;

public AeronClusterPublication(final String publisherName, final AeronCluster.Context aeronClusterCtx)
{
this.aeronCluster = AeronCluster.connect(
aeronClusterCtx
.credentialsSupplier(new AeronicCredentialsSupplier(publisherName)));
aeronClusterCtx
.credentialsSupplier(new AeronicCredentialsSupplier(publisherName)));
}

@Override
Expand All @@ -31,16 +30,14 @@ public long offer(final DirectBuffer buffer)
@Override
public void close()
{
if (!aeronCluster.isClosed())
{
if (!aeronCluster.isClosed()) {
aeronCluster.close();
}
}

public int pollCluster()
{
if (isConnected())
{
if (isConnected()) {
return aeronCluster.pollEgress();
}
return 0;
Expand Down
Loading

0 comments on commit 599bad8

Please sign in to comment.