Skip to content

Commit

Permalink
multi: split UpsertAssetProof and use mutex
Browse files Browse the repository at this point in the history
`UpsertAssetProof` gave issues with a postgresql backend. See: #951

By separating the subquery from the upsert, we can error out if we see
more than one asset ID (primary key from table assets) being returned
from the database. This should return a more meaningful error than the
original postgresql error.

By also surrounding the usage with mutex we prevent the deadlocks we saw
happening.
  • Loading branch information
gijswijs authored and guggero committed Jun 24, 2024
1 parent 4d44446 commit 651af43
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 80 deletions.
2 changes: 1 addition & 1 deletion itest/addrs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ func runMultiSendTest(ctxt context.Context, t *harnessTest, alice,
AssertAddrEvent(t.t, alice, aliceAddr2, 1, statusDetected)

// Mine a block to make sure the events are marked as confirmed.
_ = MineBlocks(t.t, t.lndHarness.Miner.Client, 1, 1)[0]
_ = MineBlocks(t.t, t.lndHarness.Miner.Client, 1, 1)

// Eventually the events should be marked as confirmed.
AssertAddrEventByStatus(t.t, bob, statusConfirmed, 2)
Expand Down
8 changes: 6 additions & 2 deletions itest/assertions.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,11 @@ func AssertAddrEventByStatus(t *testing.T, client taprpc.TaprootAssetsClient,
},
)
require.NoError(t, err)
require.Len(t, resp.Events, numEvents)

if len(resp.Events) != numEvents {
return fmt.Errorf("got %d events, wanted %d",
len(resp.Events), numEvents)
}

