Skip to content

Commit

Permalink
refactor: use http mode as default to sync registry contents (#3659)
Browse files Browse the repository at this point in the history
Signed-off-by: fengxsong <[email protected]>
  • Loading branch information
fengxsong authored and [email protected] committed Aug 10, 2023
1 parent a40c5ed commit 2d6c37a
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 133 deletions.
72 changes: 0 additions & 72 deletions pkg/filesystem/registry/ssh.go

This file was deleted.

129 changes: 81 additions & 48 deletions pkg/filesystem/registry/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/labring/sealos/pkg/constants"
"github.com/labring/sealos/pkg/filesystem"
"github.com/labring/sealos/pkg/registry/handler"
"github.com/labring/sealos/pkg/registry/sync"
"github.com/labring/sealos/pkg/ssh"
Expand All @@ -45,8 +46,7 @@ const (
defaultTemporaryPort = "5050"
)

// TODO: fallback to ssh mode when HTTP is not available
type syncMode struct {
type impl struct {
pathResolver PathResolver
ssh ssh.Interface
mounts []v2.MountImage
Expand All @@ -61,14 +61,11 @@ func shouldSkip(mounts []v2.MountImage) bool {
return true
}

func (s *syncMode) Sync(ctx context.Context, hosts ...string) error {
func (s *impl) Sync(ctx context.Context, hosts ...string) error {
if shouldSkip(s.mounts) {
return nil
}
sys := &types.SystemContext{
DockerInsecureSkipTLSVerify: types.OptionalBoolTrue,
}
logger.Info("using sync mode syncing images to hosts %v", hosts)
logger.Info("trying default http mode to sync images to hosts %v", hosts)
// run `sealctl registry serve` to start a temporary registry
for i := range hosts {
cmdCtx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -100,8 +97,12 @@ func (s *syncMode) Sync(ctx context.Context, hosts ...string) error {
return nil
})
}
var syncFn func(context.Context, string) error
if err := eg.Wait(); err != nil {
return err
logger.Warn("cannot connect to remote temporary registry: %v, fallback using ssh mode instead", err)
syncFn = syncViaSSH(s, hosts)
} else {
syncFn = syncViaHTTP(endpoints)
}

outerEg, ctx := errgroup.WithContext(ctx)
Expand All @@ -111,46 +112,7 @@ func (s *syncMode) Sync(ctx context.Context, hosts ...string) error {
continue
}
outerEg.Go(func() error {
config, err := handler.NewConfig(registryDir, 0)
if err != nil {
return err
}
config.Log.AccessLog.Disabled = true
errCh := handler.Run(ctx, config)

eg, inner := errgroup.WithContext(ctx)
for j := range endpoints {
dst := endpoints[j]
eg.Go(func() error {
src := sync.ParseRegistryAddress(localhost, config.HTTP.Addr)
probeCtx, cancel := context.WithTimeout(inner, time.Second*3)
defer cancel()
if err = httputils.WaitUntilEndpointAlive(probeCtx, "http://"+src); err != nil {
return err
}
opts := &sync.Options{
SystemContext: sys,
Source: src,
Target: dst,
SelectionOptions: []copy.ImageListSelection{
copy.CopyAllImages, copy.CopySystemImage,
},
OmitError: true,
}

if err = sync.ToRegistry(inner, opts); err == nil {
return nil
}
if !strings.Contains(err.Error(), "manifest unknown") {
return err
}
return nil
})
}
go func() {
errCh <- eg.Wait()
}()
return <-errCh
return syncFn(ctx, registryDir)
})
}
return outerEg.Wait()
Expand All @@ -177,3 +139,74 @@ func loginRegistry(ctx context.Context, sys *types.SystemContext, username, pass
Stdout: io.Discard,
}, []string{registry})
}

func syncViaSSH(s *impl, targets []string) func(context.Context, string) error {
return func(ctx context.Context, localDir string) error {
eg, _ := errgroup.WithContext(ctx)
for i := range targets {
target := targets[i]
eg.Go(func() error {
return ssh.CopyDir(s.ssh, target, localDir, s.pathResolver.RootFSPath(), constants.IsRegistryDir)
})
}
return eg.Wait()
}
}

func syncViaHTTP(targets []string) func(context.Context, string) error {
sys := &types.SystemContext{
DockerInsecureSkipTLSVerify: types.OptionalBoolTrue,
}
return func(ctx context.Context, localDir string) error {
config, err := handler.NewConfig(localDir, 0)
if err != nil {
return err
}
config.Log.AccessLog.Disabled = true
errCh := handler.Run(ctx, config)

eg, inner := errgroup.WithContext(ctx)
for i := range targets {
target := targets[i]
eg.Go(func() error {
src := sync.ParseRegistryAddress(localhost, config.HTTP.Addr)
probeCtx, cancel := context.WithTimeout(inner, time.Second*3)
defer cancel()
if err = httputils.WaitUntilEndpointAlive(probeCtx, "http://"+src); err != nil {
return err
}
opts := &sync.Options{
SystemContext: sys,
Source: src,
Target: target,
SelectionOptions: []copy.ImageListSelection{
copy.CopyAllImages, copy.CopySystemImage,
},
OmitError: true,
}

if err = sync.ToRegistry(inner, opts); err == nil {
return nil
}
if !strings.Contains(err.Error(), "manifest unknown") {
return err
}
return nil
})
}
go func() {
errCh <- eg.Wait()
}()
return <-errCh
}
}

type PathResolver interface {
RootFSSealctlPath() string
RootFSRegistryPath() string
RootFSPath() string
}

func New(pathResolver PathResolver, ssh ssh.Interface, mounts []v2.MountImage) filesystem.RegistrySyncer {
return &impl{pathResolver, ssh, mounts}
}
20 changes: 7 additions & 13 deletions pkg/system/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,6 @@ var configOptions = []ConfigOption{
Description: "whether to check the md5sum value is consistent during the copy process.",
DefaultValue: "true",
},
{
Key: RegistrySyncExperimentalConfigKey,
Description: "enable registry sync experimental feature, using containers/image module to save and sync images",
DefaultValue: "false",
},
{
Key: ContainerStorageConfEnvKey,
Description: "path of container storage config file, setting this env will override the default location",
Expand All @@ -100,14 +95,13 @@ var configOptions = []ConfigOption{
}

const (
PromptConfigKey = "PROMPT"
RuntimeRootConfigKey = "RUNTIME_ROOT"
DataRootConfigKey = "DATA_ROOT"
BuildahFormatConfigKey = "BUILDAH_FORMAT"
BuildahLogLevelConfigKey = "BUILDAH_LOG_LEVEL"
ContainerStorageConfEnvKey = "CONTAINERS_STORAGE_CONF"
ScpChecksumConfigKey = "SCP_CHECKSUM"
RegistrySyncExperimentalConfigKey = "REGISTRY_SYNC_EXPERIMENTAL"
PromptConfigKey = "PROMPT"
RuntimeRootConfigKey = "RUNTIME_ROOT"
DataRootConfigKey = "DATA_ROOT"
BuildahFormatConfigKey = "BUILDAH_FORMAT"
BuildahLogLevelConfigKey = "BUILDAH_LOG_LEVEL"
ContainerStorageConfEnvKey = "CONTAINERS_STORAGE_CONF"
ScpChecksumConfigKey = "SCP_CHECKSUM"
)

func (*envSystemConfig) getValueOrDefault(key string) (*ConfigOption, error) {
Expand Down

0 comments on commit 2d6c37a

Please sign in to comment.