Skip to content

Commit

Permalink
Merge pull request #939 from ellemouton/sql3Accounts3
Browse files Browse the repository at this point in the history
[sql-3] accounts: replace calls to UpdateAccount
  • Loading branch information
ellemouton authored Jan 21, 2025
2 parents e2c4da2 + a4506c9 commit 76c63db
Show file tree
Hide file tree
Showing 6 changed files with 509 additions and 110 deletions.
13 changes: 13 additions & 0 deletions accounts/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,17 @@ var (
ErrLabelAlreadyExists = errors.New(
"account label uniqueness constraint violation",
)

// ErrAlreadySucceeded is returned by the UpsertAccountPayment method
// if the WithErrAlreadySucceeded option is used and the payment has
// already succeeded.
ErrAlreadySucceeded = errors.New("payment has already succeeded")

// ErrPaymentNotAssociated indicate that the payment with the given hash
// has not yet been associated with the account in question. It is also
// returned when the WithErrIfUnknown option is used with
// UpsertAccountPayment if the payment is not yet known.
ErrPaymentNotAssociated = errors.New(
"payment not associated with account",
)
)
89 changes: 89 additions & 0 deletions accounts/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,23 @@ type Store interface {
IncreaseAccountBalance(ctx context.Context, id AccountID,
amount lnwire.MilliSatoshi) error

// UpsertAccountPayment updates or inserts a payment entry for the given
// account. Various functional options can be passed to modify the
// behavior of the method. The returned boolean is true if the payment
// was already known before the update. This is to be treated as a
// best-effort indication if an error is also returned since the method
// may error before the boolean can be set correctly.
UpsertAccountPayment(_ context.Context, id AccountID,
paymentHash lntypes.Hash, fullAmount lnwire.MilliSatoshi,
status lnrpc.Payment_PaymentStatus,
options ...UpsertPaymentOption) (bool, error)

// DeleteAccountPayment removes a payment entry from the account with
// the given ID. It will return the ErrPaymentNotAssociated error if the
// payment is not associated with the account.
DeleteAccountPayment(_ context.Context, id AccountID,
hash lntypes.Hash) error

// RemoveAccount finds an account by its ID and removes it from the¨
// store.
RemoveAccount(ctx context.Context, id AccountID) error
Expand Down Expand Up @@ -316,3 +333,75 @@ type RequestValuesStore interface {
// DeleteValues deletes any values stored for the given request ID.
DeleteValues(reqID uint64)
}

// UpsertPaymentOption is a functional option that can be passed to the
// UpsertAccountPayment method to modify its behavior.
type UpsertPaymentOption func(*upsertAcctPaymentOption)

// upsertAcctPaymentOption is a struct that holds optional parameters for the
// UpsertAccountPayment method.
type upsertAcctPaymentOption struct {
debitAccount bool
errIfAlreadyPending bool
usePendingAmount bool
errIfAlreadySucceeded bool
errIfUnknown bool
}

// newUpsertPaymentOption creates a new upsertAcctPaymentOption with default
// values.
func newUpsertPaymentOption() *upsertAcctPaymentOption {
return &upsertAcctPaymentOption{
debitAccount: false,
errIfAlreadyPending: false,
usePendingAmount: false,
errIfAlreadySucceeded: false,
errIfUnknown: false,
}
}

// WithDebitAccount is a functional option that can be passed to the
// UpsertAccountPayment method to indicate that the account balance should be
// debited by the full amount of the payment.
func WithDebitAccount() UpsertPaymentOption {
return func(o *upsertAcctPaymentOption) {
o.debitAccount = true
}
}

// WithErrIfAlreadyPending is a functional option that can be passed to the
// UpsertAccountPayment method to indicate that an error should be returned if
// the payment is already pending or succeeded.
func WithErrIfAlreadyPending() UpsertPaymentOption {
return func(o *upsertAcctPaymentOption) {
o.errIfAlreadyPending = true
}
}

// WithErrIfAlreadySucceeded is a functional option that can be passed to the
// UpsertAccountPayment method to indicate that the ErrAlreadySucceeded error
// should be returned if the payment is already in a succeeded state.
func WithErrIfAlreadySucceeded() UpsertPaymentOption {
return func(o *upsertAcctPaymentOption) {
o.errIfAlreadySucceeded = true
}
}

// WithPendingAmount is a functional option that can be passed to the
// UpsertAccountPayment method to indicate that if the payment already exists,
// then the known payment amount should be used instead of the new value passed
// to the method.
func WithPendingAmount() UpsertPaymentOption {
return func(o *upsertAcctPaymentOption) {
o.usePendingAmount = true
}
}

// WithErrIfUnknown is a functional option that can be passed to the
// UpsertAccountPayment method to indicate that the ErrPaymentNotAssociated
// error should be returned if the payment is not associated with the account.
func WithErrIfUnknown() UpsertPaymentOption {
return func(o *upsertAcctPaymentOption) {
o.errIfUnknown = true
}
}
165 changes: 63 additions & 102 deletions accounts/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,26 +467,7 @@ func (s *InterceptorService) PaymentErrored(ctx context.Context, id AccountID,
"has already started")
}

