Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rfq-relayer): relayer supports active quoting #3198

Merged
merged 50 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
d93e20f
Feat: add active rfq subscription on quoter
dwasse Sep 26, 2024
481f043
Feat: relayer subscribes to active quotes upon starting
dwasse Sep 26, 2024
fdbc865
[goreleaser]
dwasse Sep 26, 2024
77c51e8
Feat: specify ws url in relayer
dwasse Sep 26, 2024
d10d56d
[goreleaser]
dwasse Sep 26, 2024
d2b1701
Merge branch 'feat/active-rfq-api' into feat/active-rfq-relayer
dwasse Sep 27, 2024
f06a64b
[goreleaser]
dwasse Sep 27, 2024
f6300a1
Fix: build
dwasse Sep 27, 2024
2646149
[goreleaser]
dwasse Sep 27, 2024
ea61286
Merge branch 'feat/active-rfq-api' into feat/active-rfq-relayer
dwasse Sep 27, 2024
dcd264a
Feat: relayer tracing
dwasse Sep 27, 2024
02bf53c
[goreleaser]
dwasse Sep 27, 2024
a83253f
Feat: use supports_active_quoting instead of ws url
dwasse Sep 27, 2024
6febf7b
[goreleaser]
dwasse Sep 27, 2024
3ce1bd3
WIP: add logs
dwasse Sep 27, 2024
460f5ac
[goreleaser]
dwasse Sep 27, 2024
9ec49cb
WIP: more logs
dwasse Sep 27, 2024
92b49ec
[goreleaser]
dwasse Sep 30, 2024
e85ff62
More logs
dwasse Sep 30, 2024
f4ed5b5
[goreleaser]
dwasse Sep 30, 2024
0c0b562
More logs
dwasse Sep 30, 2024
1742fe3
[goreleaser]
dwasse Sep 30, 2024
0d7a7c4
More logs
dwasse Sep 30, 2024
4828dfc
[goreleaser]
dwasse Sep 30, 2024
ab9513d
Close conn when encountering write err
dwasse Sep 30, 2024
a2a2079
[goreleaser]
dwasse Sep 30, 2024
69ed171
More logs
dwasse Sep 30, 2024
fbccdf8
[goreleaser]
dwasse Sep 30, 2024
4b098f9
More logs
dwasse Sep 30, 2024
135f1ac
[goreleaser]
dwasse Sep 30, 2024
b4969e9
More logs
dwasse Sep 30, 2024
bf3adaa
[goreleaser]
dwasse Sep 30, 2024
82ed6bb
More logs
dwasse Sep 30, 2024
896d52d
[goreleaser]
dwasse Sep 30, 2024
0018269
Logs with ts
dwasse Sep 30, 2024
c3f0eb3
[goreleaser]
dwasse Sep 30, 2024
50b969e
More tracing
dwasse Sep 30, 2024
64389c4
[goreleaser]
dwasse Sep 30, 2024
0c07fe4
Fix: send to reqChan
dwasse Sep 30, 2024
719361a
[goreleaser]
dwasse Sep 30, 2024
26c9174
Check for zero pong time
dwasse Sep 30, 2024
ccd24b3
Fix: make close_at and closed_quote_id optional
dwasse Sep 30, 2024
9688fa7
[goreleaser]
dwasse Sep 30, 2024
b98ca8c
Feat: remove extra fields from responses
dwasse Sep 30, 2024
739472d
[goreleaser]
dwasse Sep 30, 2024
7d7f6df
Fix: skip passive quote
dwasse Sep 30, 2024
6a5590f
[goreleaser]
dwasse Sep 30, 2024
56afeff
Cleanup: remove logs
dwasse Sep 30, 2024
bdb7539
Fix: use correct span
dwasse Sep 30, 2024
ab15e5d
Cleanup: remove logs
dwasse Sep 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions services/rfq/api/db/api_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ type ActiveQuoteRequest struct {
ExpirationWindow time.Duration `gorm:"column:expiration_window"`
CreatedAt time.Time `gorm:"column:created_at"`
Status ActiveQuoteRequestStatus `gorm:"column:status"`
ClosedAt time.Time `gorm:"column:fulfilled_at"`
ClosedQuoteID string `gorm:"column:fulfilled_quote_id"`
ClosedAt *time.Time `gorm:"column:closed_at"`
ClosedQuoteID *string `gorm:"column:closed_quote_id"`
}

