Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RSDK-8294] Use Global Connection to App in Internal Cloud Service and Local Robot #4782

Merged
merged 37 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
c6a7cb3
only acquire connection if connection is nil but cloud config is defined
bashar-515 Feb 13, 2025
4484f8d
use nil conns in functions that call New() and newWithResources()
bashar-515 Feb 13, 2025
fe83e57
pass conn in local robot
bashar-515 Feb 13, 2025
da298d8
pass nil as connection in RobotFromConfigPath()
bashar-515 Feb 13, 2025
cfde0c8
create global app connection in example server
bashar-515 Feb 13, 2025
7ff0029
pass nil connection in tests
bashar-515 Feb 13, 2025
2ec5c6a
add comment
bashar-515 Feb 13, 2025
b08f684
provide global app connection to cloud service
bashar-515 Feb 13, 2025
090edd1
update service tests
bashar-515 Feb 13, 2025
73038c8
only close connection if not nil
bashar-515 Feb 13, 2025
82ffad8
pass nil connection
bashar-515 Feb 13, 2025
81c7e45
lint
bashar-515 Feb 13, 2025
73facf7
change connection expectations
bashar-515 Feb 14, 2025
ec1aecc
pass actual connection in test
bashar-515 Feb 14, 2025
bcaea48
remove TODO comments
bashar-515 Feb 14, 2025
0f75a6e
pass nil connection
bashar-515 Feb 14, 2025
ed75da4
no longer close connection
bashar-515 Feb 14, 2025
1201c72
no longer track shared connection
bashar-515 Feb 14, 2025
c44a5a8
expect no managed error
bashar-515 Feb 14, 2025
6c57c86
handle app connection internally
bashar-515 Feb 14, 2025
ad4b3e1
lint
bashar-515 Feb 14, 2025
b8df4aa
fix bug
bashar-515 Feb 14, 2025
f2b1c6f
update service tests
bashar-515 Feb 14, 2025
13e8ee3
only close connection if it is not nil
bashar-515 Feb 14, 2025
2e2cd6e
manage app connection
bashar-515 Feb 14, 2025
9124464
create auth connection
bashar-515 Feb 14, 2025
d94aa1b
make changes to cloudmanaged test
bashar-515 Feb 14, 2025
f45cf7c
lint
bashar-515 Feb 18, 2025
fe7ea97
early return on error
bashar-515 Feb 18, 2025
827e743
check conn2 instead of conn1
bashar-515 Feb 18, 2025
2aae7df
clean up echoclient tests
bashar-515 Feb 18, 2025
6ad3d96
lint
bashar-515 Feb 18, 2025
0d38c61
document behavior of AcquireConnection()
bashar-515 Feb 18, 2025
c3e8c39
remove TODO comment
bashar-515 Feb 18, 2025
3377d59
check for ErrNotConnected
bashar-515 Feb 18, 2025
26c9c6b
lint
bashar-515 Feb 18, 2025
962b21a
modify auth tests
bashar-515 Feb 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cli/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func setupWithRunningPart(
Model: resource.DefaultServiceModel,
},
},
}, logging.NewInMemoryLogger(t))
}, nil, logging.NewInMemoryLogger(t))
test.That(t, err, test.ShouldBeNil)

