diff --git a/common/src/main/java/org/dash/wallet/common/transactions/TransactionUtils.kt b/common/src/main/java/org/dash/wallet/common/transactions/TransactionUtils.kt index 475a916780..b18a55708e 100644 --- a/common/src/main/java/org/dash/wallet/common/transactions/TransactionUtils.kt +++ b/common/src/main/java/org/dash/wallet/common/transactions/TransactionUtils.kt @@ -17,10 +17,17 @@ package org.dash.wallet.common.transactions +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.sample import org.bitcoinj.core.Address +import org.bitcoinj.core.Sha256Hash import org.bitcoinj.core.Transaction import org.bitcoinj.core.TransactionBag import org.bitcoinj.script.ScriptException +import java.util.concurrent.ConcurrentHashMap object TransactionUtils { fun getWalletAddressOfReceived(tx: Transaction, bag: TransactionBag): Address? { @@ -118,4 +125,22 @@ object TransactionUtils { } return result } +} + +fun Flow.batchAndFilterUpdates(timeInterval: Long = 500): Flow> { + val latestTransactions = ConcurrentHashMap() + + return this + .onEach { transaction -> + // Update the latest transaction for the hash + latestTransactions[transaction.txId] = transaction + } + .sample(timeInterval) // Emit events every 500ms + .map { + // Collect the latest transactions + latestTransactions.values.toList().also { + latestTransactions.clear() // Clear after collecting + } + } + .filter { it.isNotEmpty() } // Only emit non-empty lists } \ No newline at end of file diff --git a/wallet/src/de/schildbach/wallet/transactions/WalletObserver.kt b/wallet/src/de/schildbach/wallet/transactions/WalletObserver.kt index 5e5a23131a..ff19b339af 100644 --- a/wallet/src/de/schildbach/wallet/transactions/WalletObserver.kt +++ b/wallet/src/de/schildbach/wallet/transactions/WalletObserver.kt @@ -26,7 +26,9 @@ import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.callbackFlow import kotlinx.coroutines.flow.catch import org.bitcoinj.core.Context +import org.bitcoinj.core.Sha256Hash import org.bitcoinj.core.Transaction +import org.bitcoinj.core.TransactionConfidence import org.bitcoinj.core.listeners.TransactionConfidenceEventListener import org.bitcoinj.utils.Threading import org.bitcoinj.wallet.Wallet @@ -36,6 +38,9 @@ import org.bitcoinj.wallet.listeners.WalletCoinsSentEventListener import org.bitcoinj.wallet.listeners.WalletResetEventListener import org.dash.wallet.common.transactions.filters.TransactionFilter import org.slf4j.LoggerFactory +import java.util.Date +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.TimeUnit class WalletObserver(private val wallet: Wallet) { companion object { @@ -65,12 +70,14 @@ class WalletObserver(private val wallet: Wallet) { } } + /** observe new transactions (sent and received) and optionally transaction confidence changes for the past hour. */ fun observeTransactions( observeTxConfidence: Boolean, vararg filters: TransactionFilter ): Flow = callbackFlow { log.info("observing transactions start {}", this@WalletObserver) try { + val transactions = ConcurrentHashMap() Threading.USER_THREAD.execute { try { Context.propagate(wallet.context) @@ -83,10 +90,18 @@ class WalletObserver(private val wallet: Wallet) { } } + var transactionConfidenceListener: TransactionConfidence.Listener? = null + val coinsSentListener = WalletCoinsSentEventListener { _, tx: Transaction?, _, _ -> try { + val oneHourAgo = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1) if (tx != null && (filters.isEmpty() || filters.any { it.matches(tx) })) { // log.info("observing transaction sent: {} [=====] {}", tx.txId, this@WalletObserver) + if (tx.updateTime.time > oneHourAgo && observeTxConfidence) { + transactions[tx.txId] = tx + tx.confidence.addEventListener(Threading.USER_THREAD, transactionConfidenceListener) + // log.info("observing transaction: start listening to {}", tx.txId) + } trySend(tx).onFailure { log.error("Failed to send transaction sent event", it) } @@ -101,6 +116,12 @@ class WalletObserver(private val wallet: Wallet) { try { if (tx != null && (filters.isEmpty() || filters.any { it.matches(tx) })) { // log.info("observing transaction received: {} [=====] {}", tx.txId, this@WalletObserver) + val oneHourAgo = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1) + if (tx.updateTime.time > oneHourAgo && observeTxConfidence) { + transactions[tx.txId] = tx + tx.confidence.addEventListener(Threading.USER_THREAD, transactionConfidenceListener) + // log.info("observing transaction: start listening to {}", tx.txId) + } trySend(tx).onFailure { log.error("Failed to send transaction received event", it) } @@ -111,25 +132,32 @@ class WalletObserver(private val wallet: Wallet) { } } - var transactionConfidenceChangedListener: TransactionConfidenceEventListener? = null - if (observeTxConfidence) { - transactionConfidenceChangedListener = TransactionConfidenceEventListener { _, tx: Transaction? -> + transactionConfidenceListener = TransactionConfidence.Listener { transactionConfidence, changeReason -> try { + val tx = transactions[transactionConfidence.transactionHash] if (tx != null && (filters.isEmpty() || filters.any { it.matches(tx) })) { // log.info("observing transaction conf {} [=====] {}", tx.txId, this@WalletObserver) - if (tx.getConfidence(wallet.context).depthInBlocks < 7) { - trySend(tx).onFailure { - log.error("Failed to send transaction confidence event", it) - } + trySend(tx).onFailure { + log.error("Failed to send transaction confidence event", it) } } + val shouldStopListening = when (changeReason) { + TransactionConfidence.Listener.ChangeReason.CHAIN_LOCKED -> transactionConfidence.isChainLocked + TransactionConfidence.Listener.ChangeReason.IX_TYPE -> transactionConfidence.isTransactionLocked + TransactionConfidence.Listener.ChangeReason.DEPTH -> transactionConfidence.depthInBlocks >= 6 + else -> false + } + if (shouldStopListening) { + // log.info("observing transaction: stop listening to {}", transactionConfidence.transactionHash) + transactionConfidence.removeEventListener(transactionConfidenceListener) + transactions.remove(transactionConfidence.transactionHash) + } } catch (e: Exception) { log.error("Error in transactionConfidenceChangedListener", e) close(e) } } - wallet.addTransactionConfidenceEventListener(Threading.USER_THREAD, transactionConfidenceChangedListener) } wallet.addCoinsSentEventListener(Threading.USER_THREAD, coinsSentListener) @@ -140,7 +168,11 @@ class WalletObserver(private val wallet: Wallet) { wallet.removeCoinsSentEventListener(coinsSentListener) wallet.removeCoinsReceivedEventListener(coinsReceivedListener) if (observeTxConfidence) { - wallet.removeTransactionConfidenceEventListener(transactionConfidenceChangedListener) + transactions.forEach { (_, tx) -> + tx.confidence.removeEventListener(transactionConfidenceListener) + // log.info("observing transaction: stop listening to {}", tx.txId) + } + transactions.clear() } } } catch (e: Exception) { diff --git a/wallet/src/de/schildbach/wallet/ui/main/MainViewModel.kt b/wallet/src/de/schildbach/wallet/ui/main/MainViewModel.kt index 3808272a68..6d38cfe900 100644 --- a/wallet/src/de/schildbach/wallet/ui/main/MainViewModel.kt +++ b/wallet/src/de/schildbach/wallet/ui/main/MainViewModel.kt @@ -104,8 +104,8 @@ import org.dash.wallet.common.services.analytics.AnalyticsTimer import org.dash.wallet.common.transactions.TransactionUtils.isEntirelySelf import org.dash.wallet.common.transactions.TransactionWrapper import org.dash.wallet.common.transactions.TransactionWrapperComparator +import org.dash.wallet.common.transactions.batchAndFilterUpdates import org.dash.wallet.common.util.toBigDecimal -import org.dash.wallet.common.util.window import org.dash.wallet.integrations.crowdnode.api.CrowdNodeApi import org.dash.wallet.integrations.crowdnode.transactions.FullCrowdNodeSignUpTxSetFactory import org.slf4j.LoggerFactory @@ -160,7 +160,6 @@ class MainViewModel @Inject constructor( val fiatFormat: MonetaryFormat = Constants.LOCAL_FORMAT.minDecimals(0).optionalDecimals(0, 2) private val _transactions = MutableLiveData>() - private val _modifyTransactionRow = MutableStateFlow>(Pair(false, null)) val transactions: LiveData> get() = _transactions private val _transactionsDirection = MutableStateFlow(TxFilterType.ALL) @@ -299,7 +298,7 @@ class MainViewModel @Inject constructor( val filter = TxDirectionFilter(direction, walletData.wallet!!) refreshTransactions(filter, metadata) walletData.observeTransactions(true, filter) - .window(500) // batch every 500 ms + .batchAndFilterUpdates(500) // batch every 500 ms .onEach { if (!refreshTxWatch.isRunning) { refreshTxWatch.start() } val timeElapsed = refreshTxWatch.elapsed(TimeUnit.MILLISECONDS)