-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(rfq-relayer): relayer supports active quoting #3198
Changes from 18 commits
d93e20f
481f043
fdbc865
77c51e8
d10d56d
d2b1701
f06a64b
f6300a1
2646149
ea61286
dcd264a
02bf53c
a83253f
6febf7b
3ce1bd3
460f5ac
9ec49cb
92b49ec
e85ff62
f4ed5b5
0c0b562
1742fe3
0d7a7c4
4828dfc
ab9513d
a2a2079
69ed171
fbccdf8
4b098f9
135f1ac
b4969e9
bf3adaa
82ed6bb
896d52d
0018269
c3f0eb3
50b969e
64389c4
0c07fe4
719361a
26c9174
ccd24b3
9688fa7
b98ca8c
739472d
7d7f6df
6a5590f
56afeff
bdb7539
ab15e5d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -275,7 +275,9 @@ func (r *QuoterAPIServer) AuthMiddleware() gin.HandlerFunc { | |
loggedRequest = &req | ||
} | ||
case RFQRoute, RFQStreamRoute: | ||
fmt.Println("RFQRoute, RFQStreamRoute") | ||
chainsHeader := c.GetHeader(ChainsHeader) | ||
fmt.Printf("got chains header: %s\n", chainsHeader) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Consider using a proper logging framework and removing debug logs. While adding logs can be helpful for debugging, using
Example using the existing if debugMode {
logger.Debug("RFQRoute or RFQStreamRoute hit")
logger.Debugf("Got chains header: %s", chainsHeader)
} |
||
if chainsHeader != "" { | ||
var chainIDs []int | ||
err = json.Unmarshal([]byte(chainsHeader), &chainIDs) | ||
|
@@ -434,28 +436,34 @@ func (r *QuoterAPIServer) PutRelayAck(c *gin.Context) { | |
// @Header 101 {string} X-Api-Version "API Version Number - See docs for more info" | ||
// @Router /quote_requests [get]. | ||
func (r *QuoterAPIServer) GetActiveRFQWebsocket(ctx context.Context, c *gin.Context) { | ||
fmt.Println("GetActiveRFQWebsocket") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Enhance logging with proper framework and more context. The current debug print statement can be improved for better logging practices:
Example improvement: logger.WithFields(log.Fields{
"relayerAddr": relayerAddr,
"remoteAddr": c.Request.RemoteAddr,
}).Debug("Handling WebSocket connection for active quote requests") Also, consider wrapping this log in a debug flag check to avoid unnecessary logging in production environments. |
||
ctx, span := r.handler.Tracer().Start(ctx, "GetActiveRFQWebsocket") | ||
defer func() { | ||
metrics.EndSpan(span) | ||
}() | ||
|
||
fmt.Println("upgrading websocket") | ||
ws, err := r.upgrader.Upgrade(c.Writer, c.Request, nil) | ||
if err != nil { | ||
logger.Error("Failed to set websocket upgrade", "error", err) | ||
return | ||
} | ||
fmt.Println("upgraded websocket") | ||
|
||
// use the relayer address as the ID for the connection | ||
rawRelayerAddr, exists := c.Get("relayerAddr") | ||
if !exists { | ||
fmt.Println("no relayer address recovered from signature") | ||
c.JSON(http.StatusBadRequest, gin.H{"error": "No relayer address recovered from signature"}) | ||
return | ||
} | ||
relayerAddr, ok := rawRelayerAddr.(string) | ||
if !ok { | ||
fmt.Println("invalid relayer address type") | ||
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid relayer address type"}) | ||
return | ||
} | ||
fmt.Printf("relayer address: %s\n", relayerAddr) | ||
|
||
span.SetAttributes( | ||
attribute.String("relayer_address", relayerAddr), | ||
|
@@ -464,6 +472,7 @@ func (r *QuoterAPIServer) GetActiveRFQWebsocket(ctx context.Context, c *gin.Cont | |
// only one connection per relayer allowed | ||
_, ok = r.wsClients.Load(relayerAddr) | ||
if ok { | ||
fmt.Println("relayer already connected") | ||
c.JSON(http.StatusBadRequest, gin.H{"error": "relayer already connected"}) | ||
return | ||
} | ||
|
@@ -474,12 +483,16 @@ func (r *QuoterAPIServer) GetActiveRFQWebsocket(ctx context.Context, c *gin.Cont | |
}() | ||
|
||
client := newWsClient(relayerAddr, ws, r.pubSubManager, r.handler) | ||
fmt.Println("registered ws client") | ||
r.wsClients.Store(relayerAddr, client) | ||
span.AddEvent("registered ws client") | ||
fmt.Println("running ws client") | ||
err = client.Run(ctx) | ||
if err != nil { | ||
fmt.Printf("error running ws client: %v\n", err) | ||
logger.Error("Error running websocket client", "error", err) | ||
} | ||
fmt.Println("ws client done") | ||
} | ||
|
||
const ( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -96,6 +96,7 @@ const ( | |
|
||
// Run runs the WebSocket client. | ||
func (c *wsClient) Run(ctx context.Context) (err error) { | ||
fmt.Println("running ws client") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Improve logging practices for better observability and consistency. While the added logging statements enhance visibility into the WebSocket client's operations, there are a few areas for improvement:
Here's a suggested refactor for improved logging: import "log" // Or your preferred logging framework
// ...
func (c *wsClient) Run(ctx context.Context) (err error) {
log.Println("Running ws client")
// ...
case req := <-c.requestChan:
log.Printf("Sending quote request: %v", req)
err = c.sendRelayerRequest(ctx, req)
if err != nil {
log.Printf("Error sending quote request: %v", err)
}
case msg := <-messageChan:
log.Printf("Received message: %s", msg)
err = c.handleRelayerMessage(ctx, msg)
if err != nil {
log.Printf("Error handling relayer message: %v", err)
}
// ...
} This approach provides consistent logging and avoids duplication while maintaining the improved visibility into the WebSocket client's operations. Also applies to: 118-118, 121-121, 125-125, 128-128 |
||
c.lastPong = time.Now() | ||
messageChan := make(chan []byte) | ||
pingTicker := time.NewTicker(PingPeriod) | ||
|
@@ -114,13 +115,17 @@ func (c *wsClient) Run(ctx context.Context) (err error) { | |
close(c.doneChan) | ||
return nil | ||
case req := <-c.requestChan: | ||
fmt.Printf("sending quote request: %v\n", req) | ||
err = c.sendRelayerRequest(ctx, req) | ||
if err != nil { | ||
fmt.Printf("error sending quote request: %v\n", err) | ||
logger.Error("Error sending quote request: %s", err) | ||
} | ||
case msg := <-messageChan: | ||
fmt.Printf("received message: %s\n", msg) | ||
err = c.handleRelayerMessage(ctx, msg) | ||
if err != nil { | ||
fmt.Printf("error handling relayer message: %v\n", err) | ||
logger.Error("Error handling relayer message: %s", err) | ||
} | ||
case <-pingTicker.C: | ||
|
@@ -204,6 +209,7 @@ func (c *wsClient) handleRelayerMessage(ctx context.Context, msg []byte) (err er | |
} | ||
|
||
func (c *wsClient) handleSubscribe(ctx context.Context, content json.RawMessage) (resp model.ActiveRFQMessage) { | ||
fmt.Printf("handleSubscribe: %s\n", content) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Enhance logging and error handling in the The added logging improves visibility into the subscription process, but there are opportunities for improvement:
Here's a suggested refactor for improved logging and error handling: func (c *wsClient) handleSubscribe(ctx context.Context, content json.RawMessage) (resp model.ActiveRFQMessage) {
log.Printf("Handling subscribe request: %s", content)
// ...
err = c.pubsub.AddSubscription(c.relayerAddr, sub)
if err != nil {
log.Printf("Error adding subscription: %v", err)
return getErrorResponse(SubscribeOp, err)
}
log.Printf("Successfully added subscription for relayer %s, chain IDs: %v", c.relayerAddr, sub.Chains)
return getSuccessResponse(SubscribeOp)
} This refactor provides more consistent and informative logging while avoiding duplication of error information. Also applies to: 228-228, 231-231 |
||
ctx, span := c.handler.Tracer().Start(ctx, "handleSubscribe", trace.WithAttributes( | ||
attribute.String("relayer_address", c.relayerAddr), | ||
)) | ||
|
@@ -219,8 +225,10 @@ func (c *wsClient) handleSubscribe(ctx context.Context, content json.RawMessage) | |
span.SetAttributes(attribute.IntSlice("chain_ids", sub.Chains)) | ||
err = c.pubsub.AddSubscription(c.relayerAddr, sub) | ||
if err != nil { | ||
fmt.Printf("error adding subscription: %v\n", err) | ||
return getErrorResponse(SubscribeOp, fmt.Errorf("error adding subscription: %w", err)) | ||
} | ||
fmt.Printf("successfully added subscription for chain ids: %v\n", sub.Chains) | ||
return getSuccessResponse(SubscribeOp) | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -3,6 +3,7 @@ package quoter | |||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
import ( | ||||||||||||||||||||||||||||||
"context" | ||||||||||||||||||||||||||||||
"encoding/json" | ||||||||||||||||||||||||||||||
"errors" | ||||||||||||||||||||||||||||||
"fmt" | ||||||||||||||||||||||||||||||
"math/big" | ||||||||||||||||||||||||||||||
|
@@ -31,6 +32,7 @@ import ( | |||||||||||||||||||||||||||||
"github.com/synapsecns/sanguine/ethergo/signer/signer" | ||||||||||||||||||||||||||||||
rfqAPIClient "github.com/synapsecns/sanguine/services/rfq/api/client" | ||||||||||||||||||||||||||||||
"github.com/synapsecns/sanguine/services/rfq/api/model" | ||||||||||||||||||||||||||||||
"github.com/synapsecns/sanguine/services/rfq/api/rest" | ||||||||||||||||||||||||||||||
"github.com/synapsecns/sanguine/services/rfq/relayer/inventory" | ||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
|
@@ -42,6 +44,8 @@ var logger = log.Logger("quoter") | |||||||||||||||||||||||||||||
type Quoter interface { | ||||||||||||||||||||||||||||||
// SubmitAllQuotes submits all quotes to the RFQ API. | ||||||||||||||||||||||||||||||
SubmitAllQuotes(ctx context.Context) (err error) | ||||||||||||||||||||||||||||||
// SubscribeActiveRFQ subscribes to the RFQ websocket API. | ||||||||||||||||||||||||||||||
SubscribeActiveRFQ(ctx context.Context) (err error) | ||||||||||||||||||||||||||||||
Comment on lines
+47
to
+48
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding a new method to public interface Adding |
||||||||||||||||||||||||||||||
// ShouldProcess determines if a quote should be processed. | ||||||||||||||||||||||||||||||
// We do this by either saving all quotes in-memory, and refreshing via GetSelfQuotes() through the API | ||||||||||||||||||||||||||||||
// The first comparison is does bridge transaction OriginChainID+TokenAddr match with a quote + DestChainID+DestTokenAddr, then we look to see if we have enough amount to relay it + if the price fits our bounds (based on that the Relayer is relaying the destination token for the origin) | ||||||||||||||||||||||||||||||
|
@@ -251,6 +255,102 @@ func (m *Manager) SubmitAllQuotes(ctx context.Context) (err error) { | |||||||||||||||||||||||||||||
return m.prepareAndSubmitQuotes(ctx, inv) | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
// SubscribeActiveRFQ subscribes to the RFQ websocket API. | ||||||||||||||||||||||||||||||
// This function is blocking and will run until the context is cancelled. | ||||||||||||||||||||||||||||||
func (m *Manager) SubscribeActiveRFQ(ctx context.Context) (err error) { | ||||||||||||||||||||||||||||||
ctx, span := m.metricsHandler.Tracer().Start(ctx, "SubscribeActiveRFQ") | ||||||||||||||||||||||||||||||
defer func() { | ||||||||||||||||||||||||||||||
metrics.EndSpanWithErr(span, err) | ||||||||||||||||||||||||||||||
}() | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
chainIDs := []int{} | ||||||||||||||||||||||||||||||
for chainID := range m.config.Chains { | ||||||||||||||||||||||||||||||
chainIDs = append(chainIDs, chainID) | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
req := model.SubscribeActiveRFQRequest{ | ||||||||||||||||||||||||||||||
ChainIDs: chainIDs, | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
span.SetAttributes(attribute.IntSlice("chain_ids", chainIDs)) | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
reqChan := make(chan *model.ActiveRFQMessage) | ||||||||||||||||||||||||||||||
respChan, err := m.rfqClient.SubscribeActiveQuotes(ctx, &req, reqChan) | ||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||
return fmt.Errorf("error subscribing to active quotes: %w", err) | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
span.AddEvent("subscribed to active quotes") | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
for { | ||||||||||||||||||||||||||||||
select { | ||||||||||||||||||||||||||||||
case <-ctx.Done(): | ||||||||||||||||||||||||||||||
return | ||||||||||||||||||||||||||||||
case msg := <-respChan: | ||||||||||||||||||||||||||||||
resp, err := m.generateActiveRFQ(ctx, msg) | ||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||
return fmt.Errorf("error generating active RFQ message: %w", err) | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
Comment on lines
+294
to
+296
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Handle errors without terminating the subscription loop Returning an error here will exit the Apply this diff to adjust error handling: resp, err := m.generateActiveRFQ(ctx, msg)
if err != nil {
- return fmt.Errorf("error generating active RFQ message: %w", err)
+ span.RecordError(err)
+ logger.Error("error generating active RFQ message", "error", err)
+ continue
}
|
||||||||||||||||||||||||||||||
respChan <- resp | ||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Potential deadlock: Sending responses to In the select loop, you are sending responses back to Apply this diff to fix the issue: case msg := <-respChan:
resp, err := m.generateActiveRFQ(ctx, msg)
if err != nil {
return fmt.Errorf("error generating active RFQ message: %w", err)
}
- respChan <- resp
+ reqChan <- resp 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
// getActiveRFQ handles an active RFQ message. | ||||||||||||||||||||||||||||||
func (m *Manager) generateActiveRFQ(ctx context.Context, msg *model.ActiveRFQMessage) (resp *model.ActiveRFQMessage, err error) { | ||||||||||||||||||||||||||||||
ctx, span := m.metricsHandler.Tracer().Start(ctx, "generateActiveRFQ", trace.WithAttributes( | ||||||||||||||||||||||||||||||
attribute.String("op", msg.Op), | ||||||||||||||||||||||||||||||
)) | ||||||||||||||||||||||||||||||
defer func() { | ||||||||||||||||||||||||||||||
metrics.EndSpanWithErr(span, err) | ||||||||||||||||||||||||||||||
}() | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
if msg.Op != rest.RequestQuoteOp { | ||||||||||||||||||||||||||||||
span.AddEvent("not a request quote op") | ||||||||||||||||||||||||||||||
return nil, nil | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
inv, err := m.inventoryManager.GetCommittableBalances(ctx, inventory.SkipDBCache()) | ||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||
return nil, fmt.Errorf("error getting committable balances: %w", err) | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
var rfqRequest model.WsRFQRequest | ||||||||||||||||||||||||||||||
err = json.Unmarshal(msg.Content, &rfqRequest) | ||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||
return nil, fmt.Errorf("error unmarshalling quote data: %w", err) | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
span.SetAttributes(attribute.String("request_id", rfqRequest.RequestID)) | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
quoteInput := QuoteInput{ | ||||||||||||||||||||||||||||||
OriginChainID: rfqRequest.Data.OriginChainID, | ||||||||||||||||||||||||||||||
DestChainID: rfqRequest.Data.DestChainID, | ||||||||||||||||||||||||||||||
OriginTokenAddr: common.HexToAddress(rfqRequest.Data.OriginTokenAddr), | ||||||||||||||||||||||||||||||
DestTokenAddr: common.HexToAddress(rfqRequest.Data.DestTokenAddr), | ||||||||||||||||||||||||||||||
OriginBalance: inv[rfqRequest.Data.OriginChainID][common.HexToAddress(rfqRequest.Data.OriginTokenAddr)], | ||||||||||||||||||||||||||||||
DestBalance: inv[rfqRequest.Data.DestChainID][common.HexToAddress(rfqRequest.Data.DestTokenAddr)], | ||||||||||||||||||||||||||||||
Comment on lines
+334
to
+335
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Possible nil map access without existence checks Accessing Apply this diff to safely access the map entries: quoteInput := QuoteInput{
OriginChainID: rfqRequest.Data.OriginChainID,
DestChainID: rfqRequest.Data.DestChainID,
OriginTokenAddr: common.HexToAddress(rfqRequest.Data.OriginTokenAddr),
DestTokenAddr: common.HexToAddress(rfqRequest.Data.DestTokenAddr),
- OriginBalance: inv[rfqRequest.Data.OriginChainID][common.HexToAddress(rfqRequest.Data.OriginTokenAddr)],
- DestBalance: inv[rfqRequest.Data.DestChainID][common.HexToAddress(rfqRequest.Data.DestTokenAddr)],
+ OriginBalance: func() *big.Int {
+ if chainBalances, ok := inv[rfqRequest.Data.OriginChainID]; ok {
+ return chainBalances[common.HexToAddress(rfqRequest.Data.OriginTokenAddr)]
+ }
+ return nil
+ }(),
+ DestBalance: func() *big.Int {
+ if chainBalances, ok := inv[rfqRequest.Data.DestChainID]; ok {
+ return chainBalances[common.HexToAddress(rfqRequest.Data.DestTokenAddr)]
+ }
+ return nil
+ }(),
}
if quoteInput.OriginBalance == nil || quoteInput.DestBalance == nil {
return nil, fmt.Errorf("insufficient inventory balances for the provided chain IDs and token addresses")
} 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
rawQuote, err := m.generateQuote(ctx, quoteInput) | ||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||
return nil, fmt.Errorf("error generating quote: %w", err) | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
span.SetAttributes(attribute.String("dest_amount", rawQuote.DestAmount)) | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
rfqResp := model.WsRFQResponse{ | ||||||||||||||||||||||||||||||
RequestID: rfqRequest.RequestID, | ||||||||||||||||||||||||||||||
DestAmount: rawQuote.DestAmount, | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
respBytes, err := json.Marshal(rfqResp) | ||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||
return nil, fmt.Errorf("error serializing response: %w", err) | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
resp = &model.ActiveRFQMessage{ | ||||||||||||||||||||||||||||||
Op: rest.SendQuoteOp, | ||||||||||||||||||||||||||||||
Content: respBytes, | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
span.AddEvent("generated response") | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
return resp, nil | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
// GetPrice gets the price of a token. | ||||||||||||||||||||||||||||||
func (m *Manager) GetPrice(parentCtx context.Context, tokenName string) (_ float64, err error) { | ||||||||||||||||||||||||||||||
ctx, span := m.metricsHandler.Tracer().Start(parentCtx, "GetPrice") | ||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve logging approach for WebSocket connection details
While the added print statements can be helpful for debugging, using
fmt.Printf
directly is not ideal for production code. Consider the following improvements:logger
variable (which is an instance oflog.Logger
) instead offmt.Printf
. This allows for better log management and potential log level control.Here's a suggested refactor:
Also, consider adding a helper function to sanitize headers before logging to ensure sensitive information is not exposed.
📝 Committable suggestion