options, _, addr := robottestutils.CreateBaseOptionsAndListener(t)
Expand Down
4 changes: 2 additions & 2 deletions components/camera/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ func setupRealRobot(
t.Helper()

ctx := context.Background()
robot, err := robotimpl.RobotFromConfig(ctx, robotConfig, logger)
robot, err := robotimpl.RobotFromConfig(ctx, robotConfig, nil, logger)
test.That(t, err, test.ShouldBeNil)

// We initialize with a stream config such that the stream server is capable of creating video stream and
Expand All @@ -652,7 +652,7 @@ func setupRealRobotWithOptions(
t.Helper()

ctx := context.Background()
robot, err := robotimpl.RobotFromConfig(ctx, robotConfig, logger)
robot, err := robotimpl.RobotFromConfig(ctx, robotConfig, nil, logger)
test.That(t, err, test.ShouldBeNil)

// We initialize with a stream config such that the stream server is capable of creating video stream and
Expand Down
2 changes: 1 addition & 1 deletion components/camera/transformpipeline/classifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func buildRobotWithClassifier(logger logging.Logger) (robot.Robot, error) {
defer os.Remove(newConfFile)

// make the robot from new config
r, err := robotimpl.RobotFromConfigPath(context.Background(), newConfFile, logger)
r, err := robotimpl.RobotFromConfigPath(context.Background(), newConfFile, nil, logger)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion components/camera/transformpipeline/detector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func buildRobotWithFakeCamera(logger logging.Logger) (robot.Robot, error) {
}
defer os.Remove(newConfFile)
// make the robot from new config
return robotimpl.RobotFromConfigPath(context.Background(), newConfFile, logger)
return robotimpl.RobotFromConfigPath(context.Background(), newConfFile, nil, logger)
}

func TestColorDetectionSource(t *testing.T) {
Expand Down
14 changes: 13 additions & 1 deletion examples/customresources/demos/remoteserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"errors"

goutils "go.viam.com/utils"
"go.viam.com/utils/rpc"

"go.viam.com/rdk/config"
_ "go.viam.com/rdk/examples/customresources/models/mygizmo"
"go.viam.com/rdk/grpc"
"go.viam.com/rdk/logging"
robotimpl "go.viam.com/rdk/robot/impl"
"go.viam.com/rdk/robot/web"
Expand Down Expand Up @@ -40,7 +42,17 @@ func mainWithArgs(ctx context.Context, args []string, logger logging.Logger) (er
return err
}

myRobot, err := robotimpl.RobotFromConfig(ctx, cfg, logger)
var appConn rpc.ClientConn
if cfg.Cloud != nil && cfg.Cloud.AppAddress != "" {
appConn, err = grpc.NewAppConn(ctx, cfg.Cloud.AppAddress, cfg.Cloud.Secret, cfg.Cloud.ID, logger)
if err != nil {
return nil
}

defer goutils.UncheckedErrorFunc(appConn.Close)
}

myRobot, err := robotimpl.RobotFromConfig(ctx, cfg, appConn, logger)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions examples/customresources/demos/remoteserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestGizmo(t *testing.T) {

cfgServer, err := config.Read(ctx, utils.ResolveFile("./examples/customresources/demos/remoteserver/remote.json"), logger, nil)
test.That(t, err, test.ShouldBeNil)
remoteB, err := robotimpl.New(ctx, cfgServer, logger.Sublogger("remoteB"))
remoteB, err := robotimpl.New(ctx, cfgServer, nil, logger.Sublogger("remoteB"))
test.That(t, err, test.ShouldBeNil)
options := weboptions.New()
options.Network.BindAddress = remoteAddrB
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestGizmo(t *testing.T) {
},
},
}
mainPart, err := robotimpl.New(ctx, mainPartConfig, logger.Sublogger("mainPart.client"))
mainPart, err := robotimpl.New(ctx, mainPartConfig, nil, logger.Sublogger("mainPart.client"))
defer func() {
test.That(t, mainPart.Close(context.Background()), test.ShouldBeNil)
}()
Expand Down
9 changes: 4 additions & 5 deletions grpc/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ type ReconfigurableClientConn struct {
onTrackCBByTrackName map[string]OnTrackCB
}

// Return this constant such that backoff error logging can compare consecutive errors and reliably
// conclude they are the same.
var errNotConnected = errors.New("not connected")
// ErrNotConnected returns so that backoff error logging can compare consecutive errors and reliably conclude they are the same.
var ErrNotConnected = errors.New("not connected")

// Invoke invokes using the underlying client connection. In the case of c.conn being closed in the middle of
// an Invoke call, it is expected that c.conn can handle that and return a well-formed error.
Expand All @@ -39,7 +38,7 @@ func (c *ReconfigurableClientConn) Invoke(
conn := c.conn
c.connMu.RUnlock()
if conn == nil {
return errNotConnected
return ErrNotConnected
}
return conn.Invoke(ctx, method, args, reply, opts...)
}
Expand All @@ -56,7 +55,7 @@ func (c *ReconfigurableClientConn) NewStream(
conn := c.conn
c.connMu.RUnlock()
if conn == nil {
return nil, errNotConnected
return nil, ErrNotConnected
}
return conn.NewStream(ctx, desc, method, opts...)
}
Expand Down
31 changes: 12 additions & 19 deletions internal/cloud/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,32 @@ type ConnectionService interface {

// NewCloudConnectionService makes a new cloud connection service to get gRPC connections
// to a cloud service managing robots.
func NewCloudConnectionService(cfg *config.Cloud, logger logging.Logger) ConnectionService {
func NewCloudConnectionService(cfg *config.Cloud, conn rpc.ClientConn, logger logging.Logger) ConnectionService {
if cfg == nil || cfg.AppAddress == "" {
return &cloudManagedService{
Named: InternalServiceName.AsNamed(),
}
}
return &cloudManagedService{

cm := &cloudManagedService{
Named: InternalServiceName.AsNamed(),
conn: conn,
managed: true,
dialer: rpc.NewCachedDialer(),
cloudCfg: *cfg,
logger: logger,
}

return cm
}

type cloudManagedService struct {
resource.Named
// we assume the config is immutable for the lifetime of the process
resource.TriviallyReconfigurable

conn rpc.ClientConn

managed bool
cloudCfg config.Cloud
logger logging.Logger
Expand All @@ -69,27 +75,14 @@ type cloudManagedService struct {
dialer rpc.Dialer
}

// AcquireConnection returns the connection provided to `NewCloudConnectionService` regardless of the state of the `cloudManagedService`.
// This means that if `Close` has been called on the `cloudManagedService`, `AcquireConnection` can still return an open connection.
func (cm *cloudManagedService) AcquireConnection(ctx context.Context) (string, rpc.ClientConn, error) {
cm.dialerMu.RLock()
defer cm.dialerMu.RUnlock()
if !cm.managed {
if cm.conn == nil {
return "", nil, ErrNotCloudManaged
}
if cm.dialer == nil {
return "", nil, errors.New("service closed")
}

ctx = rpc.ContextWithDialer(ctx, cm.dialer)
timeout := connectTimeout
// When environment indicates we are behind a proxy, bump timeout. Network
// operations tend to take longer when behind a proxy.
if os.Getenv(rpc.SocksProxyEnvVar) != "" {
timeout = connectTimeoutBehindProxy
}
timeOutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
conn, err := config.CreateNewGRPCClient(timeOutCtx, &cm.cloudCfg, cm.logger)
return cm.cloudCfg.ID, conn, err
return cm.cloudCfg.ID, cm.conn, nil
cheukt marked this conversation as resolved.
Show resolved Hide resolved
}

func (cm *cloudManagedService) AcquireConnectionAPIKey(ctx context.Context,
Expand Down
Loading
Loading