Skip to content

Commit

Permalink
Script engine support for Balance #983 (#992)
Browse files Browse the repository at this point in the history
* -Fix for #810, wrong message on soft trailing stop.

* -updated orko-parent snapshots

* -Changed the Xchange version from the 5.0.0-SNAPSHOT to 5.0.0

* Commented out Hidden Order integration tests

* -Potential fix for 'Hidden orders' test failure

* -Added Balance support to scripting Engine #983

* -Moved method to Events. (#983)

* -Added Open Order (#984) and Orderbook support to the script script engine

* -UserTrade Support to Script Engine

* -Split 'on' calls (onTick, onBalance, onOpenOrders, onOrderBook, onUserTrades) for clarity

* -Added DisposableSubscription class to clean up subscription cleanup returns.
- Cleaned up some warnings reported by IntelliJ

* - More clean up (missing final's, incorrect pattern usage)

* Review change

* Typo

Co-authored-by: Graham Crockford <[email protected]>
Co-authored-by: Graham Crockford <[email protected]>
  • Loading branch information
3 people authored Aug 29, 2020
1 parent 1ecfa12 commit d2b93e9
Showing 1 changed file with 130 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
*/
package com.gruelbox.orko.job.script;

import static com.gruelbox.orko.exchange.MarketDataType.BALANCE;
import static com.gruelbox.orko.exchange.MarketDataType.OPEN_ORDERS;
import static com.gruelbox.orko.exchange.MarketDataType.ORDERBOOK;
import static com.gruelbox.orko.exchange.MarketDataType.TICKER;
import static com.gruelbox.orko.exchange.MarketDataType.USER_TRADE;
import static com.gruelbox.orko.job.LimitOrderJob.Direction.BUY;
import static com.gruelbox.orko.jobrun.spi.Status.FAILURE_PERMANENT;
import static com.gruelbox.orko.jobrun.spi.Status.RUNNING;
Expand All @@ -28,10 +32,15 @@
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.gruelbox.orko.auth.Hasher;
import com.gruelbox.orko.db.Transactionally;
import com.gruelbox.orko.exchange.BalanceEvent;
import com.gruelbox.orko.exchange.ExchangeEventRegistry;
import com.gruelbox.orko.exchange.ExchangeEventRegistry.ExchangeEventSubscription;
import com.gruelbox.orko.exchange.MarketDataSubscription;
import com.gruelbox.orko.exchange.MarketDataType;
import com.gruelbox.orko.exchange.OpenOrdersEvent;
import com.gruelbox.orko.exchange.OrderBookEvent;
import com.gruelbox.orko.exchange.TickerEvent;
import com.gruelbox.orko.exchange.UserTradeEvent;
import com.gruelbox.orko.job.LimitOrderJob;
import com.gruelbox.orko.job.LimitOrderJob.Direction;
import com.gruelbox.orko.jobrun.JobSubmitter;
Expand Down Expand Up @@ -256,6 +265,34 @@ public Disposable setTick(JSObject callback, JSObject tickerSpec) {
callback.toString());
}

public Disposable setBalance(JSObject callback, JSObject tickerSpec) {
return onBalance(
event -> processEvent(() -> callback.call(null, event)),
convertTickerSpec(tickerSpec),
callback.toString());
}

public Disposable setOpenOrders(JSObject callback, JSObject tickerSpec) {
return onOpenOrders(
event -> processEvent(() -> callback.call(null, event)),
convertTickerSpec(tickerSpec),
callback.toString());
}

public Disposable setOrderBook(JSObject callback, JSObject tickerSpec) {
return onOrderBook(
event -> processEvent(() -> callback.call(null, event)),
convertTickerSpec(tickerSpec),
callback.toString());
}

public Disposable setUserTrades(JSObject callback, JSObject tickerSpec) {
return onUserTrades(
event -> processEvent(() -> callback.call(null, event)),
convertTickerSpec(tickerSpec),
callback.toString());
}

public Disposable setInterval(JSObject callback, Integer timeout) {
return onInterval(
() -> processEvent(() -> callback.call(null)), timeout, callback.toString());
Expand Down Expand Up @@ -311,33 +348,31 @@ public final class State {
new StateManager<>() {

@Override
public void set(String key, String value) {
HashMap<String, String> newState = new HashMap<>();
newState.putAll(job.state());
public final void set(String key, String value) {
HashMap<String, String> newState = new HashMap<>(job.state());
newState.put(key, value);
jobControl.replace(job.toBuilder().state(newState).build());
}

@Override
public String get(String key) {
public final String get(String key) {
return job.state().get(key);
}

@Override
public void remove(String key) {
HashMap<String, String> newState = new HashMap<>();
newState.putAll(job.state());
public final void remove(String key) {
HashMap<String, String> newState = new HashMap<>(job.state());
newState.remove(key);
jobControl.replace(job.toBuilder().state(newState).build());
}

@Override
public String toString() {
public final String toString() {
return job.state().toString();
}

@Override
public void increment(String key) {
public final void increment(String key) {
String value = get(key);
try {
long asLong = Long.parseLong(value);
Expand Down Expand Up @@ -386,13 +421,13 @@ public String toString() {
}

public interface StateManager<T> {
public T get(String key);
T get(String key);

public void set(String key, T value);
void set(String key, T value);

public void remove(String key);
void remove(String key);

public void increment(String key);
void increment(String key);
}

Disposable onInterval(Runnable runnable, long timeout, String description) {
Expand Down Expand Up @@ -428,32 +463,98 @@ private TickerSpec convertTickerSpec(JSObject tickerSpec) {
.build();
}

public Disposable onTick(
public final class DisposableSubscription implements Disposable {
private final ExchangeEventSubscription subscription;
private final Disposable disposable;
private final String description;

public DisposableSubscription(
ExchangeEventSubscription subscription, Disposable disposable, String description) {
this.subscription = subscription;
this.disposable = disposable;
this.description = description;
}

@Override
public boolean isDisposed() {
return disposable.isDisposed();
}

@Override
public void dispose() {
SafelyDispose.of(disposable);
SafelyClose.the(subscription);
}

@Override
public String toString() {
return description;
}
}

public final DisposableSubscription onTick(
io.reactivex.functions.Consumer<TickerEvent> handler,
TickerSpec tickerSpec,
String description) {

ExchangeEventSubscription subscription =
exchangeEventRegistry.subscribe(MarketDataSubscription.create(tickerSpec, TICKER));

Disposable disposable = subscription.getTickers().subscribe(handler);

return new Disposable() {
return new DisposableSubscription(subscription, disposable, description);
}

@Override
public boolean isDisposed() {
return disposable.isDisposed();
}
public final DisposableSubscription onBalance(
io.reactivex.functions.Consumer<BalanceEvent> handler,
TickerSpec tickerSpec,
String description) {

@Override
public void dispose() {
SafelyDispose.of(disposable);
SafelyClose.the(subscription);
}
ExchangeEventSubscription subscription =
exchangeEventRegistry.subscribe(MarketDataSubscription.create(tickerSpec, BALANCE));

@Override
public String toString() {
return description;
}
};
Disposable disposable = subscription.getBalances().subscribe(handler);

return new DisposableSubscription(subscription, disposable, description);
}

public final DisposableSubscription onOpenOrders(
io.reactivex.functions.Consumer<OpenOrdersEvent> handler,
TickerSpec tickerSpec,
String description) {

ExchangeEventSubscription subscription =
exchangeEventRegistry.subscribe(MarketDataSubscription.create(tickerSpec, OPEN_ORDERS));

Disposable disposable = subscription.getOrderSnapshots().subscribe(handler);

return new DisposableSubscription(subscription, disposable, description);
}

public final DisposableSubscription onOrderBook(
io.reactivex.functions.Consumer<OrderBookEvent> handler,
TickerSpec tickerSpec,
String description) {

ExchangeEventSubscription subscription =
exchangeEventRegistry.subscribe(MarketDataSubscription.create(tickerSpec, ORDERBOOK));

Disposable disposable = subscription.getOrderBooks().subscribe(handler);

return new DisposableSubscription(subscription, disposable, description);
}

public final DisposableSubscription onUserTrades(
io.reactivex.functions.Consumer<UserTradeEvent> handler,
TickerSpec tickerSpec,
String description) {

ExchangeEventSubscription subscription =
exchangeEventRegistry.subscribe(MarketDataSubscription.create(tickerSpec, USER_TRADE));

Disposable disposable = subscription.getUserTrades().subscribe(handler);

return new DisposableSubscription(subscription, disposable, description);
}

private void notifyAndLogError(String message) {
Expand Down

0 comments on commit d2b93e9

Please sign in to comment.