Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use http mode as default to sync registry contents #3659

Merged
merged 1 commit into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 0 additions & 72 deletions pkg/filesystem/registry/ssh.go

This file was deleted.

129 changes: 81 additions & 48 deletions pkg/filesystem/registry/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/labring/sealos/pkg/constants"
"github.com/labring/sealos/pkg/filesystem"
"github.com/labring/sealos/pkg/registry/handler"
"github.com/labring/sealos/pkg/registry/sync"
"github.com/labring/sealos/pkg/ssh"
Expand All @@ -45,8 +46,7 @@ const (
defaultTemporaryPort = "5050"
)

// TODO: fallback to ssh mode when HTTP is not available
type syncMode struct {
type impl struct {
pathResolver PathResolver
ssh ssh.Interface
mounts []v2.MountImage
Expand All @@ -61,14 +61,11 @@ func shouldSkip(mounts []v2.MountImage) bool {
return true
}

func (s *syncMode) Sync(ctx context.Context, hosts ...string) error {
func (s *impl) Sync(ctx context.Context, hosts ...string) error {
if shouldSkip(s.mounts) {
return nil
}
sys := &types.SystemContext{
DockerInsecureSkipTLSVerify: types.OptionalBoolTrue,
}
logger.Info("using sync mode syncing images to hosts %v", hosts)
logger.Info("trying default http mode to sync images to hosts %v", hosts)
// run `sealctl registry serve` to start a temporary registry
for i := range hosts {
cmdCtx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -100,8 +97,12 @@ func (s *syncMode) Sync(ctx context.Context, hosts ...string) error {
return nil
})
}
var syncFn func(context.Context, string) error
if err := eg.Wait(); err != nil {
return err
logger.Warn("cannot connect to remote temporary registry: %v, fallback using ssh mode instead", err)
syncFn = syncViaSSH(s, hosts)
} else {
syncFn = syncViaHTTP(endpoints)
}

outerEg, ctx := errgroup.WithContext(ctx)
Expand All @@ -111,46 +112,7 @@ func (s *syncMode) Sync(ctx context.Context, hosts ...string) error {
continue
}
outerEg.Go(func() error {
config, err := handler.NewConfig(registryDir, 0)
if err != nil {
return err
}
config.Log.AccessLog.Disabled = true
errCh := handler.Run(ctx, config)

eg, inner := errgroup.WithContext(ctx)
for j := range endpoints {
dst := endpoints[j]
eg.Go(func() error {
src := sync.ParseRegistryAddress(localhost, config.HTTP.Addr)
probeCtx, cancel := context.WithTimeout(inner, time.Second*3)
defer cancel()
if err = httputils.WaitUntilEndpointAlive(probeCtx, "http://"+src); err != nil {
return err
}
opts := &sync.Options{
SystemContext: sys,
Source: src,
Target: dst,
SelectionOptions: []copy.ImageListSelection{
copy.CopyAllImages, copy.CopySystemImage,
},
OmitError: true,
}

if err = sync.ToRegistry(inner, opts); err == nil {
return nil
}
if !strings.Contains(err.Error(), "manifest unknown") {
return err
}
return nil
})
}
go func() {
errCh <- eg.Wait()
}()
return <-errCh
return syncFn(ctx, registryDir)
})
}
return outerEg.Wait()
Expand All @@ -177,3 +139,74 @@ func loginRegistry(ctx context.Context, sys *types.SystemContext, username, pass
Stdout: io.Discard,
}, []string{registry})
}

func syncViaSSH(s *impl, targets []string) func(context.Context, string) error {
return func(ctx context.Context, localDir string) error {
eg, _ := errgroup.WithContext(ctx)
for i := range targets {
target := targets[i]
eg.Go(func() error {
return ssh.CopyDir(s.ssh, target, localDir, s.pathResolver.RootFSPath(), constants.IsRegistryDir)
})
}
return eg.Wait()
}
}

func syncViaHTTP(targets []string) func(context.Context, string) error {
sys := &types.SystemContext{
DockerInsecureSkipTLSVerify: types.OptionalBoolTrue,
}
return func(ctx context.Context, localDir string) error {
config, err := handler.NewConfig(localDir, 0)
if err != nil {
return err
}
config.Log.AccessLog.Disabled = true
errCh := handler.Run(ctx, config)

eg, inner := errgroup.WithContext(ctx)
for i := range targets {
target := targets[i]
eg.Go(func() error {
src := sync.ParseRegistryAddress(localhost, config.HTTP.Addr)
probeCtx, cancel := context.WithTimeout(inner, time.Second*3)
defer cancel()
if err = httputils.WaitUntilEndpointAlive(probeCtx, "http://"+src); err != nil {
return err
}
opts := &sync.Options{
SystemContext: sys,
Source: src,
Target: target,
SelectionOptions: []copy.ImageListSelection{
copy.CopyAllImages, copy.CopySystemImage,
},
OmitError: true,
}

if err = sync.ToRegistry(inner, opts); err == nil {
return nil
}
if !strings.Contains(err.Error(), "manifest unknown") {
return err
}
return nil
})
}
go func() {
errCh <- eg.Wait()
}()
return <-errCh
}
}

type PathResolver interface {
RootFSSealctlPath() string
RootFSRegistryPath() string
RootFSPath() string
}

func New(pathResolver PathResolver, ssh ssh.Interface, mounts []v2.MountImage) filesystem.RegistrySyncer {
return &impl{pathResolver, ssh, mounts}
}
20 changes: 7 additions & 13 deletions pkg/system/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,6 @@ var configOptions = []ConfigOption{
Description: "whether to check the md5sum value is consistent during the copy process.",
DefaultValue: "true",
},
{
Key: RegistrySyncExperimentalConfigKey,
Description: "enable registry sync experimental feature, using containers/image module to save and sync images",
DefaultValue: "false",
},
{
Key: ContainerStorageConfEnvKey,
Description: "path of container storage config file, setting this env will override the default location",
Expand All @@ -100,14 +95,13 @@ var configOptions = []ConfigOption{
}

const (
PromptConfigKey = "PROMPT"
RuntimeRootConfigKey = "RUNTIME_ROOT"
DataRootConfigKey = "DATA_ROOT"
BuildahFormatConfigKey = "BUILDAH_FORMAT"
BuildahLogLevelConfigKey = "BUILDAH_LOG_LEVEL"
ContainerStorageConfEnvKey = "CONTAINERS_STORAGE_CONF"
ScpChecksumConfigKey = "SCP_CHECKSUM"
RegistrySyncExperimentalConfigKey = "REGISTRY_SYNC_EXPERIMENTAL"
PromptConfigKey = "PROMPT"
RuntimeRootConfigKey = "RUNTIME_ROOT"
DataRootConfigKey = "DATA_ROOT"
BuildahFormatConfigKey = "BUILDAH_FORMAT"
BuildahLogLevelConfigKey = "BUILDAH_LOG_LEVEL"
ContainerStorageConfEnvKey = "CONTAINERS_STORAGE_CONF"
ScpChecksumConfigKey = "SCP_CHECKSUM"
)

func (*envSystemConfig) getValueOrDefault(key string) (*ConfigOption, error) {
Expand Down
Loading