Skip to content

Commit

Permalink
feat(rfq-relayer): relayer supports active quoting (#3198)
Browse files Browse the repository at this point in the history
* Feat: add active rfq subscription on quoter

* Feat: relayer subscribes to active quotes upon starting

* [goreleaser]

* Feat: specify ws url in relayer

* [goreleaser]

* [goreleaser]

* Fix: build

* [goreleaser]

* Feat: relayer tracing

* [goreleaser]

* Feat: use supports_active_quoting instead of ws url

* [goreleaser]

* WIP: add logs

* [goreleaser]

* WIP: more logs

* [goreleaser]

* More logs

* [goreleaser]

* More logs

* [goreleaser]

* More logs

* [goreleaser]

* Close conn when encountering write err

* [goreleaser]

* More logs

* [goreleaser]

* More logs

* [goreleaser]

* More logs

* [goreleaser]

* More logs

* [goreleaser]

* Logs with ts

* [goreleaser]

* More tracing

* [goreleaser]

* Fix: send to reqChan

* [goreleaser]

* Check for zero pong time

* Fix: make close_at and closed_quote_id optional

* [goreleaser]

* Feat: remove extra fields from responses

* [goreleaser]

* Fix: skip passive quote

* [goreleaser]

* Cleanup: remove logs

* Fix: use correct span

* Cleanup: remove logs
  • Loading branch information
dwasse authored Sep 30, 2024
1 parent 925617e commit 7ff7c81
Show file tree
Hide file tree
Showing 13 changed files with 180 additions and 44 deletions.
4 changes: 2 additions & 2 deletions services/rfq/api/db/api_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ type ActiveQuoteRequest struct {
ExpirationWindow time.Duration `gorm:"column:expiration_window"`
CreatedAt time.Time `gorm:"column:created_at"`
Status ActiveQuoteRequestStatus `gorm:"column:status"`
ClosedAt time.Time `gorm:"column:fulfilled_at"`
ClosedQuoteID string `gorm:"column:fulfilled_quote_id"`
ClosedAt *time.Time `gorm:"column:closed_at"`
ClosedQuoteID *string `gorm:"column:closed_quote_id"`
}

// FromUserRequest converts a model.PutRFQRequest to an ActiveQuoteRequest.
Expand Down
4 changes: 2 additions & 2 deletions services/rfq/api/db/sql/base/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ func (s *Store) UpdateActiveQuoteRequestStatus(ctx context.Context, requestID st
if quoteID == nil {
return fmt.Errorf("quote id is required for fulfilled status")
}

Check warning on line 104 in services/rfq/api/db/sql/base/store.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/db/sql/base/store.go#L97-L104

Added lines #L97 - L104 were not covered by tests
updates["fulfilled_quote_id"] = quoteID
updates["fulfilled_at"] = time.Now().UTC()
updates["closed_quote_id"] = quoteID
updates["closed_at"] = time.Now().UTC()
}
result := s.db.WithContext(ctx).
Model(&db.ActiveQuoteRequest{}).
Expand Down
12 changes: 6 additions & 6 deletions services/rfq/api/model/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,17 @@ type ActiveRFQMessage struct {

// PutUserQuoteResponse represents a response to a user quote request.
type PutUserQuoteResponse struct {
Success bool `json:"success"`
Reason string `json:"reason"`
UserAddress string `json:"user_address"`
QuoteType string `json:"quote_type"`
Data QuoteData `json:"data"`
Success bool `json:"success"`
Reason string `json:"reason,omitempty"`
QuoteType string `json:"quote_type,omitempty"`
DestAmount string `json:"dest_amount,omitempty"`
RelayerAddress string `json:"relayer_address,omitempty"`
}

// WsRFQResponse represents a response to a quote request.
type WsRFQResponse struct {
RequestID string `json:"request_id"`
QuoteID string `json:"quote_id"`
QuoteID string `json:"quote_id,omitempty"`
DestAmount string `json:"dest_amount"`
UpdatedAt time.Time `json:"updated_at"`
}
Expand Down
14 changes: 12 additions & 2 deletions services/rfq/api/rest/rfq.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ func (r *QuoterAPIServer) handleActiveRFQ(ctx context.Context, request *model.Pu
responses := r.collectRelayerResponses(ctx, request, requestID)
var quoteID string
var isUpdated bool
for _, resp := range responses {
for relayerAddr, resp := range responses {
quote, isUpdated = getBestQuote(quote, getRelayerQuoteData(request, resp))
if isUpdated {
quoteID = resp.QuoteID
}
quote.RelayerAddress = &relayerAddr
}
err = r.recordActiveQuote(ctx, quote, requestID, quoteID)
if err != nil {
Expand Down Expand Up @@ -90,13 +91,14 @@ func (r *QuoterAPIServer) collectRelayerResponses(ctx context.Context, request *
wg.Add(1)
go func(client WsClient) {
var respStatus db.ActiveQuoteResponseStatus
var err error
_, clientSpan := r.handler.Tracer().Start(collectionCtx, "collectRelayerResponses", trace.WithAttributes(
attribute.String("relayer_address", relayerAddr),
attribute.String("request_id", requestID),
))
defer func() {
clientSpan.SetAttributes(attribute.String("status", respStatus.String()))
metrics.EndSpan(clientSpan)
metrics.EndSpanWithErr(clientSpan, err)
}()

defer wg.Done()
Expand All @@ -105,6 +107,11 @@ func (r *QuoterAPIServer) collectRelayerResponses(ctx context.Context, request *
logger.Errorf("Error receiving quote response: %v", err)
return
}

Check warning on line 109 in services/rfq/api/rest/rfq.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/rest/rfq.go#L107-L109

Added lines #L107 - L109 were not covered by tests
clientSpan.AddEvent("received quote response", trace.WithAttributes(
attribute.String("relayer_address", relayerAddr),
attribute.String("request_id", requestID),
attribute.String("dest_amount", resp.DestAmount),
))

// validate the response
respStatus = getQuoteResponseStatus(expireCtx, resp)
Expand Down Expand Up @@ -247,6 +254,9 @@ func (r *QuoterAPIServer) handlePassiveRFQ(ctx context.Context, request *model.P
)

rawDestAmountInt, _ := rawDestAmount.Int(nil)
if rawDestAmountInt.Cmp(quote.FixedFee.BigInt()) < 0 {
continue
}
destAmount := new(big.Int).Sub(rawDestAmountInt, quote.FixedFee.BigInt()).String()
//nolint:gosec
quoteData := &model.QuoteData{
Expand Down
18 changes: 6 additions & 12 deletions services/rfq/api/rest/rfq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func (c *ServerSuite) TestActiveRFQSingleRelayer() {
}

// Prepare the relayer quote response
originAmount := userRequestAmount.String()
destAmount := new(big.Int).Sub(userRequestAmount, big.NewInt(1000)).String()
quoteResp := &model.WsRFQResponse{
DestAmount: destAmount,
Expand All @@ -119,8 +118,7 @@ func (c *ServerSuite) TestActiveRFQSingleRelayer() {
// Assert the response
c.Assert().True(userQuoteResp.Success)
c.Assert().Equal("active", userQuoteResp.QuoteType)
c.Assert().Equal(destAmount, *userQuoteResp.Data.DestAmount)
c.Assert().Equal(originAmount, userQuoteResp.Data.OriginAmount)
c.Assert().Equal(destAmount, userQuoteResp.DestAmount)

// Verify ActiveQuoteRequest insertion
activeQuoteRequests, err := c.database.GetActiveQuoteRequests(c.GetTestContext())
Expand Down Expand Up @@ -206,7 +204,6 @@ func (c *ServerSuite) TestActiveRFQMultipleRelayers() {
}

// Prepare the relayer quote responses
originAmount := userRequestAmount.String()
destAmount := "999000"
quoteResp := model.WsRFQResponse{
DestAmount: destAmount,
Expand Down Expand Up @@ -234,8 +231,7 @@ func (c *ServerSuite) TestActiveRFQMultipleRelayers() {
// Assert the response
c.Assert().True(userQuoteResp.Success)
c.Assert().Equal("active", userQuoteResp.QuoteType)
c.Assert().Equal(destAmount, *userQuoteResp.Data.DestAmount)
c.Assert().Equal(originAmount, userQuoteResp.Data.OriginAmount)
c.Assert().Equal(destAmount, userQuoteResp.DestAmount)

// Verify ActiveQuoteRequest insertion
activeQuoteRequests, err := c.database.GetActiveQuoteRequests(c.GetTestContext())
Expand Down Expand Up @@ -309,9 +305,8 @@ func (c *ServerSuite) TestActiveRFQFallbackToPassive() {
// Assert the response
c.Assert().True(userQuoteResp.Success)
c.Assert().Equal("passive", userQuoteResp.QuoteType)
c.Assert().Equal("998000", *userQuoteResp.Data.DestAmount) // destAmount is quote destAmount minus fixed fee
c.Assert().Equal(userRequestAmount.String(), userQuoteResp.Data.OriginAmount)
c.Assert().Equal(c.relayerWallets[0].Address().Hex(), *userQuoteResp.Data.RelayerAddress)
c.Assert().Equal("998000", userQuoteResp.DestAmount) // destAmount is quote destAmount minus fixed fee
c.Assert().Equal(c.relayerWallets[0].Address().Hex(), userQuoteResp.RelayerAddress)
}

func (c *ServerSuite) TestActiveRFQPassiveBestQuote() {
Expand Down Expand Up @@ -389,7 +384,6 @@ func (c *ServerSuite) TestActiveRFQPassiveBestQuote() {
// Assert the response
c.Assert().True(userQuoteResp.Success)
c.Assert().Equal("passive", userQuoteResp.QuoteType)
c.Assert().Equal("998900", *userQuoteResp.Data.DestAmount) // destAmount is quote destAmount minus fixed fee
c.Assert().Equal(userRequestAmount.String(), userQuoteResp.Data.OriginAmount)
c.Assert().Equal(c.relayerWallets[0].Address().Hex(), *userQuoteResp.Data.RelayerAddress)
c.Assert().Equal("998900", userQuoteResp.DestAmount) // destAmount is quote destAmount minus fixed fee
c.Assert().Equal(c.relayerWallets[0].Address().Hex(), userQuoteResp.RelayerAddress)
}
18 changes: 15 additions & 3 deletions services/rfq/api/rest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,16 +533,23 @@ func (r *QuoterAPIServer) PutRFQRequest(c *gin.Context) {
var activeQuote *model.QuoteData
if isActiveRFQ {
activeQuote = r.handleActiveRFQ(ctx, &req, requestID)
if activeQuote != nil && activeQuote.DestAmount != nil {
span.SetAttributes(attribute.String("active_quote_dest_amount", *activeQuote.DestAmount))
}
}
passiveQuote, err := r.handlePassiveRFQ(ctx, &req)
if err != nil {
logger.Error("Error handling passive RFQ", "error", err)
}

Check warning on line 543 in services/rfq/api/rest/server.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/rest/server.go#L542-L543

Added lines #L542 - L543 were not covered by tests
if passiveQuote != nil && passiveQuote.DestAmount != nil {
span.SetAttributes(attribute.String("passive_quote_dest_amount", *passiveQuote.DestAmount))
}
quote, _ := getBestQuote(activeQuote, passiveQuote)

// construct the response
var resp model.PutUserQuoteResponse
if quote == nil {
span.AddEvent("no quotes found")
resp = model.PutUserQuoteResponse{
Success: false,
Reason: "no quotes found",
Expand All @@ -552,10 +559,15 @@ func (r *QuoterAPIServer) PutRFQRequest(c *gin.Context) {
if activeQuote == nil {
quoteType = quoteTypePassive
}
span.SetAttributes(
attribute.String("quote_type", quoteType),
attribute.String("quote_dest_amount", *quote.DestAmount),
)
resp = model.PutUserQuoteResponse{
Success: true,
Data: *quote,
QuoteType: quoteType,
Success: true,
QuoteType: quoteType,
DestAmount: *quote.DestAmount,
RelayerAddress: *quote.RelayerAddress,
}
}
c.JSON(http.StatusOK, resp)
Expand Down
3 changes: 0 additions & 3 deletions services/rfq/api/rest/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ type ServerSuite struct {
handler metrics.Handler
QuoterAPIServer *rest.QuoterAPIServer
port uint16
wsPort uint16
originChainID int
destChainID int
}
Expand Down Expand Up @@ -75,8 +74,6 @@ func (c *ServerSuite) SetupTest() {
c.True(ok)
port, err := freeport.GetFreePort()
c.port = uint16(port)
wsPort, err := freeport.GetFreePort()
c.wsPort = uint16(wsPort)
c.Require().NoError(err)

testConfig := config.Config{
Expand Down
17 changes: 10 additions & 7 deletions services/rfq/api/rest/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ const (

// Run runs the WebSocket client.
func (c *wsClient) Run(ctx context.Context) (err error) {
c.lastPong = time.Now()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
messageChan := make(chan []byte)
pingTicker := time.NewTicker(PingPeriod)
defer pingTicker.Stop()
Expand All @@ -122,11 +123,12 @@ func (c *wsClient) Run(ctx context.Context) (err error) {
err = c.handleRelayerMessage(ctx, msg)
if err != nil {
logger.Error("Error handling relayer message: %s", err)
return fmt.Errorf("error handling relayer message: %w", err)
}
case <-pingTicker.C:
err = c.trySendPing(c.lastPong)
if err != nil {

Check warning on line 130 in services/rfq/api/rest/ws.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/rest/ws.go#L128-L130

Added lines #L128 - L130 were not covered by tests
logger.Error("Error sending ping message: %s", err)
logger.Error("Error sending ping message: %w", err)
}

Check warning on line 132 in services/rfq/api/rest/ws.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/rest/ws.go#L132

Added line #L132 was not covered by tests
}
}
Expand Down Expand Up @@ -169,6 +171,8 @@ func (c *wsClient) sendRelayerRequest(ctx context.Context, req *model.WsRFQReque
return nil
}

// handleRelayerMessage handles messages from the relayer.
// An error returned will result in the websocket connection being closed.
func (c *wsClient) handleRelayerMessage(ctx context.Context, msg []byte) (err error) {
var rfqMsg model.ActiveRFQMessage
err = json.Unmarshal(msg, &rfqMsg)
Expand All @@ -191,13 +195,12 @@ func (c *wsClient) handleRelayerMessage(ctx context.Context, msg []byte) (err er
}

Check warning on line 195 in services/rfq/api/rest/ws.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/rest/ws.go#L192-L195

Added lines #L192 - L195 were not covered by tests
case SendQuoteOp:
err = c.handleSendQuote(ctx, rfqMsg.Content)
if err != nil {
return fmt.Errorf("error handling send quote: %w", err)
}
logger.Errorf("error handling send quote: %v", err)
case PongOp:
c.lastPong = time.Now()
default:

Check warning on line 201 in services/rfq/api/rest/ws.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/rest/ws.go#L199-L201

Added lines #L199 - L201 were not covered by tests
return fmt.Errorf("received unexpected operation from relayer: %s", rfqMsg.Op)
logger.Errorf("received unexpected operation from relayer: %s", rfqMsg.Op)
return nil
}

return nil
Expand Down Expand Up @@ -273,7 +276,7 @@ func (c *wsClient) handleSendQuote(ctx context.Context, content json.RawMessage)
}

func (c *wsClient) trySendPing(lastPong time.Time) (err error) {

Check warning on line 278 in services/rfq/api/rest/ws.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/rest/ws.go#L278

Added line #L278 was not covered by tests
if time.Since(lastPong) > PingPeriod {
if time.Since(lastPong) > PingPeriod && !lastPong.IsZero() {
err = c.conn.Close()
if err != nil {
return fmt.Errorf("error closing websocket connection: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion services/rfq/e2e/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (i *IntegrationSuite) getRelayerConfig() relconfig.Config {
},
// generated ex-post facto
QuotableTokens: map[string][]string{},
RfqAPIURL: i.apiServer,
RFQAPIURL: i.apiServer,
Signer: signerConfig.SignerConfig{
Type: signerConfig.FileType.String(),
File: filet.TmpFile(i.T(), "", i.relayerWallet.PrivateKeyHex()).Name(),
Expand Down
Loading

0 comments on commit 7ff7c81

Please sign in to comment.