account, err := s.store.Account(ctx, id)
if err != nil {
return err
}

// Check that this payment is actually associated with this account.
_, ok = account.Payments[hash]
if !ok {
return fmt.Errorf("payment with hash %s is not associated "+
"with this account", hash)
}

// Delete the payment and update the persisted account.
delete(account.Payments, hash)

if err := s.store.UpdateAccount(ctx, account); err != nil {
return fmt.Errorf("error updating account: %w", err)
}

return nil
return s.store.DeleteAccountPayment(ctx, id, hash)
}

// AssociatePayment associates a payment (hash) with the given account,
Expand All @@ -498,44 +479,24 @@ func (s *InterceptorService) AssociatePayment(ctx context.Context, id AccountID,
s.Lock()
defer s.Unlock()

account, err := s.store.Account(ctx, id)
if err != nil {
return err
}

// Check if this payment is associated with the account already.
_, ok := account.Payments[paymentHash]
if ok {
// We do not allow another payment to the same hash if the
// payment is already in-flight or succeeded. This mitigates a
// user being able to launch a second RPC-erring payment with
// the same hash that would remove the payment from being
// tracked. Note that this prevents launching multipart
// payments, but allows retrying a payment if it has failed.
if account.Payments[paymentHash].Status !=
lnrpc.Payment_FAILED {

return fmt.Errorf("payment with hash %s is already in "+
"flight or succeeded (status %v)", paymentHash,
account.Payments[paymentHash].Status)
}

// Otherwise, we fall through to correctly update the payment
// amount, in case we have a zero-amount invoice that is
// retried.
}

// Associate the payment with the account and store it.
account.Payments[paymentHash] = &PaymentEntry{
Status: lnrpc.Payment_UNKNOWN,
FullAmount: fullAmt,
}

if err := s.store.UpdateAccount(ctx, account); err != nil {
return fmt.Errorf("error updating account: %w", err)
}
// We add the WithErrIfAlreadyPending option to ensure that if the
// payment is already associated with the account, then we return
// an error if the payment is already in-flight or succeeded. This
// prevents a user from being able to launch a second RPC-erring payment
// with the same hash that would remove the payment from being tracked.
//
// NOTE: this prevents launching multipart payments, but allows
// retrying a payment if it has failed.
//
// If the payment is already associated with the account but not in
// flight, we update the payment amount in case we have a zero-amount
// invoice that is retried.
_, err := s.store.UpsertAccountPayment(
ctx, id, paymentHash, fullAmt, lnrpc.Payment_UNKNOWN,
WithErrIfAlreadyPending(),
)

return nil
return err
}

// invoiceUpdate credits the account an invoice was registered with, in case the
Expand Down Expand Up @@ -629,34 +590,30 @@ func (s *InterceptorService) TrackPayment(ctx context.Context, id AccountID,
return nil
}

// Similarly, if we've already processed the payment in the past, there
// is a reference in the account with the given state.
account, err := s.store.Account(ctx, id)
if err != nil {
return fmt.Errorf("error fetching account: %w", err)
}

