Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rfq-relayer): relayer supports active quoting #3198

Merged
merged 50 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
d93e20f
Feat: add active rfq subscription on quoter
dwasse Sep 26, 2024
481f043
Feat: relayer subscribes to active quotes upon starting
dwasse Sep 26, 2024
fdbc865
[goreleaser]
dwasse Sep 26, 2024
77c51e8
Feat: specify ws url in relayer
dwasse Sep 26, 2024
d10d56d
[goreleaser]
dwasse Sep 26, 2024
d2b1701
Merge branch 'feat/active-rfq-api' into feat/active-rfq-relayer
dwasse Sep 27, 2024
f06a64b
[goreleaser]
dwasse Sep 27, 2024
f6300a1
Fix: build
dwasse Sep 27, 2024
2646149
[goreleaser]
dwasse Sep 27, 2024
ea61286
Merge branch 'feat/active-rfq-api' into feat/active-rfq-relayer
dwasse Sep 27, 2024
dcd264a
Feat: relayer tracing
dwasse Sep 27, 2024
02bf53c
[goreleaser]
dwasse Sep 27, 2024
a83253f
Feat: use supports_active_quoting instead of ws url
dwasse Sep 27, 2024
6febf7b
[goreleaser]
dwasse Sep 27, 2024
3ce1bd3
WIP: add logs
dwasse Sep 27, 2024
460f5ac
[goreleaser]
dwasse Sep 27, 2024
9ec49cb
WIP: more logs
dwasse Sep 27, 2024
92b49ec
[goreleaser]
dwasse Sep 30, 2024
e85ff62
More logs
dwasse Sep 30, 2024
f4ed5b5
[goreleaser]
dwasse Sep 30, 2024
0c0b562
More logs
dwasse Sep 30, 2024
1742fe3
[goreleaser]
dwasse Sep 30, 2024
0d7a7c4
More logs
dwasse Sep 30, 2024
4828dfc
[goreleaser]
dwasse Sep 30, 2024
ab9513d
Close conn when encountering write err
dwasse Sep 30, 2024
a2a2079
[goreleaser]
dwasse Sep 30, 2024
69ed171
More logs
dwasse Sep 30, 2024
fbccdf8
[goreleaser]
dwasse Sep 30, 2024
4b098f9
More logs
dwasse Sep 30, 2024
135f1ac
[goreleaser]
dwasse Sep 30, 2024
b4969e9
More logs
dwasse Sep 30, 2024
bf3adaa
[goreleaser]
dwasse Sep 30, 2024
82ed6bb
More logs
dwasse Sep 30, 2024
896d52d
[goreleaser]
dwasse Sep 30, 2024
0018269
Logs with ts
dwasse Sep 30, 2024
c3f0eb3
[goreleaser]
dwasse Sep 30, 2024
50b969e
More tracing
dwasse Sep 30, 2024
64389c4
[goreleaser]
dwasse Sep 30, 2024
0c07fe4
Fix: send to reqChan
dwasse Sep 30, 2024
719361a
[goreleaser]
dwasse Sep 30, 2024
26c9174
Check for zero pong time
dwasse Sep 30, 2024
ccd24b3
Fix: make close_at and closed_quote_id optional
dwasse Sep 30, 2024
9688fa7
[goreleaser]
dwasse Sep 30, 2024
b98ca8c
Feat: remove extra fields from responses
dwasse Sep 30, 2024
739472d
[goreleaser]
dwasse Sep 30, 2024
7d7f6df
Fix: skip passive quote
dwasse Sep 30, 2024
6a5590f
[goreleaser]
dwasse Sep 30, 2024
56afeff
Cleanup: remove logs
dwasse Sep 30, 2024
bdb7539
Fix: use correct span
dwasse Sep 30, 2024
ab15e5d
Cleanup: remove logs
dwasse Sep 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions services/rfq/api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ func (c *clientImpl) connectWebsocket(ctx context.Context, req *model.SubscribeA
}

