diff --git a/itest/send_test.go b/itest/send_test.go index 33460c680..5ea91cec1 100644 --- a/itest/send_test.go +++ b/itest/send_test.go @@ -135,6 +135,176 @@ func testBasicSendUnidirectional(t *harnessTest) { wg.Wait() } +// testRestartReceiver restart receiver node and check asset balance. +func testRestartReceiverCheckBalance(t *harnessTest) { + var ( + ctxb = context.Background() + wg sync.WaitGroup + ) + + const ( + numUnits = 10 + numSends = 1 + ) + + // Subscribe to receive assent send events from primary tapd node. + eventNtfns, err := t.tapd.SubscribeSendAssetEventNtfns( + ctxb, &taprpc.SubscribeSendAssetEventNtfnsRequest{}, + ) + require.NoError(t.t, err) + + // Test to ensure that we execute the transaction broadcast state. + // This test is executed in a goroutine to ensure that we can receive + // the event notification from the tapd node as the rest of the test + // proceeds. + wg.Add(1) + go func() { + defer wg.Done() + + broadcastState := tapfreighter.SendStateBroadcast.String() + targetEventSelector := func(event *taprpc.SendAssetEvent) bool { + switch eventTyped := event.Event.(type) { + case *taprpc.SendAssetEvent_ExecuteSendStateEvent: + ev := eventTyped.ExecuteSendStateEvent + + // Log send state execution. + timestamp := time.UnixMicro(ev.Timestamp) + t.Logf("Executing send state (%v): %v", + timestamp.Format(time.RFC3339Nano), + ev.SendState) + + return ev.SendState == broadcastState + } + + return false + } + + timeout := 2 * defaultProofTransferReceiverAckTimeout + ctx, cancel := context.WithTimeout(ctxb, timeout) + defer cancel() + assertAssetSendNtfsEvent( + t, ctx, eventNtfns, targetEventSelector, numSends, + ) + }() + + // First, we'll make a normal assets with enough units to allow us to + // send it around a few times. + rpcAssets := MintAssetsConfirmBatch( + t.t, t.lndHarness.Miner.Client, t.tapd, + []*mintrpc.MintAssetRequest{issuableAssets[0]}, + ) + + genInfo := rpcAssets[0].AssetGenesis + + // Now that we have the asset created, we'll make a new node that'll + // serve as the node which'll receive the assets. The existing tapd + // node will be used to synchronize universe state. + recvTapd := setupTapdHarness( + t.t, t, t.lndHarness.Bob, t.universeServer, + func(params *tapdHarnessParams) { + params.startupSyncNode = t.tapd + params.startupSyncNumAssets = len(rpcAssets) + }, + ) + defer func() { + require.NoError(t.t, recvTapd.stop(!*noDelete)) + }() + + // Next, we'll attempt to complete two transfers with distinct + // addresses from our main node to Bob. + currentUnits := issuableAssets[0].Asset.Amount + + // Issue a single address which will be reused for each send. + bobAddr, err := recvTapd.NewAddr(ctxb, &taprpc.NewAddrRequest{ + AssetId: genInfo.AssetId, + Amt: numUnits, + AssetVersion: rpcAssets[0].Version, + }) + require.NoError(t.t, err) + + for i := 0; i < numSends; i++ { + t.t.Logf("Performing send procedure: %d", i) + + // Deduct what we sent from the expected current number of + // units. + currentUnits -= numUnits + + AssertAddrCreated(t.t, recvTapd, rpcAssets[0], bobAddr) + + sendResp := sendAssetsToAddr(t, t.tapd, bobAddr) + + ConfirmAndAssertOutboundTransfer( + t.t, t.lndHarness.Miner.Client, t.tapd, sendResp, + genInfo.AssetId, + []uint64{currentUnits, numUnits}, i, i+1, + ) + AssertNonInteractiveRecvComplete(t.t, recvTapd, i+1) + } + + // Close event stream. + err = eventNtfns.CloseSend() + require.NoError(t.t, err) + + wg.Wait() + + assertRecvBalance := func() { + // Get asset balance by group from the receiver node. + respGroup, err := recvTapd.ListBalances( + ctxb, &taprpc.ListBalancesRequest{ + GroupBy: &taprpc.ListBalancesRequest_GroupKey{ + GroupKey: true, + }, + }, + ) + require.NoError(t.t, err) + + // We expect to see a single asset group balance. The receiver + // node received one asset only. + require.Len(t.t, respGroup.AssetGroupBalances, 1) + + var assetGroupBalance *taprpc.AssetGroupBalance + + for _, value := range respGroup.AssetGroupBalances { + assetGroupBalance = value + break + } + + require.Equal(t.t, int(10), int(assetGroupBalance.Balance)) + + // Get asset balance by asset ID from the receiver node. + respAsset, err := recvTapd.ListBalances( + ctxb, &taprpc.ListBalancesRequest{ + GroupBy: &taprpc.ListBalancesRequest_AssetId{ + AssetId: true, + }, + }, + ) + require.NoError(t.t, err) + + // We expect to see a single asset group balance. The receiver + // node received one asset only. + require.Len(t.t, respAsset.AssetBalances, 1) + + var assetBalance *taprpc.AssetBalance + + for _, value := range respAsset.AssetBalances { + assetBalance = value + break + } + + require.Equal(t.t, assetBalance.Balance, uint64(10)) + } + + // Initial balance check. + assertRecvBalance() + + // Restart the receiver node and then check the balance again. + require.NoError(t.t, recvTapd.stop(false)) + require.NoError(t.t, recvTapd.start(false)) + + assertRecvBalance() +} + // testResumePendingPackageSend tests that we can properly resume a pending // package send after a restart. func testResumePendingPackageSend(t *harnessTest) { diff --git a/itest/test_list_on_test.go b/itest/test_list_on_test.go index f02b0fe4e..841db91a1 100644 --- a/itest/test_list_on_test.go +++ b/itest/test_list_on_test.go @@ -52,6 +52,11 @@ var testCases = []*testCase{ test: testBasicSendUnidirectional, proofCourierType: proof.UniverseRpcCourierType, }, + { + name: "restart receiver check balance", + test: testRestartReceiverCheckBalance, + proofCourierType: proof.UniverseRpcCourierType, + }, { name: "resume pending package send", test: testResumePendingPackageSend, diff --git a/tapgarden/custodian.go b/tapgarden/custodian.go index 817b2f1f0..ab15a3ffd 100644 --- a/tapgarden/custodian.go +++ b/tapgarden/custodian.go @@ -414,16 +414,16 @@ func (c *Custodian) inspectWalletTx(walletTx *lndclient.Transaction) error { courier.SetSubscribers(c.statusEventsSubs) c.statusEventsSubsMtx.Unlock() - // Sleep to give the sender an opportunity to transfer - // the proof to the proof courier service. - // Without this delay our first attempt at retrieving - // the proof will very likely fail. We should expect - // retrieval success before this delay. - select { - case <-time.After(defaultProofRetrievalDelay): - case <-ctx.Done(): - return - } + //// Sleep to give the sender an opportunity to transfer + //// the proof to the proof courier service. + //// Without this delay our first attempt at retrieving + //// the proof will very likely fail. We should expect + //// retrieval success before this delay. + //select { + //case <-time.After(defaultProofRetrievalDelay): + //case <-ctx.Done(): + // return + //} // Attempt to receive proof via proof courier service. loc := proof.Locator{