From d3d4aa7e9fa70eb773d658861d1899dd356be74a Mon Sep 17 00:00:00 2001 From: David Fuelling Date: Wed, 1 Apr 2020 16:00:58 -0600 Subject: [PATCH] Fixes #430 (#443) * Fixes #430 * Supersedes PR #440 (now closed). * Fixes #430: Clarify and enhance the InterledgerErrorCodes that the SimpleStreamSender will fail-fast on and stop the stream. These include any F or R-family error, except F08 and F99. Any T-family error will NOT abort the sendMoney operation. * Fixes Invalid Denomination: If a receiver doesn't send back `ConnectionAssetDetails` frames to the sender during preflight, then the denomination will be absent. In this case, the FixedReceiverAmountPaymentTracker doesn't handle this condition properly. * Enhance unit & integration tests to cover these scenarios. * Add Javadoc in various places and cleanup formatting Signed-off-by: David Fuelling --- .../examples/SendMoneyExample.java | 2 - .../link/http/IlpOverHttpLinkFactory.java | 4 - .../sender/AimdCongestionController.java | 2 +- .../FixedReceiverAmountPaymentTracker.java | 17 +- .../FixedSenderAmountPaymentTracker.java | 11 +- .../stream/sender/SimpleStreamSender.java | 444 +++++++++++------- ...FixedReceiverAmountPaymentTrackerTest.java | 19 +- .../FixedSenderAmountPaymentTrackerTest.java | 20 +- .../HalfsiesExchangeRateCalculator.java | 22 +- .../sender/SendMoneyAggregatorTest.java | 229 ++++++--- .../stream/sender/SimpleStreamSenderIT.java | 129 +++-- .../sender/SimpleStreamSenderTests.java | 33 +- .../interledger/stream/PaymentTracker.java | 35 +- .../stream/ReceiverAmountPaymentTracker.java | 9 + .../interledger/stream/SendMoneyRequest.java | 24 +- .../stream/SenderAmountPaymentTracker.java | 20 + .../calculators/ExchangeRateCalculator.java | 56 ++- .../NoOpExchangeRateCalculator.java | 19 +- .../stream/PaymentTrackerTest.java | 103 ++++ .../ExchangeRateCalculatorTest.java | 35 +- .../NoOpExchangeRateCalculatorTest.java | 16 +- stream-parent/stream-receiver/pom.xml | 4 - .../receiver/InvalidReceiverProblem.java | 26 - .../receiver/StatelessStreamReceiver.java | 11 +- .../stream/receiver/SenderReceiverTest.java | 95 +++- .../testutils/AlwaysEmptyStreamReceiver.java | 173 +++++++ .../testutils/SimulatedIlpv4Network.java | 36 +- 27 files changed, 1129 insertions(+), 465 deletions(-) create mode 100644 stream-parent/stream-core/src/test/java/org/interledger/stream/PaymentTrackerTest.java delete mode 100644 stream-parent/stream-receiver/src/main/java/org/interledger/stream/receiver/InvalidReceiverProblem.java create mode 100644 stream-parent/stream-receiver/src/test/java/org/interledger/stream/receiver/testutils/AlwaysEmptyStreamReceiver.java diff --git a/examples-parent/send-money/src/main/java/org/interledger/examples/SendMoneyExample.java b/examples-parent/send-money/src/main/java/org/interledger/examples/SendMoneyExample.java index 89c9cf04..5f3e2679 100644 --- a/examples-parent/send-money/src/main/java/org/interledger/examples/SendMoneyExample.java +++ b/examples-parent/send-money/src/main/java/org/interledger/examples/SendMoneyExample.java @@ -16,7 +16,6 @@ import org.interledger.stream.Denominations; import org.interledger.stream.SendMoneyRequest; import org.interledger.stream.SendMoneyResult; -import org.interledger.stream.SenderAmountMode; import org.interledger.stream.sender.FixedSenderAmountPaymentTracker; import org.interledger.stream.sender.SimpleStreamSender; @@ -72,7 +71,6 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc SendMoneyResult result = simpleStreamSender.sendMoney( SendMoneyRequest.builder() .sourceAddress(SENDER_ADDRESS) - .senderAmountMode(SenderAmountMode.SENDER_AMOUNT) .amount(UnsignedLong.valueOf(100000)) .denomination(Denominations.XRP) .destinationAddress(connectionDetails.destinationAddress()) diff --git a/link-parent/link-ilp-over-http/src/main/java/org/interledger/link/http/IlpOverHttpLinkFactory.java b/link-parent/link-ilp-over-http/src/main/java/org/interledger/link/http/IlpOverHttpLinkFactory.java index fdeed241..4dfb43f4 100644 --- a/link-parent/link-ilp-over-http/src/main/java/org/interledger/link/http/IlpOverHttpLinkFactory.java +++ b/link-parent/link-ilp-over-http/src/main/java/org/interledger/link/http/IlpOverHttpLinkFactory.java @@ -90,10 +90,6 @@ public Link constructLink( decryptor.decrypt(outgoingLinkSettings.simpleAuthSettings().get().authToken().getBytes()) )); } else { - // TODO: For now, we assume the bytes are a String that conform to the Crypt CLI. However, this should be made - // type-safe and more generic if possible. E.g., CryptoCLI formate vs Protobuf. Or, standardize on a single - // type-safe format? - // NOTE: This supplier will always create a copy of the decrypted bytes so that the consumer of each call can // safely wipe the bytes from memory without affecting other callers. final SharedSecretBytesSupplier sharedSecretSupplier = () -> decryptor diff --git a/stream-parent/stream-client/src/main/java/org/interledger/stream/sender/AimdCongestionController.java b/stream-parent/stream-client/src/main/java/org/interledger/stream/sender/AimdCongestionController.java index 80d8dae0..d7907c66 100644 --- a/stream-parent/stream-client/src/main/java/org/interledger/stream/sender/AimdCongestionController.java +++ b/stream-parent/stream-client/src/main/java/org/interledger/stream/sender/AimdCongestionController.java @@ -187,7 +187,7 @@ public void reject(final UnsignedLong prepareAmount, final InterledgerRejectPack default: { // No special treatment for unhandled errors, but warn just in case we start to see a lot of them. // Actual packet data is logged by the StreamSender, so no need to log packet details here. - logger.warn("For Congestion control purposes, ignoring unhandled packet rejection ({}: {}).", + logger.debug("For Congestion control purposes, ignoring unhandled packet rejection ({}: {}).", rejectPacket.getCode().getCode(), rejectPacket.getCode().getName() ); } diff --git a/stream-parent/stream-client/src/main/java/org/interledger/stream/sender/FixedReceiverAmountPaymentTracker.java b/stream-parent/stream-client/src/main/java/org/interledger/stream/sender/FixedReceiverAmountPaymentTracker.java index f1c39cf3..06a71687 100644 --- a/stream-parent/stream-client/src/main/java/org/interledger/stream/sender/FixedReceiverAmountPaymentTracker.java +++ b/stream-parent/stream-client/src/main/java/org/interledger/stream/sender/FixedReceiverAmountPaymentTracker.java @@ -13,7 +13,6 @@ import com.google.common.primitives.UnsignedLong; import java.util.Objects; -import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; /** @@ -90,9 +89,7 @@ public UnsignedLong getDeliveredAmountInReceiverUnits() { @Override public PrepareAmounts getSendPacketAmounts( - final UnsignedLong congestionLimit, - final Denomination senderDenomination, - final Optional receiverDenomination + final UnsignedLong congestionLimit, final Denomination senderDenomination, final Denomination receiverDenomination ) { Objects.requireNonNull(congestionLimit); Objects.requireNonNull(senderDenomination); @@ -101,12 +98,14 @@ public PrepareAmounts getSendPacketAmounts( if (congestionLimit.equals(UnsignedLong.ZERO) || amountLeftToDeliver.get().equals(UnsignedLong.ZERO)) { return PrepareAmounts.builder().amountToSend(UnsignedLong.ZERO).minimumAmountToAccept(UnsignedLong.ZERO).build(); } - UnsignedLong amountToSendInSenderUnits = - rateCalculator.calculateAmountToSend(amountLeftToDeliver.get(), senderDenomination, receiverDenomination.get()); - final UnsignedLong packetAmountToSend = StreamUtils.max(StreamUtils.min(amountToSendInSenderUnits, congestionLimit), - UnsignedLong.ONE); - UnsignedLong minAmountToAcceptInReceiverUnits = + + final UnsignedLong amountToSendInSenderUnits = rateCalculator + .calculateAmountToSend(amountLeftToDeliver.get(), senderDenomination, receiverDenomination); + final UnsignedLong packetAmountToSend = StreamUtils + .max(StreamUtils.min(amountToSendInSenderUnits, congestionLimit), UnsignedLong.ONE); + final UnsignedLong minAmountToAcceptInReceiverUnits = rateCalculator.calculateMinAmountToAccept(packetAmountToSend, senderDenomination, receiverDenomination); + return PrepareAmounts.builder() .minimumAmountToAccept(minAmountToAcceptInReceiverUnits) .amountToSend(packetAmountToSend) diff --git a/stream-parent/stream-client/src/main/java/org/interledger/stream/sender/FixedSenderAmountPaymentTracker.java b/stream-parent/stream-client/src/main/java/org/interledger/stream/sender/FixedSenderAmountPaymentTracker.java index e1c9fe67..f3d1ca48 100644 --- a/stream-parent/stream-client/src/main/java/org/interledger/stream/sender/FixedSenderAmountPaymentTracker.java +++ b/stream-parent/stream-client/src/main/java/org/interledger/stream/sender/FixedSenderAmountPaymentTracker.java @@ -13,7 +13,6 @@ import com.google.common.primitives.UnsignedLong; import java.util.Objects; -import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; /** @@ -82,19 +81,13 @@ public UnsignedLong getDeliveredAmountInReceiverUnits() { } @Override - public PrepareAmounts getSendPacketAmounts( - final UnsignedLong congestionLimit, - final Denomination senderDenomination, - final Optional receiverDenomination - ) { + public PrepareAmounts getSendPacketAmounts(UnsignedLong congestionLimit, Denomination senderDenomination) { Objects.requireNonNull(congestionLimit); Objects.requireNonNull(senderDenomination); - Objects.requireNonNull(receiverDenomination); final UnsignedLong packetAmountToSend = StreamUtils.min(amountLeftToSend.get(), congestionLimit); return PrepareAmounts.builder() - .minimumAmountToAccept( - rateCalculator.calculateMinAmountToAccept(packetAmountToSend, senderDenomination, receiverDenomination)) + .minimumAmountToAccept(rateCalculator.calculateMinAmountToAccept(packetAmountToSend, senderDenomination)) .amountToSend(packetAmountToSend) .build(); } diff --git a/stream-parent/stream-client/src/main/java/org/interledger/stream/sender/SimpleStreamSender.java b/stream-parent/stream-client/src/main/java/org/interledger/stream/sender/SimpleStreamSender.java index c1481814..2996f8ca 100644 --- a/stream-parent/stream-client/src/main/java/org/interledger/stream/sender/SimpleStreamSender.java +++ b/stream-parent/stream-client/src/main/java/org/interledger/stream/sender/SimpleStreamSender.java @@ -1,8 +1,7 @@ package org.interledger.stream.sender; -import static org.interledger.core.InterledgerErrorCode.F00_BAD_REQUEST; -import static org.interledger.core.InterledgerErrorCode.F08_AMOUNT_TOO_LARGE_CODE; -import static org.interledger.core.InterledgerErrorCode.T04_INSUFFICIENT_LIQUIDITY_CODE; +import static org.interledger.core.InterledgerErrorCode.F08_AMOUNT_TOO_LARGE; +import static org.interledger.core.InterledgerErrorCode.F99_APPLICATION_ERROR; import static org.interledger.stream.StreamUtils.generatedFulfillableFulfillment; import org.interledger.codecs.stream.StreamCodecContextFactory; @@ -38,6 +37,7 @@ import org.interledger.stream.frames.StreamMoneyFrame; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -53,6 +53,8 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -64,6 +66,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.function.Supplier; + import javax.annotation.concurrent.ThreadSafe; /** @@ -84,10 +87,10 @@ public class SimpleStreamSender implements StreamSender { private final Link link; + private final Duration sendPacketSleepDuration; private final StreamEncryptionService streamEncryptionService; - private final ExecutorService executorService; private final StreamConnectionManager streamConnectionManager; - private final Optional sendPacketSleep; + private final ExecutorService executorService; /** * Required-args Constructor. @@ -95,59 +98,34 @@ public class SimpleStreamSender implements StreamSender { * @param link A {@link Link} that is used to send ILPv4 packets to an immediate peer. */ public SimpleStreamSender(final Link link) { - this(new JavaxStreamEncryptionService(), link); - } - - /** - * Required-args Constructor. - * - * @param link A {@link Link} that is used to send ILPv4 packets to an immediate peer. - * @param sendPacketSleep amount of time for a thread to sleep before sending more packets - */ - public SimpleStreamSender(final Link link, final Optional sendPacketSleep) { - this(new JavaxStreamEncryptionService(), link, sendPacketSleep); + this(link, Duration.ofMillis(10L)); } /** * Required-args Constructor. * - * @param streamEncryptionService An instance of {@link StreamEncryptionService} used to encrypt and decrypted - * end-to-end STREAM packet data (i.e., packets that should only be visible between - * sender and receiver). * @param link A {@link Link} that is used to send ILPv4 packets to an immediate peer. + * @param sendPacketSleepDuration A {@link Duration} representing the amount of time for the soldierOn thread to sleep + * before attempting more processing. */ - public SimpleStreamSender(final StreamEncryptionService streamEncryptionService, final Link link) { - this(streamEncryptionService, link, newDefaultExecutor(), Optional.empty()); + public SimpleStreamSender(final Link link, final Duration sendPacketSleepDuration) { + this(link, sendPacketSleepDuration, new JavaxStreamEncryptionService()); } /** * Required-args Constructor. * - * @param streamEncryptionService An instance of {@link StreamEncryptionService} used to encrypt and decrypted - * end-to-end STREAM packet data (i.e., packets that should only be visible between - * sender and receiver). * @param link A {@link Link} that is used to send ILPv4 packets to an immediate peer. - * @param sendPacketSleep amount of time for a thread to sleep before sending more packets - */ - public SimpleStreamSender(final StreamEncryptionService streamEncryptionService, - final Link link, - final Optional sendPacketSleep) { - this(streamEncryptionService, link, newDefaultExecutor(), sendPacketSleep); - } - - /** - * Required-args Constructor. - * + * @param sendPacketSleepDuration A {@link Duration} representing the amount of time for the soldierOn thread to sleep + * before attempting more processing. * @param streamEncryptionService An instance of {@link StreamEncryptionService} used to encrypt and decrypted * end-to-end STREAM packet data (i.e., packets that should only be visible between * sender and receiver). - * @param link A {@link Link} that is used to send ILPv4 packets to an immediate peer. - * @param executorService executorService to run the payments */ public SimpleStreamSender( - final StreamEncryptionService streamEncryptionService, final Link link, ExecutorService executorService + final Link link, final Duration sendPacketSleepDuration, final StreamEncryptionService streamEncryptionService ) { - this(streamEncryptionService, link, executorService, new StreamConnectionManager(), Optional.empty()); + this(link, sendPacketSleepDuration, streamEncryptionService, new StreamConnectionManager()); } /** @@ -157,43 +135,43 @@ public SimpleStreamSender( * end-to-end STREAM packet data (i.e., packets that should only be visible between * sender and receiver). * @param link A {@link Link} that is used to send ILPv4 packets to an immediate peer. - * @param executorService executorService to run the payments - * @param sendPacketSleep amount of time for a thread to sleep before sending more packets + * @param sendPacketSleepDuration A {@link Duration} representing the amount of time for the soldierOn thread to sleep + * before attempting more processing. */ public SimpleStreamSender( - final StreamEncryptionService streamEncryptionService, - final Link link, -final ExecutorService executorService, - final Optional sendPacketSleep + final Link link, final Duration sendPacketSleepDuration, final StreamEncryptionService streamEncryptionService, + final StreamConnectionManager streamConnectionManager ) { - this(streamEncryptionService, link, executorService, new StreamConnectionManager(), sendPacketSleep); + this(link, sendPacketSleepDuration, streamEncryptionService, streamConnectionManager, newDefaultExecutor()); } /** * Required-args Constructor. - * @param streamEncryptionService A {@link StreamEncryptionService} used to encrypt and decrypted end-to-end STREAM + * + * @param link A {@link Link} that is used to send ILPv4 packets to an immediate peer. + * @param sendPacketSleepDuration A {@link Duration} representing the amount of time for the soldierOn thread to sleep + * before attempting more processing. + * @param streamEncryptionService A {@link StreamEncryptionService} used to encrypt and decrypted end-to-end STREAM * packet data (i.e., packets that should only be visible between sender and * receiver). - * @param link A {@link Link} that is used to send ILPv4 packets to an immediate peer. - * @param executorService A {@link ExecutorService} to run the payments. * @param streamConnectionManager A {@link StreamConnectionManager} that manages connections for all senders and - * @param sendPacketSleep amount of time for a thread to sleep before sending more packets + * @param executorService A {@link ExecutorService} to run the payments. */ public SimpleStreamSender( - final StreamEncryptionService streamEncryptionService, - final Link link, - final ExecutorService executorService, - final StreamConnectionManager streamConnectionManager, - final Optional sendPacketSleep + final Link link, + final Duration sendPacketSleepDuration, + final StreamEncryptionService streamEncryptionService, + final StreamConnectionManager streamConnectionManager, + final ExecutorService executorService ) { - this.streamEncryptionService = Objects.requireNonNull(streamEncryptionService); this.link = Objects.requireNonNull(link); + this.sendPacketSleepDuration = Objects.requireNonNull(sendPacketSleepDuration); + this.streamEncryptionService = Objects.requireNonNull(streamEncryptionService); + this.streamConnectionManager = Objects.requireNonNull(streamConnectionManager); // Note that pools with similar properties but different details (for example, timeout parameters) may be // created using {@link ThreadPoolExecutor} constructors. this.executorService = Objects.requireNonNull(executorService); - this.streamConnectionManager = Objects.requireNonNull(streamConnectionManager); - this.sendPacketSleep = Objects.requireNonNull(sendPacketSleep); } private static ExecutorService newDefaultExecutor() { @@ -209,18 +187,18 @@ public CompletableFuture sendMoney(final SendMoneyRequest reque Objects.requireNonNull(request); final StreamConnection streamConnection = this.streamConnectionManager.openConnection( - StreamConnectionId.from(request.destinationAddress(), request.sharedSecret()) + StreamConnectionId.from(request.destinationAddress(), request.sharedSecret()) ); return new SendMoneyAggregator( - this.executorService, - streamConnection, - StreamCodecContextFactory.oer(), - this.link, - new AimdCongestionController(), - this.streamEncryptionService, - request, - this.sendPacketSleep + this.executorService, + streamConnection, + StreamCodecContextFactory.oer(), + this.link, + new AimdCongestionController(), + this.streamEncryptionService, + this.sendPacketSleepDuration, + request ).send(); } @@ -273,33 +251,32 @@ default int totalPackets() { */ static class SendMoneyAggregator { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); + // These error codes, despite beging FINAL, should be treated as "recoverable", meaning the Stream sendMoney + // operation should not immediately fail if one of these is encountered. + private static final Set NON_TERMINAL_ERROR_CODES = ImmutableSet.of( + F08_AMOUNT_TOO_LARGE, F99_APPLICATION_ERROR + ); + private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final ExecutorService executorService; private final StreamConnection streamConnection; private final CodecContext streamCodecContext; private final StreamEncryptionService streamEncryptionService; private final CongestionController congestionController; private final Link link; - private final SharedSecret sharedSecret; private final Optional timeout; - private final InterledgerAddress senderAddress; private final Denomination senderDenomination; private final InterledgerAddress destinationAddress; - private final AtomicBoolean shouldSendSourceAddress; private final AtomicInteger numFulfilledPackets; private final AtomicInteger numRejectedPackets; - private final PaymentTracker paymentTracker; - - private Optional receiverDenomination; - private final AtomicBoolean unrecoverableErrorEncountered; - - private long sendPacketSleep; + private final SendMoneyRequest sendMoneyRequest; + private Optional receiverDenomination; + private Duration sendPacketSleepDuration; /** * Required-args Constructor. @@ -312,7 +289,7 @@ static class SendMoneyAggregator { * @param congestionController A {@link CongestionController} that supports back-pressure for money streams. * @param streamEncryptionService A {@link StreamEncryptionService} that allows for Stream packet encryption and * decryption. - * @param request all relevant details about the money to send + * @param sendMoneyRequest A {@link SendMoneyRequest} that contains all relevant details about the money to */ SendMoneyAggregator( final ExecutorService executorService, @@ -321,8 +298,8 @@ static class SendMoneyAggregator { final Link link, final CongestionController congestionController, final StreamEncryptionService streamEncryptionService, - final SendMoneyRequest request, - final Optional sendPacketSleep + final Duration sendPacketSleepDuration, + final SendMoneyRequest sendMoneyRequest ) { this.executorService = Objects.requireNonNull(executorService); this.streamConnection = Objects.requireNonNull(streamConnection); @@ -332,26 +309,21 @@ static class SendMoneyAggregator { this.streamEncryptionService = Objects.requireNonNull(streamEncryptionService); this.congestionController = Objects.requireNonNull(congestionController); this.shouldSendSourceAddress = new AtomicBoolean(true); + this.unrecoverableErrorEncountered = new AtomicBoolean(false); - this.sharedSecret = request.sharedSecret(); - this.senderAddress = request.sourceAddress(); - this.destinationAddress = request.destinationAddress(); + this.sharedSecret = sendMoneyRequest.sharedSecret(); + this.senderAddress = sendMoneyRequest.sourceAddress(); + this.destinationAddress = sendMoneyRequest.destinationAddress(); + this.sendPacketSleepDuration = Objects.requireNonNull(sendPacketSleepDuration); this.numFulfilledPackets = new AtomicInteger(0); this.numRejectedPackets = new AtomicInteger(0); - this.timeout = request.timeout(); - - this.senderDenomination = request.denomination(); - - this.paymentTracker = request.paymentTracker(); - + this.sendMoneyRequest = Objects.requireNonNull(sendMoneyRequest); + this.timeout = sendMoneyRequest.timeout(); + this.senderDenomination = sendMoneyRequest.denomination(); + this.paymentTracker = sendMoneyRequest.paymentTracker(); this.receiverDenomination = Optional.empty(); - - this.unrecoverableErrorEncountered = new AtomicBoolean(false); - - Objects.requireNonNull(sendPacketSleep); - this.sendPacketSleep = sendPacketSleep.orElse(UnsignedLong.valueOf(10)).longValue(); } /** @@ -366,20 +338,22 @@ CompletableFuture send() { Instant startPreflight = DateUtils.now(); try { receiverDenomination = preflightCheck(); - } catch (StreamConnectionClosedException e) { - return CompletableFuture.completedFuture(SendMoneyResult.builder() - .sendMoneyDuration(Duration.between(startPreflight, DateUtils.now())) - .numRejectPackets(1) - .numFulfilledPackets(0) - .amountDelivered(UnsignedLong.ZERO) - .amountSent(UnsignedLong.ZERO) - .originalAmount(paymentTracker.getOriginalAmount()) - .amountLeftToSend(paymentTracker.getOriginalAmountLeft()) - .successfulPayment(paymentTracker.successful()) - .build()); + if (paymentTracker.requiresReceiverDenomination() && !receiverDenomination.isPresent()) { + // The PaymentTrack requires a receiver denomination, but the receiver didn't send one. Thus, we must abort. + return CompletableFuture.completedFuture(this.constructSendMoneyResultForInvalidPreflight(startPreflight)); + } } catch (Exception e) { - logger.warn("Preflight check failed", e); + if (paymentTracker.requiresReceiverDenomination()) { + logger.error("Preflight check failed. sendMoneyRequest={}", sendMoneyRequest, e); + return CompletableFuture.completedFuture(this.constructSendMoneyResultForInvalidPreflight(startPreflight)); + } else { + logger.warn( + "Preflight check failed, but was not crucial for this sendMoney operation. sendMoneyRequest={}", + sendMoneyRequest, e + ); + } } + // A separate executor is needed for overall call to sendMoneyPacketized otherwise a livelock can occur. // Using a shared executor could cause sendMoneyPacketized to internally get blocked from submitting tasks // because the shared executor is already blocked waiting on the results of the call here to sendMoneyPacketized @@ -408,14 +382,48 @@ CompletableFuture send() { logger.error("SendMoney Stream failed: " + error.getMessage(), error); } if (!$.successfulPayment()) { - logger.error("Failed to send full amount"); + logger.error("Failed to send full amount. sendMoneyRequestId={}", sendMoneyRequest.requestId()); } }); } /** + * Helper method to construct a {@link SendMoneyResult} that can be used when preflight checks are not successful. + * + * @param startPreflight An {@link Instant} representing the moment in time that preflight was started. + * + * @return A {@link SendMoneyResult}. + */ + private SendMoneyResult constructSendMoneyResultForInvalidPreflight(final Instant startPreflight) { + Objects.requireNonNull(startPreflight); + return SendMoneyResult.builder() + .sendMoneyDuration(Duration.between(startPreflight, DateUtils.now())) + .numRejectPackets(0) + .numFulfilledPackets(0) + .amountDelivered(UnsignedLong.ZERO) + .amountSent(UnsignedLong.ZERO) + .originalAmount(paymentTracker.getOriginalAmount()) + .amountLeftToSend(paymentTracker.getOriginalAmountLeft()) + .successfulPayment(paymentTracker.successful()) + .build(); + } + + /** + *