reqURL := strings.Replace(c.rClient.BaseURL, "http", "ws", 1) + rest.RFQStreamRoute
fmt.Printf("dialing websocket: %s\n", reqURL)
fmt.Printf("headers: %v\n", header)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve logging approach for WebSocket connection details

While the added print statements can be helpful for debugging, using fmt.Printf directly is not ideal for production code. Consider the following improvements:

  1. Use the existing logger variable (which is an instance of log.Logger) instead of fmt.Printf. This allows for better log management and potential log level control.
  2. Consider logging at a debug or trace level to avoid cluttering production logs.
  3. Be cautious about logging headers, as they may contain sensitive information.

Here's a suggested refactor:

-	fmt.Printf("dialing websocket: %s\n", reqURL)
-	fmt.Printf("headers: %v\n", header)
+	logger.Debugf("Dialing WebSocket: %s", reqURL)
+	// Log headers selectively or at a lower log level
+	logger.Tracef("WebSocket headers: %v", header)

Also, consider adding a helper function to sanitize headers before logging to ensure sensitive information is not exposed.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fmt.Printf("dialing websocket: %s\n", reqURL)
fmt.Printf("headers: %v\n", header)
logger.Debugf("Dialing WebSocket: %s", reqURL)
// Log headers selectively or at a lower log level
logger.Tracef("WebSocket headers: %v", header)