for _, event := range resp.Events {
if event.Status != filterStatus {
Expand Down Expand Up @@ -1410,7 +1414,7 @@ func AssertNumAssets(t *testing.T, ctx context.Context,
require.NoError(t, err)

// Ensure that the number of assets returned is correct.
require.Equal(t, numAssets, len(resp.Assets))
require.Len(t, resp.Assets, numAssets)

return resp
}
Expand Down
46 changes: 34 additions & 12 deletions tapdb/asset_minting.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,14 @@ type (
// MintingBatchInit is used to create a new minting batch.
MintingBatchInit = sqlc.NewMintingBatchParams

// ProofUpdate is used to update a proof file on disk.
ProofUpdate = sqlc.UpsertAssetProofParams

// ProofUpdateByID is used to update a proof file on disk by asset
// database ID.
ProofUpdateByID = sqlc.UpsertAssetProofByIDParams

// FetchAssetID is used to fetch the primary key ID of an asset, by
// outpoint and tweaked script key.
FetchAssetID = sqlc.FetchAssetIDParams

// NewScriptKey wraps the params needed to insert a new script key on
// disk.
NewScriptKey = sqlc.UpsertScriptKeyParams
Expand Down Expand Up @@ -222,12 +223,14 @@ type PendingAssetStore interface {
FetchAssetsForBatch(ctx context.Context, rawKey []byte) ([]AssetSprout,
error)

// UpsertAssetProof inserts a new or updates an existing asset proof on
// disk.
//
// TODO(roasbeef): move somewhere else??
UpsertAssetProof(ctx context.Context,
arg sqlc.UpsertAssetProofParams) error
// FetchAssetID fetches the `asset_id` (primary key) from the assets
// table for a given asset identified by `Outpoint` and
// `TweakedScriptKey`.
FetchAssetID(ctx context.Context, arg FetchAssetID) ([]int64, error)

// UpsertAssetProofByID inserts a new or updates an existing asset
// proof on disk.
UpsertAssetProofByID(ctx context.Context, arg ProofUpdateByID) error

// FetchAssetMetaForAsset fetches the asset meta for a given asset.
FetchAssetMetaForAsset(ctx context.Context,
Expand Down Expand Up @@ -1644,9 +1647,28 @@ func (a *AssetMintingStore) MarkBatchConfirmed(ctx context.Context,
// As a final act, we'll now insert the proof files for each of
// the assets that were fully confirmed with this block.
for scriptKey, proofBlob := range mintingProofs {
err := q.UpsertAssetProof(ctx, ProofUpdate{
TweakedScriptKey: scriptKey.CopyBytes(),
ProofFile: proofBlob,
// We need to fetch the table primary key `asset_id`
// first, as we need it to update the proof.
dbAssetIds, err := q.FetchAssetID(ctx, FetchAssetID{
TweakedScriptKey: scriptKey[:],
})
if err != nil {
return err
}

// We should not have more than one `asset_id`.
if len(dbAssetIds) > 1 {
return fmt.Errorf("expected 1 asset id, found"+
" %d with asset ids %v",
len(dbAssetIds), dbAssetIds)
}

// Upload proof by the dbAssetId, which is the _primary
// key_ of the asset in table assets, not the BIPS
// concept of `asset_id`.
err = q.UpsertAssetProofByID(ctx, ProofUpdateByID{
AssetID: dbAssetIds[0],
ProofFile: proofBlob,
})
if err != nil {
return fmt.Errorf("unable to insert proof "+
Expand Down
58 changes: 40 additions & 18 deletions tapdb/assets_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,10 @@ type ActiveAssetsStore interface {
UpsertManagedUTXO(ctx context.Context, arg RawManagedUTXO) (int64,
error)

// UpsertAssetProof inserts a new or updates an existing asset proof on
// disk.
UpsertAssetProof(ctx context.Context, arg ProofUpdate) error
// FetchAssetID fetches the `asset_id` (primary key) from the assets
// table for a given asset identified by `Outpoint` and
// `TweakedScriptKey`.
FetchAssetID(ctx context.Context, arg FetchAssetID) ([]int64, error)

// UpsertAssetProofByID inserts a new or updates an existing asset
// proof on disk.
Expand Down Expand Up @@ -1566,13 +1567,11 @@ func (a *AssetStore) importAssetFromProof(ctx context.Context,
return fmt.Errorf("unable to insert asset witness: %w", err)
}

// As a final step, we'll insert the proof file we used to generate all
// the above information.
scriptKeyBytes := newAsset.ScriptKey.PubKey.SerializeCompressed()
return db.UpsertAssetProof(ctx, ProofUpdate{
TweakedScriptKey: scriptKeyBytes,
Outpoint: anchorPoint,
ProofFile: proof.Blob,
// Upload proof by the dbAssetId, which is the _primary key_ of the
// asset in table assets, not the BIPS concept of `asset_id`.
return db.UpsertAssetProofByID(ctx, ProofUpdateByID{
AssetID: assetIDs[0],
ProofFile: proof.Blob,
})
}

Expand Down Expand Up @@ -1615,10 +1614,30 @@ func (a *AssetStore) upsertAssetProof(ctx context.Context,
// As a final step, we'll insert the proof file we used to generate all
// the above information.
scriptKeyBytes := proof.Asset.ScriptKey.PubKey.SerializeCompressed()
return db.UpsertAssetProof(ctx, ProofUpdate{

// We need to fetch the table primary key `asset_id` first, as we need
// it to update the proof. We could do this in one query, this gave
// issues with a postgresql backend. See:
// https://github.com/lightninglabs/taproot-assets/issues/951
dbAssetIds, err := db.FetchAssetID(ctx, FetchAssetID{
TweakedScriptKey: scriptKeyBytes,
Outpoint: outpointBytes,
ProofFile: proof.Blob,
})
if err != nil {
return err
}

// We should not have more than one `asset_id`.
if len(dbAssetIds) > 1 {
return fmt.Errorf("expected 1 asset id, found %d with asset"+
" ids %v", len(dbAssetIds), dbAssetIds)
}

// Upload proof by the dbAssetId, which is the _primary key_ of the
// asset in table assets, not the BIPS concept of `asset_id`.
return db.UpsertAssetProofByID(ctx, ProofUpdateByID{
AssetID: dbAssetIds[0],
ProofFile: proof.Blob,
})
}

Expand All @@ -1633,6 +1652,7 @@ func (a *AssetStore) ImportProofs(ctx context.Context, _ proof.HeaderVerifier,
proofs ...*proof.AnnotatedProof) error {

var writeTxOpts AssetStoreTxOptions

err := a.db.ExecTx(ctx, &writeTxOpts, func(q ActiveAssetsStore) error {
for _, p := range proofs {
if replace {
Expand Down Expand Up @@ -2674,6 +2694,7 @@ func (a *AssetStore) ConfirmParcelDelivery(ctx context.Context,
writeTxOpts AssetStoreTxOptions
localProofKeys []asset.SerializedKey
)

err := a.db.ExecTx(ctx, &writeTxOpts, func(q ActiveAssetsStore) error {
// First, we'll fetch the asset transfer based on its outpoint
// bytes, so we can apply the delta it describes.
Expand Down Expand Up @@ -2823,13 +2844,14 @@ func (a *AssetStore) ConfirmParcelDelivery(ctx context.Context,
}
localProofKeys = append(localProofKeys, scriptKey)

// Now we can update the asset proof for the sender for
// this given delta.
err = q.UpsertAssetProof(ctx, ProofUpdate{
TweakedScriptKey: out.ScriptKeyBytes,
Outpoint: out.AnchorOutpoint,
ProofFile: receiverProof.Blob,
// Upload proof by the dbAssetId, which is the _primary
// key_ of the asset in table assets, not the BIPS
// concept of `asset_id`.
err = q.UpsertAssetProofByID(ctx, ProofUpdateByID{
AssetID: newAssetID,
ProofFile: receiverProof.Blob,
})

if err != nil {
return err
}
Expand Down
76 changes: 42 additions & 34 deletions tapdb/sqlc/assets.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion tapdb/sqlc/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 3 additions & 12 deletions tapdb/sqlc/queries/assets.sql
Original file line number Diff line number Diff line change
Expand Up @@ -669,9 +669,8 @@ UPDATE chain_txns
SET block_height = $2, block_hash = $3, tx_index = $4
WHERE txn_id in (SELECT txn_id FROM target_txn);

-- name: UpsertAssetProof :exec
WITH target_asset(asset_id) AS (
SELECT asset_id
-- name: FetchAssetID :many
SELECT asset_id
FROM assets
JOIN script_keys
ON assets.script_key_id = script_keys.script_key_id
Expand All @@ -681,15 +680,7 @@ WITH target_asset(asset_id) AS (
(script_keys.tweaked_script_key = sqlc.narg('tweaked_script_key')
OR sqlc.narg('tweaked_script_key') IS NULL)
AND (utxos.outpoint = sqlc.narg('outpoint')
OR sqlc.narg('outpoint') IS NULL)
)
INSERT INTO asset_proofs (
asset_id, proof_file
) VALUES (
(SELECT asset_id FROM target_asset), @proof_file
) ON CONFLICT (asset_id)
-- This is not a NOP, we always overwrite the proof with the new one.
DO UPDATE SET proof_file = EXCLUDED.proof_file;
OR sqlc.narg('outpoint') IS NULL);

-- name: UpsertAssetProofByID :exec
INSERT INTO asset_proofs (
Expand Down

0 comments on commit 651af43

Please sign in to comment.