Skip to content

Commit

Permalink
[RSDK-8291] Use Global Connection to App in Config Watcher (#4773)
Browse files Browse the repository at this point in the history
  • Loading branch information
bashar-515 authored Feb 7, 2025
1 parent e1f9427 commit edca01a
Show file tree
Hide file tree
Showing 31 changed files with 181 additions and 161 deletions.
2 changes: 1 addition & 1 deletion components/camera/transformpipeline/detector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func writeTempConfig(cfg *config.Config) (string, error) {
// make a fake robot with a vision service.
func buildRobotWithFakeCamera(logger logging.Logger) (robot.Robot, error) {
// add a fake camera to the config
cfg, err := config.Read(context.Background(), artifact.MustPath("components/camera/transformpipeline/vision.json"), logger)
cfg, err := config.Read(context.Background(), artifact.MustPath("components/camera/transformpipeline/vision.json"), logger, nil)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (c *Config) StoreToCache() error {
if c.toCache == nil {
return errors.New("no unprocessed config to cache")
}
if err := os.MkdirAll(ViamDotDir, 0o700); err != nil {
if err := os.MkdirAll(rutils.ViamDotDir, 0o700); err != nil {
return err
}
reader := bytes.NewReader(c.toCache)
Expand Down
10 changes: 5 additions & 5 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (

func TestConfigRobot(t *testing.T) {
logger := logging.NewTestLogger(t)
cfg, err := config.Read(context.Background(), "data/robot.json", logger)
cfg, err := config.Read(context.Background(), "data/robot.json", logger, nil)
test.That(t, err, test.ShouldBeNil)

test.That(t, cfg.Components, test.ShouldHaveLength, 3)
Expand Down Expand Up @@ -76,7 +76,7 @@ func TestConfig3(t *testing.T) {
logger := logging.NewTestLogger(t)

test.That(t, os.Setenv("TEST_THING_FOO", "5"), test.ShouldBeNil)
cfg, err := config.Read(context.Background(), "data/config3.json", logger)
cfg, err := config.Read(context.Background(), "data/config3.json", logger, nil)
test.That(t, err, test.ShouldBeNil)

test.That(t, len(cfg.Components), test.ShouldEqual, 4)
Expand Down Expand Up @@ -152,7 +152,7 @@ func TestConfig3(t *testing.T) {

func TestConfigWithLogDeclarations(t *testing.T) {
logger := logging.NewTestLogger(t)
cfg, err := config.Read(context.Background(), "data/config_with_log.json", logger)
cfg, err := config.Read(context.Background(), "data/config_with_log.json", logger, nil)
test.That(t, err, test.ShouldBeNil)

test.That(t, len(cfg.Components), test.ShouldEqual, 4)
Expand Down Expand Up @@ -1207,15 +1207,15 @@ func TestPackageConfig(t *testing.T) {

func TestConfigRobotWebProfile(t *testing.T) {
logger := logging.NewTestLogger(t)
cfg, err := config.Read(context.Background(), "data/config_with_web_profile.json", logger)
cfg, err := config.Read(context.Background(), "data/config_with_web_profile.json", logger, nil)
test.That(t, err, test.ShouldBeNil)

test.That(t, cfg.EnableWebProfile, test.ShouldBeTrue)
}

func TestConfigRobotRevision(t *testing.T) {
logger := logging.NewTestLogger(t)
cfg, err := config.Read(context.Background(), "data/config_with_revision.json", logger)
cfg, err := config.Read(context.Background(), "data/config_with_revision.json", logger, nil)
test.That(t, err, test.ShouldBeNil)

test.That(t, cfg.Revision, test.ShouldEqual, "rev1")
Expand Down
8 changes: 4 additions & 4 deletions config/diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,9 +400,9 @@ func TestDiffConfigs(t *testing.T) {
t.Run(fmt.Sprintf("revealSensitiveConfigDiffs=%t", revealSensitiveConfigDiffs), func(t *testing.T) {
logger.Infof("Test name: %v LeftFile: `%v` RightFile: `%v`", tc.Name, tc.LeftFile, tc.RightFile)
logger := logging.NewTestLogger(t)
left, err := config.Read(context.Background(), tc.LeftFile, logger)
left, err := config.Read(context.Background(), tc.LeftFile, logger, nil)
test.That(t, err, test.ShouldBeNil)
right, err := config.Read(context.Background(), tc.RightFile, logger)
right, err := config.Read(context.Background(), tc.RightFile, logger, nil)
test.That(t, err, test.ShouldBeNil)

diff, err := config.DiffConfigs(*left, *right, revealSensitiveConfigDiffs)
Expand Down Expand Up @@ -445,9 +445,9 @@ func TestDiffConfigHeterogenousTypes(t *testing.T) {
} {
t.Run(tc.Name, func(t *testing.T) {
logger := logging.NewTestLogger(t)
left, err := config.Read(context.Background(), tc.LeftFile, logger)
left, err := config.Read(context.Background(), tc.LeftFile, logger, nil)
test.That(t, err, test.ShouldBeNil)
right, err := config.Read(context.Background(), tc.RightFile, logger)
right, err := config.Read(context.Background(), tc.RightFile, logger, nil)
test.That(t, err, test.ShouldBeNil)

_, err = config.DiffConfigs(*left, *right, true)
Expand Down
88 changes: 27 additions & 61 deletions config/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"path/filepath"
"runtime"
"sort"
"time"

"github.com/a8m/envsubst"
"github.com/pkg/errors"
Expand All @@ -24,6 +23,7 @@ import (
"go.viam.com/rdk/logging"
"go.viam.com/rdk/resource"
rutils "go.viam.com/rdk/utils"
"go.viam.com/rdk/utils/contextutils"
)

// RDK versioning variables which are replaced by LD flags.
Expand All @@ -34,9 +34,6 @@ var (
)

const (
initialReadTimeout = 1 * time.Second
readTimeout = 5 * time.Second
readTimeoutBehindProxy = time.Minute
// PackagesDirName is where packages go underneath viamDotDir.
PackagesDirName = "packages"
// LocalPackagesSuffix is used by the local package manager.
Expand Down Expand Up @@ -81,20 +78,14 @@ func getAgentInfo(logger logging.Logger) (*apppb.AgentInfo, error) {
}, nil
}

var (
// ViamDotDir is the directory for Viam's cached files.
ViamDotDir string
viamPackagesDir string
)
var viamPackagesDir string

func init() {
home := rutils.PlatformHomeDir()
ViamDotDir = filepath.Join(home, ".viam")
viamPackagesDir = filepath.Join(ViamDotDir, PackagesDirName)
viamPackagesDir = filepath.Join(rutils.ViamDotDir, PackagesDirName)
}

func getCloudCacheFilePath(id string) string {
return filepath.Join(ViamDotDir, fmt.Sprintf("cached_cloud_config_%s.json", id))
return filepath.Join(rutils.ViamDotDir, fmt.Sprintf("cached_cloud_config_%s.json", id))
}

func readFromCache(id string) (*Config, error) {
Expand Down Expand Up @@ -124,14 +115,8 @@ func clearCache(id string) {

func readCertificateDataFromCloudGRPC(ctx context.Context,
cloudConfigFromDisk *Cloud,
logger logging.Logger,
conn rpc.ClientConn,
) (tlsConfig, error) {
conn, err := CreateNewGRPCClient(ctx, cloudConfigFromDisk, logger)
if err != nil {
return tlsConfig{}, err
}
defer utils.UncheckedErrorFunc(conn.Close)

service := apppb.NewRobotServiceClient(conn)
res, err := service.Certificate(ctx, &apppb.CertificateRequest{Id: cloudConfigFromDisk.ID})
if err != nil {
Expand Down Expand Up @@ -182,28 +167,6 @@ func isLocationSecretsEqual(prevCloud, cloud *Cloud) bool {
return true
}

// GetTimeoutCtx returns a context [and its cancel function] with a timeout value determined by whether we are behind a proxy and whether a
// cached config exists.
func GetTimeoutCtx(ctx context.Context, shouldReadFromCache bool, id string) (context.Context, func()) {
timeout := readTimeout
// When environment indicates we are behind a proxy, bump timeout. Network
// operations tend to take longer when behind a proxy.
if proxyAddr := os.Getenv(rpc.SocksProxyEnvVar); proxyAddr != "" {
timeout = readTimeoutBehindProxy
}

// use shouldReadFromCache to determine whether this is part of initial read or not, but only shorten timeout
// if cached config exists
cachedConfigExists := false
if _, err := os.Stat(getCloudCacheFilePath(id)); err == nil {
cachedConfigExists = true
}
if shouldReadFromCache && cachedConfigExists {
timeout = initialReadTimeout
}
return context.WithTimeout(ctx, timeout)
}

// readFromCloud fetches a robot config from the cloud based
// on the given config.
func readFromCloud(
Expand All @@ -213,10 +176,11 @@ func readFromCloud(
shouldReadFromCache bool,
checkForNewCert bool,
logger logging.Logger,
conn rpc.ClientConn,
) (*Config, error) {
logger.Debug("reading configuration from the cloud")
cloudCfg := originalCfg.Cloud
unprocessedConfig, cached, err := getFromCloudOrCache(ctx, cloudCfg, shouldReadFromCache, logger)
unprocessedConfig, cached, err := getFromCloudOrCache(ctx, cloudCfg, shouldReadFromCache, logger, conn)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -260,8 +224,8 @@ func readFromCloud(
if !cfg.Cloud.SignalingInsecure && (checkForNewCert || tls.certificate == "" || tls.privateKey == "") {
logger.Debug("reading tlsCertificate from the cloud")

ctxWithTimeout, cancel := GetTimeoutCtx(ctx, shouldReadFromCache, cloudCfg.ID)
certData, err := readCertificateDataFromCloudGRPC(ctxWithTimeout, cloudCfg, logger)
ctxWithTimeout, cancel := contextutils.GetTimeoutCtx(ctx, shouldReadFromCache, cloudCfg.ID)
certData, err := readCertificateDataFromCloudGRPC(ctxWithTimeout, cloudCfg, conn)
if err != nil {
cancel()
if !errors.As(err, &context.DeadlineExceeded) {
Expand Down Expand Up @@ -350,13 +314,14 @@ func Read(
ctx context.Context,
filePath string,
logger logging.Logger,
conn rpc.ClientConn,
) (*Config, error) {
buf, err := envsubst.ReadFile(filePath)
if err != nil {
return nil, err
}

return FromReader(ctx, filePath, bytes.NewReader(buf), logger)
return FromReader(ctx, filePath, bytes.NewReader(buf), logger, conn)
}

// ReadLocalConfig reads a config from the given file but does not fetch any config from the remote servers.
Expand All @@ -370,7 +335,7 @@ func ReadLocalConfig(
return nil, err
}

return fromReader(ctx, filePath, bytes.NewReader(buf), logger, false)
return fromReader(ctx, filePath, bytes.NewReader(buf), logger, nil)
}

// FromReader reads a config from the given reader and specifies
Expand All @@ -380,8 +345,9 @@ func FromReader(
originalPath string,
r io.Reader,
logger logging.Logger,
conn rpc.ClientConn,
) (*Config, error) {
return fromReader(ctx, originalPath, r, logger, true)
return fromReader(ctx, originalPath, r, logger, conn)
}

// fromReader reads a config from the given reader and specifies
Expand All @@ -391,7 +357,7 @@ func fromReader(
originalPath string,
r io.Reader,
logger logging.Logger,
shouldReadFromCloud bool,
conn rpc.ClientConn,
) (*Config, error) {
// First read and process config from disk
unprocessedConfig := Config{
Expand All @@ -406,8 +372,8 @@ func fromReader(
return nil, errors.Wrapf(err, "failed to process Config")
}

if shouldReadFromCloud && cfgFromDisk.Cloud != nil {
cfg, err := readFromCloud(ctx, cfgFromDisk, nil, true, true, logger)
if conn != nil && cfgFromDisk.Cloud != nil {
cfg, err := readFromCloud(ctx, cfgFromDisk, nil, true, true, logger, conn)
return cfg, err
}

Expand Down Expand Up @@ -648,13 +614,19 @@ func processConfig(unprocessedConfig *Config, fromCloud bool, logger logging.Log

// getFromCloudOrCache returns the config from the gRPC endpoint. If failures during cloud lookup fallback to the
// local cache if the error indicates it should.
func getFromCloudOrCache(ctx context.Context, cloudCfg *Cloud, shouldReadFromCache bool, logger logging.Logger) (*Config, bool, error) {
func getFromCloudOrCache(
ctx context.Context,
cloudCfg *Cloud,
shouldReadFromCache bool,
logger logging.Logger,
conn rpc.ClientConn,
) (*Config, bool, error) {
var cached bool

ctxWithTimeout, cancel := GetTimeoutCtx(ctx, shouldReadFromCache, cloudCfg.ID)
ctxWithTimeout, cancel := contextutils.GetTimeoutCtx(ctx, shouldReadFromCache, cloudCfg.ID)
defer cancel()

cfg, errorShouldCheckCache, err := getFromCloudGRPC(ctxWithTimeout, cloudCfg, logger)
cfg, errorShouldCheckCache, err := getFromCloudGRPC(ctxWithTimeout, cloudCfg, logger, conn)
if err != nil {
if shouldReadFromCache && errorShouldCheckCache {
cachedConfig, cacheErr := readFromCache(cloudCfg.ID)
Expand Down Expand Up @@ -687,15 +659,9 @@ func getFromCloudOrCache(ctx context.Context, cloudCfg *Cloud, shouldReadFromCac
}

// getFromCloudGRPC actually does the fetching of the robot config from the gRPC endpoint.
func getFromCloudGRPC(ctx context.Context, cloudCfg *Cloud, logger logging.Logger) (*Config, bool, error) {
func getFromCloudGRPC(ctx context.Context, cloudCfg *Cloud, logger logging.Logger, conn rpc.ClientConn) (*Config, bool, error) {
shouldCheckCacheOnFailure := true

conn, err := CreateNewGRPCClient(ctx, cloudCfg, logger)
if err != nil {
return nil, shouldCheckCacheOnFailure, errors.WithMessage(err, "error creating cloud grpc client")
}
defer utils.UncheckedErrorFunc(conn.Close)

agentInfo, err := getAgentInfo(logger)
if err != nil {
return nil, shouldCheckCacheOnFailure, errors.WithMessage(err, "error getting agent info")
Expand Down
12 changes: 6 additions & 6 deletions config/reader_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ import (

func TestFromReaderValidate(t *testing.T) {
logger := logging.NewTestLogger(t)
_, err := config.FromReader(context.Background(), "somepath", strings.NewReader(""), logger)
_, err := config.FromReader(context.Background(), "somepath", strings.NewReader(""), logger, nil)
test.That(t, err, test.ShouldNotBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring, "json: EOF")

_, err = config.FromReader(context.Background(), "somepath", strings.NewReader(`{"cloud": 1}`), logger)
_, err = config.FromReader(context.Background(), "somepath", strings.NewReader(`{"cloud": 1}`), logger, nil)
test.That(t, err, test.ShouldNotBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring, "unmarshal")

conf, err := config.FromReader(context.Background(), "somepath", strings.NewReader(`{}`), logger)
conf, err := config.FromReader(context.Background(), "somepath", strings.NewReader(`{}`), logger, nil)
test.That(t, err, test.ShouldBeNil)
test.That(t, conf, test.ShouldResemble, &config.Config{
ConfigFilePath: "somepath",
Expand All @@ -39,12 +39,12 @@ func TestFromReaderValidate(t *testing.T) {
},
})

_, err = config.FromReader(context.Background(), "somepath", strings.NewReader(`{"cloud": {}}`), logger)
_, err = config.FromReader(context.Background(), "somepath", strings.NewReader(`{"cloud": {}}`), logger, nil)
test.That(t, err, test.ShouldNotBeNil)
test.That(t, resource.GetFieldFromFieldRequiredError(err), test.ShouldEqual, "id")

_, err = config.FromReader(context.Background(),
"somepath", strings.NewReader(`{"disable_partial_start":true,"components": [{}]}`), logger)
"somepath", strings.NewReader(`{"disable_partial_start":true,"components": [{}]}`), logger, nil)
test.That(t, err, test.ShouldNotBeNil)
var fre resource.FieldRequiredError
test.That(t, errors.As(err, &fre), test.ShouldBeTrue)
Expand All @@ -54,7 +54,7 @@ func TestFromReaderValidate(t *testing.T) {
conf, err = config.FromReader(context.Background(),
"somepath",
strings.NewReader(`{"components": [{"name": "foo", "type": "arm", "model": "foo"}]}`),
logger)
logger, nil)
test.That(t, err, test.ShouldBeNil)
expected := &config.Config{
ConfigFilePath: "somepath",
Expand Down
Loading

0 comments on commit edca01a

Please sign in to comment.