Skip to content

Commit

Permalink
API: Sandbox routing: Use Redis for client-proxy DNS instead of local…
Browse files Browse the repository at this point in the history
… map.
  • Loading branch information
jaytaylor committed Jan 17, 2025
1 parent 8d1235f commit c942f6b
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 45 deletions.
14 changes: 12 additions & 2 deletions .terraform.lock.hcl

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

87 changes: 64 additions & 23 deletions packages/api/internal/dns/server.go
Original file line number Diff line number Diff line change
@@ -1,48 +1,81 @@
package dns

import (
"context"
"fmt"
"log"
"net"
"strings"
"sync"

redis "github.com/go-redis/redis/v8"
resolver "github.com/miekg/dns"

"github.com/e2b-dev/infra/packages/shared/pkg/smap"
"go.uber.org/zap"
)

const ttl = 0

const defaultRoutingIP = "127.0.0.1"

type FallbackResolverFn = func(sandboxID string) (string, bool)

type DNS struct {
mu sync.Mutex
records *smap.Map[string]
ctx context.Context
rdb *redis.Client
fallbackResolverFn FallbackResolverFn
logger *zap.SugaredLogger
}

func New() *DNS {
func New(ctx context.Context, rdbOpts *redis.Options, fallbackResolverFn FallbackResolverFn, logger *zap.SugaredLogger) *DNS {
return &DNS{
records: smap.New[string](),
ctx: ctx,
rdb: redis.NewClient(rdbOpts),
fallbackResolverFn: fallbackResolverFn,
logger: logger,
}
}

func (d *DNS) Add(sandboxID, ip string) {
d.records.Insert(d.hostname(sandboxID), ip)
func (d *DNS) Add(sandboxID, ip string) error {
d.logger.Infof("DNS: Adding entry, sandboxID=%s -> %s", sandboxID, ip)
if err := d.rdb.Set(d.ctx, d.dnsKeyFor(sandboxID), ip, 86400).Err(); err != nil {
return err
}
return nil
}

func (d *DNS) Remove(sandboxID, ip string) {
d.records.RemoveCb(d.hostname(sandboxID), func(key string, v string, exists bool) bool {
return v == ip
})
func (d *DNS) Remove(sandboxID string) error {
d.logger.Infof("DNS: Removing entry, sandboxID=%s", sandboxID)
if err := d.rdb.Del(d.ctx, d.dnsKeyFor(sandboxID)).Err(); err != nil {
return err
}
return nil
}

func (d *DNS) get(hostname string) (string, bool) {
return d.records.Get(hostname)
func (d *DNS) get(sandboxID string) (string, bool) {
res, err := d.rdb.Get(d.ctx, d.dnsKeyFor(sandboxID)).Result()
if err == nil {
return res, true
}
if err != redis.Nil {
d.logger.Warnf("DNS: Redis error getting key for sandbox '%s' (will try fallback resolver..): %s", sandboxID, err)
}

if d.fallbackResolverFn != nil {
if rec, ok := d.fallbackResolverFn(sandboxID); ok {
d.logger.Infof("DNS: Not found in redis, using fallback lookup for sandbox '%s' succeeded: record=%q", sandboxID, rec)
go func() {
if err := d.Add(sandboxID, rec); err != nil {
d.logger.Errorf("DNS: Problem adding entry: %s", err)
}
}()
return rec, true
} else {
d.logger.Errorf("DNS: Fallback lookup for sandbox '%s' failed", sandboxID)
}
}
return "", false
}

func (*DNS) hostname(sandboxID string) string {
return fmt.Sprintf("%s.", sandboxID)
func (d *DNS) dnsKeyFor(sandboxID string) string {
return fmt.Sprintf("dns.%s", sandboxID)
}

func (d *DNS) handleDNSRequest(w resolver.ResponseWriter, r *resolver.Msg) {
Expand All @@ -63,8 +96,12 @@ func (d *DNS) handleDNSRequest(w resolver.ResponseWriter, r *resolver.Msg) {
}

sandboxID := strings.Split(q.Name, "-")[0]
ip, found := d.get(sandboxID)
if found {
// Trim trailing period to facilitate key consistency.
if strings.HasSuffix(sandboxID, ".") {
sandboxID = sandboxID[0 : len(sandboxID)-1]
}

if ip, found := d.get(sandboxID); found {
a.A = net.ParseIP(ip).To4()
} else {
a.A = net.ParseIP(defaultRoutingIP).To4()
Expand All @@ -76,7 +113,7 @@ func (d *DNS) handleDNSRequest(w resolver.ResponseWriter, r *resolver.Msg) {

err := w.WriteMsg(m)
if err != nil {
log.Printf("Failed to write message: %s\n", err.Error())
d.logger.Errorf("DNS: Failed to write message: %w", err)
}
}

Expand All @@ -85,11 +122,15 @@ func (d *DNS) Start(address string, port int) error {

mux.HandleFunc(".", d.handleDNSRequest)

server := resolver.Server{Addr: fmt.Sprintf("%s:%d", address, port), Net: "udp", Handler: mux}
server := resolver.Server{
Addr: fmt.Sprintf("%s:%d", address, port),
Net: "udp",
Handler: mux,
}

err := server.ListenAndServe()
if err != nil {
return fmt.Errorf("failed to start DNS server: %w", err)
return fmt.Errorf("DNS: failed to start server: %w", err)
}

return nil
Expand Down
8 changes: 6 additions & 2 deletions packages/api/internal/orchestrator/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ func (o *Orchestrator) getDeleteInstanceFunction(ctx context.Context, posthogCli
node.CPUUsage.Add(-info.VCpu)
node.RamUsage.Add(-info.RamMB)

o.dns.Remove(info.Instance.SandboxID, node.Info.IPAddress)
if err := o.dns.Remove(info.Instance.SandboxID); err != nil {
return err
}
}

req := &orchestrator.SandboxDeleteRequest{SandboxId: info.Instance.SandboxID}
Expand Down Expand Up @@ -179,7 +181,9 @@ func (o *Orchestrator) getInsertInstanceFunction(ctx context.Context, logger *za
node.CPUUsage.Add(info.VCpu)
node.RamUsage.Add(info.RamMB)

o.dns.Add(info.Instance.SandboxID, node.Info.IPAddress)
if err := o.dns.Add(info.Instance.SandboxID, node.Info.IPAddress); err != nil {
return err
}
}

_, err := o.analytics.Client.InstanceStarted(ctx, &analyticscollector.InstanceStartedEvent{
Expand Down
47 changes: 29 additions & 18 deletions packages/api/internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package orchestrator
import (
"context"
"errors"
"fmt"
"log"

redis "github.com/go-redis/redis/v8"
nomadapi "github.com/hashicorp/nomad/api"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
Expand Down Expand Up @@ -39,28 +39,12 @@ func New(
logger.Errorf("Error initializing Analytics client\n: %v", err)
}

dnsServer := dns.New()

if env.IsLocal() {
fmt.Printf("Running locally, skipping starting DNS server\n")
} else {
go func() {
fmt.Printf("Starting DNS server\n")

dnsErr := dnsServer.Start("127.0.0.4", 53)
if dnsErr != nil {
log.Fatalf("Failed running DNS server: %v\n", dnsErr)
}
}()
}

o := Orchestrator{
analytics: analyticsInstance,
nomadClient: nomadClient,
logger: logger,
tracer: tracer,
nodes: smap.New[*Node](),
dns: dnsServer,
}

cache := instance.NewCache(
Expand All @@ -72,9 +56,36 @@ func New(

o.instanceCache = cache

rdbOpts := &redis.Options{Addr: "127.0.0.1:6379"}

fallbackResolverFn := func(sandboxID string) (string, bool) {
for _, apiNode := range o.GetNodes() {
detail := o.GetNodeDetail(apiNode.NodeID)
for _, sb := range detail.Sandboxes {
if sandboxID == sb.SandboxID {
if node := o.GetNode(apiNode.NodeID); node != nil {
return node.Info.IPAddress, true
}
}
}
}
return "", false
}

o.dns = dns.New(ctx, rdbOpts, fallbackResolverFn, logger)

if env.IsLocal() {
logger.Info("Skipping syncing sandboxes, running locally")
logger.Info("Running locally, skipping starting DNS server")
logger.Info("Running locally, skipping syncing sandboxes")
} else {
go func() {
logger.Info("Starting DNS server")

if err := o.dns.Start("127.0.0.4", 53); err != nil {
log.Fatalf("Failed running DNS server: %v\n", err)
}
}()

go o.keepInSync(cache)
}

Expand Down
10 changes: 10 additions & 0 deletions packages/nomad/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ resource "nomad_job" "api" {
}
}

resource "nomad_job" "redis" {
jobspec = file("${path.module}/redis.hcl")

hcl2 {
vars = {
gcp_zone = var.gcp_zone
}
}
}

resource "nomad_job" "docker_reverse_proxy" {
jobspec = file("${path.module}/docker-reverse-proxy.hcl")

Expand Down
Loading

0 comments on commit c942f6b

Please sign in to comment.