From 5248792ba0135ede818a3779bd8f48bc206e07c3 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 27 Sep 2022 19:58:44 -0700 Subject: [PATCH 1/6] tarofreighter: use WithCtxQuitNoTimeout instead of WithCtxQuit The WithCtxQuit has an implicit timeout of 30 seconds. We were using that to pass into the conf registration call, which ofc can take longer than 30 seconds on an actual chain. --- tarofreighter/chain_porter.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tarofreighter/chain_porter.go b/tarofreighter/chain_porter.go index 0132d81e3..3628049ef 100644 --- a/tarofreighter/chain_porter.go +++ b/tarofreighter/chain_porter.go @@ -102,7 +102,7 @@ func (p *ChainPorter) Start() error { // Before we re-launch the main goroutine, we'll make sure to // restart any other incomplete sends that may or may not have // had the transaction broadcaster. - ctx, cancel := p.WithCtxQuit() + ctx, cancel := p.WithCtxQuitNoTimeout() defer cancel() pendingParcels, err := p.cfg.ExportLog.PendingParcels(ctx) if err != nil { @@ -247,7 +247,7 @@ func (p *ChainPorter) taroPorter() { func (p *ChainPorter) waitForPkgConfirmation(pkg *OutboundParcelDelta) { defer p.Wg.Done() - ctx, cancel := p.WithCtxQuit() + ctx, cancel := p.WithCtxQuitNoTimeout() defer cancel() mkErr := func(format string, args ...interface{}) error { @@ -269,7 +269,7 @@ func (p *ChainPorter) waitForPkgConfirmation(pkg *OutboundParcelDelta) { return } - confCtx, confCancel := p.WithCtxQuit() + confCtx, confCancel := p.WithCtxQuitNoTimeout() confNtfn, errChan, err := p.cfg.ChainBridge.RegisterConfirmationsNtfn( confCtx, &txHash, pkg.AnchorTx.TxOut[0].PkScript, 1, currentHeight, true, @@ -512,7 +512,7 @@ func (p *ChainPorter) stateStep(currentPkg sendPackage) (*sendPackage, error) { // we'll perform coin selection to see if the send request is even // possible at all. case SendStateCommitmentSelect: - ctx, cancel := p.WithCtxQuit() + ctx, cancel := p.WithCtxQuitNoTimeout() defer cancel() // We need to find a commitment that has enough assets to @@ -565,7 +565,7 @@ func (p *ChainPorter) stateStep(currentPkg sendPackage) (*sendPackage, error) { // Now that we have our set of inputs selected, we'll validate them to // make sure that they're enough to satisfy our send request. case SendStateValidatedInput: - ctx, cancel := p.WithCtxQuit() + ctx, cancel := p.WithCtxQuitNoTimeout() defer cancel() // We'll validate the selected input and commitment. From this @@ -709,7 +709,7 @@ func (p *ChainPorter) stateStep(currentPkg sendPackage) (*sendPackage, error) { // our initial skeleton PSBT packet to send off to the wallet for // funding. case SendStatePsbtFund: - ctx, cancel := p.WithCtxQuit() + ctx, cancel := p.WithCtxQuitNoTimeout() defer cancel() // Construct our template PSBT to commits to the set of dummy @@ -784,7 +784,7 @@ func (p *ChainPorter) stateStep(currentPkg sendPackage) (*sendPackage, error) { return ¤tPkg, err } - ctx, cancel := p.WithCtxQuit() + ctx, cancel := p.WithCtxQuitNoTimeout() defer cancel() // With all the input and output information in the packet, we @@ -923,7 +923,7 @@ func (p *ChainPorter) stateStep(currentPkg sendPackage) (*sendPackage, error) { // In this terminal state, we'll broadcast the transaction to the // network, then launch a goroutine to notify us on confirmation. case SendStateBroadcast: - ctx, cancel := p.WithCtxQuit() + ctx, cancel := p.WithCtxQuitNoTimeout() defer cancel() // We'll need to extract the output public key from the tx out From 365a13cf5644c40bf29ad72d8d4420f6025e3042 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 27 Sep 2022 19:57:14 -0700 Subject: [PATCH 2/6] address: add work around for lnd coin type mismatch Tracking the issue on the ind side with: https://github.com/lightningnetwork/lnd/issues/6952 --- address/params.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/address/params.go b/address/params.go index 532efdc19..9df447472 100644 --- a/address/params.go +++ b/address/params.go @@ -128,6 +128,12 @@ func Net(hrp string) (*ChainParams, error) { return &SigNetTaro, nil case SimNetTaro.TaroHRP: + // For simnet, we'll need to slighlty modify the coin type as + // lnd only ever expects the testnet coin type (1) instead of + // the simnet coin type (115). + simNet := SimNetTaro + simNet.HDCoinType = TestNet3Taro.HDCoinType + return &SimNetTaro, nil default: @@ -151,7 +157,13 @@ func ParamsForChain(name string) ChainParams { return SigNetTaro case chaincfg.SimNetParams.Name: - return SimNetTaro + // For simnet, we'll need to slighlty modify the coin type as + // lnd only ever expects the testnet coin type (1) instead of + // the simnet coin type (115). + simNet := SimNetTaro + simNet.HDCoinType = TestNet3Taro.HDCoinType + + return simNet default: panic(fmt.Sprintf("unknown chain: %v", name)) From aacb60b5fab69829422c3e71d124259b31a96cdc Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 27 Sep 2022 20:57:16 -0700 Subject: [PATCH 3/6] multi: catch duplicate addr error from ImportTaprootOutput --- tarofreighter/chain_porter.go | 14 ++++++++++++-- tarogarden/caretaker.go | 13 ++++++++++++- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/tarofreighter/chain_porter.go b/tarofreighter/chain_porter.go index 3628049ef..c3a7d3d3a 100644 --- a/tarofreighter/chain_porter.go +++ b/tarofreighter/chain_porter.go @@ -949,11 +949,21 @@ func (p *ChainPorter) stateStep(currentPkg sendPackage) (*sendPackage, error) { // it for spends and also takes account of the BTC we used in // the transfer. _, err = p.cfg.Wallet.ImportTaprootOutput(ctx, anchorOutputKey) - if err != nil { + switch { + case err == nil: + break + + // On restart, we'll get an error that the output has already + // been added to the wallet, so we'll catch this now and move + // along if so. + case strings.Contains(err.Error(), "already exists"): + break + + case err != nil: return nil, err } - log.Infof("Broadcasting new transfer tx, taro_anchor_output=%x", + log.Infof("Broadcasting new transfer tx, taro_anchor_output=%v", spew.Sdump(anchorOutput)) // With the public key imported, we can now broadcast to the diff --git a/tarogarden/caretaker.go b/tarogarden/caretaker.go index 1df14a636..48ce21e86 100644 --- a/tarogarden/caretaker.go +++ b/tarogarden/caretaker.go @@ -3,6 +3,7 @@ package tarogarden import ( "context" "fmt" + "strings" "sync" "time" @@ -534,7 +535,17 @@ func (b *BatchCaretaker) stateStep(currentState BatchState) (BatchState, error) ctx, cancel = b.WithCtxQuit() defer cancel() _, err = b.cfg.Wallet.ImportTaprootOutput(ctx, mintingOutputKey) - if err != nil { + switch { + case err == nil: + break + + // On restart, we'll get an error that the output has already + // been added to the wallet, so we'll catch this now and move + // along if so. + case strings.Contains(err.Error(), "already exists"): + break + + case err != nil: return 0, fmt.Errorf("unable to import key: %w", err) } From 37ea6ff1a3a79d4b2b54c4e11c3f6fc0c32605f3 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 27 Sep 2022 20:57:41 -0700 Subject: [PATCH 4/6] tarofreighter: fix restart case by using OutboundPkg.AnchorTx The transfer tx isn't populated yet, so this would panic. --- tarofreighter/chain_porter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tarofreighter/chain_porter.go b/tarofreighter/chain_porter.go index c3a7d3d3a..88d16d02c 100644 --- a/tarofreighter/chain_porter.go +++ b/tarofreighter/chain_porter.go @@ -969,7 +969,7 @@ func (p *ChainPorter) stateStep(currentPkg sendPackage) (*sendPackage, error) { // With the public key imported, we can now broadcast to the // network. err = p.cfg.ChainBridge.PublishTransaction( - ctx, currentPkg.TransferTx, + ctx, currentPkg.OutboundPkg.AnchorTx, ) if err != nil { return nil, err From a20fe7c4e9b27a9f744ac074b013ecd6e1e42a2c Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 27 Sep 2022 20:58:04 -0700 Subject: [PATCH 5/6] tarodb: properly read witness data for asset deltas --- tarodb/assets_store.go | 13 ++++++++++++- tarodb/assets_store_test.go | 7 +++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/tarodb/assets_store.go b/tarodb/assets_store.go index 3c4597dab..15b3f354a 100644 --- a/tarodb/assets_store.go +++ b/tarodb/assets_store.go @@ -1562,6 +1562,17 @@ func (a *AssetStore) PendingParcels(ctx context.Context, var splitRootHash mssmt.NodeHash copy(splitRootHash[:], delta.SplitCommitmentRootHash) + var witnessData []asset.Witness + err = asset.WitnessDecoder( + bytes.NewReader(delta.SerializedWitnesses), + &witnessData, &[8]byte{}, + uint64(len(delta.SerializedWitnesses)), + ) + if err != nil { + return fmt.Errorf("unable to decode "+ + "witness: %v", err) + } + spendDeltas[i] = tarofreighter.AssetSpendDelta{ OldScriptKey: *oldScriptKey, NewAmt: uint64(delta.NewAmt), @@ -1584,7 +1595,7 @@ func (a *AssetStore) PendingParcels(ctx context.Context, splitRootHash, uint64(delta.SplitCommitmentRootValue.Int64), ), - WitnessData: nil, + WitnessData: witnessData, } } diff --git a/tarodb/assets_store_test.go b/tarodb/assets_store_test.go index 087273572..ce62b4edd 100644 --- a/tarodb/assets_store_test.go +++ b/tarodb/assets_store_test.go @@ -740,6 +740,12 @@ func TestAssetExportLog(t *testing.T) { senderBlob := bytes.Repeat([]byte{0x01}, 100) receiverBlob := bytes.Repeat([]byte{0x02}, 100) + newWitness := asset.Witness{ + PrevID: &asset.PrevID{}, + TxWitness: [][]byte{[]byte{0x01}, []byte{0x02}}, + SplitCommitment: nil, + } + // With the assets inserted, we'll now construct the struct we'll used // to commit a new spend on disk. anchorTxHash := newAnchorTx.TxHash() @@ -773,6 +779,7 @@ func TestAssetExportLog(t *testing.T) { SplitCommitmentRoot: mssmt.NewComputedNode( newRootHash, newRootValue, ), + WitnessData: []asset.Witness{newWitness}, }, }, SenderAssetProof: senderBlob, From 7cc14e730d4f9b54a6b2694d65ca4425cfedb944 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 21 Sep 2022 20:14:12 -0700 Subject: [PATCH 6/6] multi: add new proof.Courier interface for async proof distribution In this commit, we add a new `proof.Courier` interface that's optionally used as async proof distribution. The current default implementation uses a hashmail server to send the proofs back and forth between the sender and the receiver. In the future, this'll be replaced with streaming/long-polling connections to a desginated multi-verse. Fixes #95 --- cmd/tarocli/main.go | 2 +- go.mod | 1 + go.sum | 2 + itest/tarod_harness.go | 6 + log.go | 2 + proof/courier.go | 371 ++++++++++++++++++++++++++++++++++ proof/log.go | 26 +++ tarocfg/config.go | 7 + tarocfg/server.go | 19 ++ tarofreighter/chain_porter.go | 40 +++- tarogarden/custodian.go | 62 +++++- 11 files changed, 526 insertions(+), 12 deletions(-) create mode 100644 proof/courier.go create mode 100644 proof/log.go diff --git a/cmd/tarocli/main.go b/cmd/tarocli/main.go index dead1664d..b87efc108 100644 --- a/cmd/tarocli/main.go +++ b/cmd/tarocli/main.go @@ -236,7 +236,7 @@ func main() { cli.StringFlag{ Name: "rpcserver", Value: defaultRPCHostPort, - Usage: "The host:port of LN daemon.", + Usage: "The host:port of taro daemon.", }, cli.StringFlag{ Name: "tarodir", diff --git a/go.mod b/go.mod index f6a6f6114..21f99b484 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.5.0 github.com/jessevdk/go-flags v1.4.0 + github.com/lightninglabs/lightning-node-connect/hashmailrpc v1.0.2 github.com/lightninglabs/lndclient v0.16.0-1 github.com/lightninglabs/protobuf-hex-display v1.4.3-hex-display github.com/lightningnetwork/lnd v0.15.0-beta.rc6.0.20220825081330-cf9a9864cf25 diff --git a/go.sum b/go.sum index 7dc2f7b84..9b56fe3ef 100644 --- a/go.sum +++ b/go.sum @@ -610,6 +610,8 @@ github.com/lib/pq v1.10.3 h1:v9QZf2Sn6AmjXtQeFpdoq/eaNtYP6IN+7lcrygsIAtg= github.com/lib/pq v1.10.3/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf h1:HZKvJUHlcXI/f/O0Avg7t8sqkPo78HFzjmeYFl6DPnc= github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf/go.mod h1:vxmQPeIQxPf6Jf9rM8R+B4rKBqLA2AjttNxkFBL2Plk= +github.com/lightninglabs/lightning-node-connect/hashmailrpc v1.0.2 h1:Er1miPZD2XZwcfE4xoS5AILqP1mj7kqnhbBSxW9BDxY= +github.com/lightninglabs/lightning-node-connect/hashmailrpc v1.0.2/go.mod h1:antQGRDRJiuyQF6l+k6NECCSImgCpwaZapATth2Chv4= github.com/lightninglabs/lndclient v0.16.0-1 h1:DRnCRjYrGrCmE9HTVcW9aruqeO/GkPg8QV+ClMFJMDE= github.com/lightninglabs/lndclient v0.16.0-1/go.mod h1:G4D4/rcEnbdl8I0cTUO7oaJgEGa8WrplMv2IuPy1xu0= github.com/lightninglabs/neutrino v0.14.2 h1:yrnZUCYMZ5ECtXhgDrzqPq2oX8awoAN2D/cgCewJcCo= diff --git a/itest/tarod_harness.go b/itest/tarod_harness.go index ff23fe2a7..b4c6869b6 100644 --- a/itest/tarod_harness.go +++ b/itest/tarod_harness.go @@ -93,6 +93,12 @@ func newTarodHarness(ht *harnessTest, cfg tarodConfig) (*tarodHarness, error) { return nil, err } + // We'll modify the config slightly here, since we don't need to use + // the hashmail system for integration tests. + // + // TODO(roasbeef): make local aperture instance in future + finalCfg.HashMailAddr = "" + return &tarodHarness{ cfg: &cfg, clientCfg: finalCfg, diff --git a/log.go b/log.go index 8c1d69c9e..1c0ebbb8a 100644 --- a/log.go +++ b/log.go @@ -2,6 +2,7 @@ package taro import ( "github.com/btcsuite/btclog" + "github.com/lightninglabs/taro/proof" "github.com/lightninglabs/taro/rpcperms" "github.com/lightninglabs/taro/tarofreighter" "github.com/lightninglabs/taro/tarogarden" @@ -90,6 +91,7 @@ func SetupLoggers(root *build.RotatingLogWriter, interceptor signal.Interceptor) AddSubLogger( root, tarofreighter.Subsystem, interceptor, tarofreighter.UseLogger, ) + AddSubLogger(root, proof.Subsystem, interceptor, proof.UseLogger) } // AddSubLogger is a helper method to conveniently create and register the diff --git a/proof/courier.go b/proof/courier.go new file mode 100644 index 000000000..8608191eb --- /dev/null +++ b/proof/courier.go @@ -0,0 +1,371 @@ +package proof + +import ( + "bytes" + "context" + "crypto/sha512" + "crypto/tls" + "fmt" + + "github.com/lightninglabs/lightning-node-connect/hashmailrpc" + "github.com/lightninglabs/taro/address" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/status" +) + +// Courier abstracts away from the final proof retrival/delivery process as +// part of the non-interactive send flow. A sender can use this given the +// abstracted Addr/source type to send a proof to the receiver. Conversely, a +// receiver can use this to fetch a proof from the sender. +// +// TODO(roasbeef): FileSystemCourier, RpcCourier +type Courier[Addr any] interface { + // DeliverProof attempts to delivery a proof to the receiver, using the + // information in the Addr type. + DeliverProof(context.Context, Addr, *AnnotatedProof) error + + // ReceiveProof attempts to obtain a proof as identified by the passed + // locator from the source encapsulated within the specified address. + ReceiveProof(context.Context, Addr, Locator) (*AnnotatedProof, error) +} + +// ProofMailbox represents an abstract store-and-forward maillbox that can be +// used to send/receive proofs. +type ProofMailbox interface { + // Init creates a mailbox given the specified stream ID. + Init(ctx context.Context, sid streamID) error + + // WriteProof writes the proof to the mailbox specified by the sid. + WriteProof(ctx context.Context, sid streamID, proof Blob) error + + // ReadProof reads a proof from the mailbox. This is a blocking method. + ReadProof(ctx context.Context, sid streamID) (Blob, error) + + // AckProof sends an ACK from the receiver to the sender that a proof + // has been recevied. + AckProof(ctx context.Context, sid streamID) error + + // RecvAck waits for the sender to receive the ack from the receiver. + RecvAck(ctx context.Context, sid streamID) error + + // CleanUp atempts to tear down the mailbox as specified by the passed + // sid. + CleanUp(ctx context.Context, sid streamID) error +} + +// HashMailBox is an implementation of the ProofMailbox interface backed by the +// hashmailrpc.HashMailClient. +type HashMailBox struct { + client hashmailrpc.HashMailClient +} + +// serverDialOpts returns the set of server options needed to connect to the +// server using a TLS connection. +func serverDialOpts() ([]grpc.DialOption, error) { + var opts []grpc.DialOption + + creds := credentials.NewTLS(&tls.Config{}) + opts = append(opts, grpc.WithTransportCredentials(creds)) + + return opts, nil +} + +// NewHashMailBox makes a new mailbox by dialing to the server specified by the +// address above. +func NewHashMailBox(serverAddr string) (*HashMailBox, error) { + dialOpts, err := serverDialOpts() + if err != nil { + return nil, err + } + + conn, err := grpc.Dial(serverAddr, dialOpts...) + if err != nil { + return nil, err + } + + client := hashmailrpc.NewHashMailClient(conn) + + return &HashMailBox{ + client: client, + }, nil +} + +// isErrAlreadyExists returns true if the passed error is the "already exists" +// error within the error wrapped error which is returned by the hash mail +// server when a stream we're attempting to create already exists. +func isErrAlreadyExists(err error) bool { + statusCode, ok := status.FromError(err) + if !ok { + return false + } + + return statusCode.Code() == codes.AlreadyExists +} + +// Init creates a mailbox given the specified stream ID. +func (h *HashMailBox) Init(ctx context.Context, sid streamID) error { + streamInit := &hashmailrpc.CipherBoxAuth{ + Desc: &hashmailrpc.CipherBoxDesc{ + StreamId: sid[:], + }, + Auth: &hashmailrpc.CipherBoxAuth_LndAuth{ + LndAuth: &hashmailrpc.LndAuth{}, + }, + } + + _, err := h.client.NewCipherBox(ctx, streamInit) + if err != nil && !isErrAlreadyExists(err) { + return err + } + + return nil +} + +// WriteProof writes the proof to the mailbox specified by the sid. +func (h *HashMailBox) WriteProof(ctx context.Context, sid streamID, + proof Blob) error { + + writeStream, err := h.client.SendStream(ctx) + if err != nil { + return fmt.Errorf("unable to create send stream: %w", err) + } + + err = writeStream.Send(&hashmailrpc.CipherBox{ + Desc: &hashmailrpc.CipherBoxDesc{ + StreamId: sid[:], + }, + Msg: proof[:], + }) + if err != nil { + return err + } + + return writeStream.CloseSend() +} + +// ReadProof reads a proof from the mailbox. This is a blocking method. +func (h *HashMailBox) ReadProof(ctx context.Context, + sid streamID) (Blob, error) { + + readStream, err := h.client.RecvStream(ctx, &hashmailrpc.CipherBoxDesc{ + StreamId: sid[:], + }) + if err != nil { + return nil, fmt.Errorf("unable to create read stream: %w", err) + } + + msg, err := readStream.Recv() + if err != nil { + return nil, err + } + + // TODO(roasbeef): modify ACK based on size of ting? + + return Blob(msg.Msg), nil +} + +// ackMsg is the string used to signal that the receiver has received the proof +// sent by the sender. +var ackMsg = []byte("ack") + +// AckProof sends an ACK from the receiver to the sender that a proof has been +// recevied. +func (h *HashMailBox) AckProof(ctx context.Context, sid streamID) error { + writeStream, err := h.client.SendStream(ctx) + if err != nil { + return fmt.Errorf("unable to create send stream: %w", err) + } + + err = writeStream.Send(&hashmailrpc.CipherBox{ + Desc: &hashmailrpc.CipherBoxDesc{ + StreamId: sid[:], + }, + Msg: ackMsg, + }) + if err != nil { + return err + } + + return writeStream.CloseSend() +} + +// RecvAck waits for the sender to receive the ack from the receiver. +func (h *HashMailBox) RecvAck(ctx context.Context, sid streamID) error { + readStream, err := h.client.RecvStream(ctx, &hashmailrpc.CipherBoxDesc{ + StreamId: sid[:], + }) + if err != nil { + return fmt.Errorf("unable to create read stream: %w", err) + } + + msg, err := readStream.Recv() + if err != nil { + return err + } + + if bytes.Equal(msg.Msg, ackMsg) { + return nil + } + + return fmt.Errorf("expected ack, got %x", msg.Msg) +} + +// CleanUp atempts to tear down the mailbox as specified by the passed sid. +func (h *HashMailBox) CleanUp(ctx context.Context, sid streamID) error { + streamAuth := &hashmailrpc.CipherBoxAuth{ + Desc: &hashmailrpc.CipherBoxDesc{ + StreamId: sid[:], + }, + Auth: &hashmailrpc.CipherBoxAuth_LndAuth{ + LndAuth: &hashmailrpc.LndAuth{}, + }, + } + + _, err := h.client.DelCipherBox(ctx, streamAuth) + return err +} + +// A compile-time assertion to ensure that the HashMailBox meets the +// ProofMailbox interface. +var _ ProofMailbox = (*HashMailBox)(nil) + +// streamID wraps the 64-byte stream ID the mailbox scheme uses. +type streamID [64]byte + +// deriveSenderStreamID derives the stream ID for the sender in the asset +// transfer. +func deriveSenderStreamID(addr address.Taro) streamID { + sid := sha512.Sum512(addr.ScriptKey.SerializeCompressed()) + + return sid +} + +// deriveReceiverStreamID derives the stream ID for the receiver in the asset +// transfer. +func deriveReceiverStreamID(addr address.Taro) streamID { + scriptKey := addr.ScriptKey.SerializeCompressed() + sid := sha512.Sum512(scriptKey) + sid[63] ^= 0x01 + + return sid +} + +// HashMailCourier is an implementation of the Courier interfaces that +type HashMailCourier struct { + mailbox ProofMailbox +} + +// NewHashMailCourier implements the Courier interface using the specified +// ProofMailbox. This instance of the Courier relies on the taro address itself +// as the parametrized address type. +func NewHashMailCourier(mailbox ProofMailbox) (*HashMailCourier, error) { + return &HashMailCourier{ + mailbox: mailbox, + }, nil +} + +// DeliverProof attempts to delivery a proof to the receiver, using the +// information in the Addr type. +// +// TODO(roasbeef): other delivery context as type param? +func (h *HashMailCourier) DeliverProof(ctx context.Context, addr address.Taro, + proof *AnnotatedProof) error { + + log.Infof("Attempting to deliver receiver proof for send of "+ + "asset_id=%x, amt=%v", addr.ID(), addr.Amount) + + // To deliver the proof to the receiver, we'll use our hashmail box to + // first create a new session that we'll use to send the proof over. + // We'll send on this stream, while the receiver receives on it. + // + // TODO(roasbeef): should do this as early in the process as possible. + senderStreamID := deriveSenderStreamID(addr) + log.Infof("Creating sender mailbox w/ sid=%x", senderStreamID) + if err := h.mailbox.Init(ctx, senderStreamID); err != nil { + return err + } + + // Now that the stream has been initialized, we'll write the proof over + // the stream. + // + // TODO(roasbeef): do ecies here + log.Infof("Sending receiver proof via sid=%x", senderStreamID) + err := h.mailbox.WriteProof(ctx, senderStreamID, proof.Blob) + if err != nil { + return err + } + + // With the proof delivered, we'll now wait to receive the ACK from the + // receiver. To do this, we'll use the receiver's stream ID to listen + // on the mailbox. + // + // TODO(roasbeef): ok that both sides might be on the same side here? + receiverStreamID := deriveReceiverStreamID(addr) + log.Infof("Creating receiver mailbox w/ sid=%x", receiverStreamID) + if err := h.mailbox.Init(ctx, receiverStreamID); err != nil { + return err + } + + // We'll wait to receive the ACK from the remote party over their + // stream. + log.Infof("Waiting for receiver proof via sid=%x", receiverStreamID) + if err := h.mailbox.RecvAck(ctx, receiverStreamID); err != nil { + return err + } + + log.Infof("Received ACK from receiver! Cleaning up mailboxes...") + + // Once we receive this ACK, we can clean up our mailbox and also the + // receiver's mailbox. + if err := h.mailbox.CleanUp(ctx, senderStreamID); err != nil { + return err + } + return h.mailbox.CleanUp(ctx, receiverStreamID) +} + +// ReceiveProof attempts to obtain a proof as identified by the passed locator +// from the source encapsulated within the specified address. +func (h *HashMailCourier) ReceiveProof(ctx context.Context, addr address.Taro, + loc Locator) (*AnnotatedProof, error) { + + senderStreamID := deriveSenderStreamID(addr) + if err := h.mailbox.Init(ctx, senderStreamID); err != nil { + return nil, err + } + + log.Infof("Attempting to receive proof via sid=%x", senderStreamID) + + // To receiver the proof from the sender, we'll derive the stream ID + // they'll use to send the proof, and then wait to receive it. + proof, err := h.mailbox.ReadProof(ctx, senderStreamID) + if err != nil { + return nil, err + } + + // Now that we've read the proof, we'll create our mailbox (which might + // already exist) to send an ACK back to the sender. + receiverStreamID := deriveReceiverStreamID(addr) + log.Infof("Sending ACK to sender via sid=%x", receiverStreamID) + if err := h.mailbox.Init(ctx, receiverStreamID); err != nil { + return nil, err + } + if err := h.mailbox.AckProof(ctx, receiverStreamID); err != nil { + return nil, err + } + + // Finally, we'll return the proof state back to the caller. + assetID := addr.ID() + return &AnnotatedProof{ + Locator: Locator{ + AssetID: &assetID, + ScriptKey: addr.ScriptKey, + }, + Blob: Blob(proof), + }, nil +} + +// A compile-time assertion to ensure the HashMailCourier meets the +// proof.Courier interface. +var _ Courier[address.Taro] = (*HashMailCourier)(nil) diff --git a/proof/log.go b/proof/log.go new file mode 100644 index 000000000..d5a4936de --- /dev/null +++ b/proof/log.go @@ -0,0 +1,26 @@ +package proof + +import ( + "github.com/btcsuite/btclog" +) + +// Subsystem defines the logging code for this subsystem. +const Subsystem = "PROF" + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +var log = btclog.Disabled + +// DisableLog disables all library log output. Logging output is disabled +// by default until UseLogger is called. +func DisableLog() { + UseLogger(btclog.Disabled) +} + +// UseLogger uses a specified Logger to output package logging info. +// This should be used in preference to SetLogWriter if the caller is also +// using btclog. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/tarocfg/config.go b/tarocfg/config.go index b39dc34da..efd533207 100644 --- a/tarocfg/config.go +++ b/tarocfg/config.go @@ -55,6 +55,10 @@ const ( // determine when a set of pending assets should be flushed into a new // batch. defaultBatchMintingInterval = time.Minute * 10 + + // defaultHashMailAddr is the default address we'll use to deliver + // optionally deliver proofs for asynchronous sends. + defaultHashMailAddr = "mailbox.terminal.lightning.today:443" ) var ( @@ -185,6 +189,8 @@ type Config struct { BatchMintingInterval time.Duration `long:"batch-minting-interval" description:"A duration (1m, 2h, etc) that governs how frequently pending assets are gather into a batch to be minted."` + HashMailAddr string `long:"hashmailaddr" description:"The full host:port that should be used to optionally deliver proofs files for asynchronous sends"` + ChainConf *ChainConfig RpcConf *RpcConfig @@ -235,6 +241,7 @@ func DefaultConfig() Config { }, LogWriter: build.NewRotatingLogWriter(), BatchMintingInterval: defaultBatchMintingInterval, + HashMailAddr: defaultHashMailAddr, } } diff --git a/tarocfg/server.go b/tarocfg/server.go index 1096f2a14..17588fd18 100644 --- a/tarocfg/server.go +++ b/tarocfg/server.go @@ -77,6 +77,7 @@ func CreateServerFromConfig(cfg *Config, cfgLogger btclog.Logger, addrBookDB, &taroChainParams, ) + cfgLogger.Infof("Attempting to establish connection to lnd...") lndConn, err := getLnd( cfg.ChainConf.Network, cfg.Lnd, shutdownInterceptor, ) @@ -89,6 +90,8 @@ func CreateServerFromConfig(cfg *Config, cfgLogger btclog.Logger, walletAnchor := taro.NewLndRpcWalletAnchor(lndServices) chainBridge := taro.NewLndRpcChainBridge(lndServices) + cfgLogger.Infof("lnd connection initialized") + addrBook := address.NewBook(address.BookConfig{ Store: tarodbAddrBook, StoreTimeout: tarodb.DefaultStoreTimeout, @@ -107,6 +110,20 @@ func CreateServerFromConfig(cfg *Config, cfgLogger btclog.Logger, assetStore, proofFileStore, ) + var hashMailCourier proof.Courier[address.Taro] + if cfg.HashMailAddr != "" { + hashMailBox, err := proof.NewHashMailBox(cfg.HashMailAddr) + if err != nil { + return nil, fmt.Errorf("unable to make "+ + "mailbox: %v", err) + } + hashMailCourier, err = proof.NewHashMailCourier(hashMailBox) + if err != nil { + return nil, fmt.Errorf("unable to make hashmail "+ + "courier: %v", err) + } + } + server, err := taro.NewServer(&taro.Config{ DebugLevel: cfg.DebugLevel, ChainParams: cfg.ActiveNetParams, @@ -132,6 +149,7 @@ func CreateServerFromConfig(cfg *Config, cfgLogger btclog.Logger, AddrBook: addrBook, ProofArchive: proofArchive, ErrChan: mainErrChan, + ProofCourier: hashMailCourier, }, ), AddrBook: addrBook, @@ -146,6 +164,7 @@ func CreateServerFromConfig(cfg *Config, cfgLogger btclog.Logger, KeyRing: keyRing, ChainParams: &taroChainParams, AssetProofs: proofFileStore, + ProofCourier: hashMailCourier, }), SignalInterceptor: shutdownInterceptor, LogWriter: cfg.LogWriter, diff --git a/tarofreighter/chain_porter.go b/tarofreighter/chain_porter.go index 88d16d02c..4152da7c5 100644 --- a/tarofreighter/chain_porter.go +++ b/tarofreighter/chain_porter.go @@ -58,6 +58,10 @@ type ChainPorterConfig struct { // TODO(roasbeef): replace with proof.Courier in the future/ AssetProofs proof.Archiver + // ProofCourier is used to optionally deliver the final proof to the + // user using an asynchronous transport mechanism. + ProofCourier proof.Courier[address.Taro] + // ErrChan is the main error channel the custodian will report back // critical errors to the main server. ErrChan chan<- error @@ -171,6 +175,8 @@ func (p *ChainPorter) RequestShipment(req *AssetParcel) (*PendingParcel, error) // TODO(roasbeef): consolidate w/ below? or adopt similar arch as ChainPlanter // - could move final conf into the state machien itself func (p *ChainPorter) resumePendingParcel(pkg *OutboundParcelDelta) { + defer p.Wg.Done() + log.Infof("Attempting to resume delivery to anchor_point=%v", pkg.NewAnchorPoint) @@ -382,13 +388,14 @@ func (p *ChainPorter) waitForPkgConfirmation(pkg *OutboundParcelDelta) { p.cfg.ErrChan <- mkErr("error encoding receiver proof: %v", err) return } - err = p.cfg.AssetProofs.ImportProofs(ctx, &proof.AnnotatedProof{ + receiverProof := &proof.AnnotatedProof{ Locator: proof.Locator{ AssetID: &pkg.AssetSpendDeltas[0].WitnessData[0].PrevID.ID, ScriptKey: *receiverProofSuffix.Asset.ScriptKey.PubKey, }, Blob: updatedReceiverProof.Bytes(), - }) + } + err = p.cfg.AssetProofs.ImportProofs(ctx, receiverProof) if err != nil { p.cfg.ErrChan <- mkErr("error importing proof: %v", err) return @@ -397,6 +404,35 @@ func (p *ChainPorter) waitForPkgConfirmation(pkg *OutboundParcelDelta) { log.Debugf("Updated proofs for sender and receiver (new_len=%d)", len(senderProof.Proofs)) + // If we have a proof courier instance active, then we'll launch a new + // goroutine to deliver the proof to the receiver. + // + // TODO(roasbeef): move earlier? + if p.cfg.ProofCourier != nil { + p.Wg.Add(1) + go func() { + defer p.Wg.Done() + + // TODO(roasbeef): should actually also serialize the + // addr of the remote party here + addr := address.Taro{ + Genesis: receiverProofSuffix.Asset.Genesis, + ScriptKey: receiverProof.ScriptKey, + Amount: receiverProofSuffix.Asset.Amount, + } + ctx, cancel := p.WithCtxQuitNoTimeout() + defer cancel() + err := p.cfg.ProofCourier.DeliverProof( + ctx, addr, receiverProof, + ) + if err != nil { + log.Errorf("unable to deliver proof: %v", err) + } + }() + } + + log.Infof("Marking parcel (txid=%v) as confirmed!", txHash) + // At this point we have the confirmation signal, so we can mark the // parcel delivery as completed in the database. err = p.cfg.ExportLog.ConfirmParcelDelivery(ctx, &AssetConfirmEvent{ diff --git a/tarogarden/custodian.go b/tarogarden/custodian.go index 2982460c8..5e9c5fed7 100644 --- a/tarogarden/custodian.go +++ b/tarogarden/custodian.go @@ -38,6 +38,10 @@ type CustodianConfig struct { // ProofArchive is the storage backend for proofs. ProofArchive *proof.MultiArchiver + // ProofCourier is used to optionally deliver the final proof to the + // user using an asynchronous transport mechanism. + ProofCourier proof.Courier[address.Taro] + // ErrChan is the main error channel the custodian will report back // critical errors to the main server. ErrChan chan<- error @@ -312,10 +316,48 @@ func (c *Custodian) inspectWalletTx(walletTx *lndclient.Transaction) error { // This is a new output, let's find out if it's for an address // of ours. - err := c.mapToTaroAddr(walletTx, uint32(idx), op) + addr, err := c.mapToTaroAddr(walletTx, uint32(idx), op) if err != nil { return err } + + if c.cfg.ProofCourier == nil || addr == nil { + continue + } + + // Now that we've seen this output on chain, we'll launch a + // goroutine to use the ProofCourier to import the proof into + // our local DB. + c.Wg.Add(1) + go func() { + defer c.Wg.Done() + + ctx, cancel := c.WithCtxQuit() + defer cancel() + + assetID := addr.ID() + proof, err := c.cfg.ProofCourier.ReceiveProof( + ctx, *addr, proof.Locator{ + AssetID: &assetID, + ScriptKey: addr.ScriptKey, + }, + ) + if err != nil { + log.Errorf("unable to recv proof: %v", err) + return + } + + ctx, cancel = c.CtxBlocking() + defer cancel() + + err = c.cfg.ProofArchive.ImportProofs(ctx, proof) + if err != nil { + log.Errorf("unable to import proofs: %v", err) + return + } + + return + }() } return nil @@ -325,11 +367,11 @@ func (c *Custodian) inspectWalletTx(walletTx *lndclient.Transaction) error { // matching address is found, an event is created for it. If an event already // exists, it is updated with the current transaction information. func (c *Custodian) mapToTaroAddr(walletTx *lndclient.Transaction, - outputIdx uint32, op wire.OutPoint) error { + outputIdx uint32, op wire.OutPoint) (*address.Taro, error) { taprootKey, err := proof.ExtractTaprootKey(walletTx.Tx, outputIdx) if err != nil { - return fmt.Errorf("error extracting taproot key: %w", err) + return nil, fmt.Errorf("error extracting taproot key: %w", err) } ctxt, cancel := c.WithCtxQuit() @@ -339,16 +381,16 @@ func (c *Custodian) mapToTaroAddr(walletTx *lndclient.Transaction, // There is no Taro address that expects an asset for the given on-chain // output. This probably wasn't a Taro transaction at all then. case errors.Is(err, address.ErrNoAddr): - return nil + return nil, nil case err != nil: - return fmt.Errorf("error querying addresses by taro key: %w", - err) + return nil, fmt.Errorf("error querying addresses by "+ + "taro key: %w", err) } addrStr, err := addr.EncodeAddress() if err != nil { - return fmt.Errorf("unable to encode address: %v", err) + return nil, fmt.Errorf("unable to encode address: %v", err) } // Make sure we have an event registered for the transaction, since it @@ -368,13 +410,13 @@ func (c *Custodian) mapToTaroAddr(walletTx *lndclient.Transaction, ) cancel() if err != nil { - return fmt.Errorf("error creating event: %w", err) + return nil, fmt.Errorf("error creating event: %w", err) } // Let's update our cache of ongoing events. c.events[op] = event - return nil + return addr.Taro, nil } // importAddrToWallet imports the given Taro address into the lnd-internal @@ -418,6 +460,8 @@ func (c *Custodian) checkProofAvailable(event *address.Event) error { ctxt, cancel := c.WithCtxQuit() defer cancel() + // TODO(roasbeef): use the courier here? + id := event.Addr.ID() blob, err := c.cfg.ProofArchive.FetchProof(ctxt, proof.Locator{ AssetID: &id,