Send a zero-value Prepare packet to "pre-flight" the Connection before actual value is transferred.

+ * + *

This operation is used to initialize a new Stream connection in order to fulfill any prerequisites necessary + * before sending real value. For example, it is necessary to obtain the receiver's "Connection Asset Details" + * before a sender can send value, in order to manage slippage for the sender.

+ * + *

Likewise, it may be desirable to perform other preflight checks in the future, such as checking an exchange + * rate or some other type of check.

+ * * TODO: See https://github.com/hyperledger/quilt/issues/308 to determine when the Stream and/or Connection should * be closed. + * + * @return A {@link Denomination} that contains the asset information for the receiver. + * + * @throws StreamConnectionClosedException if the denomination could not be loaded and the Stream should be closed. */ @VisibleForTesting Optional preflightCheck() throws StreamConnectionClosedException { @@ -425,7 +433,7 @@ Optional preflightCheck() throws StreamConnectionClosedException { sequence = this.streamConnection.nextSequence(); } catch (StreamConnectionClosedException e) { // The Connection is closed, so we can't send anything more on it. - logger.warn( + logger.error( "Unable to send more packets on a closed StreamConnection. streamConnection={} error={}", streamConnection, e ); @@ -434,7 +442,7 @@ Optional preflightCheck() throws StreamConnectionClosedException { final List frames = Lists.newArrayList( StreamMoneyFrame.builder() - // This aggregator supports only a simple stream-id, which is one. + // This aggregator supports only a single stream-id, which is one. .streamId(UnsignedLong.ONE) .shares(UnsignedLong.ONE) .build(), @@ -466,37 +474,34 @@ Optional preflightCheck() throws StreamConnectionClosedException { .data(streamPacketData) .build(); - InterledgerResponsePacket responsePacket = sendPacketAndCheckForFailure(preparePacket); - - final Function> readDetails = (p) -> { - final StreamPacket packet = this.fromEncrypted(sharedSecret, p.getData()); - return packet.frames().stream() - .filter(f -> f.streamFrameType() == StreamFrameType.ConnectionAssetDetails) - .findFirst() - .map(f -> (ConnectionAssetDetailsFrame) f) - .map(f -> Denomination.builder().from(f.sourceDenomination()).build()); + // The function that parses out the STREAM packets... + final Function> readDetailsFromStream = (responsePacket) -> { + final StreamPacket packet = this.fromEncrypted(sharedSecret, responsePacket.getData()); + if (packet != null) { + return packet.frames().stream() + .filter(f -> f.streamFrameType() == StreamFrameType.ConnectionAssetDetails) + .findFirst() + .map(f -> (ConnectionAssetDetailsFrame) f) + .map(f -> Denomination.builder().from(f.sourceDenomination()).build()); + } else { + return Optional.empty(); + } }; - return responsePacket.map(readDetails::apply, readDetails::apply); + return link.sendPacket(preparePacket) + .handleAndReturn( + fulfillPacket -> { + }, // Do nothing on fulfill + this::checkForAndTriggerUnrecoverableError // check for unrecoverable error. + ) + // We typically expect this Prepare operation to reject, but regardless of whether the response is a fulfill or a + // reject, try to read the Receiver's Connection Asset Details and return them. + .map(readDetailsFromStream::apply, readDetailsFromStream::apply); } /** - * Send the packet but check to see if an error in the HTTP 4XX range was encountered so that we know if we - * should stop retrying - * @param preparePacket - * @return the returned response packet + * Helper method to send money in a packetized operation. */ - @VisibleForTesting - protected InterledgerResponsePacket sendPacketAndCheckForFailure(InterledgerPreparePacket preparePacket) { - InterledgerResponsePacket response = link.sendPacket(preparePacket); - response.handle((fulfill) -> {}, (reject) -> { - if (reject.getCode().equals(F00_BAD_REQUEST)) { - unrecoverableErrorEncountered.set(true); - } - }); - return response; - } - private void sendMoneyPacketized() { final AtomicBoolean timeoutReached = new AtomicBoolean(false); @@ -515,9 +520,14 @@ private void sendMoneyPacketized() { while (soldierOn(timeoutReached.get(), tryingToSendTooMuch)) { // Determine the amount to send - PrepareAmounts amounts = paymentTracker.getSendPacketAmounts( - congestionController.getMaxAmount(), senderDenomination, receiverDenomination - ); + final PrepareAmounts amounts = receiverDenomination + .map(receiverDenomination -> paymentTracker.getSendPacketAmounts( + congestionController.getMaxAmount(), senderDenomination, receiverDenomination + )) + .orElseGet(() -> paymentTracker.getSendPacketAmounts( + congestionController.getMaxAmount(), senderDenomination + )); + UnsignedLong amountToSend = amounts.getAmountToSend(); UnsignedLong receiverMinimum = amounts.getMinimumAmountToAccept(); @@ -525,7 +535,7 @@ private void sendMoneyPacketized() { try { // Don't send any more, but wait a bit for outstanding requests to complete so we don't cycle needlessly in // a while loop that doesn't do anything useful. - Thread.sleep(sendPacketSleep); + Thread.sleep(sendPacketSleepDuration.toMillis()); } catch (InterruptedException e) { throw new StreamSenderException(e.getMessage(), e); } @@ -606,6 +616,24 @@ private void sendMoneyPacketized() { timeoutMonitor.shutdownNow(); } + /** + * Schedules a {@link Callable} with {@link this#executorService} to actually send an {@link + * InterledgerPreparePacket} on an existing Stream connection. + * + * @param timeoutReached An {@link AtomicBoolean} that can be set to indicate whether a timeout has been + * reached. + * @param preparePacketSupplier A {@link Supplier} of the {@link InterledgerPreparePacket} that will be used by this + * method. This is a supplier in order to facilitate late-bound construction of the + * packet in order to ensure the when a packet is constructed, its expiry is + * initialized very close to when the packet will actually be sent on a link. Without + * this supplier, packets were getting created and then handing around in the executor. + * Under extreme-load conditions, these packets would expire before ever getting sent + * out over the wire. + * @param streamPacket A decoded {@link StreamPacket} containing all STREAM information that is inside of + * the Prepare packet passed into this method. This is provided for debugging and minor + * optimization improvements inside of nested methods called by this method. + * @param prepareAmounts A {@link PrepareAmounts} for augmenting the sending of the prepare packet. + */ @VisibleForTesting void schedule( final AtomicBoolean timeoutReached, @@ -623,11 +651,12 @@ void schedule( InterledgerPreparePacket preparePacket = preparePacketSupplier.get(); if (!timeoutReached.get()) { try { - InterledgerResponsePacket responsePacket = sendPacketAndCheckForFailure(preparePacket); - responsePacket.handle( + link.sendPacket(preparePacket).handle( fulfillPacket -> handleFulfill(preparePacket, streamPacket, fulfillPacket, prepareAmounts), - rejectPacket -> handleReject(preparePacket, streamPacket, rejectPacket, prepareAmounts, - numRejectedPackets, congestionController) + rejectPacket -> handleReject( + preparePacket, streamPacket, rejectPacket, + prepareAmounts, numRejectedPackets, congestionController + ) ); } catch (Exception e) { logger.error("Link send failed. preparePacket={}", preparePacket, e); @@ -639,11 +668,10 @@ void schedule( .build()); paymentTracker.rollback(prepareAmounts, false); } - } - else { + } else { logger.info("timeout reached, not sending packet"); congestionController.reject(preparePacket.getAmount(), InterledgerRejectPacket.builder() - .code(InterledgerErrorCode.F99_APPLICATION_ERROR) + .code(InterledgerErrorCode.R00_TRANSFER_TIMED_OUT) .message(String.format("Timeout reached before packet could be sent", preparePacket)) .build()); } @@ -654,8 +682,9 @@ void schedule( congestionController.reject(preparePacketSupplier.get().getAmount(), InterledgerRejectPacket.builder() .code(InterledgerErrorCode.F00_BAD_REQUEST) .message( - String.format("Unable to schedule sendMoney task. preparePacket=%s error=%s", preparePacketSupplier.get(), - e.getMessage()) + String + .format("Unable to schedule sendMoney task. preparePacket=%s error=%s", preparePacketSupplier.get(), + e.getMessage()) ) .build()); throw e; @@ -669,12 +698,14 @@ boolean soldierOn(final boolean timeoutReached, final boolean tryingToSendTooMuc // the connection is not closed // and you haven't delivered the full amount // and you haven't timed out - // and you're not trying to send to much + // and you're not trying to send too much // and we haven't hit an unrecoverable error return this.congestionController.hasInFlight() - || ( - !streamConnection.isClosed() && paymentTracker.moreToSend() && !timeoutReached && !tryingToSendTooMuch && - !unrecoverableErrorEncountered.get()); + || (!streamConnection.isClosed() + && paymentTracker.moreToSend() + && !timeoutReached + && !tryingToSendTooMuch + && !unrecoverableErrorEncountered.get()); } /** @@ -769,6 +800,9 @@ void handleFulfill( * @param originalStreamPacket The {@link StreamPacket} that was inside of {@code originalPreparePacket}. * @param rejectPacket The {@link InterledgerRejectPacket} received from a peer directly connected via a * {@link Link}. + * @param numRejectedPackets An {@link AtomicInteger} that holds the total number of packets rejected thus far on + * this sendMoney operation. + * @param congestionController The {@link CongestionController} used for this sendMoney operation. */ @VisibleForTesting void handleReject( @@ -804,28 +838,88 @@ void handleReject( rejectPacket ); - switch (rejectPacket.getCode().getCode()) { - - case T04_INSUFFICIENT_LIQUIDITY_CODE: - case F08_AMOUNT_TOO_LARGE_CODE: { - // Handled by the congestion controller - break; + this.checkForAndTriggerUnrecoverableError(rejectPacket); + + //////////// + // Log the rejection + //////////// + + if (ErrorFamily.FINAL.equals(rejectPacket.getCode().getErrorFamily())) { + // Most Final errors trigger immediate stoppage of send operations, except for F08 and F99. + if (NON_TERMINAL_ERROR_CODES.contains(rejectPacket.getCode())) { + // error was a tolerable F rejection (currently F08 or F99) + logger.debug( + "Encountered an expected ILPv4 FINAL error. Retrying... " + + "originalPreparePacket={} originalStreamPacket={} rejectPacket={}", + originalPreparePacket, originalStreamPacket, rejectPacket); + } else { + logger.error( + "Encountered an unexpected ILPv4 FINAL error. Aborting this sendMoney. " + + "originalPreparePacket={} originalStreamPacket={} rejectPacket={}", + originalPreparePacket, originalStreamPacket, rejectPacket + ); } - default: { - if (rejectPacket.getCode().getErrorFamily() == ErrorFamily.TEMPORARY) { - logger.warn( - "Temporary ILPv4 transport outage. Retrying... originalPreparePacket={} originalStreamPacket={} " - + "rejectPacket={}", - originalPreparePacket, originalStreamPacket, rejectPacket); + } else if (ErrorFamily.RELATIVE.equals(rejectPacket.getCode().getErrorFamily())) { + // All Relative errors trigger immediate stoppage of send operations. + logger.warn( + "Relative ILPv4 transport outage. originalPreparePacket={} originalStreamPacket={} rejectPacket={}", + originalPreparePacket, originalStreamPacket, rejectPacket + ); + } else { // if (ErrorFamily.TEMPORARY.equals(rejectPacket.getCode().getErrorFamily())) { + logger.warn( + "Temporary ILPv4 transport outage. originalPreparePacket={} originalStreamPacket={} rejectPacket={}", + originalPreparePacket, originalStreamPacket, rejectPacket + ); + } + } - } else { - logger.error( - "Encountered Final ILPv4 error. Retrying, but this sendMoney will likely hang until timeout." - + " originalPreparePacket={} originalStreamPacket={} rejectPacket={}", - originalPreparePacket, originalStreamPacket, rejectPacket); - } - break; + /** + *