// FromUserRequest converts a model.PutRFQRequest to an ActiveQuoteRequest.
Expand Down
4 changes: 2 additions & 2 deletions services/rfq/api/db/sql/base/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ func (s *Store) UpdateActiveQuoteRequestStatus(ctx context.Context, requestID st
if quoteID == nil {
return fmt.Errorf("quote id is required for fulfilled status")
}
updates["fulfilled_quote_id"] = quoteID
updates["fulfilled_at"] = time.Now().UTC()
updates["closed_quote_id"] = quoteID
updates["closed_at"] = time.Now().UTC()
}
result := s.db.WithContext(ctx).
Model(&db.ActiveQuoteRequest{}).
Expand Down
12 changes: 6 additions & 6 deletions services/rfq/api/model/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,17 @@ type ActiveRFQMessage struct {

// PutUserQuoteResponse represents a response to a user quote request.
type PutUserQuoteResponse struct {
Success bool `json:"success"`
Reason string `json:"reason"`
UserAddress string `json:"user_address"`
QuoteType string `json:"quote_type"`
Data QuoteData `json:"data"`
Success bool `json:"success"`
Reason string `json:"reason,omitempty"`
QuoteType string `json:"quote_type,omitempty"`
DestAmount string `json:"dest_amount,omitempty"`
RelayerAddress string `json:"relayer_address,omitempty"`
}

// WsRFQResponse represents a response to a quote request.
type WsRFQResponse struct {
RequestID string `json:"request_id"`
QuoteID string `json:"quote_id"`
QuoteID string `json:"quote_id,omitempty"`
DestAmount string `json:"dest_amount"`
UpdatedAt time.Time `json:"updated_at"`
}
Expand Down
14 changes: 12 additions & 2 deletions services/rfq/api/rest/rfq.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ func (r *QuoterAPIServer) handleActiveRFQ(ctx context.Context, request *model.Pu
responses := r.collectRelayerResponses(ctx, request, requestID)
var quoteID string
var isUpdated bool
for _, resp := range responses {
for relayerAddr, resp := range responses {
quote, isUpdated = getBestQuote(quote, getRelayerQuoteData(request, resp))
if isUpdated {
quoteID = resp.QuoteID
}
quote.RelayerAddress = &relayerAddr
}
err = r.recordActiveQuote(ctx, quote, requestID, quoteID)
if err != nil {
Expand Down Expand Up @@ -90,13 +91,14 @@ func (r *QuoterAPIServer) collectRelayerResponses(ctx context.Context, request *
wg.Add(1)
go func(client WsClient) {
var respStatus db.ActiveQuoteResponseStatus
var err error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix variable shadowing of err to ensure correct error handling

The use of := in resp, err := client.ReceiveQuoteResponse(collectionCtx, requestID) shadows the err variable declared earlier at line 93. This means that the err used in the deferred function may remain nil, and any error returned by ReceiveQuoteResponse won't be properly logged or passed to metrics.EndSpanWithErr. To fix this, replace := with = to assign to the existing err variable.

Apply this diff to fix the issue:

-            resp, err := client.ReceiveQuoteResponse(collectionCtx, requestID)
+            resp, err = client.ReceiveQuoteResponse(collectionCtx, requestID)

Committable suggestion was skipped due to low confidence.

_, clientSpan := r.handler.Tracer().Start(collectionCtx, "collectRelayerResponses", trace.WithAttributes(
attribute.String("relayer_address", relayerAddr),
attribute.String("request_id", requestID),
))
defer func() {
clientSpan.SetAttributes(attribute.String("status", respStatus.String()))
metrics.EndSpan(clientSpan)
metrics.EndSpanWithErr(clientSpan, err)
}()

defer wg.Done()
Expand All @@ -105,6 +107,11 @@ func (r *QuoterAPIServer) collectRelayerResponses(ctx context.Context, request *
logger.Errorf("Error receiving quote response: %v", err)
return
}
clientSpan.AddEvent("received quote response", trace.WithAttributes(
attribute.String("relayer_address", relayerAddr),
attribute.String("request_id", requestID),
attribute.String("dest_amount", resp.DestAmount),
))

// validate the response
respStatus = getQuoteResponseStatus(expireCtx, resp)
Expand Down Expand Up @@ -247,6 +254,9 @@ func (r *QuoterAPIServer) handlePassiveRFQ(ctx context.Context, request *model.P
)

rawDestAmountInt, _ := rawDestAmount.Int(nil)
if rawDestAmountInt.Cmp(quote.FixedFee.BigInt()) < 0 {
continue
}
destAmount := new(big.Int).Sub(rawDestAmountInt, quote.FixedFee.BigInt()).String()
//nolint:gosec
quoteData := &model.QuoteData{
Expand Down
18 changes: 6 additions & 12 deletions services/rfq/api/rest/rfq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func (c *ServerSuite) TestActiveRFQSingleRelayer() {
}

// Prepare the relayer quote response
originAmount := userRequestAmount.String()
destAmount := new(big.Int).Sub(userRequestAmount, big.NewInt(1000)).String()
quoteResp := &model.WsRFQResponse{
DestAmount: destAmount,
Expand All @@ -119,8 +118,7 @@ func (c *ServerSuite) TestActiveRFQSingleRelayer() {
// Assert the response
c.Assert().True(userQuoteResp.Success)
c.Assert().Equal("active", userQuoteResp.QuoteType)
c.Assert().Equal(destAmount, *userQuoteResp.Data.DestAmount)
c.Assert().Equal(originAmount, userQuoteResp.Data.OriginAmount)
c.Assert().Equal(destAmount, userQuoteResp.DestAmount)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider refactoring repeated assertions into a helper function

The assertion c.Assert().Equal(destAmount, userQuoteResp.DestAmount) is used in multiple test cases. To improve code maintainability and reduce duplication, consider extracting this and similar assertions into a helper function.


// Verify ActiveQuoteRequest insertion
activeQuoteRequests, err := c.database.GetActiveQuoteRequests(c.GetTestContext())
Expand Down Expand Up @@ -206,7 +204,6 @@ func (c *ServerSuite) TestActiveRFQMultipleRelayers() {
}

// Prepare the relayer quote responses
originAmount := userRequestAmount.String()
destAmount := "999000"
quoteResp := model.WsRFQResponse{
DestAmount: destAmount,
Expand Down Expand Up @@ -234,8 +231,7 @@ func (c *ServerSuite) TestActiveRFQMultipleRelayers() {
// Assert the response
c.Assert().True(userQuoteResp.Success)
c.Assert().Equal("active", userQuoteResp.QuoteType)
c.Assert().Equal(destAmount, *userQuoteResp.Data.DestAmount)
c.Assert().Equal(originAmount, userQuoteResp.Data.OriginAmount)
c.Assert().Equal(destAmount, userQuoteResp.DestAmount)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Reduce code duplication by using a helper function

This assertion duplicates logic from other tests:

c.Assert().Equal(destAmount, userQuoteResp.DestAmount)

Refactoring repeated code into a helper function can enhance readability and maintainability.


// Verify ActiveQuoteRequest insertion
activeQuoteRequests, err := c.database.GetActiveQuoteRequests(c.GetTestContext())
Expand Down Expand Up @@ -309,9 +305,8 @@ func (c *ServerSuite) TestActiveRFQFallbackToPassive() {
// Assert the response
c.Assert().True(userQuoteResp.Success)
c.Assert().Equal("passive", userQuoteResp.QuoteType)
c.Assert().Equal("998000", *userQuoteResp.Data.DestAmount) // destAmount is quote destAmount minus fixed fee
c.Assert().Equal(userRequestAmount.String(), userQuoteResp.Data.OriginAmount)
c.Assert().Equal(c.relayerWallets[0].Address().Hex(), *userQuoteResp.Data.RelayerAddress)
c.Assert().Equal("998000", userQuoteResp.DestAmount) // destAmount is quote destAmount minus fixed fee
c.Assert().Equal(c.relayerWallets[0].Address().Hex(), userQuoteResp.RelayerAddress)
Comment on lines +308 to +309
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Compute expected DestAmount dynamically instead of hardcoding

In the assertions:

c.Assert().Equal("998000", userQuoteResp.DestAmount) // destAmount is quote destAmount minus fixed fee
c.Assert().Equal(c.relayerWallets[0].Address().Hex(), userQuoteResp.RelayerAddress)

The expected DestAmount is hardcoded as "998000". To improve test flexibility and reduce potential errors if input values change, consider calculating the expected DestAmount using the test variables.

Apply this change:

+ expectedDestAmount := new(big.Int).Sub(passiveQuotes[0].DestAmount.BigInt(), passiveQuotes[0].FixedFee.BigInt())
  // Assert the response
  c.Assert().True(userQuoteResp.Success)
  c.Assert().Equal("passive", userQuoteResp.QuoteType)
- c.Assert().Equal("998000", userQuoteResp.DestAmount) // destAmount is quote destAmount minus fixed fee
+ c.Assert().Equal(expectedDestAmount.String(), userQuoteResp.DestAmount)
  c.Assert().Equal(c.relayerWallets[0].Address().Hex(), userQuoteResp.RelayerAddress)

}

func (c *ServerSuite) TestActiveRFQPassiveBestQuote() {
Expand Down Expand Up @@ -389,7 +384,6 @@ func (c *ServerSuite) TestActiveRFQPassiveBestQuote() {
// Assert the response
c.Assert().True(userQuoteResp.Success)
c.Assert().Equal("passive", userQuoteResp.QuoteType)
c.Assert().Equal("998900", *userQuoteResp.Data.DestAmount) // destAmount is quote destAmount minus fixed fee
c.Assert().Equal(userRequestAmount.String(), userQuoteResp.Data.OriginAmount)
c.Assert().Equal(c.relayerWallets[0].Address().Hex(), *userQuoteResp.Data.RelayerAddress)
c.Assert().Equal("998900", userQuoteResp.DestAmount) // destAmount is quote destAmount minus fixed fee
c.Assert().Equal(c.relayerWallets[0].Address().Hex(), userQuoteResp.RelayerAddress)
Comment on lines +387 to +388
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Calculate expected DestAmount instead of using hardcoded value

The assertion uses a hardcoded value:

c.Assert().Equal("998900", userQuoteResp.DestAmount) // destAmount is quote destAmount minus fixed fee

To enhance maintainability, compute the expected DestAmount dynamically based on test inputs.

Apply this change:

+ expectedDestAmount := new(big.Int).Sub(passiveQuotes[0].DestAmount.BigInt(), passiveQuotes[0].FixedFee.BigInt())
  // Assert the response
  c.Assert().True(userQuoteResp.Success)
  c.Assert().Equal("passive", userQuoteResp.QuoteType)
- c.Assert().Equal("998900", userQuoteResp.DestAmount) // destAmount is quote destAmount minus fixed fee
+ c.Assert().Equal(expectedDestAmount.String(), userQuoteResp.DestAmount)
  c.Assert().Equal(c.relayerWallets[0].Address().Hex(), userQuoteResp.RelayerAddress)

}
18 changes: 15 additions & 3 deletions services/rfq/api/rest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,16 +533,23 @@ func (r *QuoterAPIServer) PutRFQRequest(c *gin.Context) {
var activeQuote *model.QuoteData
if isActiveRFQ {
activeQuote = r.handleActiveRFQ(ctx, &req, requestID)
if activeQuote != nil && activeQuote.DestAmount != nil {
span.SetAttributes(attribute.String("active_quote_dest_amount", *activeQuote.DestAmount))
}
}
passiveQuote, err := r.handlePassiveRFQ(ctx, &req)
if err != nil {
logger.Error("Error handling passive RFQ", "error", err)
}
if passiveQuote != nil && passiveQuote.DestAmount != nil {
span.SetAttributes(attribute.String("passive_quote_dest_amount", *passiveQuote.DestAmount))
}
quote, _ := getBestQuote(activeQuote, passiveQuote)

// construct the response
var resp model.PutUserQuoteResponse
if quote == nil {
span.AddEvent("no quotes found")
resp = model.PutUserQuoteResponse{
Success: false,
Reason: "no quotes found",
Expand All @@ -552,10 +559,15 @@ func (r *QuoterAPIServer) PutRFQRequest(c *gin.Context) {
if activeQuote == nil {
quoteType = quoteTypePassive
}
span.SetAttributes(
attribute.String("quote_type", quoteType),
attribute.String("quote_dest_amount", *quote.DestAmount),
)
resp = model.PutUserQuoteResponse{
Success: true,
Data: *quote,
QuoteType: quoteType,
Success: true,
QuoteType: quoteType,
DestAmount: *quote.DestAmount,
RelayerAddress: *quote.RelayerAddress,
}
}
c.JSON(http.StatusOK, resp)
Expand Down
3 changes: 0 additions & 3 deletions services/rfq/api/rest/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ type ServerSuite struct {
handler metrics.Handler
QuoterAPIServer *rest.QuoterAPIServer
port uint16
wsPort uint16
originChainID int
destChainID int
}
Expand Down Expand Up @@ -75,8 +74,6 @@ func (c *ServerSuite) SetupTest() {
c.True(ok)
port, err := freeport.GetFreePort()
c.port = uint16(port)
wsPort, err := freeport.GetFreePort()
c.wsPort = uint16(wsPort)
c.Require().NoError(err)

testConfig := config.Config{
Expand Down
17 changes: 10 additions & 7 deletions services/rfq/api/rest/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ const (

// Run runs the WebSocket client.
func (c *wsClient) Run(ctx context.Context) (err error) {
c.lastPong = time.Now()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
messageChan := make(chan []byte)
pingTicker := time.NewTicker(PingPeriod)
defer pingTicker.Stop()
Expand All @@ -122,11 +123,12 @@ func (c *wsClient) Run(ctx context.Context) (err error) {
err = c.handleRelayerMessage(ctx, msg)
if err != nil {
logger.Error("Error handling relayer message: %s", err)
return fmt.Errorf("error handling relayer message: %w", err)
}
case <-pingTicker.C:
err = c.trySendPing(c.lastPong)
if err != nil {
logger.Error("Error sending ping message: %s", err)
logger.Error("Error sending ping message: %w", err)
}
}
}
Expand Down Expand Up @@ -169,6 +171,8 @@ func (c *wsClient) sendRelayerRequest(ctx context.Context, req *model.WsRFQReque
return nil
}

// handleRelayerMessage handles messages from the relayer.
// An error returned will result in the websocket connection being closed.
func (c *wsClient) handleRelayerMessage(ctx context.Context, msg []byte) (err error) {
var rfqMsg model.ActiveRFQMessage
err = json.Unmarshal(msg, &rfqMsg)
Expand All @@ -191,13 +195,12 @@ func (c *wsClient) handleRelayerMessage(ctx context.Context, msg []byte) (err er
}
case SendQuoteOp:
err = c.handleSendQuote(ctx, rfqMsg.Content)
if err != nil {
return fmt.Errorf("error handling send quote: %w", err)
}
logger.Errorf("error handling send quote: %v", err)
case PongOp:
c.lastPong = time.Now()
default:
return fmt.Errorf("received unexpected operation from relayer: %s", rfqMsg.Op)
logger.Errorf("received unexpected operation from relayer: %s", rfqMsg.Op)
return nil
}

return nil
Expand Down Expand Up @@ -273,7 +276,7 @@ func (c *wsClient) handleSendQuote(ctx context.Context, content json.RawMessage)
}

func (c *wsClient) trySendPing(lastPong time.Time) (err error) {
if time.Since(lastPong) > PingPeriod {
if time.Since(lastPong) > PingPeriod && !lastPong.IsZero() {
err = c.conn.Close()
if err != nil {
return fmt.Errorf("error closing websocket connection: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion services/rfq/e2e/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (i *IntegrationSuite) getRelayerConfig() relconfig.Config {
},
// generated ex-post facto
QuotableTokens: map[string][]string{},
RfqAPIURL: i.apiServer,
RFQAPIURL: i.apiServer,
Signer: signerConfig.SignerConfig{
Type: signerConfig.FileType.String(),
File: filet.TmpFile(i.T(), "", i.relayerWallet.PrivateKeyHex()).Name(),
Expand Down
Loading
Loading