Skip to content

Commit

Permalink
feat(rfq): active quoting API (#3128)
Browse files Browse the repository at this point in the history
* WIP: initial websocket wiring

* WIP: add ws client and handling

* Fix: receive respsects context

* Cleanup: split into rfq.go

* Fix: build

* Feat: add mocked ws client

* Fix: build

* Feat: add SubscribeActiveQuotes() to client

* Feat: add PutUserQuoteRequest() to api client

* Fix: build

* WIP: rfq tests with ws auth

* WIP: test fixes

* Feat: working TestHandleActiveRFQ

* Feat: add expired request case

* WIP: functionalize test relayer resps

* Feat: add runMockRelayer with multiple relayers

* Feat: add MultipleRelayers case

* Feat: add FallbackToPassive case

* Fix: bigint ptr issue

* Cleanup: bump expiration window

* WIP: logs

* Feat: split into separate tests

* Cleanup: logs

* Feat: add PassiveBestQuote case

* WIP: update db interface with new models

* Feat: impl new db funcs

* Feat: insert models within api server

* Feat: update quote request / response statuses

* Fix: db error handling

* Fix: api tests

* Feat: add initial response validation

* Feat: impl pingpong

* Fix: register models

* Feat: verify quote request in SingleRelayer case

* Feat: verify more db requests

* Cleanup: common vars

* Cleanup: break down handleActiveRFQ into sub funcs

* Cleanup: comments

* Cleanup: remove unused mock

* Fix: builds

* Feat: make relayer response data optional to signify null resp

* Fix: response primary key on quote id

* Fix: build

* Feat: update swagger docs

* WIP: generic pubsub

* Feat: add basic PubSubManager

* Feat: implement subscription / unsubscription operations

* Feat: respond to subscribe operation

* Feat: add runWsListener helper

* Cleanup: reduce chan buffer

* Cleanup: lints

* Cleanup: break down into smaller funcs

* Cleanup: refactor ws client

* Cleanup: more lints

* Fix: build

* Cleanup: lints

* Feat: mark as fulfilled when updating request status

* Cleanup: lint

* Skip broken test for now

* Cleanup: lint

* Feat: add open_quote_requests endpoint with test

* Feat: add new open request model

* Update swagger

* go mod tidy

* fix error

* Fix: respecting context

* Replace: Fulfilled -> Closed

* Cleanup: use errors.New()

* Feat: ReceiveQuoteResponse specifies request id

* Cleanup: remove logs

* Feat: add some tracing

* Feat: add IntegratorID

* Feat: remove repetitive fields from relayer quote response, move requests to requests.go

* Cleanup: use new routes

* Cleanup: migrate req/res struct naming

* Cleanup: update swagger

* Cleanup: lint

* [goreleaser]

* Feat: run ws endpoint within existing server

* [goreleaser]

* Fix: build

* [goreleaser]

* Feat: add more tracing

* [goreleaser]

* feat(rfq-relayer): relayer supports active quoting (#3198)

* Feat: add active rfq subscription on quoter

* Feat: relayer subscribes to active quotes upon starting

* [goreleaser]

* Feat: specify ws url in relayer

* [goreleaser]

* [goreleaser]

* Fix: build

* [goreleaser]

* Feat: relayer tracing

* [goreleaser]

* Feat: use supports_active_quoting instead of ws url

* [goreleaser]

* WIP: add logs

* [goreleaser]

* WIP: more logs

* [goreleaser]

* More logs

* [goreleaser]

* More logs

* [goreleaser]

* More logs

* [goreleaser]

* Close conn when encountering write err

* [goreleaser]

* More logs

* [goreleaser]

* More logs

* [goreleaser]

* More logs

* [goreleaser]

* More logs

* [goreleaser]

* Logs with ts

* [goreleaser]

* More tracing

* [goreleaser]

* Fix: send to reqChan

* [goreleaser]

* Check for zero pong time

* Fix: make close_at and closed_quote_id optional

* [goreleaser]

* Feat: remove extra fields from responses

* [goreleaser]

* Fix: skip passive quote

* [goreleaser]

* Cleanup: remove logs

* Fix: use correct span

* Cleanup: remove logs

* Fix: build

* Cleanup: lint

* Cleanup: lint

* Cleanup: update swagger

* Feat: client sends pings, server sends pongs

* [goreleaser]

* Cleanup: remove unused func

* WIP: ws error handling

* [goreleaser]

* Feat: ws client uses errgroup

* Cleanup: remove log

* [goreleaser]

* Replace: PutUserQuoteResponse -> PutRFQResponse

* Feat: add QuoteID to PutRFQResponse

* [goreleaser]

* Cleanup: lint

* Fix: build

* Cleanup: lint

* [goreleaser]

* Add logs

* [goreleaser]

* Add logs

* [goreleaser]

* Cleanup: remove logs

---------

Co-authored-by: Trajan0x <[email protected]>
  • Loading branch information
dwasse and trajan0x authored Oct 2, 2024
1 parent da18f00 commit ad48cb0
Show file tree
Hide file tree
Showing 33 changed files with 2,814 additions and 122 deletions.
1 change: 1 addition & 0 deletions contrib/opbot/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ require (
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.54.0 // indirect
github.com/prometheus/procfs v0.15.0 // indirect
github.com/puzpuzpuz/xsync v1.5.2 // indirect
github.com/puzpuzpuz/xsync/v2 v2.5.1 // indirect
github.com/richardwilkes/toolbox v1.74.0 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
Expand Down
241 changes: 222 additions & 19 deletions services/rfq/api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ package client

import (
"context"
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"
"time"

"github.com/ipfs/go-log"

"github.com/google/uuid"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"

Expand All @@ -17,17 +21,23 @@ import (

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/go-resty/resty/v2"
"github.com/gorilla/websocket"
"github.com/synapsecns/sanguine/ethergo/signer/signer"
"github.com/synapsecns/sanguine/services/rfq/api/model"
"github.com/synapsecns/sanguine/services/rfq/api/rest"
)

var logger = log.Logger("rfq-client")

const pingPeriod = 20 * time.Second

// AuthenticatedClient is an interface for the RFQ API.
// It provides methods for creating, retrieving and updating quotes.
type AuthenticatedClient interface {
PutQuote(ctx context.Context, q *model.PutQuoteRequest) error
PutQuote(ctx context.Context, q *model.PutRelayerQuoteRequest) error
PutBulkQuotes(ctx context.Context, q *model.PutBulkQuotesRequest) error
PutRelayAck(ctx context.Context, req *model.PutAckRequest) (*model.PutRelayAckResponse, error)
SubscribeActiveQuotes(ctx context.Context, req *model.SubscribeActiveRFQRequest, reqChan chan *model.ActiveRFQMessage) (respChan chan *model.ActiveRFQMessage, err error)
UnauthenticatedClient
}

Expand All @@ -37,6 +47,7 @@ type UnauthenticatedClient interface {
GetSpecificQuote(ctx context.Context, q *model.GetQuoteSpecificRequest) ([]*model.GetQuoteResponse, error)
GetQuoteByRelayerAddress(ctx context.Context, relayerAddr string) ([]*model.GetQuoteResponse, error)
GetRFQContracts(ctx context.Context) (*model.GetContractsResponse, error)
PutRFQRequest(ctx context.Context, q *model.PutRFQRequest) (*model.PutRFQResponse, error)
resty() *resty.Client
}

Expand All @@ -50,7 +61,8 @@ func (c unauthenticatedClient) resty() *resty.Client {

type clientImpl struct {
UnauthenticatedClient
rClient *resty.Client
rClient *resty.Client
reqSigner signer.Signer
}

// NewAuthenticatedClient creates a new client for the RFQ quoting API.
Expand All @@ -65,33 +77,40 @@ func NewAuthenticatedClient(metrics metrics.Handler, rfqURL string, reqSigner si
// to a new variable for clarity.
authedClient := unauthedClient.resty().
OnBeforeRequest(func(client *resty.Client, request *resty.Request) error {
// if request.Method == "PUT" && request.URL == rfqURL+rest.QUOTE_ROUTE {
// i.e. signature (hex encoded) = keccak(bytes.concat("\x19Ethereum Signed Message:\n", len(strconv.Itoa(time.Now().Unix()), strconv.Itoa(time.Now().Unix())))
// so that full auth header string: auth = strconv.Itoa(time.Now().Unix()) + ":" + signature
// Get the current Unix timestamp as a string.
now := strconv.Itoa(int(time.Now().Unix()))

// Prepare the data to be signed.
data := "\x19Ethereum Signed Message:\n" + strconv.Itoa(len(now)) + now

sig, err := reqSigner.SignMessage(request.Context(), []byte(data), true)

authHeader, err := getAuthHeader(request.Context(), reqSigner)
if err != nil {
return fmt.Errorf("failed to sign request: %w", err)
return fmt.Errorf("failed to get auth header: %w", err)
}

res := fmt.Sprintf("%s:%s", now, hexutil.Encode(signer.Encode(sig)))
request.SetHeader("Authorization", res)

request.SetHeader(rest.AuthorizationHeader, authHeader)
return nil
})

return &clientImpl{
UnauthenticatedClient: unauthedClient,
rClient: authedClient,
reqSigner: reqSigner,
}, nil
}

func getAuthHeader(ctx context.Context, reqSigner signer.Signer) (string, error) {
// if request.Method == "PUT" && request.URL == rfqURL+rest.QUOTE_ROUTE {
// i.e. signature (hex encoded) = keccak(bytes.concat("\x19Ethereum Signed Message:\n", len(strconv.Itoa(time.Now().Unix()), strconv.Itoa(time.Now().Unix())))
// so that full auth header string: auth = strconv.Itoa(time.Now().Unix()) + ":" + signature
// Get the current Unix timestamp as a string.
now := strconv.Itoa(int(time.Now().Unix()))

// Prepare the data to be signed.
data := "\x19Ethereum Signed Message:\n" + strconv.Itoa(len(now)) + now

sig, err := reqSigner.SignMessage(ctx, []byte(data), true)

if err != nil {
return "", fmt.Errorf("failed to sign request: %w", err)
}

return fmt.Sprintf("%s:%s", now, hexutil.Encode(signer.Encode(sig))), nil
}

// NewUnauthenticatedClient creates a new client for the RFQ quoting API.
func NewUnauthenticatedClient(metricHandler metrics.Handler, rfqURL string) (UnauthenticatedClient, error) {
client := resty.New().
Expand All @@ -115,7 +134,7 @@ func NewUnauthenticatedClient(metricHandler metrics.Handler, rfqURL string) (Una
}

// PutQuote puts a new quote in the RFQ quoting API.
func (c *clientImpl) PutQuote(ctx context.Context, q *model.PutQuoteRequest) error {
func (c *clientImpl) PutQuote(ctx context.Context, q *model.PutRelayerQuoteRequest) error {
res, err := c.rClient.R().
SetContext(ctx).
SetBody(q).
Expand Down Expand Up @@ -159,6 +178,171 @@ func (c *clientImpl) PutRelayAck(ctx context.Context, req *model.PutAckRequest)
return ack, nil
}

func (c *clientImpl) SubscribeActiveQuotes(ctx context.Context, req *model.SubscribeActiveRFQRequest, reqChan chan *model.ActiveRFQMessage) (respChan chan *model.ActiveRFQMessage, err error) {
conn, err := c.connectWebsocket(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to connect to websocket: %w", err)
}
// first, subscrbe to the given chains
sub := model.SubscriptionParams{
Chains: req.ChainIDs,
}
subJSON, err := json.Marshal(sub)
if err != nil {
return respChan, fmt.Errorf("error marshaling subscription params: %w", err)
}
err = conn.WriteJSON(model.ActiveRFQMessage{
Op: rest.SubscribeOp,
Content: json.RawMessage(subJSON),
})
if err != nil {
return nil, fmt.Errorf("error sending subscribe message: %w", err)
}
// make sure subscription is successful
var resp model.ActiveRFQMessage
err = conn.ReadJSON(&resp)
if err != nil {
return nil, fmt.Errorf("error reading subscribe response: %w", err)
}
if !resp.Success || resp.Op != rest.SubscribeOp {
return nil, fmt.Errorf("subscription failed")
}

respChan = make(chan *model.ActiveRFQMessage)
go func() {
wsErr := c.processWebsocket(ctx, conn, reqChan, respChan)
if wsErr != nil {
logger.Error("Error running websocket listener: %s", wsErr)
}
}()

return respChan, nil
}

func (c *clientImpl) connectWebsocket(ctx context.Context, req *model.SubscribeActiveRFQRequest) (conn *websocket.Conn, err error) {
if len(req.ChainIDs) == 0 {
return nil, fmt.Errorf("chain IDs are required")
}

header, err := c.getWsHeaders(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to get auth header: %w", err)
}

reqURL := strings.Replace(c.rClient.BaseURL, "http", "ws", 1) + rest.RFQStreamRoute
conn, httpResp, err := websocket.DefaultDialer.Dial(reqURL, header)
if err != nil {
return nil, fmt.Errorf("failed to connect to websocket: %w", err)
}
err = httpResp.Body.Close()
if err != nil {
logger.Warnf("error closing websocket connection: %v", err)
}

return conn, nil
}

func (c *clientImpl) getWsHeaders(ctx context.Context, req *model.SubscribeActiveRFQRequest) (header http.Header, err error) {
header = http.Header{}
chainIDsJSON, err := json.Marshal(req.ChainIDs)
if err != nil {
return header, fmt.Errorf("failed to marshal chain IDs: %w", err)
}
header.Set(rest.ChainsHeader, string(chainIDsJSON))
authHeader, err := getAuthHeader(ctx, c.reqSigner)
if err != nil {
return header, fmt.Errorf("failed to get auth header: %w", err)
}
header.Set(rest.AuthorizationHeader, authHeader)
return header, nil
}

func (c *clientImpl) processWebsocket(ctx context.Context, conn *websocket.Conn, reqChan, respChan chan *model.ActiveRFQMessage) (err error) {
defer func() {
close(respChan)
err := conn.Close()
if err != nil {
logger.Warnf("error closing websocket connection: %v", err)
}
}()

readChan := make(chan []byte)
go c.listenWsMessages(ctx, conn, readChan)
go c.sendPings(ctx, reqChan)

for {
select {
case <-ctx.Done():
return nil
case msg, ok := <-reqChan:
if !ok {
return fmt.Errorf("error reading from reqChan: %w", ctx.Err())
}
err := conn.WriteJSON(msg)
if err != nil {
return fmt.Errorf("error sending message to websocket: %w", err)
}
case msg, ok := <-readChan:
if !ok {
return nil
}
err = c.handleWsMessage(ctx, msg, respChan)
if err != nil {
return fmt.Errorf("error handling websocket message: %w", err)
}
}
}
}

func (c *clientImpl) sendPings(ctx context.Context, reqChan chan *model.ActiveRFQMessage) {
pingTicker := time.NewTicker(pingPeriod)
defer pingTicker.Stop()

for {
select {
case <-pingTicker.C:
pingMsg := model.ActiveRFQMessage{
Op: rest.PingOp,
}
reqChan <- &pingMsg
case <-ctx.Done():
return
}
}
}
func (c *clientImpl) listenWsMessages(ctx context.Context, conn *websocket.Conn, readChan chan []byte) {
defer close(readChan)
for {
_, message, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
logger.Warnf("websocket connection closed unexpectedly: %v", err)
}
return
}
select {
case readChan <- message:
case <-ctx.Done():
return
}
}
}

func (c *clientImpl) handleWsMessage(ctx context.Context, msg []byte, respChan chan *model.ActiveRFQMessage) (err error) {
var rfqMsg model.ActiveRFQMessage
err = json.Unmarshal(msg, &rfqMsg)
if err != nil {
return fmt.Errorf("error unmarshaling message: %w", err)
}

select {
case respChan <- &rfqMsg:
case <-ctx.Done():
return nil
}
return nil
}

// GetAllQuotes retrieves all quotes from the RFQ quoting API.
func (c *unauthenticatedClient) GetAllQuotes(ctx context.Context) ([]*model.GetQuoteResponse, error) {
var quotes []*model.GetQuoteResponse
Expand Down Expand Up @@ -242,6 +426,25 @@ func (c unauthenticatedClient) GetRFQContracts(ctx context.Context) (*model.GetC
return contracts, nil
}

func (c unauthenticatedClient) PutRFQRequest(ctx context.Context, q *model.PutRFQRequest) (*model.PutRFQResponse, error) {
var response model.PutRFQResponse
resp, err := c.rClient.R().
SetContext(ctx).
SetBody(q).
SetResult(&response).
Put(rest.RFQRoute)

if err != nil {
return nil, fmt.Errorf("error from server: %s: %w", getStatus(resp), err)
}

if resp.IsError() {
return nil, fmt.Errorf("error from server: %s", getStatus(resp))
}

return &response, nil
}

func getStatus(resp *resty.Response) string {
if resp == nil {
return "http status unavailable"
Expand Down
8 changes: 4 additions & 4 deletions services/rfq/api/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
// TODO: @aurelius tese tests make a lot less sesnes w/ a composite index

func (c *ClientSuite) TestPutAndGetQuote() {
req := model.PutQuoteRequest{
req := model.PutRelayerQuoteRequest{
OriginChainID: 1,
OriginTokenAddr: "0xOriginTokenAddr",
DestChainID: 42161,
Expand Down Expand Up @@ -40,7 +40,7 @@ func (c *ClientSuite) TestPutAndGetQuote() {

func (c *ClientSuite) TestPutAndGetBulkQuotes() {
req := model.PutBulkQuotesRequest{
Quotes: []model.PutQuoteRequest{
Quotes: []model.PutRelayerQuoteRequest{
{
OriginChainID: 1,
OriginTokenAddr: "0xOriginTokenAddr",
Expand Down Expand Up @@ -98,7 +98,7 @@ func (c *ClientSuite) TestPutAndGetBulkQuotes() {
}

func (c *ClientSuite) TestGetSpecificQuote() {
req := model.PutQuoteRequest{
req := model.PutRelayerQuoteRequest{
OriginChainID: 1,
OriginTokenAddr: "0xOriginTokenAddr",
DestChainID: 42161,
Expand Down Expand Up @@ -135,7 +135,7 @@ func (c *ClientSuite) TestGetSpecificQuote() {
}

func (c *ClientSuite) TestGetQuoteByRelayerAddress() {
req := model.PutQuoteRequest{
req := model.PutRelayerQuoteRequest{
OriginChainID: 1,
OriginTokenAddr: "0xOriginTokenAddr",
DestChainID: 42161,
Expand Down
27 changes: 27 additions & 0 deletions services/rfq/api/db/activequoterequeststatus_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit ad48cb0

Please sign in to comment.