A helper method to centralize all logic around when to trigger an "unrecoverable error" that will stop this + * sender from schedule further packets.

+ * + *

This method functions according to the following rules:

+ * + *

Interledger Reject packets with the following attributes will immediately trigger an UNRECOVERABLE_ERROR + * condition, stopping the sendMoney operation:

+ *
    + *
  1. Any F-family rejections (except F08 and F99, which are used for congestion control and rate-probing).
  2. + *
  3. Any R-family rejections.
  4. + *
+ * + *

Interledger Reject packets with the following attributes MAY trigger an UNRECOVERABLE_ERROR condition after + * some threshold (this logic is not yet implemented, and may never be):

+ *
    + *
  1. T00 Errors (generally should be accepted up to some threshold)
  2. + *
  3. T01 Errors (generally should be accepted up to some threshold)
  4. + *
  5. F99 Errors (generally should be accepted up to some threshold; used for FX problems where prepare amount is + * less than min amount the receiver should receive).
  6. + *
+ * + *

Interledger Reject packets with the following attributes will NEVER trigger an UNRECOVERABLE_ERROR condition:

+ *
    + *
  1. T02-T99 Errors (these may resolve with time)
  2. + *
  3. F08 Errors (used for congestion control)
  4. + *
+ * + * @param rejectPacket An {@link InterledgerRejectPacket} that was received from an outgoing {@link Link} in + * response to a prepare packet. + */ + @VisibleForTesting + void checkForAndTriggerUnrecoverableError(final InterledgerRejectPacket rejectPacket) { + Objects.requireNonNull(rejectPacket); + + if (ErrorFamily.FINAL.equals(rejectPacket.getCode().getErrorFamily())) { + // Most Final errors trigger immediate stoppage of send operations, except for F08 and F99. + if (NON_TERMINAL_ERROR_CODES.contains(rejectPacket.getCode())) { + // error was a tolerable F rejection (currently F08 or F99) + } else { + unrecoverableErrorEncountered.set(true); } + } else if (ErrorFamily.RELATIVE.equals(rejectPacket.getCode().getErrorFamily())) { + // All Relative errors trigger immediate stoppage of send operations. + unrecoverableErrorEncountered.set(true); + } else { // if (ErrorFamily.TEMPORARY.equals(rejectPacket.getCode().getErrorFamily())) { + // do nothing. } } @@ -834,6 +928,16 @@ protected boolean isUnrecoverableErrorEncountered() { return this.unrecoverableErrorEncountered.get(); } + @VisibleForTesting + void setUnrecoverableErrorEncountered(boolean unrecoverableErrorEncountered) { + this.unrecoverableErrorEncountered.set(unrecoverableErrorEncountered); + } + + // TODO: FIXME per https://github.com/hyperledger/quilt/issues/308. Until this is completed, parallel STREAM send() + // operations are not thread-safe. Consider a new instance of everything on each sendMoney? In other words, each + // SendMoney requires its own connection information, + // so as long as two senders don't use the same Connection details, everything should be thread-safe. + ///** // * Close the current STREAM connection by sending a {@link ConnectionCloseFrame} to the receiver. // * diff --git a/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/FixedReceiverAmountPaymentTrackerTest.java b/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/FixedReceiverAmountPaymentTrackerTest.java index 871e1cd6..a4693476 100644 --- a/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/FixedReceiverAmountPaymentTrackerTest.java +++ b/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/FixedReceiverAmountPaymentTrackerTest.java @@ -8,8 +8,9 @@ import com.google.common.primitives.UnsignedLong; import org.junit.Test; -import java.util.Optional; - +/** + * Unit tests for {@link FixedReceiverAmountPaymentTracker}. + */ public class FixedReceiverAmountPaymentTrackerTest { @Test @@ -23,14 +24,12 @@ public void checkAllInteractions() { assertThat(tracker.getOriginalAmountLeft()).isEqualTo(UnsignedLong.valueOf(10)); assertThat(tracker.moreToSend()).isTrue(); - PrepareAmounts amounts = tracker.getSendPacketAmounts(UnsignedLong.ZERO, Denominations.XRP, - Optional.of(Denominations.XRP)); + PrepareAmounts amounts = tracker.getSendPacketAmounts(UnsignedLong.ZERO, Denominations.XRP, Denominations.XRP); assertThat(amounts.getAmountToSend()).isEqualTo(UnsignedLong.ZERO); assertThat(amounts.getMinimumAmountToAccept()).isEqualTo(UnsignedLong.ZERO); - amounts = tracker.getSendPacketAmounts(UnsignedLong.valueOf(6), Denominations.XRP, - Optional.of(Denominations.XRP)); + amounts = tracker.getSendPacketAmounts(UnsignedLong.valueOf(6), Denominations.XRP, Denominations.XRP); assertThat(amounts.getAmountToSend()).isEqualTo(UnsignedLong.valueOf(6)); assertThat(amounts.getMinimumAmountToAccept()).isEqualTo(UnsignedLong.valueOf(3)); @@ -80,11 +79,15 @@ public void checkAllInteractions() { assertThat(tracker.getOriginalAmountLeft()).isEqualTo(UnsignedLong.ZERO); assertThat(tracker.moreToSend()).isFalse(); - amounts = tracker.getSendPacketAmounts(UnsignedLong.valueOf(6), Denominations.XRP, - Optional.of(Denominations.XRP)); + amounts = tracker.getSendPacketAmounts(UnsignedLong.valueOf(6), Denominations.XRP, Denominations.XRP); assertThat(amounts.getAmountToSend()).isEqualTo(UnsignedLong.ZERO); assertThat(amounts.getMinimumAmountToAccept()).isEqualTo(UnsignedLong.ZERO); } + @Test + public void testMode() { + FixedReceiverAmountPaymentTracker tracker = new FixedReceiverAmountPaymentTracker(UnsignedLong.ZERO); + assertThat(tracker.requiresReceiverDenomination()).isTrue(); + } } diff --git a/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/FixedSenderAmountPaymentTrackerTest.java b/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/FixedSenderAmountPaymentTrackerTest.java index c723c4e8..b4c215ef 100644 --- a/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/FixedSenderAmountPaymentTrackerTest.java +++ b/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/FixedSenderAmountPaymentTrackerTest.java @@ -8,8 +8,9 @@ import com.google.common.primitives.UnsignedLong; import org.junit.Test; -import java.util.Optional; - +/** + * Unit tests for {@link FixedSenderAmountPaymentTracker}. + */ public class FixedSenderAmountPaymentTrackerTest { @Test @@ -23,14 +24,12 @@ public void checkAllInteractions() { assertThat(tracker.getOriginalAmountLeft()).isEqualTo(UnsignedLong.valueOf(12)); assertThat(tracker.moreToSend()).isTrue(); - PrepareAmounts amounts = tracker.getSendPacketAmounts(UnsignedLong.ZERO, Denominations.XRP, - Optional.of(Denominations.XRP)); + PrepareAmounts amounts = tracker.getSendPacketAmounts(UnsignedLong.ZERO, Denominations.XRP); assertThat(amounts.getAmountToSend()).isEqualTo(UnsignedLong.ZERO); assertThat(amounts.getMinimumAmountToAccept()).isEqualTo(UnsignedLong.ZERO); - amounts = tracker.getSendPacketAmounts(UnsignedLong.valueOf(6), Denominations.XRP, - Optional.of(Denominations.XRP)); + amounts = tracker.getSendPacketAmounts(UnsignedLong.valueOf(6), Denominations.XRP); assertThat(amounts.getAmountToSend()).isEqualTo(UnsignedLong.valueOf(6)); assertThat(amounts.getMinimumAmountToAccept()).isEqualTo(UnsignedLong.valueOf(3)); @@ -82,8 +81,7 @@ public void authFailsWhenAmountLeftToSendLessThanPrepare() { FixedSenderAmountPaymentTracker tracker = new FixedSenderAmountPaymentTracker(UnsignedLong.valueOf(12L), new HalfsiesExchangeRateCalculator()); - PrepareAmounts amounts = tracker.getSendPacketAmounts(UnsignedLong.valueOf(12), Denominations.XRP, - Optional.of(Denominations.XRP)); + PrepareAmounts amounts = tracker.getSendPacketAmounts(UnsignedLong.valueOf(12), Denominations.XRP); tracker.auth(amounts); tracker.commit(amounts, UnsignedLong.valueOf(12)); assertThat(tracker.getOriginalAmountLeft()).isEqualTo(UnsignedLong.ZERO); @@ -91,4 +89,10 @@ public void authFailsWhenAmountLeftToSendLessThanPrepare() { tracker.setSentAmount(UnsignedLong.ZERO); assertThat(tracker.auth(amounts)).isFalse(); } + + @Test + public void testMode() { + FixedSenderAmountPaymentTracker tracker = new FixedSenderAmountPaymentTracker(UnsignedLong.ZERO); + assertThat(tracker.requiresReceiverDenomination()).isFalse(); + } } diff --git a/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/HalfsiesExchangeRateCalculator.java b/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/HalfsiesExchangeRateCalculator.java index 0a622480..548fba8f 100644 --- a/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/HalfsiesExchangeRateCalculator.java +++ b/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/HalfsiesExchangeRateCalculator.java @@ -6,7 +6,7 @@ import com.google.common.primitives.UnsignedLong; -import java.util.Optional; +import java.util.Objects; /** * An implementation of {@link ExchangeRateCalculator} that always assumes an exchange rate from sender:receiver @@ -15,15 +15,23 @@ public class HalfsiesExchangeRateCalculator implements ExchangeRateCalculator { @Override - public UnsignedLong calculateAmountToSend(UnsignedLong amountToReceive, Denomination sendDenomination, - Denomination receiveDenomination) throws NoExchangeRateException { - return amountToReceive.times(UnsignedLong.valueOf(2)); + public UnsignedLong calculateAmountToSend( + final UnsignedLong amountToSend, + final Denomination amountToSendDenomination, + final Denomination receiverDenomination + ) throws NoExchangeRateException { + Objects.requireNonNull(amountToSend); + Objects.requireNonNull(amountToSendDenomination); + Objects.requireNonNull(receiverDenomination); + return amountToSend.times(UnsignedLong.valueOf(2)); } @Override - public UnsignedLong calculateMinAmountToAccept(UnsignedLong sendAmount, Denomination sendDenomination, - Optional expectedReceivedDenomination) - throws NoExchangeRateException { + public UnsignedLong calculateMinAmountToAccept( + final UnsignedLong sendAmount, final Denomination sendAmountDenomination + ) { + Objects.requireNonNull(sendAmount); + Objects.requireNonNull(sendAmountDenomination); return sendAmount.dividedBy(UnsignedLong.valueOf(2)); } } diff --git a/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/SendMoneyAggregatorTest.java b/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/SendMoneyAggregatorTest.java index 40a2ffa4..097f2a33 100644 --- a/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/SendMoneyAggregatorTest.java +++ b/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/SendMoneyAggregatorTest.java @@ -1,7 +1,29 @@ package org.interledger.stream.sender; import static org.assertj.core.api.Assertions.assertThat; +import static org.interledger.core.InterledgerErrorCode.F00_BAD_REQUEST; +import static org.interledger.core.InterledgerErrorCode.F01_INVALID_PACKET; +import static org.interledger.core.InterledgerErrorCode.F02_UNREACHABLE; +import static org.interledger.core.InterledgerErrorCode.F03_INVALID_AMOUNT; +import static org.interledger.core.InterledgerErrorCode.F04_INSUFFICIENT_DST_AMOUNT; +import static org.interledger.core.InterledgerErrorCode.F05_WRONG_CONDITION; +import static org.interledger.core.InterledgerErrorCode.F06_UNEXPECTED_PAYMENT; +import static org.interledger.core.InterledgerErrorCode.F07_CANNOT_RECEIVE; +import static org.interledger.core.InterledgerErrorCode.F08_AMOUNT_TOO_LARGE; +import static org.interledger.core.InterledgerErrorCode.F99_APPLICATION_ERROR; +import static org.interledger.core.InterledgerErrorCode.R00_TRANSFER_TIMED_OUT; +import static org.interledger.core.InterledgerErrorCode.R01_INSUFFICIENT_SOURCE_AMOUNT; +import static org.interledger.core.InterledgerErrorCode.R02_INSUFFICIENT_TIMEOUT; +import static org.interledger.core.InterledgerErrorCode.R99_APPLICATION_ERROR; +import static org.interledger.core.InterledgerErrorCode.T00_INTERNAL_ERROR; +import static org.interledger.core.InterledgerErrorCode.T01_PEER_UNREACHABLE; +import static org.interledger.core.InterledgerErrorCode.T02_PEER_BUSY; +import static org.interledger.core.InterledgerErrorCode.T03_CONNECTOR_BUSY; +import static org.interledger.core.InterledgerErrorCode.T04_INSUFFICIENT_LIQUIDITY; +import static org.interledger.core.InterledgerErrorCode.T05_RATE_LIMITED; +import static org.interledger.core.InterledgerErrorCode.T99_APPLICATION_ERROR; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -54,6 +76,7 @@ import java.io.IOException; import java.time.Duration; import java.time.Instant; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -101,7 +124,7 @@ public void setUp() { when(congestionControllerMock.getMaxAmount()).thenReturn(UnsignedLong.ONE); when(streamEncryptionServiceMock.encrypt(any(), any())).thenReturn(new byte[32]); when(streamEncryptionServiceMock.decrypt(any(), any())).thenReturn(new byte[32]); - when(linkMock.sendPacket(any())).thenReturn(mock(InterledgerRejectPacket.class)); + when(linkMock.sendPacket(any())).thenReturn(sampleRejectPacket(F99_APPLICATION_ERROR)); ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); this.paymentTracker = new FixedSenderAmountPaymentTracker(originalAmountToSend, new NoOpExchangeRateCalculator()); SendMoneyRequest request = SendMoneyRequest.builder() @@ -115,7 +138,8 @@ public void setUp() { .build(); this.sendMoneyAggregator = new SendMoneyAggregator( executor, streamConnectionMock, streamCodecContextMock, linkMock, congestionControllerMock, - streamEncryptionServiceMock, request, Optional.empty()); + streamEncryptionServiceMock, Duration.ofMillis(10L), request + ); defaultPrepareAmounts = PrepareAmounts.from(samplePreparePacket(), sampleStreamPacket()); } @@ -225,13 +249,14 @@ public void failureToSchedulePutsMoneyBack() { ExecutorService executor = mock(ExecutorService.class); this.sendMoneyAggregator = new SendMoneyAggregator( executor, streamConnectionMock, streamCodecContextMock, linkMock, congestionControllerMock, - streamEncryptionServiceMock, request, Optional.empty()); + streamEncryptionServiceMock, Duration.ofSeconds(10L), request + ); when(executor.submit(any(Runnable.class))).thenThrow(new RejectedExecutionException()); InterledgerPreparePacket prepare = samplePreparePacket(); InterledgerRejectPacket expectedReject = InterledgerRejectPacket.builder() - .code(InterledgerErrorCode.F00_BAD_REQUEST) + .code(F00_BAD_REQUEST) .message( String.format("Unable to schedule sendMoney task. preparePacket=%s error=%s", prepare, "java.util.concurrent.RejectedExecutionException") @@ -263,7 +288,7 @@ public void preflightCheckFindsNoDenomination() throws Exception { @Test public void preflightCheckRejects() throws Exception { when(streamConnectionMock.nextSequence()).thenReturn(UnsignedLong.ONE); - when(linkMock.sendPacket(any())).thenReturn(sampleRejectPacket(InterledgerErrorCode.T00_INTERNAL_ERROR)); + when(linkMock.sendPacket(any())).thenReturn(sampleRejectPacket(T00_INTERNAL_ERROR)); StreamPacket streamPacket = StreamPacket.builder().from(sampleStreamPacket()) .addFrames(StreamMoneyFrame.builder() .shares(UnsignedLong.ONE) @@ -313,8 +338,8 @@ public void soldierOn() { allSoldierOnsTrue(); // flip flag on unrecoverable error - when(linkMock.sendPacket(any())).thenReturn(sampleRejectPacket(InterledgerErrorCode.F00_BAD_REQUEST)); - sendMoneyAggregator.sendPacketAndCheckForFailure(samplePreparePacket()); + sendMoneyAggregator.setUnrecoverableErrorEncountered(true); + setSoldierOnBooleans(false, false, false); allSoldierOnsFalse(); setSoldierOnBooleans(false, false, true); @@ -342,7 +367,7 @@ public void breakLoopToPreventOversend() throws Exception { .amountToSend(UnsignedLong.valueOf(10L)) .minimumAmountToAccept(UnsignedLong.ZERO) .build(); - when(tracker.getSendPacketAmounts(any(), any(), any())).thenReturn(prepare); + when(tracker.getSendPacketAmounts(any(), any())).thenReturn(prepare); when(tracker.auth(any())).thenReturn(false); when(tracker.getOriginalAmountMode()).thenReturn(SenderAmountMode.SENDER_AMOUNT); when(tracker.moreToSend()).thenReturn(true); @@ -363,10 +388,10 @@ public void breakLoopToPreventOversend() throws Exception { .build(); ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); - this.sendMoneyAggregator = new SendMoneyAggregator( executor, streamConnectionMock, streamCodecContextMock, linkMock, congestionControllerMock, - streamEncryptionServiceMock, request, Optional.empty()); + streamEncryptionServiceMock, Duration.ofMillis(10L), request + ); setSoldierOnBooleans(false, false, true); when(streamConnectionMock.nextSequence()).thenReturn(UnsignedLong.ONE); @@ -394,7 +419,7 @@ public void fromEncrypted() throws Exception { public void handleRejectHatesNullPrepare() { expectedException.expect(NullPointerException.class); sendMoneyAggregator.handleReject(null, sampleStreamPacket(), - sampleRejectPacket(InterledgerErrorCode.T00_INTERNAL_ERROR), defaultPrepareAmounts, new AtomicInteger(), + sampleRejectPacket(T00_INTERNAL_ERROR), defaultPrepareAmounts, new AtomicInteger(), congestionControllerMock); } @@ -402,7 +427,7 @@ public void handleRejectHatesNullPrepare() { public void handleRejectHatesNullStreamPacket() { expectedException.expect(NullPointerException.class); sendMoneyAggregator.handleReject(samplePreparePacket(), null, - sampleRejectPacket(InterledgerErrorCode.T00_INTERNAL_ERROR), defaultPrepareAmounts, new AtomicInteger(), + sampleRejectPacket(T00_INTERNAL_ERROR), defaultPrepareAmounts, new AtomicInteger(), congestionControllerMock); } @@ -417,7 +442,7 @@ public void handleRejectHatesNullReject() { public void handleRejectHatesNullNumReject() { expectedException.expect(NullPointerException.class); sendMoneyAggregator - .handleReject(null, sampleStreamPacket(), sampleRejectPacket(InterledgerErrorCode.T00_INTERNAL_ERROR), + .handleReject(null, sampleStreamPacket(), sampleRejectPacket(T00_INTERNAL_ERROR), null, null, congestionControllerMock); } @@ -425,7 +450,7 @@ public void handleRejectHatesNullNumReject() { public void handleRejectHatesNullCongestionController() { expectedException.expect(NullPointerException.class); sendMoneyAggregator.handleReject(null, sampleStreamPacket(), - sampleRejectPacket(InterledgerErrorCode.T00_INTERNAL_ERROR), defaultPrepareAmounts, new AtomicInteger(), + sampleRejectPacket(T00_INTERNAL_ERROR), defaultPrepareAmounts, new AtomicInteger(), null); } @@ -433,7 +458,7 @@ public void handleRejectHatesNullCongestionController() { public void handleRejectHatesNullPrepareAmounts() { expectedException.expect(NullPointerException.class); sendMoneyAggregator.handleReject(null, sampleStreamPacket(), - sampleRejectPacket(InterledgerErrorCode.T00_INTERNAL_ERROR), null, new AtomicInteger(), + sampleRejectPacket(T00_INTERNAL_ERROR), null, new AtomicInteger(), congestionControllerMock); } @@ -442,7 +467,7 @@ public void handleReject() { UnsignedLong originalAmountToSend = paymentTracker.getOriginalAmountLeft(); AtomicInteger numReject = new AtomicInteger(0); InterledgerPreparePacket prepare = samplePreparePacket(); - InterledgerRejectPacket reject = sampleRejectPacket(InterledgerErrorCode.T00_INTERNAL_ERROR); + InterledgerRejectPacket reject = sampleRejectPacket(T00_INTERNAL_ERROR); sendMoneyAggregator.handleReject(prepare, sampleStreamPacket(), reject, defaultPrepareAmounts, numReject, congestionControllerMock); assertThat(numReject.get()).isEqualTo(1); @@ -451,23 +476,112 @@ public void handleReject() { } @Test - public void sendPacketAndCheckForFailureMarksUnrecoverableForF00() { - when(linkMock.sendPacket(any())).thenReturn(sampleRejectPacket(InterledgerErrorCode.F00_BAD_REQUEST)); - assertThat(sendMoneyAggregator.isUnrecoverableErrorEncountered()).isFalse(); - sendMoneyAggregator.sendPacketAndCheckForFailure(samplePreparePacket()); - assertThat(sendMoneyAggregator.isUnrecoverableErrorEncountered()).isTrue(); + public void handleRejectWithVariousErrorCodes() { + // F Errors + handleRejectTestHelper(F00_BAD_REQUEST, true); + handleRejectTestHelper(InterledgerErrorCode.F01_INVALID_PACKET, true); + handleRejectTestHelper(InterledgerErrorCode.F02_UNREACHABLE, true); + handleRejectTestHelper(InterledgerErrorCode.F03_INVALID_AMOUNT, true); + handleRejectTestHelper(InterledgerErrorCode.F04_INSUFFICIENT_DST_AMOUNT, true); + handleRejectTestHelper(InterledgerErrorCode.F05_WRONG_CONDITION, true); + handleRejectTestHelper(InterledgerErrorCode.F06_UNEXPECTED_PAYMENT, true); + handleRejectTestHelper(InterledgerErrorCode.F07_CANNOT_RECEIVE, true); + handleRejectTestHelper(InterledgerErrorCode.F08_AMOUNT_TOO_LARGE, false); + handleRejectTestHelper(InterledgerErrorCode.F99_APPLICATION_ERROR, false); + + // T Errors + handleRejectTestHelper(T00_INTERNAL_ERROR, false); + handleRejectTestHelper(InterledgerErrorCode.T01_PEER_UNREACHABLE, false); + handleRejectTestHelper(InterledgerErrorCode.T02_PEER_BUSY, false); + handleRejectTestHelper(InterledgerErrorCode.T03_CONNECTOR_BUSY, false); + handleRejectTestHelper(InterledgerErrorCode.T04_INSUFFICIENT_LIQUIDITY, false); + handleRejectTestHelper(InterledgerErrorCode.T05_RATE_LIMITED, false); + handleRejectTestHelper(InterledgerErrorCode.T99_APPLICATION_ERROR, false); + + // R Errors + handleRejectTestHelper(InterledgerErrorCode.R00_TRANSFER_TIMED_OUT, true); + handleRejectTestHelper(InterledgerErrorCode.R01_INSUFFICIENT_SOURCE_AMOUNT, true); + handleRejectTestHelper(InterledgerErrorCode.R02_INSUFFICIENT_TIMEOUT, true); + handleRejectTestHelper(InterledgerErrorCode.R99_APPLICATION_ERROR, true); + } + + private void handleRejectTestHelper( + final InterledgerErrorCode errorCodeToTest, final boolean expectedUnrecoverableState + ) { + Objects.requireNonNull(errorCodeToTest); + + /////// + // Test Initialization + sendMoneyAggregator.setUnrecoverableErrorEncountered(false); + UnsignedLong originalAmountToSend = paymentTracker.getOriginalAmountLeft(); + AtomicInteger numReject = new AtomicInteger(0); + InterledgerPreparePacket prepare = samplePreparePacket(); + InterledgerRejectPacket reject = sampleRejectPacket(errorCodeToTest); + + /////// + // Do test + sendMoneyAggregator.handleReject( + prepare, sampleStreamPacket(), reject, defaultPrepareAmounts, numReject, congestionControllerMock + ); + + /////// + // Assertions + assertThat(numReject.get()).isEqualTo(1); + assertThat(paymentTracker.getOriginalAmountLeft()).isEqualTo(originalAmountToSend.plus(prepare.getAmount())); + verify(congestionControllerMock, times(1)).reject(UnsignedLong.ONE, reject); + assertThat(sendMoneyAggregator.isUnrecoverableErrorEncountered()).isEqualTo(expectedUnrecoverableState); + } + + @Test + public void testCheckForAndTriggerUnrecoverableError() { + // F Errors + testCheckForAndTriggerUnrecoverableErrorHelper(F00_BAD_REQUEST, true); + testCheckForAndTriggerUnrecoverableErrorHelper(F01_INVALID_PACKET, true); + testCheckForAndTriggerUnrecoverableErrorHelper(F02_UNREACHABLE, true); + testCheckForAndTriggerUnrecoverableErrorHelper(F03_INVALID_AMOUNT, true); + testCheckForAndTriggerUnrecoverableErrorHelper(F04_INSUFFICIENT_DST_AMOUNT, true); + testCheckForAndTriggerUnrecoverableErrorHelper(F05_WRONG_CONDITION, true); + testCheckForAndTriggerUnrecoverableErrorHelper(F06_UNEXPECTED_PAYMENT, true); + testCheckForAndTriggerUnrecoverableErrorHelper(F07_CANNOT_RECEIVE, true); + testCheckForAndTriggerUnrecoverableErrorHelper(F08_AMOUNT_TOO_LARGE, false); + testCheckForAndTriggerUnrecoverableErrorHelper(F99_APPLICATION_ERROR, false); + + // T Errors + testCheckForAndTriggerUnrecoverableErrorHelper(T00_INTERNAL_ERROR, false); + testCheckForAndTriggerUnrecoverableErrorHelper(T01_PEER_UNREACHABLE, false); + testCheckForAndTriggerUnrecoverableErrorHelper(T02_PEER_BUSY, false); + testCheckForAndTriggerUnrecoverableErrorHelper(T03_CONNECTOR_BUSY, false); + testCheckForAndTriggerUnrecoverableErrorHelper(T04_INSUFFICIENT_LIQUIDITY, false); + testCheckForAndTriggerUnrecoverableErrorHelper(T05_RATE_LIMITED, false); + testCheckForAndTriggerUnrecoverableErrorHelper(T99_APPLICATION_ERROR, false); + + // R Errors + testCheckForAndTriggerUnrecoverableErrorHelper(R00_TRANSFER_TIMED_OUT, true); + testCheckForAndTriggerUnrecoverableErrorHelper(R01_INSUFFICIENT_SOURCE_AMOUNT, true); + testCheckForAndTriggerUnrecoverableErrorHelper(R02_INSUFFICIENT_TIMEOUT, true); + testCheckForAndTriggerUnrecoverableErrorHelper(R99_APPLICATION_ERROR, true); + } + + private void testCheckForAndTriggerUnrecoverableErrorHelper( + final InterledgerErrorCode errorCodeToTest, final boolean expectedUnrecoverableState + ) { + Objects.requireNonNull(errorCodeToTest); + InterledgerRejectPacket reject = sampleRejectPacket(errorCodeToTest); + sendMoneyAggregator.setUnrecoverableErrorEncountered(false); + sendMoneyAggregator.checkForAndTriggerUnrecoverableError(reject); + assertThat(sendMoneyAggregator.isUnrecoverableErrorEncountered()).isEqualTo(expectedUnrecoverableState); } @Test public void preflightCheckFlagsAsUnrecoverable() throws Exception { when(streamConnectionMock.nextSequence()).thenReturn(UnsignedLong.ONE); - when(linkMock.sendPacket(any())).thenReturn(sampleRejectPacket(InterledgerErrorCode.F00_BAD_REQUEST)); + when(linkMock.sendPacket(any())).thenReturn(sampleRejectPacket(F00_BAD_REQUEST)); StreamPacket streamPacket = StreamPacket.builder().from(sampleStreamPacket()) - .addFrames(StreamMoneyFrame.builder() - .shares(UnsignedLong.ONE) - .streamId(UnsignedLong.ONE) - .build()) - .build(); + .addFrames(StreamMoneyFrame.builder() + .shares(UnsignedLong.ONE) + .streamId(UnsignedLong.ONE) + .build()) + .build(); when(streamCodecContextMock.read(any(), any())).thenReturn(streamPacket); assertThat(sendMoneyAggregator.isUnrecoverableErrorEncountered()).isFalse(); sendMoneyAggregator.preflightCheck(); @@ -477,21 +591,23 @@ public void preflightCheckFlagsAsUnrecoverable() throws Exception { @Test public void stopSendingWhenUnrecoverableErrorEncountered() throws Exception { SendMoneyRequest request = SendMoneyRequest.builder() - .sharedSecret(sharedSecret) - .sourceAddress(sourceAddress) - .senderAmountMode(SenderAmountMode.SENDER_AMOUNT) - .destinationAddress(destinationAddress) - .amount(originalAmountToSend) - .timeout(Optional.of(Duration.ofSeconds(60))) - .denomination(Denominations.XRP) - .paymentTracker(new FixedSenderAmountPaymentTracker(UnsignedLong.valueOf(10l), new NoOpExchangeRateCalculator())) - .build(); + .sharedSecret(sharedSecret) + .sourceAddress(sourceAddress) + .destinationAddress(destinationAddress) + .amount(originalAmountToSend) + .timeout(Optional.of(Duration.ofSeconds(60))) + .denomination(Denominations.XRP) + .paymentTracker( + new FixedSenderAmountPaymentTracker(UnsignedLong.valueOf(10l), new NoOpExchangeRateCalculator()) + ) + .build(); ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); this.sendMoneyAggregator = new SendMoneyAggregator( - executor, streamConnectionMock, streamCodecContextMock, linkMock, congestionControllerMock, - streamEncryptionServiceMock, request, Optional.empty()); + executor, streamConnectionMock, streamCodecContextMock, linkMock, congestionControllerMock, + streamEncryptionServiceMock, Duration.ofMillis(10L), request + ); when(congestionControllerMock.hasInFlight()).thenAnswer(new Answer() { @@ -515,26 +631,26 @@ public InterledgerResponsePacket answer(InvocationOnMock invocationOnMock) throw if (invocations.incrementAndGet() <= 2) { return sampleFulfillPacket(); } - return sampleRejectPacket(InterledgerErrorCode.F00_BAD_REQUEST); + return sampleRejectPacket(F00_BAD_REQUEST); } }); StreamPacket streamPacket = StreamPacket.builder() - .prepareAmount(UnsignedLong.ONE) - .sequence(UnsignedLong.ZERO) - .interledgerPacketType(InterledgerPacketType.FULFILL) - .addFrames(StreamMoneyFrame.builder() - .shares(UnsignedLong.ONE) - .streamId(UnsignedLong.ONE) - .build()) - .build(); + .prepareAmount(UnsignedLong.ONE) + .sequence(UnsignedLong.ZERO) + .interledgerPacketType(InterledgerPacketType.FULFILL) + .addFrames(StreamMoneyFrame.builder() + .shares(UnsignedLong.ONE) + .streamId(UnsignedLong.ONE) + .build()) + .build(); when(streamCodecContextMock.read(any(), any())).thenReturn(streamPacket); assertThat(sendMoneyAggregator.isUnrecoverableErrorEncountered()).isFalse(); SendMoneyResult result = sendMoneyAggregator.send().get(); assertThat(sendMoneyAggregator.isUnrecoverableErrorEncountered()).isTrue(); assertThat(result) - .extracting("amountDelivered", "amountSent", "numFulfilledPackets", - "successfulPayment") - .containsExactly(UnsignedLong.ONE, UnsignedLong.ONE, 1, false); + .extracting("amountDelivered", "amountSent", "numFulfilledPackets", + "successfulPayment") + .containsExactly(UnsignedLong.ONE, UnsignedLong.ONE, 1, false); // depending on execution order, we may halt sooner than trying all 10 potential times assertThat(result.numRejectPackets()).isLessThanOrEqualTo(9); assertThat(result.amountLeftToSend()).isLessThanOrEqualTo(UnsignedLong.valueOf(9)); @@ -543,31 +659,32 @@ public InterledgerResponsePacket answer(InvocationOnMock invocationOnMock) throw } /** - * Test that if there is a delay between when a packet is schedule and when the executor sends the packet, - * that the prepare packet expiresAt is calculated just before sending (not before scheduling). + * Test that if there is a delay between when a packet is schedule and when the executor sends the packet, that the + * prepare packet expiresAt is calculated just before sending (not before scheduling). */ @Test public void packetExpiryIsComputedJustBeforeSending() throws InterruptedException { SendMoneyRequest request = SendMoneyRequest.builder() .sharedSecret(sharedSecret) .sourceAddress(sourceAddress) - .senderAmountMode(SenderAmountMode.SENDER_AMOUNT) .destinationAddress(destinationAddress) .amount(originalAmountToSend) .timeout(Optional.of(Duration.ofSeconds(60))) .denomination(Denominations.XRP) - .paymentTracker(new FixedSenderAmountPaymentTracker(UnsignedLong.valueOf(10l), new NoOpExchangeRateCalculator())) + .paymentTracker( + new FixedSenderAmountPaymentTracker(UnsignedLong.valueOf(10l), new NoOpExchangeRateCalculator())) .build(); // use a SleepyExecutorService with a non-trivial sleep so that we can verify that expiresAt is calculated // AFTER the scheduler woke up to process the packet. long scheduleDelayMillis = 100; ExecutorService executor = new SleepyExecutorService(Executors.newFixedThreadPool(1), scheduleDelayMillis); - Instant minExpectedExpiresAt = DateUtils.now().plusMillis(scheduleDelayMillis); + Instant minExpectedExpiresAt = DateUtils.now().plusMillis(scheduleDelayMillis); this.sendMoneyAggregator = new SendMoneyAggregator( executor, streamConnectionMock, streamCodecContextMock, linkMock, congestionControllerMock, - streamEncryptionServiceMock, request, Optional.empty()); + streamEncryptionServiceMock, Duration.ofMillis(10L), request + ); sendMoneyAggregator.schedule(new AtomicBoolean(false), () -> samplePreparePacket(), sampleStreamPacket(), diff --git a/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/SimpleStreamSenderIT.java b/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/SimpleStreamSenderIT.java index 25fe1dd2..afc5edb1 100644 --- a/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/SimpleStreamSenderIT.java +++ b/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/SimpleStreamSenderIT.java @@ -52,7 +52,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Optional; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -158,9 +158,7 @@ public void setUp() throws IOException { public void sendMoneySinglePacket() { final UnsignedLong paymentAmount = UnsignedLong.valueOf(1000); - StreamSender streamSender = new SimpleStreamSender( - new JavaxStreamEncryptionService(), link - ); + StreamSender streamSender = new SimpleStreamSender(link); final StreamConnectionDetails connectionDetails = getStreamConnectionDetails(1000000); @@ -191,9 +189,7 @@ public void sendMoneySinglePacket() { public void sendSmallPayment() { final UnsignedLong paymentAmount = UnsignedLong.valueOf(100); - StreamSender streamSender = new SimpleStreamSender( - new JavaxStreamEncryptionService(), link - ); + StreamSender streamSender = new SimpleStreamSender(link); final StreamConnectionDetails connectionDetails = getStreamConnectionDetails("sendSmallPayment"); @@ -227,9 +223,7 @@ public void sendMoneyOnSameConnectionInParallel() { final int numExecutions = 10; final UnsignedLong paymentAmount = UnsignedLong.valueOf(100000L); - StreamSender streamSender = new SimpleStreamSender( - new JavaxStreamEncryptionService(), link - ); + StreamSender streamSender = new SimpleStreamSender(link); final StreamConnectionDetails connectionDetails = getStreamConnectionDetails(1000000); @@ -291,9 +285,7 @@ public void sendMoneyOnSameConnectionInParallel() { public void sendMoneyMultiPacket() { final UnsignedLong paymentAmount = UnsignedLong.valueOf(100000); - StreamSender streamSender = new SimpleStreamSender( - new JavaxStreamEncryptionService(), link - ); + StreamSender streamSender = new SimpleStreamSender(link); final StreamConnectionDetails connectionDetails = getStreamConnectionDetails(1000001); @@ -318,37 +310,37 @@ public void sendMoneyMultiPacket() { /** * Two calls to {@link SimpleStreamSender#sendMoney(SendMoneyRequest)}} that involves multiple packets in parallel. - * First call is to a {@link SimpleStreamSender} with the default sleep time (100ms) - * Second call is to a {@link SimpleStreamSender} with a shorter sleep time (5ms) + * First call is to a {@link SimpleStreamSender} with the default sleep time (100ms) Second call is to a {@link + * SimpleStreamSender} with a shorter sleep time (5ms) */ @Test public void sendMoneyMultiPacketDifferentSleepTimes() { - final SendMoneyResult heavySleeperResult = sendMoneyWithConfiguredSleep(Optional.of(UnsignedLong.valueOf(100)), 1000001); - final SendMoneyResult lightSleeperResult = sendMoneyWithConfiguredSleep(Optional.of(UnsignedLong.valueOf(5)), 1000002); + final SendMoneyResult heavySleeperResult = sendMoneyWithConfiguredSleep(Duration.ofMillis(100), 1000001); + final SendMoneyResult lightSleeperResult = sendMoneyWithConfiguredSleep(Duration.ofMillis(5), 1000002); - logger.info("Heavy sleeper took {} to send {} packets.", heavySleeperResult.sendMoneyDuration(), heavySleeperResult.totalPackets()); - logger.info("Light sleeper took {} to send {} packets.", lightSleeperResult.sendMoneyDuration(), lightSleeperResult.totalPackets()); + logger.info("Heavy sleeper took {} to send {} packets.", heavySleeperResult.sendMoneyDuration(), + heavySleeperResult.totalPackets()); + logger.info("Light sleeper took {} to send {} packets.", lightSleeperResult.sendMoneyDuration(), + lightSleeperResult.totalPackets()); assertThat(heavySleeperResult.sendMoneyDuration()).isGreaterThan(lightSleeperResult.sendMoneyDuration()); } - private SendMoneyResult sendMoneyWithConfiguredSleep(Optional sleepTime, int streamConnectionId) { + private SendMoneyResult sendMoneyWithConfiguredSleep(Duration sleepTimeDuration, int streamConnectionId) { final UnsignedLong paymentAmount = UnsignedLong.valueOf(100000); - StreamSender heavySleeperSender = new SimpleStreamSender( - new JavaxStreamEncryptionService(), link, sleepTime - ); + StreamSender heavySleeperSender = new SimpleStreamSender(link, sleepTimeDuration); final StreamConnectionDetails connectionDetails = getStreamConnectionDetails(streamConnectionId); final SendMoneyResult heavySleeperResult = heavySleeperSender.sendMoney( - SendMoneyRequest.builder() - .sourceAddress(SENDER_ADDRESS) - .amount(paymentAmount) - .denomination(Denominations.XRP) - .destinationAddress(connectionDetails.destinationAddress()) - .sharedSecret(connectionDetails.sharedSecret()) - .paymentTracker(new FixedSenderAmountPaymentTracker(paymentAmount, new NoOpExchangeRateCalculator())) - .build() + SendMoneyRequest.builder() + .sourceAddress(SENDER_ADDRESS) + .amount(paymentAmount) + .denomination(Denominations.XRP) + .destinationAddress(connectionDetails.destinationAddress()) + .sharedSecret(connectionDetails.sharedSecret()) + .paymentTracker(new FixedSenderAmountPaymentTracker(paymentAmount, new NoOpExchangeRateCalculator())) + .build() ).join(); assertThat(heavySleeperResult.amountDelivered()).isEqualTo(paymentAmount); @@ -356,7 +348,7 @@ private SendMoneyResult sendMoneyWithConfiguredSleep(Optional slee assertThat(heavySleeperResult.numFulfilledPackets()).isCloseTo(8, Offset.offset(1)); assertThat(heavySleeperResult.numRejectPackets()).isEqualTo(0); - logger.info("Payment Sent via sender with sleep = {} : {}", sleepTime.orElse(UnsignedLong.valueOf(100)), heavySleeperResult); + logger.info("Payment Sent via sender with sleep = {} : {}", sleepTimeDuration.toMillis(), heavySleeperResult); return heavySleeperResult; } @@ -415,7 +407,8 @@ public void sendMoneyHonorsTimeout() { // using a sleepy executor here to make sure race condition is handled properly where timeout is reached // after submitting a sendPacketized task to the executor but before the task is executed StreamSender streamSender = new SimpleStreamSender( - new JavaxStreamEncryptionService(), link, new SleepyExecutorService(Executors.newFixedThreadPool(5), 5) + link, Duration.ofMillis(10L), new JavaxStreamEncryptionService(), new StreamConnectionManager(), + new SleepyExecutorService(Executors.newFixedThreadPool(5), 5) ); String username = "sendMoneyHonorsTimeout"; @@ -438,7 +431,8 @@ public void sendMoneyHonorsTimeout() { .denomination(Denominations.XRP) .destinationAddress(connectionDetails.destinationAddress()) .sharedSecret(connectionDetails.sharedSecret()) - .paymentTracker(new FixedSenderAmountPaymentTracker(paymentAmount, new NoOpExchangeRateCalculator())) + .paymentTracker( + new FixedSenderAmountPaymentTracker(paymentAmount, new NoOpExchangeRateCalculator())) .timeout(Duration.ofMillis(10 + i * 10)) .build() ).get(); @@ -458,9 +452,7 @@ public void sendMoneyHonorsTimeout() { public void sendFailsIfNoExchangeRate() throws Throwable { final UnsignedLong paymentAmount = UnsignedLong.valueOf(1000); - StreamSender streamSender = new SimpleStreamSender( - new JavaxStreamEncryptionService(), link - ); + StreamSender streamSender = new SimpleStreamSender(link); String username = "sendFailsIfNoExchangeRate"; InterledgerAddress address = HOST_ADDRESS.with(username); @@ -575,9 +567,7 @@ public void sendMoneyWithWrongLinkPassword() throws IOException { .build()); final UnsignedLong paymentAmount = UnsignedLong.valueOf(1000); - final StreamSender streamSender = new SimpleStreamSender( - new JavaxStreamEncryptionService(), link - ); + final StreamSender streamSender = new SimpleStreamSender(link); final StreamConnectionDetails connectionDetails = getStreamConnectionDetails(1000000); streamSender.sendMoney( @@ -593,6 +583,39 @@ public void sendMoneyWithWrongLinkPassword() throws IOException { ).whenComplete(($, error) -> assertThat(error).isNotNull()); } + /** + * This test sends a payment in packets that are destined for a receiver that doesn't exist. This will cause an + * F02_UNREACHABLE reject packet to be returned during the preflightCheck, which should cause the soldierOn loop to + * short circuit and not try to send any packets. + */ + @Test + public void sendMoneyFailsWithFinalErrorAndShortCircuits() { + final UnsignedLong paymentAmount = UnsignedLong.valueOf(1000000); + + StreamSender streamSender = new SimpleStreamSender(link); + + final StreamConnectionDetails connectionDetails = getStreamConnectionDetails(1000001); + + final SendMoneyResult sendMoneyResult = streamSender.sendMoney( + SendMoneyRequest.builder() + .sourceAddress(SENDER_ADDRESS) + .amount(paymentAmount) + .denomination(Denominations.XRP) + .destinationAddress(InterledgerAddress.of("test.foo.bar.patrick")) + .sharedSecret(connectionDetails.sharedSecret()) + .paymentTracker(new FixedSenderAmountPaymentTracker(paymentAmount, new NoOpExchangeRateCalculator())) + .build() + ).join(); + + logger.info("SendMoneyResult: " + sendMoneyResult); + // preflightCheck should trip the circuit and cause streamSender to not even try to send any packets. + assertThat(sendMoneyResult.totalPackets()).isEqualTo(0); + } + + ////////////////// + // Private Helpers + ////////////////// + private StreamConnectionDetails getStreamConnectionDetails(int id) { return getStreamConnectionDetails("accountTest" + id); } @@ -620,7 +643,7 @@ private SendMoneyResult sendMoney(UnsignedLong paymentAmount, int taskId, Thread final StreamConnectionDetails connectionDetails = getStreamConnectionDetails(taskId); StreamSender streamSender = new SimpleStreamSender( - new JavaxStreamEncryptionService(), link, executor + link, Duration.ofMillis(10L), new JavaxStreamEncryptionService(), new StreamConnectionManager(), executor ); final SendMoneyResult sendMoneyResult = streamSender.sendMoney( @@ -701,14 +724,17 @@ private OkHttpClient constructOkHttpClient() { static class CrankyExchangeRateCalculator implements ExchangeRateCalculator { @Override - public UnsignedLong calculateAmountToSend(UnsignedLong amountToReceive, Denomination sendDenomination, - Denomination receiveDenomination) { + public UnsignedLong calculateAmountToSend(UnsignedLong amountToSend, Denomination amountToSendDenomination, + Denomination receiverDenomination) { throw new NoExchangeRateException("no exchanges allowed"); } @Override - public UnsignedLong calculateMinAmountToAccept(UnsignedLong sendAmount, Denomination sendDenomination, - Optional expectedReceivedDenomination) throws NoExchangeRateException { + public UnsignedLong calculateMinAmountToAccept( + final UnsignedLong sendAmount, final Denomination sendAmountDenomination + ) { + Objects.requireNonNull(sendAmount); + Objects.requireNonNull(sendAmountDenomination); throw new NoExchangeRateException("no exchanges allowed"); } } @@ -716,14 +742,17 @@ public UnsignedLong calculateMinAmountToAccept(UnsignedLong sendAmount, Denomina static class GreedyExchangeRateCalculator implements ExchangeRateCalculator { @Override - public UnsignedLong calculateAmountToSend(UnsignedLong amountToReceive, Denomination sendDenomination, - Denomination receiveDenomination) { - return amountToReceive.plus(UnsignedLong.ONE); + public UnsignedLong calculateAmountToSend(UnsignedLong amountToSend, Denomination amountToSendDenomination, + Denomination receiverDenomination) { + return amountToSend.plus(UnsignedLong.ONE); } @Override - public UnsignedLong calculateMinAmountToAccept(UnsignedLong sendAmount, Denomination sendDenomination, - Optional expectedReceivedDenomination) throws NoExchangeRateException { + public UnsignedLong calculateMinAmountToAccept( + final UnsignedLong sendAmount, final Denomination sendAmountDenomination + ) { + Objects.requireNonNull(sendAmount); + Objects.requireNonNull(sendAmountDenomination); return sendAmount.plus(UnsignedLong.ONE); } } diff --git a/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/SimpleStreamSenderTests.java b/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/SimpleStreamSenderTests.java index 40aa3eed..b907ccc6 100644 --- a/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/SimpleStreamSenderTests.java +++ b/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/SimpleStreamSenderTests.java @@ -14,11 +14,10 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import java.util.Optional; -import java.util.concurrent.ExecutorService; +import java.time.Duration; /** - * Unit tests for {@link SimpleStreamSenderTests}. + * Unit tests for {@link SimpleStreamSender}. */ public class SimpleStreamSenderTests { @@ -36,43 +35,39 @@ public class SimpleStreamSenderTests { @Before public void setUp() { MockitoAnnotations.initMocks(this); - simpleStreamSender = new SimpleStreamSender(streamEncryptionServiceMock, linkMock); - } - - @Test - public void constructWithNullEncryptionService() { - expectedException.expect(NullPointerException.class); - new SimpleStreamSender(null, linkMock); + simpleStreamSender = new SimpleStreamSender(linkMock, Duration.ofMillis(10L), streamEncryptionServiceMock); } @Test public void constructWithNullLink() { expectedException.expect(NullPointerException.class); - new SimpleStreamSender(streamEncryptionServiceMock, null); + new SimpleStreamSender(null); } @Test - public void constructThreeArgWithNullEncryptionService() { + public void constructWithNullDurationo() { expectedException.expect(NullPointerException.class); - new SimpleStreamSender(null, linkMock, mock(ExecutorService.class)); + new SimpleStreamSender(linkMock, null); } @Test - public void constructThreeArgWithNullLink() { + public void constructWithNullEncryptionService() { expectedException.expect(NullPointerException.class); - new SimpleStreamSender(streamEncryptionServiceMock, null, mock(ExecutorService.class)); + new SimpleStreamSender(linkMock, Duration.ofMillis(10L), null); } @Test - public void constructThreeArgWithNullExecutor() { + public void constructWithNullConnectionManager() { expectedException.expect(NullPointerException.class); - new SimpleStreamSender(streamEncryptionServiceMock, linkMock, (ExecutorService) null); + new SimpleStreamSender(linkMock, Duration.ofMillis(10L), streamEncryptionServiceMock, null); } @Test - public void constructThreeArgWithNullSleep() { + public void constructThreeArgWithNullExecutor() { expectedException.expect(NullPointerException.class); - new SimpleStreamSender(streamEncryptionServiceMock, linkMock, (Optional) null); + new SimpleStreamSender( + linkMock, Duration.ofMillis(10L), streamEncryptionServiceMock, mock(StreamConnectionManager.class), null + ); } @Test diff --git a/stream-parent/stream-core/src/main/java/org/interledger/stream/PaymentTracker.java b/stream-parent/stream-core/src/main/java/org/interledger/stream/PaymentTracker.java index aeb0def5..b1166a25 100644 --- a/stream-parent/stream-core/src/main/java/org/interledger/stream/PaymentTracker.java +++ b/stream-parent/stream-core/src/main/java/org/interledger/stream/PaymentTracker.java @@ -2,8 +2,6 @@ import com.google.common.primitives.UnsignedLong; -import java.util.Optional; - /** * Defines how to track a payment while considering the amount sent vs amount received, allowing room for * path-exchange-rate fluctuations and implementation-defined rules relating to whether or not to continue a payment. @@ -50,6 +48,19 @@ public interface PaymentTracker { */ UnsignedLong getDeliveredAmountInReceiverUnits(); + /** + * Computes the values that should be used to Prepare a Stream packet, based upon the supplied inputs. + * + * @param congestionLimit An {@link UnsignedLong} representing the number of units that the congestion controller + * has indicated should be sent. + * @param senderDenomination A {@link Denomination} representing the asset information for the sender, in order to + * compute path-exchange-rates. + * + * @return A {@link PrepareAmounts} object that contains the correct information to use in the next ILPv4 and Stream + * packets as part of a packetized payment flow in STREAM. + */ + PrepareAmounts getSendPacketAmounts(UnsignedLong congestionLimit, Denomination senderDenomination); + /** * Computes the values that should be used to Prepare a Stream packet, based upon the supplied inputs. * @@ -64,7 +75,7 @@ public interface PaymentTracker { * packets as part of a packetized payment flow in STREAM. */ PrepareAmounts getSendPacketAmounts( - UnsignedLong congestionLimit, Denomination senderDenomination, Optional receiverDenomination + UnsignedLong congestionLimit, Denomination senderDenomination, Denomination receiverDenomination ); /** @@ -91,8 +102,8 @@ PrepareAmounts getSendPacketAmounts( /** * Finalize and commit any tracking values for a packetized payment using the values in {@code prepareAmounts}. * - * @param prepareAmounts A {@link PrepareAmounts} that contains discrete ILPv4 and Stream packet amounts for an - * individual Prepare request. + * @param prepareAmounts A {@link PrepareAmounts} that contains discrete ILPv4 and Stream packet amounts for an + * individual Prepare request. * @param deliveredAmount A {@link UnsignedLong} that contains the amount delivered which needs to be committed. */ void commit(PrepareAmounts prepareAmounts, UnsignedLong deliveredAmount); @@ -112,4 +123,18 @@ PrepareAmounts getSendPacketAmounts( default boolean successful() { return !moreToSend(); } + + /** + *

Determines if this payment track requires a receiver denomination. This is generally used to determine whether + * or not whether {@link #getSendPacketAmounts(UnsignedLong, Denomination)} or {@link + * #getSendPacketAmounts(UnsignedLong, Denomination, Denomination)} or should be called.

+ * + *

By default, this method returns {@code true} for {@link SenderAmountMode#RECEIVER_AMOUNT}.

+ * + * @return {@code true} if {@link #getOriginalAmountMode()} equals {@link SenderAmountMode#RECEIVER_AMOUNT}; {@code + * false} otherwise. + */ + default boolean requiresReceiverDenomination() { + return getOriginalAmountMode() == SenderAmountMode.RECEIVER_AMOUNT; + } } diff --git a/stream-parent/stream-core/src/main/java/org/interledger/stream/ReceiverAmountPaymentTracker.java b/stream-parent/stream-core/src/main/java/org/interledger/stream/ReceiverAmountPaymentTracker.java index 966af7b1..5572927e 100644 --- a/stream-parent/stream-core/src/main/java/org/interledger/stream/ReceiverAmountPaymentTracker.java +++ b/stream-parent/stream-core/src/main/java/org/interledger/stream/ReceiverAmountPaymentTracker.java @@ -1,5 +1,7 @@ package org.interledger.stream; +import com.google.common.primitives.UnsignedLong; + /** * An extension of {@link PaymentTracker} that defines the {@link #getOriginalAmount()} to be in the receiver's units. */ @@ -10,4 +12,11 @@ default SenderAmountMode getOriginalAmountMode() { return SenderAmountMode.RECEIVER_AMOUNT; } + @Override + default PrepareAmounts getSendPacketAmounts(UnsignedLong congestionLimit, Denomination senderDenomination) { + throw new RuntimeException( + "Implementations of ReceiverAmountPaymentTracker require a Denomination. Call the getSendPacketAmounts() that" + + " requires a receiverDenomination instead." + ); + } } diff --git a/stream-parent/stream-core/src/main/java/org/interledger/stream/SendMoneyRequest.java b/stream-parent/stream-core/src/main/java/org/interledger/stream/SendMoneyRequest.java index 7890be19..4906fcc2 100644 --- a/stream-parent/stream-core/src/main/java/org/interledger/stream/SendMoneyRequest.java +++ b/stream-parent/stream-core/src/main/java/org/interledger/stream/SendMoneyRequest.java @@ -5,11 +5,11 @@ import org.interledger.core.SharedSecret; import com.google.common.primitives.UnsignedLong; +import org.immutables.value.Value.Default; import java.time.Duration; import java.util.Optional; - -import javax.annotation.Nullable; +import java.util.UUID; @Immutable public interface SendMoneyRequest { @@ -18,6 +18,16 @@ static SendMoneyRequestBuilder builder() { return new SendMoneyRequestBuilder(); } + /** + * A unique identifier for this Send Money request. + * + * @return A {@link UUID} that uniquely identifies this request. + */ + @Default + default UUID requestId() { + return UUID.randomUUID(); + } + /** * The shared secret, shared between only the sender and receiver, required by IL-RFC-29 to encrypt stream frames so * that only the sender and receiver can decrypt them. @@ -48,16 +58,6 @@ static SendMoneyRequestBuilder builder() { */ UnsignedLong amount(); - /** - * @return A {@link SenderAmountMode} that indicates the meaning of {@link #amount()}. - * - * @deprecated ascertained via the type of payment tracker used for sending. Returns the {@link SenderAmountMode} for - * this payment tracker. - */ - @Deprecated - @Nullable - SenderAmountMode getSenderAmountMode(); - /** * The {@link Denomination} of the amount in this request. * diff --git a/stream-parent/stream-core/src/main/java/org/interledger/stream/SenderAmountPaymentTracker.java b/stream-parent/stream-core/src/main/java/org/interledger/stream/SenderAmountPaymentTracker.java index d12f0358..956f9d88 100644 --- a/stream-parent/stream-core/src/main/java/org/interledger/stream/SenderAmountPaymentTracker.java +++ b/stream-parent/stream-core/src/main/java/org/interledger/stream/SenderAmountPaymentTracker.java @@ -1,5 +1,7 @@ package org.interledger.stream; +import com.google.common.primitives.UnsignedLong; + /** * An extension of {@link PaymentTracker} that defines the {@link #getOriginalAmount()} to be in the sender's units. */ @@ -10,4 +12,22 @@ default SenderAmountMode getOriginalAmountMode() { return SenderAmountMode.SENDER_AMOUNT; } + /** + * ReceiverDenomination is generally not required in a Payment Tracking in SENDER_AMOUNT mode. + * + * @param congestionLimit An {@link UnsignedLong} representing the number of units that the congestion controller + * has indicated should be sent. + * @param senderDenomination A {@link Denomination} representing the asset information for the sender, in order to + * compute path-exchange-rates. + * @param receiverDenomination A {@link Denomination} representing the asset information for the receiver, in order to + * compute path-exchange-rates. + * + * @return + */ + @Override + default PrepareAmounts getSendPacketAmounts( + UnsignedLong congestionLimit, Denomination senderDenomination, Denomination receiverDenomination + ) { + return getSendPacketAmounts(congestionLimit, senderDenomination); + } } diff --git a/stream-parent/stream-core/src/main/java/org/interledger/stream/calculators/ExchangeRateCalculator.java b/stream-parent/stream-core/src/main/java/org/interledger/stream/calculators/ExchangeRateCalculator.java index 8998a4ae..ffe500ff 100644 --- a/stream-parent/stream-core/src/main/java/org/interledger/stream/calculators/ExchangeRateCalculator.java +++ b/stream-parent/stream-core/src/main/java/org/interledger/stream/calculators/ExchangeRateCalculator.java @@ -5,7 +5,7 @@ import com.google.common.primitives.UnsignedLong; import java.math.BigDecimal; -import java.util.Optional; +import java.util.Objects; /** *

Defines how a STREAM sender or receiver can calculate the amount to send on a per-packet basis, in addition to @@ -23,37 +23,46 @@ public interface ExchangeRateCalculator { * Calculates the amount to send for the ILP packet based on a desired amount to be received by the receiver (in * receiver's units). * - * @param amountToReceive amount to be received (in receiver's units) - * @param sendDenomination sender's denomination - * @param receiveDenomination receiver's denomination + * @param amountToReceive An {@link UnsignedLong} containing the amount to be received (in receiver's units) + * @param sendDenomination A {@link Denomination} representing the sender's denomination. + * @param receiverDenomination A {@link Denomination} representing the receiver's denomination. * - * @return amount to send in the ILP packet (in sender's denomination) + * @return An {@link UnsignedLong} containing amount to send in the ILP packet (in sender's denomination). * * @throws NoExchangeRateException if exchange rate could not be calculated */ - UnsignedLong calculateAmountToSend(UnsignedLong amountToReceive, - Denomination sendDenomination, - Denomination receiveDenomination) - throws NoExchangeRateException; + UnsignedLong calculateAmountToSend( + UnsignedLong amountToReceive, Denomination sendDenomination, Denomination receiverDenomination + ) throws NoExchangeRateException; /** - * Calculate the minimum amount a receiver should accept based on the exchange rate applied to the sendAmount. - * - * @param sendAmount amount to send. - * @param sendDenomination asset code and scale of the amount to send. - * @param expectedReceivedDenomination asset code and scale to apply to the amount the receiver should receive when - * computing the total for an exchange rate. Optional if denomination of receiver - * is unknown. ExchangeRateCalculator implementations should throw a {@link - * NoExchangeRateException} + * Calculate the minimum amount a receiver should accept based on the exchange rate applied to the {@code + * sendAmount}. * - * @return the minimum amount the receiver should accept. + * @param sendAmount An {@link UnsignedLong} representing the amount to send. + * @param sendAmountDenomination asset code and scale of the amount to send. * - * @throws NoExchangeRateException if exchange rate could not be calculated + * @return An {@link UnsignedLong} containing the minimum amount the receiver should accept, in sender's units. */ - UnsignedLong calculateMinAmountToAccept(UnsignedLong sendAmount, Denomination sendDenomination, - Optional expectedReceivedDenomination) + UnsignedLong calculateMinAmountToAccept(UnsignedLong sendAmount, Denomination sendAmountDenomination) throws NoExchangeRateException; + /** + * Calculate the minimum amount a receiver should accept based on the exchange rate applied to the {@code + * sendAmount}. + * + * @param sendAmount An {@link UnsignedLong} containing the amount to send. + * @param sendAmountDenomination A {@link Denomination} for {@code amountToSend}. + * @param receiverDenomination A {@link Denomination} for the receiver. + * + * @return An {@link UnsignedLong} containing the minimum amount the receiver should accept, in sender's units. + */ + default UnsignedLong calculateMinAmountToAccept( + UnsignedLong sendAmount, Denomination sendAmountDenomination, Denomination receiverDenomination + ) throws NoExchangeRateException { + return calculateMinAmountToAccept(sendAmount, sendAmountDenomination); + } + /** * Convenience method to compute the BigDecimal representation of an amount for a given asset scale. * @@ -62,7 +71,10 @@ UnsignedLong calculateMinAmountToAccept(UnsignedLong sendAmount, Denomination se * * @return the scaled decimal amount */ - default BigDecimal scaled(UnsignedLong amount, Denomination denomination) { + default BigDecimal scaled(final UnsignedLong amount, final Denomination denomination) { + Objects.requireNonNull(amount); + Objects.requireNonNull(denomination); + BigDecimal scale = BigDecimal.ONE.scaleByPowerOfTen(denomination.assetScale()); return new BigDecimal(amount.bigIntegerValue()).divide(scale); } diff --git a/stream-parent/stream-core/src/main/java/org/interledger/stream/calculators/NoOpExchangeRateCalculator.java b/stream-parent/stream-core/src/main/java/org/interledger/stream/calculators/NoOpExchangeRateCalculator.java index 432aa8b8..8b0c0455 100644 --- a/stream-parent/stream-core/src/main/java/org/interledger/stream/calculators/NoOpExchangeRateCalculator.java +++ b/stream-parent/stream-core/src/main/java/org/interledger/stream/calculators/NoOpExchangeRateCalculator.java @@ -4,25 +4,26 @@ import com.google.common.primitives.UnsignedLong; -import java.util.Optional; +import java.util.Objects; /** * Exchange rate calculator that ignores denominations and applies no rate calculations. */ public class NoOpExchangeRateCalculator implements ExchangeRateCalculator { - @Override - public UnsignedLong calculateAmountToSend(UnsignedLong amountToReceive, - Denomination sendDenomination, - Denomination receiveDenomination) { - return amountToReceive; + public UnsignedLong calculateAmountToSend(UnsignedLong amountToSend, + Denomination amountToSendDenomination, + Denomination receiverDenomination) { + return amountToSend; } @Override - public UnsignedLong calculateMinAmountToAccept(UnsignedLong sendAmount, Denomination sendDenomination, - Optional expectedReceivedDenomination) { + public UnsignedLong calculateMinAmountToAccept( + final UnsignedLong sendAmount, final Denomination sendAmountDenomination + ) { + Objects.requireNonNull(sendAmount); + Objects.requireNonNull(sendAmountDenomination); return UnsignedLong.ZERO; } - } diff --git a/stream-parent/stream-core/src/test/java/org/interledger/stream/PaymentTrackerTest.java b/stream-parent/stream-core/src/test/java/org/interledger/stream/PaymentTrackerTest.java new file mode 100644 index 00000000..07406656 --- /dev/null +++ b/stream-parent/stream-core/src/test/java/org/interledger/stream/PaymentTrackerTest.java @@ -0,0 +1,103 @@ +package org.interledger.stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.interledger.stream.SenderAmountMode.RECEIVER_AMOUNT; +import static org.interledger.stream.SenderAmountMode.SENDER_AMOUNT; + +import com.google.common.primitives.UnsignedLong; +import org.junit.Test; + +/** + * Unit tests for default methods in {@link PaymentTracker}. + */ +public class PaymentTrackerTest { + + @Test + public void testGetOriginalAmountMode() { + PaymentTracker paymentTracker = constructPaymentTracker(SENDER_AMOUNT); + assertThat(paymentTracker.getOriginalAmountMode()).isEqualTo(SENDER_AMOUNT); + assertThat(paymentTracker.requiresReceiverDenomination()).isFalse(); + + paymentTracker = constructPaymentTracker(RECEIVER_AMOUNT); + assertThat(paymentTracker.getOriginalAmountMode()).isEqualTo(RECEIVER_AMOUNT); + assertThat(paymentTracker.requiresReceiverDenomination()).isTrue(); + } + + @Test + public void testIsSuccessful() { + PaymentTracker paymentTracker = constructPaymentTracker(SENDER_AMOUNT, false); + assertThat(paymentTracker.successful()).isTrue(); + + paymentTracker = constructPaymentTracker(SENDER_AMOUNT, true); + assertThat(paymentTracker.successful()).isFalse(); + } + + ////////////////// + // Private Helpers + ////////////////// + + private PaymentTracker constructPaymentTracker(final SenderAmountMode originalAmountMode) { + return constructPaymentTracker(originalAmountMode, false); + } + + private PaymentTracker constructPaymentTracker(final SenderAmountMode originalAmountMode, + final boolean moreToSend) { + return new PaymentTracker() { + @Override + public UnsignedLong getOriginalAmount() { + return UnsignedLong.ZERO; + } + + @Override + public SenderAmountMode getOriginalAmountMode() { + return originalAmountMode; + } + + @Override + public UnsignedLong getOriginalAmountLeft() { + return UnsignedLong.ZERO; + } + + @Override + public UnsignedLong getDeliveredAmountInSenderUnits() { + return UnsignedLong.ZERO; + } + + @Override + public UnsignedLong getDeliveredAmountInReceiverUnits() { + return UnsignedLong.ZERO; + } + + @Override + public PrepareAmounts getSendPacketAmounts(UnsignedLong congestionLimit, Denomination senderDenomination) { + return null; + } + + @Override + public PrepareAmounts getSendPacketAmounts(UnsignedLong congestionLimit, Denomination senderDenomination, + Denomination receiverDenomination) { + return null; + } + + @Override + public boolean auth(PrepareAmounts prepareAmounts) { + return false; + } + + @Override + public void rollback(PrepareAmounts prepareAmounts, boolean packetRejected) { + + } + + @Override + public void commit(PrepareAmounts prepareAmounts, UnsignedLong deliveredAmount) { + + } + + @Override + public boolean moreToSend() { + return moreToSend; + } + }; + } +} diff --git a/stream-parent/stream-core/src/test/java/org/interledger/stream/calculators/ExchangeRateCalculatorTest.java b/stream-parent/stream-core/src/test/java/org/interledger/stream/calculators/ExchangeRateCalculatorTest.java index 3eb48d32..e9dffd9d 100644 --- a/stream-parent/stream-core/src/test/java/org/interledger/stream/calculators/ExchangeRateCalculatorTest.java +++ b/stream-parent/stream-core/src/test/java/org/interledger/stream/calculators/ExchangeRateCalculatorTest.java @@ -1,7 +1,6 @@ package org.interledger.stream.calculators; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.spy; import org.interledger.stream.Denomination; import org.interledger.stream.Denominations; @@ -10,27 +9,49 @@ import org.junit.Test; import java.math.BigDecimal; -import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +/** + * Unit tests for {@link ExchangeRateCalculator}. + */ public class ExchangeRateCalculatorTest { @Test public void scaled() { final ExchangeRateCalculator calculator = new ExchangeRateCalculator() { @Override - public UnsignedLong calculateAmountToSend(UnsignedLong amountToReceive, Denomination sendDenomination, - Denomination receiveDenomination) throws NoExchangeRateException { + public UnsignedLong calculateAmountToSend(UnsignedLong amountToSend, Denomination amountToSendDenomination, + Denomination receiverDenomination) throws NoExchangeRateException { return null; } @Override - public UnsignedLong calculateMinAmountToAccept(UnsignedLong sendAmount, - Denomination sendDenomination, - Optional expectedReceivedDenomination) + public UnsignedLong calculateMinAmountToAccept(UnsignedLong sendAmount, Denomination sendAmountDenomination) throws NoExchangeRateException { return null; } }; assertThat(calculator.scaled(UnsignedLong.ONE, Denominations.US_CENTS)).isEqualTo(BigDecimal.valueOf(.01)); } + + @Test + public void testCalculateMinAmountToAcceptDefault() { + final AtomicBoolean called = new AtomicBoolean(false); + final ExchangeRateCalculator calculator = new ExchangeRateCalculator() { + @Override + public UnsignedLong calculateAmountToSend(UnsignedLong amountToSend, Denomination amountToSendDenomination, + Denomination receiverDenomination) throws NoExchangeRateException { + return UnsignedLong.ZERO; + } + + @Override + public UnsignedLong calculateMinAmountToAccept(UnsignedLong sendAmount, Denomination sendAmountDenomination) + throws NoExchangeRateException { + called.set(true); + return UnsignedLong.ZERO; + } + }; + calculator.calculateMinAmountToAccept(UnsignedLong.ZERO, Denominations.US_CENTS); + assertThat(called.get()).isTrue(); + } } diff --git a/stream-parent/stream-core/src/test/java/org/interledger/stream/calculators/NoOpExchangeRateCalculatorTest.java b/stream-parent/stream-core/src/test/java/org/interledger/stream/calculators/NoOpExchangeRateCalculatorTest.java index 81a82f1b..24e1f71c 100644 --- a/stream-parent/stream-core/src/test/java/org/interledger/stream/calculators/NoOpExchangeRateCalculatorTest.java +++ b/stream-parent/stream-core/src/test/java/org/interledger/stream/calculators/NoOpExchangeRateCalculatorTest.java @@ -7,14 +7,22 @@ import com.google.common.primitives.UnsignedLong; import org.junit.Test; -import java.util.Optional; - +/** + * Unit tests for {@link NoOpExchangeRateCalculator}. + */ public class NoOpExchangeRateCalculatorTest { @Test - public void calculate() { + public void calculateMinAmountToAcceptInReceiverAmountMode() { + ExchangeRateCalculator calc = new NoOpExchangeRateCalculator(); + assertThat(calc.calculateMinAmountToAccept(UnsignedLong.ONE, Denominations.USD, Denominations.EUR)) + .isEqualTo(UnsignedLong.ZERO); + } + + @Test + public void calculateMinAmountToAcceptInSenderAmountMode() { ExchangeRateCalculator calc = new NoOpExchangeRateCalculator(); - assertThat(calc.calculateMinAmountToAccept(UnsignedLong.ONE, Denominations.USD, Optional.of(Denominations.EUR))) + assertThat(calc.calculateMinAmountToAccept(UnsignedLong.ONE, Denominations.USD)) .isEqualTo(UnsignedLong.ZERO); } } diff --git a/stream-parent/stream-receiver/pom.xml b/stream-parent/stream-receiver/pom.xml index 977b437b..89f2bb74 100644 --- a/stream-parent/stream-receiver/pom.xml +++ b/stream-parent/stream-receiver/pom.xml @@ -42,10 +42,6 @@ stream-client test - - org.zalando - problem - com.fasterxml.jackson.datatype jackson-datatype-guava diff --git a/stream-parent/stream-receiver/src/main/java/org/interledger/stream/receiver/InvalidReceiverProblem.java b/stream-parent/stream-receiver/src/main/java/org/interledger/stream/receiver/InvalidReceiverProblem.java deleted file mode 100644 index c6a1b064..00000000 --- a/stream-parent/stream-receiver/src/main/java/org/interledger/stream/receiver/InvalidReceiverProblem.java +++ /dev/null @@ -1,26 +0,0 @@ -package org.interledger.stream.receiver; - -import org.zalando.problem.AbstractThrowableProblem; -import org.zalando.problem.Status; - -import java.net.URI; - -/** - * An extension of {@link AbstractThrowableProblem} that is emitted when an SPSP request is made for an account/address - * that does not exist. - */ -public class InvalidReceiverProblem extends AbstractThrowableProblem { - - // TODO: Move to ILP-link-core? - @Deprecated - public static final String TYPE_PREFIX = "https://errors.interledger.org"; - - public InvalidReceiverProblem() { - super( - URI.create(TYPE_PREFIX + "/invalid-receiver-id"), - "Invalid receiver ID", - Status.NOT_FOUND - ); - } - -} diff --git a/stream-parent/stream-receiver/src/main/java/org/interledger/stream/receiver/StatelessStreamReceiver.java b/stream-parent/stream-receiver/src/main/java/org/interledger/stream/receiver/StatelessStreamReceiver.java index 0fd1dbee..10da2f2e 100644 --- a/stream-parent/stream-receiver/src/main/java/org/interledger/stream/receiver/StatelessStreamReceiver.java +++ b/stream-parent/stream-receiver/src/main/java/org/interledger/stream/receiver/StatelessStreamReceiver.java @@ -93,10 +93,10 @@ public InterledgerResponsePacket receiveMoney( try { if (preparePacket.getData().length == 0) { return InterledgerRejectPacket.builder() - .code(InterledgerErrorCode.F06_UNEXPECTED_PAYMENT) - .message("No STREAM packet bytes available to decrypt") - .triggeredBy(receiverAddress) - .build(); + .code(InterledgerErrorCode.F06_UNEXPECTED_PAYMENT) + .message("No STREAM packet bytes available to decrypt") + .triggeredBy(receiverAddress) + .build(); } // Try to parse the STREAM data from the payload. final byte[] streamPacketBytes = streamEncryptionService.decrypt(streamSharedSecret, preparePacket.getData()); @@ -125,6 +125,9 @@ public InterledgerResponsePacket receiveMoney( .totalReceived(UnsignedLong.ZERO) .receiveMax(UnsignedLong.MAX_VALUE) .build())); + + // Add ConnectionNewAddress, but only if the sender sent one. This allows the sender to ask multiple times, if + // desired. streamPacket.frames().stream() .filter(streamFrame -> streamFrame.streamFrameType() == StreamFrameType.ConnectionNewAddress) .findFirst() diff --git a/stream-parent/stream-receiver/src/test/java/org/interledger/stream/receiver/SenderReceiverTest.java b/stream-parent/stream-receiver/src/test/java/org/interledger/stream/receiver/SenderReceiverTest.java index 757b5b67..89e64e98 100644 --- a/stream-parent/stream-receiver/src/test/java/org/interledger/stream/receiver/SenderReceiverTest.java +++ b/stream-parent/stream-receiver/src/test/java/org/interledger/stream/receiver/SenderReceiverTest.java @@ -11,9 +11,11 @@ import org.interledger.stream.SendMoneyRequest; import org.interledger.stream.SendMoneyResult; import org.interledger.stream.calculators.ExchangeRateCalculator; +import org.interledger.stream.calculators.NoExchangeRateException; import org.interledger.stream.calculators.NoOpExchangeRateCalculator; import org.interledger.stream.crypto.JavaxStreamEncryptionService; import org.interledger.stream.crypto.StreamEncryptionService; +import org.interledger.stream.receiver.testutils.AlwaysEmptyStreamReceiver; import org.interledger.stream.receiver.testutils.SimulatedIlpv4Network; import org.interledger.stream.receiver.testutils.SimulatedPathConditions; import org.interledger.stream.sender.FixedReceiverAmountPaymentTracker; @@ -36,7 +38,6 @@ import java.time.Duration; import java.util.List; import java.util.Objects; -import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -75,16 +76,24 @@ private static StreamNode initNode( final byte[] serverSecret = BaseEncoding.base16().decode(SHARED_SECRET_HEX); final StreamEncryptionService streamEncryptionService = new JavaxStreamEncryptionService(); - SimpleStreamSender streamSender = new SimpleStreamSender( - streamEncryptionService, link + return initNode(link, senderAddress, receiverAddress, + new StatelessStreamReceiver( + () -> serverSecret, + new SpspStreamConnectionGenerator(), + streamEncryptionService, + StreamCodecContextFactory.oer() + ) ); + } - StatelessStreamReceiver streamReceiver = new StatelessStreamReceiver( - () -> serverSecret, - new SpspStreamConnectionGenerator(), - streamEncryptionService, - StreamCodecContextFactory.oer() - ); + private static StreamNode initNode( + Link link, + InterledgerAddress senderAddress, + InterledgerAddress receiverAddress, + StreamReceiver streamReceiver + ) { + final byte[] serverSecret = BaseEncoding.base16().decode(SHARED_SECRET_HEX); + SimpleStreamSender streamSender = new SimpleStreamSender(link); return StreamNode.builder() .serverSecret(serverSecret) @@ -97,6 +106,7 @@ private static StreamNode initNode( .build(); } + private static List awaitResults(List> futures) { CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); return futures.stream().map(CompletableFuture::join).collect(Collectors.toList()); @@ -264,16 +274,49 @@ public void testSendFromRightToLeft() { logger.info("Payment Sent: {}", sendMoneyResult); } + /** + * Validates that the send operation does not complete if the preflight check fails. + */ @Test public void testSendCannotDetermineReceiverDenomination() { final UnsignedLong paymentAmount = UnsignedLong.valueOf(1000); - final SendMoneyResult sendMoneyResult = sendMoney(rightStreamNode, leftStreamNode, paymentAmount); + final byte[] serverSecret = BaseEncoding.base16().decode(SHARED_SECRET_HEX); + final StreamEncryptionService streamEncryptionService = new JavaxStreamEncryptionService(); + final StreamReceiver brokenStreamReceiver = new AlwaysEmptyStreamReceiver( + () -> serverSecret, + new SpspStreamConnectionGenerator(), + streamEncryptionService, + StreamCodecContextFactory.oer() + ); + this.rightStreamNode = initNode(simulatedIlpNetwork.getRightToLeftLink(), RIGHT_SENDER_ADDRESS, + RIGHT_RECEIVER_ADDRESS, brokenStreamReceiver); + this.simulatedIlpNetwork.getRightToLeftLink().unregisterLinkHandler(); + this.simulatedIlpNetwork.getRightToLeftLink().registerLinkHandler(incomingPreparePacket -> + brokenStreamReceiver.receiveMoney(incomingPreparePacket, RIGHT_RECEIVER_ADDRESS, rightStreamNode.denomination()) + ); - assertThat(sendMoneyResult.amountDelivered()).isEqualTo(paymentAmount); + final SimpleStreamSender sender = new SimpleStreamSender(leftStreamNode.link()); + final StreamConnectionDetails connectionDetails = leftStreamNode.getNewStreamConnectionDetails(); + + final SendMoneyResult sendMoneyResult = sender.sendMoney( + SendMoneyRequest.builder() + .sourceAddress(leftStreamNode.senderAddress()) + .amount(paymentAmount) + .denomination(leftStreamNode.denomination()) + .destinationAddress(connectionDetails.destinationAddress()) + .sharedSecret(connectionDetails.sharedSecret()) + .paymentTracker(new FixedReceiverAmountPaymentTracker(paymentAmount, new NoOpExchangeRateCalculator())) + .timeout(Duration.ofMillis(10000)) + .build()) + .join(); + + assertThat(sendMoneyResult.amountDelivered()).isEqualTo(UnsignedLong.ZERO); assertThat(sendMoneyResult.originalAmount()).isEqualTo(paymentAmount); - assertThat(sendMoneyResult.numFulfilledPackets()).isEqualTo(1); + assertThat(sendMoneyResult.numFulfilledPackets()).isEqualTo(0); assertThat(sendMoneyResult.numRejectPackets()).isEqualTo(0); + assertThat(sendMoneyResult.successfulPayment()).isFalse(); + assertThat(sendMoneyResult.amountLeftToSend()).isEqualTo(UnsignedLong.valueOf(1000L)); logger.info("Payment Sent: {}", sendMoneyResult); } @@ -578,19 +621,29 @@ static class FixedRateExchangeCalculator implements ExchangeRateCalculator { } @Override - public UnsignedLong calculateAmountToSend(UnsignedLong amountToReceive, - Denomination sendDenomination, - Denomination receiveDenomination) { - return UnsignedLong.valueOf(new BigDecimal(amountToReceive.bigIntegerValue()) - .divide(senderUnitsPerReceiverUnits, RoundingMode.FLOOR).toBigInteger()); // FIXME how to round + public UnsignedLong calculateAmountToSend(UnsignedLong amountToSend, + Denomination amountToSendDenomination, + Denomination receiverDenomination) { + return UnsignedLong.valueOf(new BigDecimal(amountToSend.bigIntegerValue()) + .divide(senderUnitsPerReceiverUnits, RoundingMode.FLOOR).toBigInteger()); // toBigInteger rounds to Floor } @Override - public UnsignedLong calculateMinAmountToAccept(UnsignedLong sendAmount, - Denomination sendDenomination, - Optional expectedReceivedDenomination) { + public UnsignedLong calculateMinAmountToAccept( + final UnsignedLong sendAmount, final Denomination sendAmountDenomination + ) { + Objects.requireNonNull(sendAmount); + Objects.requireNonNull(sendAmountDenomination); return UnsignedLong.valueOf(new BigDecimal(sendAmount.bigIntegerValue()) - .multiply(senderUnitsPerReceiverUnits).toBigInteger()); // FIXME how to round + .multiply(senderUnitsPerReceiverUnits).toBigInteger()); // toBigInteger rounds to Floor + } + + @Override + public UnsignedLong calculateMinAmountToAccept( + final UnsignedLong sendAmount, final Denomination sendAmountDenomination, + final Denomination receiverDenomination + ) throws NoExchangeRateException { + return this.calculateMinAmountToAccept(sendAmount, sendAmountDenomination); } } diff --git a/stream-parent/stream-receiver/src/test/java/org/interledger/stream/receiver/testutils/AlwaysEmptyStreamReceiver.java b/stream-parent/stream-receiver/src/test/java/org/interledger/stream/receiver/testutils/AlwaysEmptyStreamReceiver.java new file mode 100644 index 00000000..a551628f --- /dev/null +++ b/stream-parent/stream-receiver/src/test/java/org/interledger/stream/receiver/testutils/AlwaysEmptyStreamReceiver.java @@ -0,0 +1,173 @@ +package org.interledger.stream.receiver.testutils; + +import static org.interledger.stream.FluentCompareTo.is; + +import org.interledger.core.InterledgerAddress; +import org.interledger.core.InterledgerErrorCode; +import org.interledger.core.InterledgerFulfillPacket; +import org.interledger.core.InterledgerFulfillment; +import org.interledger.core.InterledgerPacketType; +import org.interledger.core.InterledgerPreparePacket; +import org.interledger.core.InterledgerRejectPacket; +import org.interledger.core.InterledgerResponsePacket; +import org.interledger.core.SharedSecret; +import org.interledger.encoding.asn.framework.CodecContext; +import org.interledger.spsp.StreamConnectionDetails; +import org.interledger.stream.Denomination; +import org.interledger.stream.StreamException; +import org.interledger.stream.StreamPacket; +import org.interledger.stream.StreamUtils; +import org.interledger.stream.crypto.StreamEncryptionService; +import org.interledger.stream.frames.StreamFrame; +import org.interledger.stream.receiver.ServerSecretSupplier; +import org.interledger.stream.receiver.StreamConnectionGenerator; +import org.interledger.stream.receiver.StreamReceiver; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableList.Builder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Objects; + +/** + *

An implementation of {@link StreamReceiver} that sends valid STREAM frames that are _always_ empty, for testing + * purposes only.

+ */ +public class AlwaysEmptyStreamReceiver implements StreamReceiver { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private final ServerSecretSupplier serverSecretSupplier; + private final StreamConnectionGenerator streamConnectionGenerator; + private final StreamEncryptionService streamEncryptionService; + private final CodecContext streamCodecContext; + + public AlwaysEmptyStreamReceiver( + final ServerSecretSupplier serverSecretSupplier, final StreamConnectionGenerator streamConnectionGenerator, + final StreamEncryptionService streamEncryptionService, final CodecContext streamCodecContext + ) { + this.serverSecretSupplier = Objects.requireNonNull(serverSecretSupplier, "serverSecretSupplier must not be null"); + this.streamConnectionGenerator = Objects + .requireNonNull(streamConnectionGenerator, "connectionGenerator must not be null"); + this.streamEncryptionService = Objects + .requireNonNull(streamEncryptionService, "streamEncryptionService must not be null"); + this.streamCodecContext = Objects.requireNonNull(streamCodecContext, "streamCodecContext must not be null"); + } + + @Override + public StreamConnectionDetails setupStream(final InterledgerAddress receiverAddress) { + Objects.requireNonNull(receiverAddress); + return streamConnectionGenerator.generateConnectionDetails(serverSecretSupplier, receiverAddress); + } + + @Override + public InterledgerResponsePacket receiveMoney( + final InterledgerPreparePacket preparePacket, final InterledgerAddress receiverAddress, + final Denomination denomination + ) { + Objects.requireNonNull(preparePacket); + Objects.requireNonNull(receiverAddress); + + // Will throw if there's an error... + final SharedSecret spspSharedSecret = this.streamConnectionGenerator + .deriveSecretFromAddress(serverSecretSupplier, preparePacket.getDestination()); + final SharedSecret streamSharedSecret = SharedSecret.of(spspSharedSecret.key()); + + final StreamPacket streamPacket; + try { + // Try to parse the STREAM data from the payload. + final byte[] streamPacketBytes = streamEncryptionService.decrypt(streamSharedSecret, preparePacket.getData()); + streamPacket = streamCodecContext.read(StreamPacket.class, new ByteArrayInputStream(streamPacketBytes)); + } catch (Exception e) { + logger.error( + "Unable to decrypt packet. preparePacket={} receiverAddress={} error={}", + preparePacket, receiverAddress, e + ); + return InterledgerRejectPacket.builder() + .code(InterledgerErrorCode.F06_UNEXPECTED_PAYMENT) + .message("Could not decrypt data") + .triggeredBy(receiverAddress) + .build(); + } + + final Builder responseFrames = ImmutableList.builder(); + + // Generate fulfillment using the shared secret that was pre-negotiated with the sender. + final InterledgerFulfillment fulfillment = StreamUtils + .generatedFulfillableFulfillment(streamSharedSecret, preparePacket.getData()); + + final boolean isFulfillable = fulfillment.getCondition().equals(preparePacket.getExecutionCondition()); + + // Return Fulfill or Reject Packet + if (isFulfillable && is(preparePacket.getAmount()).greaterThanEqualTo(streamPacket.prepareAmount())) { + final StreamPacket returnableStreamPacketResponse = StreamPacket.builder() + .sequence(streamPacket.sequence()) + .interledgerPacketType(InterledgerPacketType.FULFILL) + .prepareAmount(preparePacket.getAmount()) + .frames(responseFrames.build()) + .build(); + + logger.debug( + "Fulfilling prepare packet. preparePacket={} fulfillment={} returnableStreamPacketResponse={}", + preparePacket, fulfillment, returnableStreamPacketResponse + ); + + try { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + streamCodecContext.write(returnableStreamPacketResponse, baos); + final byte[] returnableStreamPacketBytes = baos.toByteArray(); + final byte[] encryptedReturnableStreamPacketBytes = streamEncryptionService + .encrypt(streamSharedSecret, returnableStreamPacketBytes); + + return InterledgerFulfillPacket.builder() + .fulfillment(fulfillment) + .data(encryptedReturnableStreamPacketBytes) + .build(); + + } catch (IOException e) { + throw new StreamException(e.getMessage(), e); + } + } else { + final StreamPacket returnableStreamPacketResponse = StreamPacket.builder() + .sequence(streamPacket.sequence()) + .interledgerPacketType(InterledgerPacketType.REJECT) + .prepareAmount(preparePacket.getAmount()) + .frames(responseFrames.build()) + .build(); + + if (isFulfillable) { + logger.debug("Packet is unfulfillable. preparePacket={}", preparePacket); + } else if (is(preparePacket.getAmount()).lessThan(streamPacket.prepareAmount())) { + logger.debug( + "Received only: {} when we should have received at least: {}", + preparePacket.getAmount(), streamPacket.prepareAmount() + ); + } + + logger.debug( + "Rejecting Prepare and including encrypted stream packet. preparePacket={} returnableStreamPacketResponse={}", + preparePacket, returnableStreamPacketResponse + ); + + try { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + streamCodecContext.write(returnableStreamPacketResponse, baos); + final byte[] returnableStreamPacketBytes = baos.toByteArray(); + final byte[] encryptedReturnableStreamPacketBytes = streamEncryptionService + .encrypt(streamSharedSecret, returnableStreamPacketBytes); + + return InterledgerRejectPacket.builder() + .code(InterledgerErrorCode.F99_APPLICATION_ERROR) + .message("STREAM packet not fulfillable (prepare amount < stream packet amount)") + .triggeredBy(receiverAddress) + .data(encryptedReturnableStreamPacketBytes) + .build(); + } catch (IOException e) { + throw new StreamException(e.getMessage(), e); + } + } + } +} diff --git a/stream-parent/stream-receiver/src/test/java/org/interledger/stream/receiver/testutils/SimulatedIlpv4Network.java b/stream-parent/stream-receiver/src/test/java/org/interledger/stream/receiver/testutils/SimulatedIlpv4Network.java index d5b7be4b..316b1a65 100644 --- a/stream-parent/stream-receiver/src/test/java/org/interledger/stream/receiver/testutils/SimulatedIlpv4Network.java +++ b/stream-parent/stream-receiver/src/test/java/org/interledger/stream/receiver/testutils/SimulatedIlpv4Network.java @@ -28,16 +28,17 @@ * as STREAM sender/receiver interactions.

* *

Ordinarily, an ILPv4 node would utilize a {@link Link} to connect to an Interledger peer - * Connector/Router. This peer might be connected to an arbitrary number of intermediate nodes, forming a graph of - * potential connections between the `sender` and `receiver`. In these cases, it can be difficult to isolate various - * network conditions for testing purposes, such as path-exchange rate, path latency, path distance, and more.

+ * Connector. This peer might be connected to an arbitrary number of intermediate nodes, forming a graph of potential + * connections between the `sender` and `receiver`. In these cases, it can be difficult to isolate various network + * conditions for testing purposes, such as path-exchange rate, path latency, path distance, and more.

* *

In order to isolate and control these variables for testing purposes, this class can be used to allow, for - * example, a STREAM sender and receiver to interact with each other without actually engaging an actual - * Connector/Router or Link with an actual underlying network transport.

+ * example, a STREAM sender and receiver to interact with each other without actually engaging an actual Connector + * network or Link with an actual underlying network transport.

* *

This class works by defining a `left` and a `right` link where each Link's internal methods are connected to - * the other Link's method to simulate network connectivity.

+ * the other Link's method to simulate network connectivity, as detailed in the following diagram:

+ * *
  * ┌────────────┬────────────┐               ┌────────────┬────────────┐
  * │            │  SendData  │──────────────▷│   OnData   │            │
@@ -59,6 +60,25 @@ public class SimulatedIlpv4Network {
 
   private Random random = new SecureRandom();
 
+  /**
+   * Required-args Constructor.
+   *
+   * @param leftToRightLink A {@link SimulatedPathConditions} that governs the simulated path from the left {@link Link}
+   *                        to the right {@link Link}.
+   * @param rightToLeftLink A {@link SimulatedPathConditions} that governs the simulated path from right to left.
+   */
+  public SimulatedIlpv4Network(
+      final Link leftToRightLink, final Link rightToLeftLink
+  ) {
+    this.leftToRightNetworkConditions = SimulatedPathConditions.builder().build();
+    this.rightToLeftNetworkConditions = SimulatedPathConditions.builder().build();
+
+    this.leftToRightLink = Objects.requireNonNull(leftToRightLink);
+    leftToRightLink.setLinkId(LinkId.of("left"));
+    this.rightToLeftLink = Objects.requireNonNull(rightToLeftLink);
+    rightToLeftLink.setLinkId(LinkId.of("right"));
+  }
+
   /**
    * Required-args Constructor.
    *
@@ -121,7 +141,7 @@ public InterledgerResponsePacket sendPacketToRight(final InterledgerPreparePacke
           this.leftToRightNetworkConditions.currentExchangeRateSupplier().get()
       );
 
-      return this.leftToRightLink.getLinkHandler()
+      return this.rightToLeftLink.getLinkHandler()
           .map(linkHandler -> linkHandler.handleIncomingPacket(adjustedPreparePacket))
           .orElseThrow(() -> new RuntimeException("No LinkHandler registered for leftToRightLink"));
     }
@@ -151,7 +171,7 @@ public InterledgerResponsePacket sendPacketToLeft(final InterledgerPreparePacket
           this.rightToLeftNetworkConditions.currentExchangeRateSupplier().get()
       );
 
-      return this.rightToLeftLink.getLinkHandler()
+      return this.leftToRightLink.getLinkHandler()
           .map(linkHandler -> linkHandler.handleIncomingPacket(adjustedPreparePacket))
           .orElseThrow(() -> new RuntimeException("No LinkHandler registered for leftToRightLink"));
     }