From 651af43c2a39cce1b819ef4e2ae3587db1b1a617 Mon Sep 17 00:00:00 2001 From: Gijs van Dam Date: Tue, 18 Jun 2024 17:00:10 +0200 Subject: [PATCH] multi: split `UpsertAssetProof` and use `mutex` `UpsertAssetProof` gave issues with a postgresql backend. See: #951 By separating the subquery from the upsert, we can error out if we see more than one asset ID (primary key from table assets) being returned from the database. This should return a more meaningful error than the original postgresql error. By also surrounding the usage with mutex we prevent the deadlocks we saw happening. --- itest/addrs_test.go | 2 +- itest/assertions.go | 8 +++- tapdb/asset_minting.go | 46 +++++++++++++++------ tapdb/assets_store.go | 58 +++++++++++++++++--------- tapdb/sqlc/assets.sql.go | 76 +++++++++++++++++++---------------- tapdb/sqlc/querier.go | 2 +- tapdb/sqlc/queries/assets.sql | 15 ++----- 7 files changed, 127 insertions(+), 80 deletions(-) diff --git a/itest/addrs_test.go b/itest/addrs_test.go index 4822af145..7a96fc6f6 100644 --- a/itest/addrs_test.go +++ b/itest/addrs_test.go @@ -603,7 +603,7 @@ func runMultiSendTest(ctxt context.Context, t *harnessTest, alice, AssertAddrEvent(t.t, alice, aliceAddr2, 1, statusDetected) // Mine a block to make sure the events are marked as confirmed. - _ = MineBlocks(t.t, t.lndHarness.Miner.Client, 1, 1)[0] + _ = MineBlocks(t.t, t.lndHarness.Miner.Client, 1, 1) // Eventually the events should be marked as confirmed. AssertAddrEventByStatus(t.t, bob, statusConfirmed, 2) diff --git a/itest/assertions.go b/itest/assertions.go index e9edda130..b737dead7 100644 --- a/itest/assertions.go +++ b/itest/assertions.go @@ -818,7 +818,11 @@ func AssertAddrEventByStatus(t *testing.T, client taprpc.TaprootAssetsClient, }, ) require.NoError(t, err) - require.Len(t, resp.Events, numEvents) + + if len(resp.Events) != numEvents { + return fmt.Errorf("got %d events, wanted %d", + len(resp.Events), numEvents) + } for _, event := range resp.Events { if event.Status != filterStatus { @@ -1410,7 +1414,7 @@ func AssertNumAssets(t *testing.T, ctx context.Context, require.NoError(t, err) // Ensure that the number of assets returned is correct. - require.Equal(t, numAssets, len(resp.Assets)) + require.Len(t, resp.Assets, numAssets) return resp } diff --git a/tapdb/asset_minting.go b/tapdb/asset_minting.go index f7a230f39..98a87da93 100644 --- a/tapdb/asset_minting.go +++ b/tapdb/asset_minting.go @@ -113,13 +113,14 @@ type ( // MintingBatchInit is used to create a new minting batch. MintingBatchInit = sqlc.NewMintingBatchParams - // ProofUpdate is used to update a proof file on disk. - ProofUpdate = sqlc.UpsertAssetProofParams - // ProofUpdateByID is used to update a proof file on disk by asset // database ID. ProofUpdateByID = sqlc.UpsertAssetProofByIDParams + // FetchAssetID is used to fetch the primary key ID of an asset, by + // outpoint and tweaked script key. + FetchAssetID = sqlc.FetchAssetIDParams + // NewScriptKey wraps the params needed to insert a new script key on // disk. NewScriptKey = sqlc.UpsertScriptKeyParams @@ -222,12 +223,14 @@ type PendingAssetStore interface { FetchAssetsForBatch(ctx context.Context, rawKey []byte) ([]AssetSprout, error) - // UpsertAssetProof inserts a new or updates an existing asset proof on - // disk. - // - // TODO(roasbeef): move somewhere else?? - UpsertAssetProof(ctx context.Context, - arg sqlc.UpsertAssetProofParams) error + // FetchAssetID fetches the `asset_id` (primary key) from the assets + // table for a given asset identified by `Outpoint` and + // `TweakedScriptKey`. + FetchAssetID(ctx context.Context, arg FetchAssetID) ([]int64, error) + + // UpsertAssetProofByID inserts a new or updates an existing asset + // proof on disk. + UpsertAssetProofByID(ctx context.Context, arg ProofUpdateByID) error // FetchAssetMetaForAsset fetches the asset meta for a given asset. FetchAssetMetaForAsset(ctx context.Context, @@ -1644,9 +1647,28 @@ func (a *AssetMintingStore) MarkBatchConfirmed(ctx context.Context, // As a final act, we'll now insert the proof files for each of // the assets that were fully confirmed with this block. for scriptKey, proofBlob := range mintingProofs { - err := q.UpsertAssetProof(ctx, ProofUpdate{ - TweakedScriptKey: scriptKey.CopyBytes(), - ProofFile: proofBlob, + // We need to fetch the table primary key `asset_id` + // first, as we need it to update the proof. + dbAssetIds, err := q.FetchAssetID(ctx, FetchAssetID{ + TweakedScriptKey: scriptKey[:], + }) + if err != nil { + return err + } + + // We should not have more than one `asset_id`. + if len(dbAssetIds) > 1 { + return fmt.Errorf("expected 1 asset id, found"+ + " %d with asset ids %v", + len(dbAssetIds), dbAssetIds) + } + + // Upload proof by the dbAssetId, which is the _primary + // key_ of the asset in table assets, not the BIPS + // concept of `asset_id`. + err = q.UpsertAssetProofByID(ctx, ProofUpdateByID{ + AssetID: dbAssetIds[0], + ProofFile: proofBlob, }) if err != nil { return fmt.Errorf("unable to insert proof "+ diff --git a/tapdb/assets_store.go b/tapdb/assets_store.go index ccc23ee03..51f0f0ba7 100644 --- a/tapdb/assets_store.go +++ b/tapdb/assets_store.go @@ -214,9 +214,10 @@ type ActiveAssetsStore interface { UpsertManagedUTXO(ctx context.Context, arg RawManagedUTXO) (int64, error) - // UpsertAssetProof inserts a new or updates an existing asset proof on - // disk. - UpsertAssetProof(ctx context.Context, arg ProofUpdate) error + // FetchAssetID fetches the `asset_id` (primary key) from the assets + // table for a given asset identified by `Outpoint` and + // `TweakedScriptKey`. + FetchAssetID(ctx context.Context, arg FetchAssetID) ([]int64, error) // UpsertAssetProofByID inserts a new or updates an existing asset // proof on disk. @@ -1566,13 +1567,11 @@ func (a *AssetStore) importAssetFromProof(ctx context.Context, return fmt.Errorf("unable to insert asset witness: %w", err) } - // As a final step, we'll insert the proof file we used to generate all - // the above information. - scriptKeyBytes := newAsset.ScriptKey.PubKey.SerializeCompressed() - return db.UpsertAssetProof(ctx, ProofUpdate{ - TweakedScriptKey: scriptKeyBytes, - Outpoint: anchorPoint, - ProofFile: proof.Blob, + // Upload proof by the dbAssetId, which is the _primary key_ of the + // asset in table assets, not the BIPS concept of `asset_id`. + return db.UpsertAssetProofByID(ctx, ProofUpdateByID{ + AssetID: assetIDs[0], + ProofFile: proof.Blob, }) } @@ -1615,10 +1614,30 @@ func (a *AssetStore) upsertAssetProof(ctx context.Context, // As a final step, we'll insert the proof file we used to generate all // the above information. scriptKeyBytes := proof.Asset.ScriptKey.PubKey.SerializeCompressed() - return db.UpsertAssetProof(ctx, ProofUpdate{ + + // We need to fetch the table primary key `asset_id` first, as we need + // it to update the proof. We could do this in one query, this gave + // issues with a postgresql backend. See: + // https://github.com/lightninglabs/taproot-assets/issues/951 + dbAssetIds, err := db.FetchAssetID(ctx, FetchAssetID{ TweakedScriptKey: scriptKeyBytes, Outpoint: outpointBytes, - ProofFile: proof.Blob, + }) + if err != nil { + return err + } + + // We should not have more than one `asset_id`. + if len(dbAssetIds) > 1 { + return fmt.Errorf("expected 1 asset id, found %d with asset"+ + " ids %v", len(dbAssetIds), dbAssetIds) + } + + // Upload proof by the dbAssetId, which is the _primary key_ of the + // asset in table assets, not the BIPS concept of `asset_id`. + return db.UpsertAssetProofByID(ctx, ProofUpdateByID{ + AssetID: dbAssetIds[0], + ProofFile: proof.Blob, }) } @@ -1633,6 +1652,7 @@ func (a *AssetStore) ImportProofs(ctx context.Context, _ proof.HeaderVerifier, proofs ...*proof.AnnotatedProof) error { var writeTxOpts AssetStoreTxOptions + err := a.db.ExecTx(ctx, &writeTxOpts, func(q ActiveAssetsStore) error { for _, p := range proofs { if replace { @@ -2674,6 +2694,7 @@ func (a *AssetStore) ConfirmParcelDelivery(ctx context.Context, writeTxOpts AssetStoreTxOptions localProofKeys []asset.SerializedKey ) + err := a.db.ExecTx(ctx, &writeTxOpts, func(q ActiveAssetsStore) error { // First, we'll fetch the asset transfer based on its outpoint // bytes, so we can apply the delta it describes. @@ -2823,13 +2844,14 @@ func (a *AssetStore) ConfirmParcelDelivery(ctx context.Context, } localProofKeys = append(localProofKeys, scriptKey) - // Now we can update the asset proof for the sender for - // this given delta. - err = q.UpsertAssetProof(ctx, ProofUpdate{ - TweakedScriptKey: out.ScriptKeyBytes, - Outpoint: out.AnchorOutpoint, - ProofFile: receiverProof.Blob, + // Upload proof by the dbAssetId, which is the _primary + // key_ of the asset in table assets, not the BIPS + // concept of `asset_id`. + err = q.UpsertAssetProofByID(ctx, ProofUpdateByID{ + AssetID: newAssetID, + ProofFile: receiverProof.Blob, }) + if err != nil { return err } diff --git a/tapdb/sqlc/assets.sql.go b/tapdb/sqlc/assets.sql.go index 88cc429c1..1ebda3434 100644 --- a/tapdb/sqlc/assets.sql.go +++ b/tapdb/sqlc/assets.sql.go @@ -516,6 +516,48 @@ func (q *Queries) DeleteUTXOLease(ctx context.Context, outpoint []byte) error { return err } +const fetchAssetID = `-- name: FetchAssetID :many +SELECT asset_id + FROM assets + JOIN script_keys + ON assets.script_key_id = script_keys.script_key_id + JOIN managed_utxos utxos + ON assets.anchor_utxo_id = utxos.utxo_id + WHERE + (script_keys.tweaked_script_key = $1 + OR $1 IS NULL) + AND (utxos.outpoint = $2 + OR $2 IS NULL) +` + +type FetchAssetIDParams struct { + TweakedScriptKey []byte + Outpoint []byte +} + +func (q *Queries) FetchAssetID(ctx context.Context, arg FetchAssetIDParams) ([]int64, error) { + rows, err := q.db.QueryContext(ctx, fetchAssetID, arg.TweakedScriptKey, arg.Outpoint) + if err != nil { + return nil, err + } + defer rows.Close() + var items []int64 + for rows.Next() { + var asset_id int64 + if err := rows.Scan(&asset_id); err != nil { + return nil, err + } + items = append(items, asset_id) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const fetchAssetMeta = `-- name: FetchAssetMeta :one SELECT meta_data_hash, meta_data_blob, meta_data_type FROM assets_meta @@ -2509,40 +2551,6 @@ func (q *Queries) UpsertAssetMeta(ctx context.Context, arg UpsertAssetMetaParams return meta_id, err } -const upsertAssetProof = `-- name: UpsertAssetProof :exec -WITH target_asset(asset_id) AS ( - SELECT asset_id - FROM assets - JOIN script_keys - ON assets.script_key_id = script_keys.script_key_id - JOIN managed_utxos utxos - ON assets.anchor_utxo_id = utxos.utxo_id - WHERE - (script_keys.tweaked_script_key = $2 - OR $2 IS NULL) - AND (utxos.outpoint = $3 - OR $3 IS NULL) -) -INSERT INTO asset_proofs ( - asset_id, proof_file -) VALUES ( - (SELECT asset_id FROM target_asset), $1 -) ON CONFLICT (asset_id) - -- This is not a NOP, we always overwrite the proof with the new one. - DO UPDATE SET proof_file = EXCLUDED.proof_file -` - -type UpsertAssetProofParams struct { - ProofFile []byte - TweakedScriptKey []byte - Outpoint []byte -} - -func (q *Queries) UpsertAssetProof(ctx context.Context, arg UpsertAssetProofParams) error { - _, err := q.db.ExecContext(ctx, upsertAssetProof, arg.ProofFile, arg.TweakedScriptKey, arg.Outpoint) - return err -} - const upsertAssetProofByID = `-- name: UpsertAssetProofByID :exec INSERT INTO asset_proofs ( asset_id, proof_file diff --git a/tapdb/sqlc/querier.go b/tapdb/sqlc/querier.go index 340d49061..107c06cf6 100644 --- a/tapdb/sqlc/querier.go +++ b/tapdb/sqlc/querier.go @@ -44,6 +44,7 @@ type Querier interface { FetchAddrEventByAddrKeyAndOutpoint(ctx context.Context, arg FetchAddrEventByAddrKeyAndOutpointParams) (FetchAddrEventByAddrKeyAndOutpointRow, error) FetchAddrs(ctx context.Context, arg FetchAddrsParams) ([]FetchAddrsRow, error) FetchAllNodes(ctx context.Context) ([]MssmtNode, error) + FetchAssetID(ctx context.Context, arg FetchAssetIDParams) ([]int64, error) FetchAssetMeta(ctx context.Context, metaID int64) (FetchAssetMetaRow, error) FetchAssetMetaByHash(ctx context.Context, metaDataHash []byte) (FetchAssetMetaByHashRow, error) FetchAssetMetaForAsset(ctx context.Context, assetID []byte) (FetchAssetMetaForAssetRow, error) @@ -157,7 +158,6 @@ type Querier interface { UpsertAssetGroupKey(ctx context.Context, arg UpsertAssetGroupKeyParams) (int64, error) UpsertAssetGroupWitness(ctx context.Context, arg UpsertAssetGroupWitnessParams) (int64, error) UpsertAssetMeta(ctx context.Context, arg UpsertAssetMetaParams) (int64, error) - UpsertAssetProof(ctx context.Context, arg UpsertAssetProofParams) error UpsertAssetProofByID(ctx context.Context, arg UpsertAssetProofByIDParams) error UpsertAssetWitness(ctx context.Context, arg UpsertAssetWitnessParams) error UpsertChainTx(ctx context.Context, arg UpsertChainTxParams) (int64, error) diff --git a/tapdb/sqlc/queries/assets.sql b/tapdb/sqlc/queries/assets.sql index 333d64f38..5c5fb6d3c 100644 --- a/tapdb/sqlc/queries/assets.sql +++ b/tapdb/sqlc/queries/assets.sql @@ -669,9 +669,8 @@ UPDATE chain_txns SET block_height = $2, block_hash = $3, tx_index = $4 WHERE txn_id in (SELECT txn_id FROM target_txn); --- name: UpsertAssetProof :exec -WITH target_asset(asset_id) AS ( - SELECT asset_id +-- name: FetchAssetID :many +SELECT asset_id FROM assets JOIN script_keys ON assets.script_key_id = script_keys.script_key_id @@ -681,15 +680,7 @@ WITH target_asset(asset_id) AS ( (script_keys.tweaked_script_key = sqlc.narg('tweaked_script_key') OR sqlc.narg('tweaked_script_key') IS NULL) AND (utxos.outpoint = sqlc.narg('outpoint') - OR sqlc.narg('outpoint') IS NULL) -) -INSERT INTO asset_proofs ( - asset_id, proof_file -) VALUES ( - (SELECT asset_id FROM target_asset), @proof_file -) ON CONFLICT (asset_id) - -- This is not a NOP, we always overwrite the proof with the new one. - DO UPDATE SET proof_file = EXCLUDED.proof_file; + OR sqlc.narg('outpoint') IS NULL); -- name: UpsertAssetProofByID :exec INSERT INTO asset_proofs (