Skip to content

Commit

Permalink
Update nat discovery to support IP family
Browse files Browse the repository at this point in the history
Current code assumes 1:1 mapping between endpoint to nat-discovery and
endpoint to submariner inter-cluster tunnel.
Since we decided that we continue using a single endpoint for each cluster
also for dual-stack, it is necessary to support mapping endpoint up to 2
nat-discovery and inter-cluster tunnels.

Additionally, nat-discovery and submariner-tunnel also need
to support IP family parameter, as both IPv4 and IPv6 should be supported.

This change update nat-discovery to support IP family,to make it clear,
only IPV4 nat-discovery is supported even after this change.

Signed-off-by: Yossi Boaron <[email protected]>
  • Loading branch information
yboaron committed Feb 20, 2025
1 parent 77c1733 commit eca94c9
Show file tree
Hide file tree
Showing 19 changed files with 237 additions and 124 deletions.
12 changes: 12 additions & 0 deletions pkg/apis/submariner.io/v1/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,15 @@ func (ep *EndpointSpec) GetPrivateIP(family k8snet.IPFamily) string {
func (ep *EndpointSpec) SetPrivateIP(ip string) {
ep.PrivateIPs, ep.PrivateIP = setIP(ep.PrivateIPs, ep.PrivateIP, ip)
}

func (ep *EndpointSpec) GetIPFamilies() [2]k8snet.IPFamily {
var ipFamilies [2]k8snet.IPFamily
// TODO_IPV6: set ipFamilies according to Subnets content
ipFamilies[0] = k8snet.IPv4

return ipFamilies
}

func (ep *EndpointSpec) GetFamilyCableName(family k8snet.IPFamily) string {
return ep.CableName + "-ipv" + string(family)
}
31 changes: 16 additions & 15 deletions pkg/cableengine/cableengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/submariner-io/submariner/pkg/natdiscovery"
"github.com/submariner-io/submariner/pkg/types"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8snet "k8s.io/utils/net"
logf "sigs.k8s.io/controller-runtime/pkg/log"

// Add supported drivers.
Expand All @@ -50,10 +51,10 @@ type Engine interface {
// InstallCable performs any set up work needed for connecting to given remote endpoint.
// Once InstallCable completes, it should be possible to connect to remote
// Pods or Services behind the given endpoint.
InstallCable(remote *v1.Endpoint) error
InstallCable(remote *v1.Endpoint, family k8snet.IPFamily) error
// RemoveCable disconnects the Engine from the given remote endpoint. Upon completion.
// remote Pods and Service may not be accessible anymore.
RemoveCable(remote *v1.Endpoint) error
RemoveCable(remote *v1.Endpoint, family k8snet.IPFamily) error
// ListCableConnections returns a list of cable connection, and the related status.
ListCableConnections() ([]v1.Connection, error)
// GetLocalEndpoint returns the local endpoint for this cable engine.
Expand Down Expand Up @@ -156,13 +157,13 @@ func (i *engine) installCableWithNATInfo(rnat *natdiscovery.NATEndpointInfo) err
i.Lock()
defer i.Unlock()

if _, ok := i.natDiscoveryPending[rnat.Endpoint.Spec.CableName]; !ok {
if _, ok := i.natDiscoveryPending[rnat.Endpoint.Spec.GetFamilyCableName(rnat.Family)]; !ok {
return nil
}

i.natDiscoveryPending[rnat.Endpoint.Spec.CableName]--
if i.natDiscoveryPending[rnat.Endpoint.Spec.CableName] == 0 {
delete(i.natDiscoveryPending, rnat.Endpoint.Spec.CableName)
i.natDiscoveryPending[rnat.Endpoint.Spec.GetFamilyCableName(rnat.Family)]--
if i.natDiscoveryPending[rnat.Endpoint.Spec.GetFamilyCableName(rnat.Family)] == 0 {
delete(i.natDiscoveryPending, rnat.Endpoint.Spec.GetFamilyCableName(rnat.Family))
}

if !i.running {
Expand Down Expand Up @@ -230,7 +231,7 @@ func (i *engine) installCableWithNATInfo(rnat *natdiscovery.NATEndpointInfo) err
return nil
}

func (i *engine) InstallCable(endpoint *v1.Endpoint) error {
func (i *engine) InstallCable(endpoint *v1.Endpoint, family k8snet.IPFamily) error {
if endpoint.Spec.ClusterID == i.localCluster.ID {
logger.V(log.TRACE).Infof("Not installing cable for local cluster")
return nil
Expand All @@ -242,28 +243,28 @@ func (i *engine) InstallCable(endpoint *v1.Endpoint) error {
}

i.Lock()
i.natDiscoveryPending[endpoint.Spec.CableName]++
i.natDiscoveryPending[endpoint.Spec.GetFamilyCableName(family)]++
i.Unlock()

i.natDiscovery.AddEndpoint(endpoint)
i.natDiscovery.AddEndpoint(endpoint, family)

return nil
}

func (i *engine) RemoveCable(endpoint *v1.Endpoint) error {
func (i *engine) RemoveCable(endpoint *v1.Endpoint, family k8snet.IPFamily) error {
if endpoint.Spec.ClusterID == i.localCluster.ID {
logger.V(log.DEBUG).Infof("Cables are not added/removed for the local cluster, skipping removal")
return nil
}

logger.Infof("Removing Endpoint cable %q", endpoint.Spec.CableName)
logger.Infof("Removing Endpoint IP%v cable %q", family, endpoint.Spec.CableName)

i.natDiscovery.RemoveEndpoint(endpoint.Spec.CableName)
i.natDiscovery.RemoveEndpoint(endpoint.Spec.GetFamilyCableName(family))

i.Lock()
defer i.Unlock()

delete(i.natDiscoveryPending, endpoint.Spec.CableName)
delete(i.natDiscoveryPending, endpoint.Spec.GetFamilyCableName(family))

if _, ok := i.installedCables[endpoint.Spec.CableName]; !ok {
return nil
Expand All @@ -274,9 +275,9 @@ func (i *engine) RemoveCable(endpoint *v1.Endpoint) error {
return errors.Wrapf(err, "error disconnecting Endpoint cable %q", endpoint.Spec.CableName)
}

delete(i.installedCables, endpoint.Spec.CableName)
delete(i.installedCables, endpoint.Spec.GetFamilyCableName(family))

logger.Infof("Successfully removed Endpoint cable %q", endpoint.Spec.CableName)
logger.Infof("Successfully removed IP%v Endpoint cable %q", family, endpoint.Spec.CableName)

return nil
}
Expand Down
33 changes: 17 additions & 16 deletions pkg/cableengine/cableengine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ var _ = Describe("Cable Engine", func() {
When("install cable for a remote endpoint", func() {
Context("and no endpoint was previously installed for the cluster", func() {
It("should connect to the endpoint", func() {
Expect(engine.InstallCable(remoteEndpoint)).To(Succeed())
Expect(engine.InstallCable(remoteEndpoint, k8snet.IPv4)).To(Succeed())
fakeDriver.AwaitConnectToEndpoint(natEndpointInfoFor(remoteEndpoint))
})
})
Expand All @@ -144,10 +144,10 @@ var _ = Describe("Cable Engine", func() {
})

JustBeforeEach(func() {
Expect(engine.InstallCable(prevEndpoint)).To(Succeed())
Expect(engine.InstallCable(prevEndpoint, k8snet.IPv4)).To(Succeed())
fakeDriver.AwaitConnectToEndpoint(natEndpointInfoFor(prevEndpoint))

Expect(engine.InstallCable(newEndpoint)).To(Succeed())
Expect(engine.InstallCable(newEndpoint, k8snet.IPv4)).To(Succeed())
})

testTimestamps := func() {
Expand Down Expand Up @@ -226,10 +226,10 @@ var _ = Describe("Cable Engine", func() {
CableName: "submariner-cable-other-1.1.1.1",
}}

Expect(engine.InstallCable(&otherEndpoint)).To(Succeed())
Expect(engine.InstallCable(&otherEndpoint, k8snet.IPv4)).To(Succeed())
fakeDriver.AwaitConnectToEndpoint(natEndpointInfoFor(&otherEndpoint))

Expect(engine.InstallCable(remoteEndpoint)).To(Succeed())
Expect(engine.InstallCable(remoteEndpoint, k8snet.IPv4)).To(Succeed())
fakeDriver.AwaitConnectToEndpoint(natEndpointInfoFor(remoteEndpoint))
fakeDriver.AwaitNoDisconnectFromEndpoint()
})
Expand All @@ -241,11 +241,11 @@ var _ = Describe("Cable Engine", func() {
})

It("should not connect to the endpoint", func() {
Expect(engine.InstallCable(remoteEndpoint)).To(Succeed())
Expect(engine.InstallCable(remoteEndpoint, k8snet.IPv4)).To(Succeed())
Eventually(natDiscovery.captureAddEndpoint).Should(Receive())

Expect(engine.RemoveCable(remoteEndpoint)).To(Succeed())
Eventually(natDiscovery.removeEndpoint).Should(Receive(Equal(remoteEndpoint.Spec.CableName)))
Expect(engine.RemoveCable(remoteEndpoint, k8snet.IPv4)).To(Succeed())
Eventually(natDiscovery.removeEndpoint).Should(Receive(Equal(remoteEndpoint.Spec.GetFamilyCableName(k8snet.IPv4))))
fakeDriver.AwaitNoDisconnectFromEndpoint()

natDiscovery.notifyReady(remoteEndpoint)
Expand All @@ -256,21 +256,21 @@ var _ = Describe("Cable Engine", func() {

When("install cable for a local endpoint", func() {
It("should not connect to the endpoint", func() {
Expect(engine.InstallCable(localEndpoint)).To(Succeed())
Expect(engine.InstallCable(localEndpoint, k8snet.IPv4)).To(Succeed())
fakeDriver.AwaitNoConnectToEndpoint()
})
})

When("remove cable for a remote endpoint", func() {
JustBeforeEach(func() {
Expect(engine.InstallCable(remoteEndpoint)).To(Succeed())
Expect(engine.InstallCable(remoteEndpoint, k8snet.IPv4)).To(Succeed())
fakeDriver.AwaitConnectToEndpoint(natEndpointInfoFor(remoteEndpoint))
})

It("should disconnect from the endpoint", func() {
Expect(engine.RemoveCable(remoteEndpoint)).To(Succeed())
Expect(engine.RemoveCable(remoteEndpoint, k8snet.IPv4)).To(Succeed())
fakeDriver.AwaitDisconnectFromEndpoint(&remoteEndpoint.Spec)
Eventually(natDiscovery.removeEndpoint).Should(Receive(Equal(remoteEndpoint.Spec.CableName)))
Eventually(natDiscovery.removeEndpoint).Should(Receive(Equal(remoteEndpoint.Spec.GetFamilyCableName(k8snet.IPv4))))
})

Context("and the driver fails to disconnect from the endpoint", func() {
Expand All @@ -279,19 +279,19 @@ var _ = Describe("Cable Engine", func() {
})

It("should return an error", func() {
Expect(engine.RemoveCable(remoteEndpoint)).To(HaveOccurred())
Expect(engine.RemoveCable(remoteEndpoint, k8snet.IPv4)).To(HaveOccurred())
})
})
})

When("remove cable for a local endpoint", func() {
JustBeforeEach(func() {
Expect(engine.InstallCable(remoteEndpoint)).To(Succeed())
Expect(engine.InstallCable(remoteEndpoint, k8snet.IPv4)).To(Succeed())
fakeDriver.AwaitConnectToEndpoint(natEndpointInfoFor(remoteEndpoint))
})

It("should not disconnect from the endpoint", func() {
Expect(engine.RemoveCable(localEndpoint)).To(Succeed())
Expect(engine.RemoveCable(localEndpoint, k8snet.IPv4)).To(Succeed())
fakeDriver.AwaitNoDisconnectFromEndpoint()
Consistently(natDiscovery.removeEndpoint).ShouldNot(Receive())
})
Expand Down Expand Up @@ -389,7 +389,7 @@ func (n *fakeNATDiscovery) Run(_ <-chan struct{}) error {
return nil
}

func (n *fakeNATDiscovery) AddEndpoint(endpoint *subv1.Endpoint) {
func (n *fakeNATDiscovery) AddEndpoint(endpoint *subv1.Endpoint, _ k8snet.IPFamily) {
if n.captureAddEndpoint != nil {
n.captureAddEndpoint <- endpoint
return
Expand All @@ -415,5 +415,6 @@ func natEndpointInfoFor(endpoint *subv1.Endpoint) *natdiscovery.NATEndpointInfo
UseIP: endpoint.Spec.GetPublicIP(k8snet.IPv4),
UseNAT: true,
Endpoint: *endpoint,
Family: k8snet.IPv4,
}
}
5 changes: 3 additions & 2 deletions pkg/cableengine/fake/cableengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/submariner-io/submariner/pkg/cableengine"
"github.com/submariner-io/submariner/pkg/natdiscovery"
"github.com/submariner-io/submariner/pkg/types"
k8snet "k8s.io/utils/net"
)

type Engine struct { //nolint:gocritic // This mutex is exposed but we tweak it in tests
Expand Down Expand Up @@ -62,7 +63,7 @@ func (e *Engine) StartEngine() error {
func (e *Engine) Stop() {
}

func (e *Engine) InstallCable(endpoint *v1.Endpoint) error {
func (e *Engine) InstallCable(endpoint *v1.Endpoint, _ k8snet.IPFamily) error {
err := e.ErrOnInstallCable
if err != nil {
e.ErrOnInstallCable = nil
Expand All @@ -74,7 +75,7 @@ func (e *Engine) InstallCable(endpoint *v1.Endpoint) error {
return nil
}

func (e *Engine) RemoveCable(endpoint *v1.Endpoint) error {
func (e *Engine) RemoveCable(endpoint *v1.Endpoint, _ k8snet.IPFamily) error {
err := e.ErrOnRemoveCable
if err != nil {
e.ErrOnRemoveCable = nil
Expand Down
53 changes: 44 additions & 9 deletions pkg/controllers/tunnel/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,39 @@ import (
v1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
"github.com/submariner-io/submariner/pkg/cableengine"
"k8s.io/apimachinery/pkg/runtime"
k8snet "k8s.io/utils/net"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

type controller struct {
engine cableengine.Engine
engine cableengine.Engine
localIPFamilies [2]k8snet.IPFamily
}

var logger = log.Logger{Logger: logf.Log.WithName("Tunnel")}

func findCommonIPFamilies(local, remote [2]k8snet.IPFamily) []k8snet.IPFamily {
common := []k8snet.IPFamily{}

for _, lf := range local {
for _, rf := range remote {
if lf == rf {
common = append(common, lf)
break
}
}
}

return common
}

func StartController(engine cableengine.Engine, namespace string, config *watcher.Config, stopCh <-chan struct{}) error {
logger.Info("Starting the tunnel controller")

c := &controller{engine: engine}

c.localIPFamilies = c.engine.GetLocalEndpoint().Spec.GetIPFamilies()

config.ResourceConfigs = []watcher.ResourceConfig{
{
Name: "Tunnel Controller",
Expand Down Expand Up @@ -76,26 +95,42 @@ func (c *controller) handleCreatedOrUpdatedEndpoint(obj runtime.Object, _ int) b

logger.V(log.TRACE).Infof("Tunnel controller processing added or updated submariner Endpoint object: %#v", endpoint)

err := c.engine.InstallCable(endpoint)
if err != nil {
logger.Errorf(err, "Error installing cable for Endpoint %#v", endpoint)
return true
commonIPFamilies := findCommonIPFamilies(c.localIPFamilies, endpoint.Spec.GetIPFamilies())

var errs []error

for _, family := range commonIPFamilies {
err := c.engine.InstallCable(endpoint, family)
if err != nil {
logger.Errorf(err, "Error installing IP%v cable for Endpoint %#v", family, endpoint)
errs = append(errs, err)
}
}

return false
return len(errs) > 0
}

func (c *controller) handleRemovedEndpoint(obj runtime.Object, _ int) bool {
endpoint := obj.(*v1.Endpoint)

commonIPFamilies := findCommonIPFamilies(c.localIPFamilies, endpoint.Spec.GetIPFamilies())

logger.V(log.DEBUG).Infof("Tunnel controller processing removed submariner Endpoint object: %#v", endpoint)

if err := c.engine.RemoveCable(endpoint); err != nil {
logger.Errorf(err, "Tunnel controller failed to remove Endpoint cable %#v from the engine", endpoint)
var errs []error

for _, family := range commonIPFamilies {
if err := c.engine.RemoveCable(endpoint, family); err != nil {
logger.Errorf(err, "Tunnel controller failed to remove Endpoint IP%v cable %#v from the engine", family, endpoint)
errs = append(errs, err)
}
}

if len(errs) > 0 {
return true
}

logger.V(log.DEBUG).Infof("Tunnel controller successfully removed Endpoint cable %s from the engine", endpoint.Spec.CableName)
logger.V(log.DEBUG).Infof("Tunnel controller processing removed submariner Endpoint object: %#v", endpoint)

return false
}
1 change: 1 addition & 0 deletions pkg/controllers/tunnel/tunnel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ var _ = Describe("Managing tunnels", func() {
UseIP: endpoint.Spec.GetPrivateIP(k8snet.IPv4),
UseNAT: false,
Endpoint: *endpoint,
Family: k8snet.IPv4,
})
}

Expand Down
12 changes: 12 additions & 0 deletions pkg/endpoint/local_ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ func GetLocalIPForDestination(dst string) string {
return localAddr.IP.String()
}

func GetLocalIPForDest(dst string, family k8snet.IPFamily) string {
switch family {
case k8snet.IPv4:
return GetLocalIPForDestination(dst)
case k8snet.IPv6:
// TODO_IPV6: add V6 healthcheck IP
case k8snet.IPFamilyUnknown:
}

return ""
}

func GetLocalIP(family k8snet.IPFamily) string {
switch family {
case k8snet.IPv4:
Expand Down
3 changes: 2 additions & 1 deletion pkg/gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
dynamicfake "k8s.io/client-go/dynamic/fake"
k8sfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
k8snet "k8s.io/utils/net"
)

const publicIP = "1.2.3.4"
Expand Down Expand Up @@ -487,7 +488,7 @@ func (n *fakeNATDiscovery) Run(_ <-chan struct{}) error {
return nil
}

func (n *fakeNATDiscovery) AddEndpoint(ep *submarinerv1.Endpoint) {
func (n *fakeNATDiscovery) AddEndpoint(ep *submarinerv1.Endpoint, _ k8snet.IPFamily) {
n.readyChannel <- &natdiscovery.NATEndpointInfo{
Endpoint: *ep,
}
Expand Down
Loading

0 comments on commit eca94c9

Please sign in to comment.