// If the account already stored a terminal state, we also don't need to
// track the payment again.
entry, ok := account.Payments[hash]
if ok && successState(entry.Status) {
return nil
// track the payment again. So we add the WithErrIfAlreadySucceeded
// option to ensure that we return an error if the payment has already
// succeeded. We can then match on the ErrAlreadySucceeded error and
// exit early if it is returned.
opts := []UpsertPaymentOption{
WithErrIfAlreadySucceeded(),
}

// There is a case where the passed in fullAmt is zero but the pending
// amount is not. In that case, we should not overwrite the pending
// amount.
if fullAmt == 0 {
fullAmt = entry.FullAmount
}

account.Payments[hash] = &PaymentEntry{
Status: lnrpc.Payment_UNKNOWN,
FullAmount: fullAmt,
opts = append(opts, WithPendingAmount())
}

if err := s.store.UpdateAccount(ctx, account); err != nil {
if !ok {
known, err := s.store.UpsertAccountPayment(
ctx, id, hash, fullAmt, lnrpc.Payment_UNKNOWN, opts...,
)
if err != nil {
if errors.Is(err, ErrAlreadySucceeded) {
return nil
}
if !known {
// In the rare case that the payment isn't associated
// with an account yet, and we fail to update the
// account we will not be tracking the payment, even if
Expand Down Expand Up @@ -832,23 +789,14 @@ func (s *InterceptorService) paymentUpdate(ctx context.Context,

// The payment went through! We now need to debit the full amount from
// the account.
account, err := s.store.Account(ctx, pendingPayment.accountID)
if err != nil {
err = s.disableAndErrorfUnsafe("error fetching account: %w",
err)

return terminalState, err
}

fullAmount := status.Value + status.Fee

// Update the account and store it in the database.
account.CurrentBalance -= int64(fullAmount)
account.Payments[hash] = &PaymentEntry{
Status: lnrpc.Payment_SUCCEEDED,
FullAmount: fullAmount,
}
if err := s.store.UpdateAccount(ctx, account); err != nil {
// Update the persisted account.
_, err := s.store.UpsertAccountPayment(
ctx, pendingPayment.accountID, hash, fullAmount,
lnrpc.Payment_SUCCEEDED, WithDebitAccount(),
)
if err != nil {
err = s.disableAndErrorfUnsafe("error updating account: %w",
err)

Expand Down Expand Up @@ -892,23 +840,36 @@ func (s *InterceptorService) removePayment(ctx context.Context,
return nil
}

account, err := s.store.Account(ctx, pendingPayment.accountID)
if err != nil {
return err
_, err := s.store.UpsertAccountPayment(
ctx, pendingPayment.accountID, hash, 0, status,
// We don't want the payment to be inserted if it isn't already
// known. So we pass in this option to ensure that the call
// exits early if the payment is unknown.
WithErrIfUnknown(),
// Otherwise, we just want to update the status of the payment
// and use the existing pending amount.
WithPendingAmount(),
)
if err != nil && !errors.Is(err, ErrPaymentNotAssociated) {
return fmt.Errorf("error updating account: %w", err)
}

pendingPayment.cancel()
delete(s.pendingPayments, hash)

// Have we associated the payment with the account already?
_, ok = account.Payments[hash]
if !ok {
return nil
}
return nil
}

// If we did, let's set the status correctly in the DB now.
account.Payments[hash].Status = status
return s.store.UpdateAccount(ctx, account)
// hasPayment returns true if the payment is currently being tracked by the
// service.
//
// NOTE: this is currently used only for tests.
func (s *InterceptorService) hasPayment(hash lntypes.Hash) bool {
s.RLock()
defer s.RUnlock()

_, ok := s.pendingPayments[hash]
return ok
}

// successState returns true if a payment was completed successfully.
Expand Down
19 changes: 11 additions & 8 deletions accounts/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,13 +383,14 @@ func TestAccountService(t *testing.T) {
// Assert that the invoice subscription succeeded.
require.Contains(t, s.invoiceToAccount, testHash)

// But setting up the payment tracking should have failed.
// But setting up the payment tracking should have
// failed.
require.False(t, s.IsRunning())

// Finally let's assert that we didn't successfully add the
// payment to pending payment, and that lnd isn't awaiting
// the payment request.
require.NotContains(t, s.pendingPayments, testHash)
// Finally let's assert that we didn't successfully add
// the payment to pending payment, and that lnd isn't
// awaiting the payment request.
require.False(t, s.hasPayment(testHash))
r.assertNoPaymentRequest(t)
},
}, {
Expand Down Expand Up @@ -426,7 +427,9 @@ func TestAccountService(t *testing.T) {
// This will cause an error send an update over
// the payment channel, which should disable the
// service.
s.pendingPayments = make(map[lntypes.Hash]*trackedPayment)
s.pendingPayments = make(
map[lntypes.Hash]*trackedPayment,
)

// Send an invalid payment over the payment chan
// which should error and disable the service
Expand Down Expand Up @@ -568,7 +571,7 @@ func TestAccountService(t *testing.T) {
return p.Status == lnrpc.Payment_FAILED
})

require.NotContains(t, s.pendingPayments, testHash2)
require.False(t, s.hasPayment(testHash2))

// Finally, if an unknown payment turns out to be
// a non-initiated payment, we should stop the tracking
Expand Down Expand Up @@ -616,7 +619,7 @@ func TestAccountService(t *testing.T) {

// Ensure that the payment was removed from the pending
// payments.
require.NotContains(t, s.pendingPayments, testHash3)
require.False(t, s.hasPayment(testHash3))
},
}, {
name: "keep track of invoice indexes",
Expand Down
Loading

0 comments on commit 76c63db

Please sign in to comment.