Skip to content

Commit

Permalink
Update clients to use Packetfilter
Browse files Browse the repository at this point in the history
Signed-off-by: Yossi Boaron <[email protected]>
  • Loading branch information
yboaron committed Jan 4, 2024
1 parent 6de7de1 commit b935856
Show file tree
Hide file tree
Showing 33 changed files with 1,074 additions and 1,074 deletions.
6 changes: 3 additions & 3 deletions pkg/globalnet/controllers/base_controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/submariner-io/admiral/pkg/federate"
"github.com/submariner-io/admiral/pkg/util"
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
iptiface "github.com/submariner-io/submariner/pkg/globalnet/controllers/iptables"
pfiface "github.com/submariner-io/submariner/pkg/globalnet/controllers/packetfilter"
"github.com/submariner-io/submariner/pkg/ipam"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -60,11 +60,11 @@ func newBaseSyncerController() *baseSyncerController {
}
}

func newBaseIPAllocationController(pool *ipam.IPPool, iptIface iptiface.Interface) *baseIPAllocationController {
func newBaseIPAllocationController(pool *ipam.IPPool, pfIface pfiface.Interface) *baseIPAllocationController {
return &baseIPAllocationController{
baseSyncerController: newBaseSyncerController(),
pool: pool,
iptIface: iptIface,
pfIface: pfIface,
}
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/globalnet/controllers/cluster_egressip_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/submariner-io/admiral/pkg/util"
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
"github.com/submariner-io/submariner/pkg/globalnet/constants"
"github.com/submariner-io/submariner/pkg/globalnet/controllers/iptables"
gnpacketfilter "github.com/submariner-io/submariner/pkg/globalnet/controllers/packetfilter"
"github.com/submariner-io/submariner/pkg/globalnet/metrics"
"github.com/submariner-io/submariner/pkg/ipam"
corev1 "k8s.io/api/core/v1"
Expand All @@ -49,13 +49,13 @@ func NewClusterGlobalEgressIPController(config *syncer.ResourceSyncerConfig, loc

logger.Info("Creating ClusterGlobalEgressIP controller")

iptIface, err := iptables.New()
pfIface, err := gnpacketfilter.New()
if err != nil {
return nil, errors.WithMessage(err, "error creating the IPTablesInterface handler")
}

controller := &clusterGlobalEgressIPController{
baseIPAllocationController: newBaseIPAllocationController(pool, iptIface),
baseIPAllocationController: newBaseIPAllocationController(pool, pfIface),
localSubnets: localSubnets,
}

Expand Down Expand Up @@ -215,7 +215,7 @@ func (c *clusterGlobalEgressIPController) flushClusterGlobalEgressRules(allocate

func (c *clusterGlobalEgressIPController) deleteClusterGlobalEgressRules(srcIPList []string, snatIP string) error {
for _, srcIP := range srcIPList {
if err := c.iptIface.RemoveClusterEgressRules(srcIP, snatIP, globalNetIPTableMark); err != nil {
if err := c.pfIface.RemoveClusterEgressRules(srcIP, snatIP, globalNetIPTableMark); err != nil {
return err //nolint:wrapcheck // Let the caller wrap it
}
}
Expand All @@ -228,7 +228,7 @@ func (c *clusterGlobalEgressIPController) programClusterGlobalEgressRules(alloca
egressRulesProgrammed := []string{}

for _, srcIP := range c.localSubnets {
if err := c.iptIface.AddClusterEgressRules(srcIP, snatIP, globalNetIPTableMark); err != nil {
if err := c.pfIface.AddClusterEgressRules(srcIP, snatIP, globalNetIPTableMark); err != nil {
_ = c.deleteClusterGlobalEgressRules(egressRulesProgrammed, snatIP)

return err //nolint:wrapcheck // Let the caller wrap it
Expand Down
12 changes: 6 additions & 6 deletions pkg/globalnet/controllers/cluster_egressip_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ var _ = Describe("ClusterGlobalEgressIP controller", func() {
Context("and programming the IP table rules fails", func() {
BeforeEach(func() {
t.createClusterGlobalEgressIP(existing)
t.ipt.AddFailOnAppendRuleMatcher(ContainSubstring(existing.Status.AllocatedIPs[0]))
t.pFakeFilter.AddFailOnAppendRuleMatcher(ContainSubstring(existing.Status.AllocatedIPs[0]))
})

It("should reallocate the global IPs", func() {
Expand Down Expand Up @@ -272,7 +272,7 @@ var _ = Describe("ClusterGlobalEgressIP controller", func() {
Context("and IP tables cleanup of previously allocated IPs initially fails", func() {
BeforeEach(func() {
numberOfIPs = *existing.Spec.NumberOfIPs + 1
t.ipt.AddFailOnDeleteRuleMatcher(ContainSubstring(existing.Status.AllocatedIPs[0]))
t.pFakeFilter.AddFailOnDeleteRuleMatcher(ContainSubstring(existing.Status.AllocatedIPs[0]))
})

It("should eventually cleanup the IP tables and reallocate", func() {
Expand All @@ -285,7 +285,7 @@ var _ = Describe("ClusterGlobalEgressIP controller", func() {
Context("and programming of IP tables initially fails", func() {
BeforeEach(func() {
numberOfIPs = *existing.Spec.NumberOfIPs + 1
t.ipt.AddFailOnAppendRuleMatcher(Not(ContainSubstring(existing.Status.AllocatedIPs[0])))
t.pFakeFilter.AddFailOnAppendRuleMatcher(Not(ContainSubstring(existing.Status.AllocatedIPs[0])))
})

It("should eventually reallocate the global IPs", func() {
Expand Down Expand Up @@ -394,13 +394,13 @@ func (t *clusterGlobalEgressIPControllerTestDriver) start() {
}

func (t *clusterGlobalEgressIPControllerTestDriver) awaitIPTableRules(ips ...string) {
t.ipt.AwaitRule("nat", constants.SmGlobalnetEgressChainForCluster, ContainSubstring(getSNATAddress(ips...)))
t.pFakeFilter.AwaitRule("nat", constants.SmGlobalnetEgressChainForCluster, ContainSubstring(getSNATAddress(ips...)))

for _, localSubnet := range t.localSubnets {
t.ipt.AwaitRule("nat", constants.SmGlobalnetEgressChainForCluster, ContainSubstring(localSubnet))
t.pFakeFilter.AwaitRule("nat", constants.SmGlobalnetEgressChainForCluster, ContainSubstring(localSubnet))
}
}

func (t *clusterGlobalEgressIPControllerTestDriver) awaitNoIPTableRules(ips ...string) {
t.ipt.AwaitNoRule("nat", constants.SmGlobalnetEgressChainForCluster, ContainSubstring(getSNATAddress(ips...)))
t.pFakeFilter.AwaitNoRule("nat", constants.SmGlobalnetEgressChainForCluster, ContainSubstring(getSNATAddress(ips...)))
}
49 changes: 25 additions & 24 deletions pkg/globalnet/controllers/controllers_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,8 @@ import (
"github.com/submariner-io/submariner/pkg/globalnet/constants"
"github.com/submariner-io/submariner/pkg/globalnet/controllers"
"github.com/submariner-io/submariner/pkg/ipam"
"github.com/submariner-io/submariner/pkg/ipset"
fakeIPSet "github.com/submariner-io/submariner/pkg/ipset/fake"
"github.com/submariner-io/submariner/pkg/iptables"
fakeIPT "github.com/submariner-io/submariner/pkg/iptables/fake"
"github.com/submariner-io/submariner/pkg/packetfilter"
fakePF "github.com/submariner-io/submariner/pkg/packetfilter/fake"
routeAgent "github.com/submariner-io/submariner/pkg/routeagent_driver/constants"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -84,12 +82,15 @@ func TestControllers(t *testing.T) {
}

type testDriverBase struct {
controller controllers.Interface
restMapper meta.RESTMapper
dynClient *dynamicfake.FakeDynamicClient
scheme *runtime.Scheme
ipt *fakeIPT.IPTables
ipSet *fakeIPSet.IPSet
controller controllers.Interface
restMapper meta.RESTMapper
dynClient *dynamicfake.FakeDynamicClient
scheme *runtime.Scheme
pFilter packetfilter.Interface
pFakeFilter *fakePF.IptablesImpl

// ipt *fakeIPT.IPTables
// ipSet *fakeIPSet.IPSet
pool *ipam.IPPool
localSubnets []string
globalCIDR string
Expand All @@ -105,12 +106,17 @@ type testDriverBase struct {
}

func newTestDriverBase() *testDriverBase {
pFakeFilter := fakePF.SetAsDriver()
pFilter, _ := packetfilter.New()

t := &testDriverBase{
restMapper: test.GetRESTMapperFor(&submarinerv1.Endpoint{}, &corev1.Service{}, &corev1.Node{}, &corev1.Pod{}, &corev1.Endpoints{},
&submarinerv1.GlobalEgressIP{}, &submarinerv1.ClusterGlobalEgressIP{}, &submarinerv1.GlobalIngressIP{}, &mcsv1a1.ServiceExport{}),
scheme: runtime.NewScheme(),
ipt: fakeIPT.New(),
ipSet: fakeIPSet.New(),
scheme: runtime.NewScheme(),
pFilter: pFilter,
pFakeFilter: pFakeFilter,
// ipt: fakeIPT.New(),
// ipSet: fakeIPSet.New(),
globalCIDR: localCIDR,
localSubnets: []string{},
}
Expand Down Expand Up @@ -140,21 +146,12 @@ func newTestDriverBase() *testDriverBase {

t.nodes = t.dynClient.Resource(*test.GetGroupVersionResourceFor(t.restMapper, &corev1.Node{}))

iptables.NewFunc = func() (iptables.Interface, error) {
return t.ipt, nil
}

ipset.NewFunc = func() ipset.Interface {
return t.ipSet
}

return t
}

func (t *testDriverBase) afterEach() {
t.controller.Stop()

iptables.NewFunc = nil
fakePF.ClearDriver()
}

func (t *testDriverBase) verifyIPsReservedInPool(ips ...string) {
Expand Down Expand Up @@ -232,7 +229,11 @@ func (t *testDriverBase) createServiceExport(s *corev1.Service) {
}

func (t *testDriverBase) createIPTableChain(table, chain string) {
_ = t.ipt.NewChain(table, chain)
strToTableType := map[string]packetfilter.TableType{"nat": packetfilter.TableTypeNAT}
regChain := packetfilter.ChainRegular{
Name: chain,
}
_ = t.pFilter.CreateRegularChainIfNotExists(strToTableType[table], &regChain)
}

func (t *testDriverBase) getGlobalIngressIPStatus(name string) *submarinerv1.GlobalIngressIPStatus {
Expand Down
19 changes: 10 additions & 9 deletions pkg/globalnet/controllers/egress_pod_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,21 @@ import (
"github.com/pkg/errors"
"github.com/submariner-io/admiral/pkg/log"
"github.com/submariner-io/admiral/pkg/watcher"
"github.com/submariner-io/submariner/pkg/ipset"
"github.com/submariner-io/submariner/pkg/globalnet/controllers/packetfilter"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
)

func startEgressPodWatcher(name, namespace string, namedIPSet ipset.Named, config *watcher.Config,
podSelector *metav1.LabelSelector,
func startEgressPodWatcher(name, namespace string, pfIface packetfilter.Interface,
gnHandle packetfilter.GlobalEgressHandle, config *watcher.Config, podSelector *metav1.LabelSelector,
) (*egressPodWatcher, error) {
pw := &egressPodWatcher{
stopCh: make(chan struct{}),
namedIPSet: namedIPSet,
stopCh: make(chan struct{}),
pfIface: pfIface,
gnHandle: gnHandle,
}

sel, err := metav1.LabelSelectorAsSelector(podSelector)
Expand Down Expand Up @@ -95,8 +96,8 @@ func (w *egressPodWatcher) onCreateOrUpdate(obj runtime.Object, _ int) bool {

logger.V(log.DEBUG).Infof("Pod %q with IP %s created/updated", key, pod.Status.PodIP)

if err := w.namedIPSet.AddEntry(pod.Status.PodIP, true); err != nil {
logger.Errorf(err, "Error adding pod IP %q to IP set %q", pod.Status.PodIP, w.ipSetName)
if err := w.pfIface.AddEgressPodIP(w.gnHandle, pod.Status.PodIP); err != nil {
logger.Errorf(err, "Error adding pod IP %q to gnHandle %q", pod.Status.PodIP, w.gnHandle)
return true
}

Expand All @@ -109,8 +110,8 @@ func (w *egressPodWatcher) onDelete(obj runtime.Object, _ int) bool {

logger.V(log.DEBUG).Infof("Pod %q removed", key)

if err := w.namedIPSet.DelEntry(pod.Status.PodIP); err != nil {
logger.Errorf(err, "Error deleting pod IP %q from IP set %q", pod.Status.PodIP, w.ipSetName)
if err := w.pfIface.RemoveEgressPodIP(w.gnHandle, pod.Status.PodIP); err != nil {
logger.Errorf(err, "Error deleting pod IP %q from gnHandle %q", pod.Status.PodIP, w.gnHandle)
return true
}

Expand Down
Loading

0 comments on commit b935856

Please sign in to comment.