-
Notifications
You must be signed in to change notification settings - Fork 154
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use Echo method to send inactivity probe #368
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ import ( | |
"github.com/go-logr/logr" | ||
"github.com/go-logr/stdr" | ||
"github.com/ovn-org/libovsdb/cache" | ||
syscall "github.com/ovn-org/libovsdb/internal" | ||
"github.com/ovn-org/libovsdb/mapper" | ||
"github.com/ovn-org/libovsdb/model" | ||
"github.com/ovn-org/libovsdb/ovsdb" | ||
|
@@ -87,6 +88,7 @@ type ovsdbClient struct { | |
metrics metrics | ||
connected bool | ||
rpcClient *rpc2.Client | ||
conn net.Conn | ||
rpcMutex sync.RWMutex | ||
// endpoints contains all possible endpoints; the first element is | ||
// the active endpoint if connected=true | ||
|
@@ -351,7 +353,10 @@ func (o *ovsdbClient) tryEndpoint(ctx context.Context, u *url.URL) (string, erro | |
return "", fmt.Errorf("failed to open connection: %w", err) | ||
} | ||
|
||
o.createRPC2Client(c) | ||
err = o.createRPC2Client(c) | ||
if err != nil { | ||
return "", err | ||
} | ||
|
||
serverDBNames, err := o.listDbs(ctx) | ||
if err != nil { | ||
|
@@ -422,11 +427,24 @@ func (o *ovsdbClient) tryEndpoint(ctx context.Context, u *url.URL) (string, erro | |
// createRPC2Client creates an rpcClient using the provided connection | ||
// It is also responsible for setting up go routines for client-side event handling | ||
// Should only be called when the mutex is held | ||
func (o *ovsdbClient) createRPC2Client(conn net.Conn) { | ||
func (o *ovsdbClient) createRPC2Client(conn net.Conn) error { | ||
o.stopCh = make(chan struct{}) | ||
if o.options.inactivityTimeout > 0 { | ||
o.trafficSeen = make(chan struct{}) | ||
} | ||
o.conn = conn | ||
// set TCP_USER_TIMEOUT socket option for connection so that | ||
// channel write doesn't block indefinitely on network disconnect. | ||
var userTimeout time.Duration | ||
if o.options.timeout > 0 { | ||
userTimeout = o.options.timeout * 3 | ||
} else { | ||
userTimeout = defaultTimeout | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that to start with we shouldn't have a default, and just set it to |
||
} | ||
err := syscall.SetTCPUserTimeout(conn, userTimeout) | ||
if err != nil { | ||
return err | ||
} | ||
o.rpcClient = rpc2.NewClientWithCodec(jsonrpc.NewJSONCodec(conn)) | ||
o.rpcClient.SetBlocking(true) | ||
o.rpcClient.Handle("echo", func(_ *rpc2.Client, args []interface{}, reply *[]interface{}) error { | ||
|
@@ -442,6 +460,7 @@ func (o *ovsdbClient) createRPC2Client(conn net.Conn) { | |
return o.update3(args, reply) | ||
}) | ||
go o.rpcClient.Run() | ||
return nil | ||
} | ||
|
||
// isEndpointLeader returns true if the currently connected endpoint is leader, | ||
|
@@ -748,7 +767,7 @@ func (o *ovsdbClient) update3(params []json.RawMessage, reply *[]interface{}) er | |
func (o *ovsdbClient) getSchema(ctx context.Context, dbName string) (ovsdb.DatabaseSchema, error) { | ||
args := ovsdb.NewGetSchemaArgs(dbName) | ||
var reply ovsdb.DatabaseSchema | ||
err := o.rpcClient.CallWithContext(ctx, "get_schema", args, &reply) | ||
err := o.CallWithContext(ctx, "get_schema", args, &reply) | ||
if err != nil { | ||
if err == rpc2.ErrShutdown { | ||
return ovsdb.DatabaseSchema{}, ErrNotConnected | ||
|
@@ -763,7 +782,7 @@ func (o *ovsdbClient) getSchema(ctx context.Context, dbName string) (ovsdb.Datab | |
// Should only be called when mutex is held | ||
func (o *ovsdbClient) listDbs(ctx context.Context) ([]string, error) { | ||
var dbs []string | ||
err := o.rpcClient.CallWithContext(ctx, "list_dbs", nil, &dbs) | ||
err := o.CallWithContext(ctx, "list_dbs", nil, &dbs) | ||
if err != nil { | ||
if err == rpc2.ErrShutdown { | ||
return nil, ErrNotConnected | ||
|
@@ -836,7 +855,7 @@ func (o *ovsdbClient) transact(ctx context.Context, dbName string, skipChWrite b | |
if dbgLogger.Enabled() { | ||
dbgLogger.Info("transacting operations", "operations", fmt.Sprintf("%+v", operation)) | ||
} | ||
err := o.rpcClient.CallWithContext(ctx, "transact", args, &reply) | ||
err := o.CallWithContext(ctx, "transact", args, &reply) | ||
if err != nil { | ||
if err == rpc2.ErrShutdown { | ||
return nil, ErrNotConnected | ||
|
@@ -869,7 +888,7 @@ func (o *ovsdbClient) MonitorCancel(ctx context.Context, cookie MonitorCookie) e | |
if o.rpcClient == nil { | ||
return ErrNotConnected | ||
} | ||
err := o.rpcClient.CallWithContext(ctx, "monitor_cancel", args, &reply) | ||
err := o.CallWithContext(ctx, "monitor_cancel", args, &reply) | ||
if err != nil { | ||
if err == rpc2.ErrShutdown { | ||
return ErrNotConnected | ||
|
@@ -981,15 +1000,15 @@ func (o *ovsdbClient) monitor(ctx context.Context, cookie MonitorCookie, reconne | |
switch monitor.Method { | ||
case ovsdb.MonitorRPC: | ||
var reply ovsdb.TableUpdates | ||
err = o.rpcClient.CallWithContext(ctx, monitor.Method, args, &reply) | ||
err = o.CallWithContext(ctx, monitor.Method, args, &reply) | ||
tableUpdates = reply | ||
case ovsdb.ConditionalMonitorRPC: | ||
var reply ovsdb.TableUpdates2 | ||
err = o.rpcClient.CallWithContext(ctx, monitor.Method, args, &reply) | ||
err = o.CallWithContext(ctx, monitor.Method, args, &reply) | ||
tableUpdates = reply | ||
case ovsdb.ConditionalMonitorSinceRPC: | ||
var reply ovsdb.MonitorCondSinceReply | ||
err = o.rpcClient.CallWithContext(ctx, monitor.Method, args, &reply) | ||
err = o.CallWithContext(ctx, monitor.Method, args, &reply) | ||
if err == nil && reply.Found { | ||
monitor.LastTransactionID = reply.LastTransactionID | ||
lastTransactionFound = true | ||
|
@@ -1080,7 +1099,7 @@ func (o *ovsdbClient) Echo(ctx context.Context) error { | |
if o.rpcClient == nil { | ||
return ErrNotConnected | ||
} | ||
err := o.rpcClient.CallWithContext(ctx, "echo", args, &reply) | ||
err := o.CallWithContext(ctx, "echo", args, &reply) | ||
if err != nil { | ||
if err == rpc2.ErrShutdown { | ||
return ErrNotConnected | ||
|
@@ -1197,72 +1216,33 @@ func (o *ovsdbClient) handleClientErrors(stopCh <-chan struct{}) { | |
} | ||
} | ||
|
||
func (o *ovsdbClient) sendEcho(args []interface{}, reply *[]interface{}) *rpc2.Call { | ||
o.rpcMutex.RLock() | ||
defer o.rpcMutex.RUnlock() | ||
if o.rpcClient == nil { | ||
return nil | ||
} | ||
return o.rpcClient.Go("echo", args, reply, make(chan *rpc2.Call, 1)) | ||
} | ||
|
||
func (o *ovsdbClient) handleInactivityProbes() { | ||
defer o.handlerShutdown.Done() | ||
echoReplied := make(chan string) | ||
var lastEcho string | ||
stopCh := o.stopCh | ||
trafficSeen := o.trafficSeen | ||
timer := time.NewTimer(o.options.inactivityTimeout) | ||
for { | ||
select { | ||
case <-stopCh: | ||
return | ||
case <-trafficSeen: | ||
// We got some traffic from the server, restart our timer | ||
case ts := <-echoReplied: | ||
// Got a response from the server, check it against lastEcho; if same clear lastEcho; if not same Disconnect() | ||
if ts != lastEcho { | ||
o.Disconnect() | ||
return | ||
if !timer.Stop() { | ||
<-timer.C | ||
} | ||
lastEcho = "" | ||
case <-time.After(o.options.inactivityTimeout): | ||
// If there's a lastEcho already, then we didn't get a server reply, disconnect | ||
if lastEcho != "" { | ||
o.Disconnect() | ||
return | ||
} | ||
// Otherwise send an echo | ||
thisEcho := fmt.Sprintf("%d", time.Now().UnixMicro()) | ||
args := []interface{}{"libovsdb echo", thisEcho} | ||
var reply []interface{} | ||
// Can't use o.Echo() because it blocks; we need the Call object direct from o.rpcClient.Go() | ||
call := o.sendEcho(args, &reply) | ||
if call == nil { | ||
o.Disconnect() | ||
return | ||
} | ||
lastEcho = thisEcho | ||
case <-timer.C: | ||
// Otherwise send an echo in a goroutine so that transactions don't block | ||
go func() { | ||
// Wait for the echo reply | ||
select { | ||
case <-stopCh: | ||
return | ||
case <-call.Done: | ||
if call.Error != nil { | ||
// RPC timeout; disconnect | ||
o.logger.V(3).Error(call.Error, "server echo reply error") | ||
o.Disconnect() | ||
} else if !reflect.DeepEqual(args, reply) { | ||
o.logger.V(3).Info("warning: incorrect server echo reply", | ||
"expected", args, "reply", reply) | ||
o.Disconnect() | ||
} else { | ||
// Otherwise stuff thisEcho into the echoReplied channel | ||
echoReplied <- thisEcho | ||
} | ||
ctx, cancel := context.WithTimeout(context.Background(), o.options.timeout) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. two nuances:
I don't think any of these are necessarily a problem but can be disconcerting if we end up seeing logs and troubleshooting scenarios where any of that is happening. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The first concern is fixed with commit e0bed3f. For the 2nd one, Do you think echo fails whereas transaction would succeed. Does that really happen (both goes via same tcp channel) ? Do you want to have some retries before going for a disconnect ? If this is not really a concern for now, we can skip doing it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I am just happy if we check that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
err := o.Echo(ctx) | ||
if err != nil { | ||
o.logger.V(3).Error(err, "server echo reply error") | ||
o.Disconnect() | ||
} | ||
cancel() | ||
}() | ||
} | ||
timer.Reset(o.options.inactivityTimeout) | ||
} | ||
} | ||
|
||
|
@@ -1478,3 +1458,19 @@ func (o *ovsdbClient) WhereAll(m model.Model, conditions ...model.Condition) Con | |
func (o *ovsdbClient) WhereCache(predicate interface{}) ConditionalAPI { | ||
return o.primaryDB().api.WhereCache(predicate) | ||
} | ||
|
||
// CallWithContext invokes the named function, waits for it to complete, and | ||
// returns its error status, or an error from Context timeout. | ||
func (o *ovsdbClient) CallWithContext(ctx context.Context, method string, args interface{}, reply interface{}) error { | ||
// Set up read/write deadline for tcp connection before making | ||
// a rpc request to the server. | ||
if tcpConn, ok := o.conn.(*net.TCPConn); ok { | ||
if o.options.timeout > 0 { | ||
err := tcpConn.SetDeadline(time.Now().Add(o.options.timeout * 3)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to BTW: How does this affect There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, The correct, The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I found a related issue in issue: grpc/grpc#15889 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks @halfcrazy , yes, it's the same issue that we are trying to fix in libovsdb client as well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @pperiyasamy have you considered @halfcrazy concerns about needing a mutex? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why |
||
if err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
return o.rpcClient.CallWithContext(ctx, method, args, reply) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, why * 3? Wouldn't it make sense to use inactivityTimeout for this as well?