Skip to content

Commit

Permalink
Add support for packetfilter
Browse files Browse the repository at this point in the history
To allow smooth transition to nftables a new packetfilter component
is introduced, packetfilter provides an API to create chains, rules, TCP
mss clamping and GlobalNet egress IP rules in a generic way.

Since we still want to support platforms using iptables, the packetfilter
component should be capable of auto-detecting whether the platform is
using nftables or iptables and use the appropriate driver.

The clients (e.g: GlobalNet) should be updated to use packetfilter API
instead of the iptables,ipsets API, so they will only use the
packetfilter APIs and will be abstracted from the underlying implementation.

This commit includes:

1. Packetfilter component, auto-detecting hardcoded to iptables.
2. iptables driver, which is based on current iptables driver.

Signed-off-by: Yossi Boaron <[email protected]>
  • Loading branch information
yboaron committed Jan 16, 2024
1 parent 50111b0 commit 516689d
Show file tree
Hide file tree
Showing 6 changed files with 765 additions and 217 deletions.
6 changes: 6 additions & 0 deletions pkg/globalnet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"github.com/submariner-io/submariner/pkg/cidr"
submarinerClientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned"
"github.com/submariner-io/submariner/pkg/globalnet/controllers"
packetfilter "github.com/submariner-io/submariner/pkg/packetfilter"
iptables "github.com/submariner-io/submariner/pkg/packetfilter/iptables"
"github.com/submariner-io/submariner/pkg/versions"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -92,6 +94,10 @@ func main() {

logger.Info("Starting submariner-globalnet", spec)

// set packetfilter driver to iptables
// TODO: check which driver is supported on platform
packetfilter.SetNewDriverFn(iptables.New)

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler().Done()

Expand Down
135 changes: 57 additions & 78 deletions pkg/packetfilter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,35 +26,74 @@ import (
"k8s.io/utils/set"
)

type Adapter struct {
Basic
func (a *Adapter) PrependUnique(table TableType, chain string, ruleSpec *Rule) error {
return a.InsertUnique(table, chain, 1, ruleSpec)
}

func (a *Adapter) CreateChainIfNotExists(table, chain string) error {
exists, err := a.ChainExists(table, chain)
if err == nil && exists {
return nil
func (a *Adapter) UpdateChainRules(table TableType, chain string, rules []*Rule) error {
existingRules, err := a.List(table, chain)
if err != nil {
return errors.Wrapf(err, "error listing the rules in table %q, chain %q", table, chain)
}

if err != nil {
return errors.Wrapf(err, "error finding IP table chain %q in table %q", chain, table)
ruleStrings := set.New[string]()

for _, existingRule := range existingRules {
ruleStrings.Insert(existingRule)
}

for _, rule := range rules {
ruleSpec, err := a.RuleToRuleSpec(rule)
if err != nil {
return errors.Wrapf(err, "Failed to translate Rule to RuleSpec, rule: %+v ", rule)
}

ruleString := strings.Join(ruleSpec, " ")

if ruleStrings.Has(ruleString) {
ruleStrings.Delete(ruleString)
} else {
logger.V(level.DEBUG).Infof("Adding packetfilter rule in %q, %q: %q", table, chain, strings.Join(ruleSpec, " "))

if err := a.Append(table, chain, ruleSpec...); err != nil {
return errors.Wrapf(err, "error adding rule %q to %q, %q", strings.Join(ruleSpec, " "), table, chain)
}
}
}

return errors.Wrap(a.NewChain(table, chain), "error creating IP table chain")
// remaining elements should not be there, remove them
for _, rule := range ruleStrings.UnsortedList() {
logger.V(level.DEBUG).Infof("Deleting stale packetfilter rule in %q, %q: %q", table, chain, rule)
ruleSpec := strings.Split(rule, " ")

if err := a.DeleteRuleSpec(table, chain, ruleSpec...); err != nil {
// Log and let go, as this is not a fatal error, or something that will make real harm,
// it's more harmful to keep retrying. At this point on next update deletion of stale rules
// will happen again
logger.Warningf("Unable to delete packetfilter entry from table %q, chain %q: %q", table, chain, rule)
}
}

return nil
}

func (a *Adapter) InsertUnique(table, chain string, position int, ruleSpec []string) error {
rules, err := a.List(table, chain)
func (a *Adapter) InsertUnique(table TableType, chain string, position int, rule *Rule) error {
existingRules, err := a.List(table, chain)
if err != nil {
return errors.Wrapf(err, "error listing the rules in table %q, chain %q", table, chain)
}

ruleSpec, err := a.RuleToRuleSpec(rule)
if err != nil {
return errors.Wrapf(err, "error listing the rules in %s chain", chain)
return errors.Wrapf(err, "Failed to translate Rule to RuleSpec, rule: %+v ", rule)
}

isPresentAtRequiredPosition := false
numOccurrences := 0

for index, rule := range rules {
for index, rule := range existingRules {
if strings.Contains(rule, strings.Join(ruleSpec, " ")) {
logger.V(level.DEBUG).Infof("In %s table, iptables rule \"%s\", exists at index %d.", table, strings.Join(ruleSpec, " "), index)
logger.V(level.DEBUG).Infof("In %q table, rule \"%s\", exists at index %d.", table, strings.Join(ruleSpec, " "), index)
numOccurrences++

if index == position {
Expand All @@ -67,78 +106,18 @@ func (a *Adapter) InsertUnique(table, chain string, position int, ruleSpec []str
// not at the desired location
if numOccurrences > 1 || !isPresentAtRequiredPosition {
for i := 0; i < numOccurrences; i++ {
if err = a.Delete(table, chain, ruleSpec...); err != nil {
return errors.Wrapf(err, "error deleting stale IP table rule %q", strings.Join(ruleSpec, " "))
if err = a.DeleteRuleSpec(table, chain, ruleSpec...); err != nil {
return errors.Wrapf(err, "error deleting stale rule %q", strings.Join(ruleSpec, " "))
}
}
}

// The required rule is present only once and is at the desired location
if numOccurrences == 1 && isPresentAtRequiredPosition {
logger.V(level.DEBUG).Infof("In %s table, iptables rule \"%s\", already exists.", table, strings.Join(ruleSpec, " "))
logger.V(level.DEBUG).Infof("In %q table, rule \"%s\", already exists.", table, strings.Join(ruleSpec, " "))
return nil
} else if err := a.Insert(table, chain, position, ruleSpec...); err != nil {
return errors.Wrapf(err, "error inserting IP table rule %q", strings.Join(ruleSpec, " "))
}

return nil
}

func (a *Adapter) PrependUnique(table, chain string, ruleSpec []string) error {
// Submariner requires certain iptable rules to be programmed at the beginning of an iptables Chain
// so that we can preserve the sourceIP for inter-cluster traffic and avoid K8s SDN making changes
// to the traffic.
// In this API, we check if the required iptable rule is present at the beginning of the chain.
// If the rule is already present and there are no stale[1] flows, we simply return. If not, we create one.
// [1] Sometimes after we program the rule at the beginning of the chain, K8s SDN might insert some
// new rules ahead of the rule that we programmed. In such cases, the rule that we programmed will
// not be the first rule to hit and Submariner behavior might get affected. So, we query the rules
// in the chain to see if the rule slipped its position, and if so, delete all such occurrences.
// We then re-program a new rule at the beginning of the chain as required.
return a.InsertUnique(table, chain, 1, ruleSpec)
}

func (a *Adapter) UpdateChainRules(table, chain string, rules [][]string) error {
existingRules, err := a.List(table, chain)
if err != nil {
return errors.Wrapf(err, "error listing the rules in table %q, chain %q", table, chain)
}

ruleStrings := set.New[string]()

for _, existingRule := range existingRules {
ruleSpec := strings.Split(existingRule, " ")
if ruleSpec[0] == "-A" {
ruleSpec = ruleSpec[2:] // remove "-A", "$chain"
ruleStrings.Insert(strings.Trim(strings.Join(ruleSpec, " "), " "))
}
}

for _, ruleSpec := range rules {
ruleString := strings.Join(ruleSpec, " ")

if ruleStrings.Has(ruleString) {
ruleStrings.Delete(ruleString)
} else {
logger.V(level.DEBUG).Infof("Adding iptables rule in %q, %q: %q", table, chain, ruleSpec)

if err := a.Append(table, chain, ruleSpec...); err != nil {
return errors.Wrapf(err, "error adding rule to %v to %q, %q", ruleSpec, table, chain)
}
}
}

// remaining elements should not be there, remove them
for _, rule := range ruleStrings.UnsortedList() {
logger.V(level.DEBUG).Infof("Deleting stale iptables rule in %q, %q: %q", table, chain, rule)
ruleSpec := strings.Split(rule, " ")

if err := a.Delete(table, chain, ruleSpec...); err != nil {
// Log and let go, as this is not a fatal error, or something that will make real harm,
// it's more harmful to keep retrying. At this point on next update deletion of stale rules
// will happen again
logger.Warningf("Unable to delete iptables entry from table %q, chain %q: %q", table, chain, rule)
}
return errors.Wrapf(err, "error inserting IP rule %q", strings.Join(ruleSpec, " "))
}

return nil
Expand Down
Loading

0 comments on commit 516689d

Please sign in to comment.