conn, httpResp, err := websocket.DefaultDialer.Dial(reqURL, header)
if err != nil {
return nil, fmt.Errorf("failed to connect to websocket: %w", err)
Expand Down
13 changes: 13 additions & 0 deletions services/rfq/api/rest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,9 @@ func (r *QuoterAPIServer) AuthMiddleware() gin.HandlerFunc {
loggedRequest = &req
}
case RFQRoute, RFQStreamRoute:
fmt.Println("RFQRoute, RFQStreamRoute")
chainsHeader := c.GetHeader(ChainsHeader)
fmt.Printf("got chains header: %s\n", chainsHeader)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider using a proper logging framework and removing debug logs.

While adding logs can be helpful for debugging, using fmt.Println and fmt.Printf directly is not ideal for production code. Consider the following improvements:

  1. Use a structured logging framework (e.g., logrus, zap) instead of fmt for better log management and performance.
  2. Use appropriate log levels (e.g., debug, info) instead of print statements.
  3. Consider making these logs conditional based on a debug flag to avoid unnecessary logging in production.

Example using the existing logger:

if debugMode {
    logger.Debug("RFQRoute or RFQStreamRoute hit")
    logger.Debugf("Got chains header: %s", chainsHeader)
}

if chainsHeader != "" {
var chainIDs []int
err = json.Unmarshal([]byte(chainsHeader), &chainIDs)
Expand Down Expand Up @@ -434,28 +436,34 @@ func (r *QuoterAPIServer) PutRelayAck(c *gin.Context) {
// @Header 101 {string} X-Api-Version "API Version Number - See docs for more info"
// @Router /quote_requests [get].
func (r *QuoterAPIServer) GetActiveRFQWebsocket(ctx context.Context, c *gin.Context) {
fmt.Println("GetActiveRFQWebsocket")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance logging with proper framework and more context.

The current debug print statement can be improved for better logging practices:

  1. Replace fmt.Println with a proper logging framework, preferably the same one used elsewhere in the codebase (e.g., the logger variable).
  2. Consider adding more context to the log, such as the relayer address or any relevant request parameters.
  3. Use appropriate log levels to allow for easy filtering in different environments.

Example improvement:

logger.WithFields(log.Fields{
    "relayerAddr": relayerAddr,
    "remoteAddr": c.Request.RemoteAddr,
}).Debug("Handling WebSocket connection for active quote requests")

Also, consider wrapping this log in a debug flag check to avoid unnecessary logging in production environments.

ctx, span := r.handler.Tracer().Start(ctx, "GetActiveRFQWebsocket")
defer func() {
metrics.EndSpan(span)
}()

fmt.Println("upgrading websocket")
ws, err := r.upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
logger.Error("Failed to set websocket upgrade", "error", err)
return
}
fmt.Println("upgraded websocket")

// use the relayer address as the ID for the connection
rawRelayerAddr, exists := c.Get("relayerAddr")
if !exists {
fmt.Println("no relayer address recovered from signature")
c.JSON(http.StatusBadRequest, gin.H{"error": "No relayer address recovered from signature"})
return
}
relayerAddr, ok := rawRelayerAddr.(string)
if !ok {
fmt.Println("invalid relayer address type")
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid relayer address type"})
return
}
fmt.Printf("relayer address: %s\n", relayerAddr)

span.SetAttributes(
attribute.String("relayer_address", relayerAddr),
Expand All @@ -464,6 +472,7 @@ func (r *QuoterAPIServer) GetActiveRFQWebsocket(ctx context.Context, c *gin.Cont
// only one connection per relayer allowed
_, ok = r.wsClients.Load(relayerAddr)
if ok {
fmt.Println("relayer already connected")
c.JSON(http.StatusBadRequest, gin.H{"error": "relayer already connected"})
return
}
Expand All @@ -474,12 +483,16 @@ func (r *QuoterAPIServer) GetActiveRFQWebsocket(ctx context.Context, c *gin.Cont
}()

client := newWsClient(relayerAddr, ws, r.pubSubManager, r.handler)
fmt.Println("registered ws client")
r.wsClients.Store(relayerAddr, client)
span.AddEvent("registered ws client")
fmt.Println("running ws client")
err = client.Run(ctx)
if err != nil {
fmt.Printf("error running ws client: %v\n", err)
logger.Error("Error running websocket client", "error", err)
}
fmt.Println("ws client done")
}

const (
Expand Down
3 changes: 0 additions & 3 deletions services/rfq/api/rest/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ type ServerSuite struct {
handler metrics.Handler
QuoterAPIServer *rest.QuoterAPIServer
port uint16
wsPort uint16
originChainID int
destChainID int
}
Expand Down Expand Up @@ -75,8 +74,6 @@ func (c *ServerSuite) SetupTest() {
c.True(ok)
port, err := freeport.GetFreePort()
c.port = uint16(port)
wsPort, err := freeport.GetFreePort()
c.wsPort = uint16(wsPort)
c.Require().NoError(err)

testConfig := config.Config{
Expand Down
8 changes: 8 additions & 0 deletions services/rfq/api/rest/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ const (

// Run runs the WebSocket client.
func (c *wsClient) Run(ctx context.Context) (err error) {
fmt.Println("running ws client")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve logging practices for better observability and consistency.

While the added logging statements enhance visibility into the WebSocket client's operations, there are a few areas for improvement:

  1. Replace fmt.Printf with a proper logging framework for better control over log levels and output formatting.
  2. Avoid duplicating error logs. At lines 121 and 128, you're using both fmt.Printf and logger.Error. Stick to one consistent logging method.
  3. Consider adding log levels (e.g., DEBUG, INFO, ERROR) to differentiate between informational and error logs.

Here's a suggested refactor for improved logging:

import "log" // Or your preferred logging framework

// ...

func (c *wsClient) Run(ctx context.Context) (err error) {
    log.Println("Running ws client")
    // ...
    case req := <-c.requestChan:
        log.Printf("Sending quote request: %v", req)
        err = c.sendRelayerRequest(ctx, req)
        if err != nil {
            log.Printf("Error sending quote request: %v", err)
        }
    case msg := <-messageChan:
        log.Printf("Received message: %s", msg)
        err = c.handleRelayerMessage(ctx, msg)
        if err != nil {
            log.Printf("Error handling relayer message: %v", err)
        }
    // ...
}

This approach provides consistent logging and avoids duplication while maintaining the improved visibility into the WebSocket client's operations.

Also applies to: 118-118, 121-121, 125-125, 128-128

c.lastPong = time.Now()
messageChan := make(chan []byte)
pingTicker := time.NewTicker(PingPeriod)
Expand All @@ -114,13 +115,17 @@ func (c *wsClient) Run(ctx context.Context) (err error) {
close(c.doneChan)
return nil
case req := <-c.requestChan:
fmt.Printf("sending quote request: %v\n", req)
err = c.sendRelayerRequest(ctx, req)
if err != nil {
fmt.Printf("error sending quote request: %v\n", err)
logger.Error("Error sending quote request: %s", err)
}
case msg := <-messageChan:
fmt.Printf("received message: %s\n", msg)
err = c.handleRelayerMessage(ctx, msg)
if err != nil {
fmt.Printf("error handling relayer message: %v\n", err)
logger.Error("Error handling relayer message: %s", err)
}
case <-pingTicker.C:
Expand Down Expand Up @@ -204,6 +209,7 @@ func (c *wsClient) handleRelayerMessage(ctx context.Context, msg []byte) (err er
}

func (c *wsClient) handleSubscribe(ctx context.Context, content json.RawMessage) (resp model.ActiveRFQMessage) {
fmt.Printf("handleSubscribe: %s\n", content)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance logging and error handling in the handleSubscribe method.

The added logging improves visibility into the subscription process, but there are opportunities for improvement:

  1. Replace fmt.Printf with a proper logging framework for consistency with the rest of the application.
  2. Avoid duplicating error information. The error at line 228 is both logged and returned in the error response.
  3. Consider adding more context to the success log at line 231, such as the relayer address.

Here's a suggested refactor for improved logging and error handling:

func (c *wsClient) handleSubscribe(ctx context.Context, content json.RawMessage) (resp model.ActiveRFQMessage) {
    log.Printf("Handling subscribe request: %s", content)
    // ...
    err = c.pubsub.AddSubscription(c.relayerAddr, sub)
    if err != nil {
        log.Printf("Error adding subscription: %v", err)
        return getErrorResponse(SubscribeOp, err)
    }
    log.Printf("Successfully added subscription for relayer %s, chain IDs: %v", c.relayerAddr, sub.Chains)
    return getSuccessResponse(SubscribeOp)
}

This refactor provides more consistent and informative logging while avoiding duplication of error information.

Also applies to: 228-228, 231-231

ctx, span := c.handler.Tracer().Start(ctx, "handleSubscribe", trace.WithAttributes(
attribute.String("relayer_address", c.relayerAddr),
))
Expand All @@ -219,8 +225,10 @@ func (c *wsClient) handleSubscribe(ctx context.Context, content json.RawMessage)
span.SetAttributes(attribute.IntSlice("chain_ids", sub.Chains))
err = c.pubsub.AddSubscription(c.relayerAddr, sub)
if err != nil {
fmt.Printf("error adding subscription: %v\n", err)
return getErrorResponse(SubscribeOp, fmt.Errorf("error adding subscription: %w", err))
}
fmt.Printf("successfully added subscription for chain ids: %v\n", sub.Chains)
return getSuccessResponse(SubscribeOp)
}

Expand Down
2 changes: 1 addition & 1 deletion services/rfq/e2e/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (i *IntegrationSuite) getRelayerConfig() relconfig.Config {
},
// generated ex-post facto
QuotableTokens: map[string][]string{},
RfqAPIURL: i.apiServer,
RFQAPIURL: i.apiServer,
Signer: signerConfig.SignerConfig{
Type: signerConfig.FileType.String(),
File: filet.TmpFile(i.T(), "", i.relayerWallet.PrivateKeyHex()).Name(),
Expand Down
100 changes: 100 additions & 0 deletions services/rfq/relayer/quoter/quoter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package quoter

import (
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
Expand Down Expand Up @@ -31,6 +32,7 @@ import (
"github.com/synapsecns/sanguine/ethergo/signer/signer"
rfqAPIClient "github.com/synapsecns/sanguine/services/rfq/api/client"
"github.com/synapsecns/sanguine/services/rfq/api/model"
"github.com/synapsecns/sanguine/services/rfq/api/rest"
"github.com/synapsecns/sanguine/services/rfq/relayer/inventory"
)

Expand All @@ -42,6 +44,8 @@ var logger = log.Logger("quoter")
type Quoter interface {
// SubmitAllQuotes submits all quotes to the RFQ API.
SubmitAllQuotes(ctx context.Context) (err error)
// SubscribeActiveRFQ subscribes to the RFQ websocket API.
SubscribeActiveRFQ(ctx context.Context) (err error)
Comment on lines +47 to +48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Adding a new method to public interface Quoter may introduce breaking changes

Adding SubscribeActiveRFQ(ctx context.Context) (err error) to the Quoter interface can break existing implementations. Any external packages implementing Quoter will now fail to compile until they implement this new method. Consider the impact on external dependencies and possibly introduce a new interface to extend Quoter.

// ShouldProcess determines if a quote should be processed.
// We do this by either saving all quotes in-memory, and refreshing via GetSelfQuotes() through the API
// The first comparison is does bridge transaction OriginChainID+TokenAddr match with a quote + DestChainID+DestTokenAddr, then we look to see if we have enough amount to relay it + if the price fits our bounds (based on that the Relayer is relaying the destination token for the origin)
Expand Down Expand Up @@ -251,6 +255,102 @@ func (m *Manager) SubmitAllQuotes(ctx context.Context) (err error) {
return m.prepareAndSubmitQuotes(ctx, inv)
}

// SubscribeActiveRFQ subscribes to the RFQ websocket API.
// This function is blocking and will run until the context is cancelled.
func (m *Manager) SubscribeActiveRFQ(ctx context.Context) (err error) {
ctx, span := m.metricsHandler.Tracer().Start(ctx, "SubscribeActiveRFQ")
defer func() {
metrics.EndSpanWithErr(span, err)
}()

chainIDs := []int{}
for chainID := range m.config.Chains {
chainIDs = append(chainIDs, chainID)
}
req := model.SubscribeActiveRFQRequest{
ChainIDs: chainIDs,
}
span.SetAttributes(attribute.IntSlice("chain_ids", chainIDs))

reqChan := make(chan *model.ActiveRFQMessage)
respChan, err := m.rfqClient.SubscribeActiveQuotes(ctx, &req, reqChan)
if err != nil {
return fmt.Errorf("error subscribing to active quotes: %w", err)
}
span.AddEvent("subscribed to active quotes")

for {
select {
case <-ctx.Done():
return
case msg := <-respChan:
resp, err := m.generateActiveRFQ(ctx, msg)
if err != nil {
return fmt.Errorf("error generating active RFQ message: %w", err)
}
Comment on lines +294 to +296
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle errors without terminating the subscription loop

Returning an error here will exit the SubscribeActiveRFQ method, terminating the subscription loop. Consider logging the error and continuing to process other messages to ensure that a single failure does not stop the entire subscription.

Apply this diff to adjust error handling:

 			resp, err := m.generateActiveRFQ(ctx, msg)
 			if err != nil {
-				return fmt.Errorf("error generating active RFQ message: %w", err)
+				span.RecordError(err)
+				logger.Error("error generating active RFQ message", "error", err)
+				continue
 			}

Committable suggestion was skipped due to low confidence.

respChan <- resp
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential deadlock: Sending responses to respChan instead of reqChan

In the select loop, you are sending responses back to respChan (line 283), which is the channel from which you receive messages. This could cause a deadlock or unexpected behavior. You should send responses to reqChan instead.

Apply this diff to fix the issue:

     case msg := <-respChan:
         resp, err := m.generateActiveRFQ(ctx, msg)
         if err != nil {
             return fmt.Errorf("error generating active RFQ message: %w", err)
         }
-        respChan <- resp
+        reqChan <- resp
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
case msg := <-respChan:
resp, err := m.generateActiveRFQ(ctx, msg)
if err != nil {
return fmt.Errorf("error generating active RFQ message: %w", err)
}
respChan <- resp
case msg := <-respChan:
resp, err := m.generateActiveRFQ(ctx, msg)
if err != nil {
return fmt.Errorf("error generating active RFQ message: %w", err)
}
reqChan <- resp

}
}
}

// getActiveRFQ handles an active RFQ message.
func (m *Manager) generateActiveRFQ(ctx context.Context, msg *model.ActiveRFQMessage) (resp *model.ActiveRFQMessage, err error) {
ctx, span := m.metricsHandler.Tracer().Start(ctx, "generateActiveRFQ", trace.WithAttributes(
attribute.String("op", msg.Op),
))
defer func() {
metrics.EndSpanWithErr(span, err)
}()

if msg.Op != rest.RequestQuoteOp {
span.AddEvent("not a request quote op")
return nil, nil
}

inv, err := m.inventoryManager.GetCommittableBalances(ctx, inventory.SkipDBCache())
if err != nil {
return nil, fmt.Errorf("error getting committable balances: %w", err)
}

var rfqRequest model.WsRFQRequest
err = json.Unmarshal(msg.Content, &rfqRequest)
if err != nil {
return nil, fmt.Errorf("error unmarshalling quote data: %w", err)
}
span.SetAttributes(attribute.String("request_id", rfqRequest.RequestID))

quoteInput := QuoteInput{
OriginChainID: rfqRequest.Data.OriginChainID,
DestChainID: rfqRequest.Data.DestChainID,
OriginTokenAddr: common.HexToAddress(rfqRequest.Data.OriginTokenAddr),
DestTokenAddr: common.HexToAddress(rfqRequest.Data.DestTokenAddr),
OriginBalance: inv[rfqRequest.Data.OriginChainID][common.HexToAddress(rfqRequest.Data.OriginTokenAddr)],
DestBalance: inv[rfqRequest.Data.DestChainID][common.HexToAddress(rfqRequest.Data.DestTokenAddr)],
Comment on lines +334 to +335
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Possible nil map access without existence checks

Accessing inv maps without checking if the keys exist can lead to a panic if the keys are missing. Before accessing inv[rfqRequest.Data.OriginChainID][...] and inv[rfqRequest.Data.DestChainID][...], ensure that both the chain ID and token address keys exist in the maps.

Apply this diff to safely access the map entries:

     quoteInput := QuoteInput{
         OriginChainID:   rfqRequest.Data.OriginChainID,
         DestChainID:     rfqRequest.Data.DestChainID,
         OriginTokenAddr: common.HexToAddress(rfqRequest.Data.OriginTokenAddr),
         DestTokenAddr:   common.HexToAddress(rfqRequest.Data.DestTokenAddr),
-        OriginBalance:   inv[rfqRequest.Data.OriginChainID][common.HexToAddress(rfqRequest.Data.OriginTokenAddr)],
-        DestBalance:     inv[rfqRequest.Data.DestChainID][common.HexToAddress(rfqRequest.Data.DestTokenAddr)],
+        OriginBalance: func() *big.Int {
+            if chainBalances, ok := inv[rfqRequest.Data.OriginChainID]; ok {
+                return chainBalances[common.HexToAddress(rfqRequest.Data.OriginTokenAddr)]
+            }
+            return nil
+        }(),
+        DestBalance: func() *big.Int {
+            if chainBalances, ok := inv[rfqRequest.Data.DestChainID]; ok {
+                return chainBalances[common.HexToAddress(rfqRequest.Data.DestTokenAddr)]
+            }
+            return nil
+        }(),
     }
     if quoteInput.OriginBalance == nil || quoteInput.DestBalance == nil {
         return nil, fmt.Errorf("insufficient inventory balances for the provided chain IDs and token addresses")
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
OriginBalance: inv[rfqRequest.Data.OriginChainID][common.HexToAddress(rfqRequest.Data.OriginTokenAddr)],
DestBalance: inv[rfqRequest.Data.DestChainID][common.HexToAddress(rfqRequest.Data.DestTokenAddr)],
OriginBalance: func() *big.Int {
if chainBalances, ok := inv[rfqRequest.Data.OriginChainID]; ok {
return chainBalances[common.HexToAddress(rfqRequest.Data.OriginTokenAddr)]
}
return nil
}(),
DestBalance: func() *big.Int {
if chainBalances, ok := inv[rfqRequest.Data.DestChainID]; ok {
return chainBalances[common.HexToAddress(rfqRequest.Data.DestTokenAddr)]
}
return nil
}(),

}

rawQuote, err := m.generateQuote(ctx, quoteInput)
if err != nil {
return nil, fmt.Errorf("error generating quote: %w", err)
}
span.SetAttributes(attribute.String("dest_amount", rawQuote.DestAmount))

rfqResp := model.WsRFQResponse{
RequestID: rfqRequest.RequestID,
DestAmount: rawQuote.DestAmount,
}
respBytes, err := json.Marshal(rfqResp)
if err != nil {
return nil, fmt.Errorf("error serializing response: %w", err)
}
resp = &model.ActiveRFQMessage{
Op: rest.SendQuoteOp,
Content: respBytes,
}
span.AddEvent("generated response")

return resp, nil
}

// GetPrice gets the price of a token.
func (m *Manager) GetPrice(parentCtx context.Context, tokenName string) (_ float64, err error) {
ctx, span := m.metricsHandler.Tracer().Start(parentCtx, "GetPrice")
Expand Down
6 changes: 4 additions & 2 deletions services/rfq/relayer/relconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ type Config struct {
BaseChainConfig ChainConfig `yaml:"base_chain_config"`
// OmniRPCURL is the URL of the OmniRPC server.
OmniRPCURL string `yaml:"omnirpc_url"`
// RfqAPIURL is the URL of the RFQ API.
RfqAPIURL string `yaml:"rfq_url"`
// RFQAPIURL is the URL of the RFQ API.
RFQAPIURL string `yaml:"rfq_url"`
// RelayerAPIPort is the port of the relayer API.
RelayerAPIPort string `yaml:"relayer_api_port"`
// Database is the database config.
Expand Down Expand Up @@ -67,6 +67,8 @@ type Config struct {
SubmitSingleQuotes bool `yaml:"submit_single_quotes"`
// VolumeLimit is the maximum dollar value of relayed transactions in the BlockWindow.
VolumeLimit float64 `yaml:"volume_limit"`
// SupportActiveQuoting enables support for active quoting.
SupportActiveQuoting bool `yaml:"support_active_quoting"`
}

// ChainConfig represents the configuration for a chain.
Expand Down
6 changes: 3 additions & 3 deletions services/rfq/relayer/relconfig/getters.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,9 +541,9 @@ func (c Config) GetOmniRPCURL() string {
return c.OmniRPCURL
}

// GetRfqAPIURL returns the RFQ API URL.
func (c Config) GetRfqAPIURL() string {
return c.RfqAPIURL
// GetRFQAPIURL returns the RFQ API URL.
func (c Config) GetRFQAPIURL() string {
return c.RFQAPIURL
}

// GetDatabase returns the database config.
Expand Down
12 changes: 11 additions & 1 deletion services/rfq/relayer/service/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func NewRelayer(ctx context.Context, metricHandler metrics.Handler, cfg relconfi
priceFetcher := pricer.NewCoingeckoPriceFetcher(cfg.GetHTTPTimeout())
fp := pricer.NewFeePricer(cfg, omniClient, priceFetcher, metricHandler)

apiClient, err := rfqAPIClient.NewAuthenticatedClient(metricHandler, cfg.GetRfqAPIURL(), sg)
apiClient, err := rfqAPIClient.NewAuthenticatedClient(metricHandler, cfg.GetRFQAPIURL(), sg)
if err != nil {
return nil, fmt.Errorf("error creating RFQ API client: %w", err)
}
Expand Down Expand Up @@ -219,6 +219,16 @@ func (r *Relayer) Start(ctx context.Context) (err error) {
}
})

if r.cfg.SupportActiveQuoting {
g.Go(func() error {
err = r.quoter.SubscribeActiveRFQ(ctx)
if err != nil {
return fmt.Errorf("could not subscribe to active RFQ: %w", err)
}
return nil
})
}

g.Go(func() error {
for {
select {
Expand Down
Loading