Skip to content

Commit

Permalink
adds manager
Browse files Browse the repository at this point in the history
  • Loading branch information
amir20 committed Feb 7, 2025
1 parent 9dcbd8e commit e639678
Show file tree
Hide file tree
Showing 10 changed files with 416 additions and 17 deletions.
20 changes: 18 additions & 2 deletions examples/k8s.dozzle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,32 @@ spec:
- name: DOZZLE_LEVEL
value: "debug"
---
# headless-service.yaml
apiVersion: v1
kind: Service
metadata:
name: dozzle-headless
labels:
app: nginx
spec:
clusterIP: None
selector:
app: dozzle
ports:
- name: internal
port: 7007
targetPort: 7007
---
# service.yaml
apiVersion: v1
kind: Service
metadata:
name: dozzle-service
spec:
type: LoadBalancer
type: NodePort
selector:
app: dozzle
ports:
- port: 8080
- port: 8081
targetPort: 8080
protocol: TCP
6 changes: 3 additions & 3 deletions internal/agent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ import (
)

type server struct {
client *docker.DockerClient
client container.Client
store *container.ContainerStore
version string

pb.UnimplementedAgentServiceServer
}

func newServer(client *docker.DockerClient, dozzleVersion string, filter container.ContainerFilter) pb.AgentServiceServer {
func newServer(client container.Client, dozzleVersion string, filter container.ContainerFilter) pb.AgentServiceServer {
return &server{
client: client,
version: dozzleVersion,
Expand Down Expand Up @@ -325,7 +325,7 @@ func (s *server) ContainerAction(ctx context.Context, in *pb.ContainerActionRequ
return &pb.ContainerActionResponse{}, nil
}

func NewServer(client *docker.DockerClient, certificates tls.Certificate, dozzleVersion string, filter container.ContainerFilter) (*grpc.Server, error) {
func NewServer(client container.Client, certificates tls.Certificate, dozzleVersion string, filter container.ContainerFilter) (*grpc.Server, error) {
caCertPool := x509.NewCertPool()
c, err := x509.ParseCertificate(certificates.Certificate[0])
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package docker_support
package container_support

import (
"context"
Expand All @@ -8,7 +8,6 @@ import (

"github.com/amir20/dozzle/internal/agent"
"github.com/amir20/dozzle/internal/container"
container_support "github.com/amir20/dozzle/internal/support/container"
"github.com/rs/zerolog/log"
)

Expand All @@ -17,7 +16,7 @@ type agentService struct {
host container.Host
}

func NewAgentService(client *agent.Client) container_support.ClientService {
func NewAgentService(client *agent.Client) ClientService {
return &agentService{
client: client,
}
Expand Down
4 changes: 2 additions & 2 deletions internal/support/docker/docker_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/amir20/dozzle/internal/container"
"github.com/amir20/dozzle/internal/docker"
container_support "github.com/amir20/dozzle/internal/support/container"

"github.com/docker/docker/pkg/stdcopy"
"github.com/rs/zerolog/log"
)
Expand All @@ -17,7 +17,7 @@ type DockerClientService struct {
store *container.ContainerStore
}

func NewDockerClientService(client *docker.DockerClient, filter container.ContainerFilter) container_support.ClientService {
func NewDockerClientService(client *docker.DockerClient, filter container.ContainerFilter) *DockerClientService {
return &DockerClientService{
client: client,
store: container.NewContainerStore(context.Background(), client, filter),
Expand Down
161 changes: 161 additions & 0 deletions internal/support/docker/multi_host_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package docker_support

import (
"context"
"fmt"
"time"

"github.com/amir20/dozzle/internal/container"
container_support "github.com/amir20/dozzle/internal/support/container"
"github.com/rs/zerolog/log"
)

type ContainerFilter = func(*container.Container) bool

type HostUnavailableError struct {
Host container.Host
Err error
}

func (h *HostUnavailableError) Error() string {
return fmt.Sprintf("host %s unavailable: %v", h.Host.ID, h.Err)
}

type ClientManager interface {
Find(id string) (container_support.ClientService, bool)
List() []container_support.ClientService
RetryAndList() ([]container_support.ClientService, []error)
Subscribe(ctx context.Context, channel chan<- container.Host)
Hosts(ctx context.Context) []container.Host
LocalClients() []container.Client
}

type MultiHostService struct {
manager ClientManager
timeout time.Duration
}

func NewMultiHostService(manager ClientManager, timeout time.Duration) *MultiHostService {
m := &MultiHostService{
manager: manager,
timeout: timeout,
}

return m
}

func (m *MultiHostService) FindContainer(host string, id string, filter container.ContainerFilter) (*container_support.ContainerService, error) {
client, ok := m.manager.Find(host)
if !ok {
return nil, fmt.Errorf("host %s not found", host)
}
ctx, cancel := context.WithTimeout(context.Background(), m.timeout)
defer cancel()
container, err := client.FindContainer(ctx, id, filter)
if err != nil {
return nil, err
}

return container_support.NewContainerService(client, container), nil
}

func (m *MultiHostService) ListContainersForHost(host string, filter container.ContainerFilter) ([]container.Container, error) {
client, ok := m.manager.Find(host)
if !ok {
return nil, fmt.Errorf("host %s not found", host)
}
ctx, cancel := context.WithTimeout(context.Background(), m.timeout)
defer cancel()

return client.ListContainers(ctx, filter)
}

func (m *MultiHostService) ListAllContainers(filter container.ContainerFilter) ([]container.Container, []error) {
containers := make([]container.Container, 0)
clients, errors := m.manager.RetryAndList()

for _, client := range clients {
ctx, cancel := context.WithTimeout(context.Background(), m.timeout)
defer cancel()
list, err := client.ListContainers(ctx, filter)
if err != nil {
host, _ := client.Host(ctx)
log.Debug().Err(err).Str("host", host.Name).Msg("error listing containers")
host.Available = false
errors = append(errors, &HostUnavailableError{Host: host, Err: err})
continue
}

containers = append(containers, list...)
}

return containers, errors
}

func (m *MultiHostService) ListAllContainersFiltered(userFilter container.ContainerFilter, filter ContainerFilter) ([]container.Container, []error) {
containers, err := m.ListAllContainers(userFilter)
filtered := make([]container.Container, 0, len(containers))
for _, container := range containers {
if filter(&container) {
filtered = append(filtered, container)
}
}
return filtered, err
}

func (m *MultiHostService) SubscribeEventsAndStats(ctx context.Context, events chan<- container.ContainerEvent, stats chan<- container.ContainerStat) {
for _, client := range m.manager.List() {
client.SubscribeEvents(ctx, events)
client.SubscribeStats(ctx, stats)
}
}

func (m *MultiHostService) SubscribeContainersStarted(ctx context.Context, containers chan<- container.Container, filter ContainerFilter) {
newContainers := make(chan container.Container)
for _, client := range m.manager.List() {
client.SubscribeContainersStarted(ctx, newContainers)
}
go func() {
<-ctx.Done()
close(newContainers)
}()

go func() {
for container := range newContainers {
if filter(&container) {
select {
case containers <- container:
case <-ctx.Done():
return
}
}
}
}()
}

func (m *MultiHostService) TotalClients() int {
return len(m.manager.List())
}

func (m *MultiHostService) Hosts() []container.Host {
ctx, cancel := context.WithTimeout(context.Background(), m.timeout)
defer cancel()
return m.manager.Hosts(ctx)
}

func (m *MultiHostService) LocalHost() (container.Host, error) {
for _, host := range m.Hosts() {
if host.Type == "local" {
return host, nil
}
}
return container.Host{}, fmt.Errorf("local host not found")
}

func (m *MultiHostService) SubscribeAvailableHosts(ctx context.Context, hosts chan<- container.Host) {
m.manager.Subscribe(ctx, hosts)
}

func (m *MultiHostService) LocalClients() []container.Client {
return m.manager.LocalClients()
}
4 changes: 2 additions & 2 deletions internal/support/docker/retriable_client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewRetriableClientManager(agents []string, timeout time.Duration, certs tls
if _, ok := clientMap[host.ID]; ok {
log.Warn().Str("host", host.Name).Str("id", host.ID).Msg("duplicate host with same ID found")
} else {
clientMap[host.ID] = NewAgentService(agent)
clientMap[host.ID] = container_support.NewAgentService(agent)
}
}

Expand Down Expand Up @@ -112,7 +112,7 @@ func (m *RetriableClientManager) RetryAndList() ([]container_support.ClientServi
continue
}

m.clients[host.ID] = NewAgentService(agent)
m.clients[host.ID] = container_support.NewAgentService(agent)
m.subscribers.Range(func(ctx context.Context, channel chan<- container.Host) bool {
host.Available = true

Expand Down
2 changes: 1 addition & 1 deletion internal/support/docker/swarm_client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (m *SwarmClientManager) RetryAndList() ([]container_support.ClientService,
continue
}

client := NewAgentService(agent)
client := container_support.NewAgentService(agent)
m.clients[host.ID] = client
log.Info().Stringer("ip", ip).Str("id", host.ID).Str("name", host.Name).Msg("added new swarm agent")

Expand Down
Loading

0 comments on commit e639678

Please sign in to comment.