From e639678a32332d4f144520013dece863c8960846 Mon Sep 17 00:00:00 2001 From: Amir Raminfar Date: Fri, 7 Feb 2025 08:59:15 -0800 Subject: [PATCH] adds manager --- examples/k8s.dozzle.yml | 20 +- internal/agent/server.go | 6 +- .../{docker => container}/agent_service.go | 5 +- internal/support/docker/docker_service.go | 4 +- internal/support/docker/multi_host_service.go | 161 ++++++++++++++ .../docker/retriable_client_manager.go | 4 +- .../support/docker/swarm_client_manager.go | 2 +- internal/support/k8s/k8s_cluster_manager.go | 206 ++++++++++++++++++ internal/support/k8s/k8s_service.go | 2 +- main.go | 23 +- 10 files changed, 416 insertions(+), 17 deletions(-) rename internal/support/{docker => container}/agent_service.go (93%) create mode 100644 internal/support/docker/multi_host_service.go create mode 100644 internal/support/k8s/k8s_cluster_manager.go diff --git a/examples/k8s.dozzle.yml b/examples/k8s.dozzle.yml index d3d494dd7382..c25b4cd23d2d 100644 --- a/examples/k8s.dozzle.yml +++ b/examples/k8s.dozzle.yml @@ -55,16 +55,32 @@ spec: - name: DOZZLE_LEVEL value: "debug" --- +# headless-service.yaml +apiVersion: v1 +kind: Service +metadata: + name: dozzle-headless + labels: + app: nginx +spec: + clusterIP: None + selector: + app: dozzle + ports: + - name: internal + port: 7007 + targetPort: 7007 +--- # service.yaml apiVersion: v1 kind: Service metadata: name: dozzle-service spec: - type: LoadBalancer + type: NodePort selector: app: dozzle ports: - - port: 8080 + - port: 8081 targetPort: 8080 protocol: TCP diff --git a/internal/agent/server.go b/internal/agent/server.go index 5bef105d3fee..03d3a7089674 100644 --- a/internal/agent/server.go +++ b/internal/agent/server.go @@ -26,14 +26,14 @@ import ( ) type server struct { - client *docker.DockerClient + client container.Client store *container.ContainerStore version string pb.UnimplementedAgentServiceServer } -func newServer(client *docker.DockerClient, dozzleVersion string, filter container.ContainerFilter) pb.AgentServiceServer { +func newServer(client container.Client, dozzleVersion string, filter container.ContainerFilter) pb.AgentServiceServer { return &server{ client: client, version: dozzleVersion, @@ -325,7 +325,7 @@ func (s *server) ContainerAction(ctx context.Context, in *pb.ContainerActionRequ return &pb.ContainerActionResponse{}, nil } -func NewServer(client *docker.DockerClient, certificates tls.Certificate, dozzleVersion string, filter container.ContainerFilter) (*grpc.Server, error) { +func NewServer(client container.Client, certificates tls.Certificate, dozzleVersion string, filter container.ContainerFilter) (*grpc.Server, error) { caCertPool := x509.NewCertPool() c, err := x509.ParseCertificate(certificates.Certificate[0]) if err != nil { diff --git a/internal/support/docker/agent_service.go b/internal/support/container/agent_service.go similarity index 93% rename from internal/support/docker/agent_service.go rename to internal/support/container/agent_service.go index e1761ed5a106..9482e88ba63c 100644 --- a/internal/support/docker/agent_service.go +++ b/internal/support/container/agent_service.go @@ -1,4 +1,4 @@ -package docker_support +package container_support import ( "context" @@ -8,7 +8,6 @@ import ( "github.com/amir20/dozzle/internal/agent" "github.com/amir20/dozzle/internal/container" - container_support "github.com/amir20/dozzle/internal/support/container" "github.com/rs/zerolog/log" ) @@ -17,7 +16,7 @@ type agentService struct { host container.Host } -func NewAgentService(client *agent.Client) container_support.ClientService { +func NewAgentService(client *agent.Client) ClientService { return &agentService{ client: client, } diff --git a/internal/support/docker/docker_service.go b/internal/support/docker/docker_service.go index bd6d6b8b3c76..b28516d5767d 100644 --- a/internal/support/docker/docker_service.go +++ b/internal/support/docker/docker_service.go @@ -7,7 +7,7 @@ import ( "github.com/amir20/dozzle/internal/container" "github.com/amir20/dozzle/internal/docker" - container_support "github.com/amir20/dozzle/internal/support/container" + "github.com/docker/docker/pkg/stdcopy" "github.com/rs/zerolog/log" ) @@ -17,7 +17,7 @@ type DockerClientService struct { store *container.ContainerStore } -func NewDockerClientService(client *docker.DockerClient, filter container.ContainerFilter) container_support.ClientService { +func NewDockerClientService(client *docker.DockerClient, filter container.ContainerFilter) *DockerClientService { return &DockerClientService{ client: client, store: container.NewContainerStore(context.Background(), client, filter), diff --git a/internal/support/docker/multi_host_service.go b/internal/support/docker/multi_host_service.go new file mode 100644 index 000000000000..12e2ae7fff79 --- /dev/null +++ b/internal/support/docker/multi_host_service.go @@ -0,0 +1,161 @@ +package docker_support + +import ( + "context" + "fmt" + "time" + + "github.com/amir20/dozzle/internal/container" + container_support "github.com/amir20/dozzle/internal/support/container" + "github.com/rs/zerolog/log" +) + +type ContainerFilter = func(*container.Container) bool + +type HostUnavailableError struct { + Host container.Host + Err error +} + +func (h *HostUnavailableError) Error() string { + return fmt.Sprintf("host %s unavailable: %v", h.Host.ID, h.Err) +} + +type ClientManager interface { + Find(id string) (container_support.ClientService, bool) + List() []container_support.ClientService + RetryAndList() ([]container_support.ClientService, []error) + Subscribe(ctx context.Context, channel chan<- container.Host) + Hosts(ctx context.Context) []container.Host + LocalClients() []container.Client +} + +type MultiHostService struct { + manager ClientManager + timeout time.Duration +} + +func NewMultiHostService(manager ClientManager, timeout time.Duration) *MultiHostService { + m := &MultiHostService{ + manager: manager, + timeout: timeout, + } + + return m +} + +func (m *MultiHostService) FindContainer(host string, id string, filter container.ContainerFilter) (*container_support.ContainerService, error) { + client, ok := m.manager.Find(host) + if !ok { + return nil, fmt.Errorf("host %s not found", host) + } + ctx, cancel := context.WithTimeout(context.Background(), m.timeout) + defer cancel() + container, err := client.FindContainer(ctx, id, filter) + if err != nil { + return nil, err + } + + return container_support.NewContainerService(client, container), nil +} + +func (m *MultiHostService) ListContainersForHost(host string, filter container.ContainerFilter) ([]container.Container, error) { + client, ok := m.manager.Find(host) + if !ok { + return nil, fmt.Errorf("host %s not found", host) + } + ctx, cancel := context.WithTimeout(context.Background(), m.timeout) + defer cancel() + + return client.ListContainers(ctx, filter) +} + +func (m *MultiHostService) ListAllContainers(filter container.ContainerFilter) ([]container.Container, []error) { + containers := make([]container.Container, 0) + clients, errors := m.manager.RetryAndList() + + for _, client := range clients { + ctx, cancel := context.WithTimeout(context.Background(), m.timeout) + defer cancel() + list, err := client.ListContainers(ctx, filter) + if err != nil { + host, _ := client.Host(ctx) + log.Debug().Err(err).Str("host", host.Name).Msg("error listing containers") + host.Available = false + errors = append(errors, &HostUnavailableError{Host: host, Err: err}) + continue + } + + containers = append(containers, list...) + } + + return containers, errors +} + +func (m *MultiHostService) ListAllContainersFiltered(userFilter container.ContainerFilter, filter ContainerFilter) ([]container.Container, []error) { + containers, err := m.ListAllContainers(userFilter) + filtered := make([]container.Container, 0, len(containers)) + for _, container := range containers { + if filter(&container) { + filtered = append(filtered, container) + } + } + return filtered, err +} + +func (m *MultiHostService) SubscribeEventsAndStats(ctx context.Context, events chan<- container.ContainerEvent, stats chan<- container.ContainerStat) { + for _, client := range m.manager.List() { + client.SubscribeEvents(ctx, events) + client.SubscribeStats(ctx, stats) + } +} + +func (m *MultiHostService) SubscribeContainersStarted(ctx context.Context, containers chan<- container.Container, filter ContainerFilter) { + newContainers := make(chan container.Container) + for _, client := range m.manager.List() { + client.SubscribeContainersStarted(ctx, newContainers) + } + go func() { + <-ctx.Done() + close(newContainers) + }() + + go func() { + for container := range newContainers { + if filter(&container) { + select { + case containers <- container: + case <-ctx.Done(): + return + } + } + } + }() +} + +func (m *MultiHostService) TotalClients() int { + return len(m.manager.List()) +} + +func (m *MultiHostService) Hosts() []container.Host { + ctx, cancel := context.WithTimeout(context.Background(), m.timeout) + defer cancel() + return m.manager.Hosts(ctx) +} + +func (m *MultiHostService) LocalHost() (container.Host, error) { + for _, host := range m.Hosts() { + if host.Type == "local" { + return host, nil + } + } + return container.Host{}, fmt.Errorf("local host not found") +} + +func (m *MultiHostService) SubscribeAvailableHosts(ctx context.Context, hosts chan<- container.Host) { + m.manager.Subscribe(ctx, hosts) +} + +func (m *MultiHostService) LocalClients() []container.Client { + return m.manager.LocalClients() +} diff --git a/internal/support/docker/retriable_client_manager.go b/internal/support/docker/retriable_client_manager.go index 50b98ac0d1a7..37b35f8385ce 100644 --- a/internal/support/docker/retriable_client_manager.go +++ b/internal/support/docker/retriable_client_manager.go @@ -66,7 +66,7 @@ func NewRetriableClientManager(agents []string, timeout time.Duration, certs tls if _, ok := clientMap[host.ID]; ok { log.Warn().Str("host", host.Name).Str("id", host.ID).Msg("duplicate host with same ID found") } else { - clientMap[host.ID] = NewAgentService(agent) + clientMap[host.ID] = container_support.NewAgentService(agent) } } @@ -112,7 +112,7 @@ func (m *RetriableClientManager) RetryAndList() ([]container_support.ClientServi continue } - m.clients[host.ID] = NewAgentService(agent) + m.clients[host.ID] = container_support.NewAgentService(agent) m.subscribers.Range(func(ctx context.Context, channel chan<- container.Host) bool { host.Available = true diff --git a/internal/support/docker/swarm_client_manager.go b/internal/support/docker/swarm_client_manager.go index 0a35d762d675..22022eb271f7 100644 --- a/internal/support/docker/swarm_client_manager.go +++ b/internal/support/docker/swarm_client_manager.go @@ -158,7 +158,7 @@ func (m *SwarmClientManager) RetryAndList() ([]container_support.ClientService, continue } - client := NewAgentService(agent) + client := container_support.NewAgentService(agent) m.clients[host.ID] = client log.Info().Stringer("ip", ip).Str("id", host.ID).Str("name", host.Name).Msg("added new swarm agent") diff --git a/internal/support/k8s/k8s_cluster_manager.go b/internal/support/k8s/k8s_cluster_manager.go new file mode 100644 index 000000000000..15d006eb183a --- /dev/null +++ b/internal/support/k8s/k8s_cluster_manager.go @@ -0,0 +1,206 @@ +package support_k8s + +import ( + "context" + "crypto/tls" + "fmt" + "net" + "sync" + "time" + + "github.com/amir20/dozzle/internal/agent" + "github.com/amir20/dozzle/internal/container" + "github.com/amir20/dozzle/internal/k8s" + container_support "github.com/amir20/dozzle/internal/support/container" + + "github.com/puzpuzpuz/xsync/v3" + "github.com/samber/lo" + lop "github.com/samber/lo/parallel" + + "github.com/rs/zerolog/log" +) + +type K8sClusterManager struct { + clients map[string]container_support.ClientService + certs tls.Certificate + mu sync.RWMutex + subscribers *xsync.MapOf[context.Context, chan<- container.Host] + localClient container.Client + localIPs []string + timeout time.Duration +} + +func localIPs() []string { + addrs, err := net.InterfaceAddrs() + if err != nil { + return []string{} + } + + ips := make([]string, 0) + for _, address := range addrs { + if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { + if ipnet.IP.To4() != nil { + ips = append(ips, ipnet.IP.String()) + } + } + } + return ips +} + +func NewK8sClusterManager(localClient *k8s.K8sClient, certs tls.Certificate, timeout time.Duration, filter container.ContainerFilter) *K8sClusterManager { + clientMap := make(map[string]container_support.ClientService) + localService := NewK8sClientService(localClient, filter) + clientMap[localClient.Host().ID] = localService + + return &K8sClusterManager{ + localClient: localClient, + clients: clientMap, + certs: certs, + subscribers: xsync.NewMapOf[context.Context, chan<- container.Host](), + localIPs: localIPs(), + timeout: timeout, + } +} + +func (m *K8sClusterManager) Subscribe(ctx context.Context, channel chan<- container.Host) { + m.subscribers.Store(ctx, channel) + + go func() { + <-ctx.Done() + m.subscribers.Delete(ctx) + }() +} + +func (m *K8sClusterManager) RetryAndList() ([]container_support.ClientService, []error) { + m.mu.Lock() + + ips, err := net.LookupIP(fmt.Sprintf("%s.default.svc.cluster.local", "dozzle-headless")) + + errors := make([]error, 0) + if err != nil { + log.Fatal().Err(err).Msg("error looking up swarm service tasks") + errors = append(errors, err) + m.mu.Unlock() + return m.List(), errors + } + + clients := lo.Values(m.clients) + endpoints := lo.KeyBy(clients, func(client container_support.ClientService) string { + ctx, cancel := context.WithTimeout(context.Background(), m.timeout) + defer cancel() + host, _ := client.Host(ctx) + return host.Endpoint + }) + + ipStrings := lo.Map(ips, func(ip net.IP, _ int) string { + return ip.String() + }) + + log.Debug().Strs(fmt.Sprintf("%s.default.svc.cluster.local", "dozzle-headless"), ipStrings).Strs("localIPs", m.localIPs).Strs("clients.endpoints", lo.Keys(endpoints)).Msg("found swarm service tasks") + + for _, ip := range ips { + if lo.Contains(m.localIPs, ip.String()) { + log.Debug().Stringer("ip", ip).Msg("skipping local IP") + continue + } + + if _, ok := endpoints[ip.String()+":7007"]; ok { + log.Debug().Stringer("ip", ip).Msg("skipping existing client") + continue + } + + agent, err := agent.NewClient(ip.String()+":7007", m.certs) + if err != nil { + log.Warn().Err(err).Stringer("ip", ip).Msg("error creating agent client") + errors = append(errors, err) + continue + } + + ctx, cancel := context.WithTimeout(context.Background(), m.timeout) + defer cancel() + host, err := agent.Host(ctx) + if err != nil { + log.Warn().Err(err).Stringer("ip", ip).Msg("error getting host from agent client") + errors = append(errors, err) + if err := agent.Close(); err != nil { + log.Warn().Err(err).Stringer("ip", ip).Msg("error closing agent client") + } + continue + } + + if host.ID == m.localClient.Host().ID { + log.Debug().Stringer("ip", ip).Msg("skipping local client") + if err := agent.Close(); err != nil { + log.Warn().Err(err).Stringer("ip", ip).Msg("error closing agent client") + } + continue + } + + client := container_support.NewAgentService(agent) + m.clients[host.ID] = client + log.Info().Stringer("ip", ip).Str("id", host.ID).Str("name", host.Name).Msg("added new swarm agent") + + m.subscribers.Range(func(ctx context.Context, channel chan<- container.Host) bool { + host.Available = true + host.Type = "swarm" + + // We don't want to block the subscribers in event.go + go func() { + select { + case channel <- host: + case <-ctx.Done(): + } + }() + + return true + }) + } + + m.mu.Unlock() + + return m.List(), errors +} + +func (m *K8sClusterManager) List() []container_support.ClientService { + m.mu.RLock() + defer m.mu.RUnlock() + return lo.Values(m.clients) +} + +func (m *K8sClusterManager) Find(id string) (container_support.ClientService, bool) { + m.mu.RLock() + defer m.mu.RUnlock() + + client, ok := m.clients[id] + + return client, ok +} + +func (m *K8sClusterManager) Hosts(ctx context.Context) []container.Host { + m.mu.RLock() + clients := lo.Values(m.clients) + m.mu.RUnlock() + + clusterNodes := lop.Map(clients, func(client container_support.ClientService, _ int) container.Host { + host, err := client.Host(ctx) + if err != nil { + log.Warn().Err(err).Str("id", host.ID).Msg("error getting host from client") + host.Available = false + } else { + host.Available = true + } + host.Type = "swarm" + + return host + }) + + return clusterNodes +} + +func (m *K8sClusterManager) String() string { + return fmt.Sprintf("K8sClusterManager{clients: %d}", len(m.clients)) +} + +func (m *K8sClusterManager) LocalClients() []container.Client { + return []container.Client{m.localClient} +} diff --git a/internal/support/k8s/k8s_service.go b/internal/support/k8s/k8s_service.go index 53196f8dbb32..cc4f373bcf85 100644 --- a/internal/support/k8s/k8s_service.go +++ b/internal/support/k8s/k8s_service.go @@ -1,4 +1,4 @@ -package k8s_support +package support_k8s import ( "context" diff --git a/main.go b/main.go index 515400f8e813..5d595be8113e 100644 --- a/main.go +++ b/main.go @@ -23,7 +23,7 @@ import ( "github.com/amir20/dozzle/internal/support/cli" container_support "github.com/amir20/dozzle/internal/support/container" docker_support "github.com/amir20/dozzle/internal/support/docker" - k8s_support "github.com/amir20/dozzle/internal/support/k8s" + support_k8s "github.com/amir20/dozzle/internal/support/k8s" "github.com/amir20/dozzle/internal/web" "github.com/rs/zerolog/log" ) @@ -197,7 +197,7 @@ func main() { } go cli.StartEvent(args, "swarm", localClient, "") go func() { - log.Info().Msgf("Dozzle agent version %s", args.Version()) + log.Info().Msgf("Dozzle agent version in swarm mode %s", args.Version()) if err := server.Serve(listener); err != nil { log.Error().Err(err).Msg("failed to serve") } @@ -213,8 +213,25 @@ func main() { log.Fatal().Err(err).Msg("Could not read certificates") } - manager := docker_support.NewRetriableClientManager(args.RemoteAgent, args.Timeout, certs, k8s_support.NewK8sClientService(localClient, args.Filter)) + manager := support_k8s.NewK8sClusterManager(localClient, certs, args.Timeout, args.Filter) multiHostService = container_support.NewMultiHostService(manager, args.Timeout) + log.Info().Msg("Starting in k8s mode") + listener, err := net.Listen("tcp", ":7007") + if err != nil { + log.Fatal().Err(err).Msg("failed to listen") + } + server, err := agent.NewServer(localClient, certs, args.Version(), args.Filter) + if err != nil { + log.Fatal().Err(err).Msg("failed to create agent") + } + go cli.StartEvent(args, "k8s", localClient, "") + go func() { + log.Info().Msgf("Dozzle agent version %s in k8s mode", args.Version()) + if err := server.Serve(listener); err != nil { + log.Error().Err(err).Msg("failed to serve") + } + }() + } else { log.Fatal().Str("mode", args.Mode).Msg("Invalid mode") }