Skip to content

Commit

Permalink
Add IP family support to cableengine
Browse files Browse the repository at this point in the history
Signed-off-by: Yossi Boaron <[email protected]>
  • Loading branch information
yboaron committed Feb 26, 2025
1 parent 0d3888b commit eb93fdd
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 60 deletions.
10 changes: 10 additions & 0 deletions pkg/apis/submariner.io/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
k8snet "k8s.io/utils/net"
)

// +genclient
Expand Down Expand Up @@ -240,6 +241,15 @@ func (c *Connection) SetStatus(status ConnectionStatus, messageFormat string, a
c.StatusMessage = fmt.Sprintf(messageFormat, a...)
}

func (c *Connection) GetFamily() k8snet.IPFamily {
family := k8snet.IPv4
parsedIP := net.ParseIP(c.UsingIP)
if parsedIP.To4() == nil {
family = k8snet.IPv6
}
return family
}

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:resource:shortName="geip"
Expand Down
3 changes: 2 additions & 1 deletion pkg/cable/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/submariner-io/submariner/pkg/endpoint"
"github.com/submariner-io/submariner/pkg/natdiscovery"
"github.com/submariner-io/submariner/pkg/types"
k8snet "k8s.io/utils/net"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

Expand All @@ -46,7 +47,7 @@ type Driver interface {
ConnectToEndpoint(endpointInfo *natdiscovery.NATEndpointInfo) (string, error)

// DisconnectFromEndpoint disconnects from the connection to the given endpoint.
DisconnectFromEndpoint(endpoint *types.SubmarinerEndpoint) error
DisconnectFromEndpoint(endpoint *types.SubmarinerEndpoint, family k8snet.IPFamily) error

// GetName returns driver's name
GetName() string
Expand Down
22 changes: 11 additions & 11 deletions pkg/cable/libreswan/libreswan.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,14 @@ func (i *libreswan) refreshConnectionStatus() error {
}
}

cable.RecordConnection(cableDriverName, &i.localEndpoint, &i.connections[j].Endpoint, string(i.connections[j].Status), false)
cable.RecordRxBytes(cableDriverName, &i.localEndpoint, &i.connections[j].Endpoint, rx)
cable.RecordTxBytes(cableDriverName, &i.localEndpoint, &i.connections[j].Endpoint, tx)
cable.RecordConnection(cableDriverName, &i.localEndpoint, &i.connections[j].Endpoint, string(i.connections[j].Status), false, i.connections[j].GetFamily())
cable.RecordRxBytes(cableDriverName, &i.localEndpoint, &i.connections[j].Endpoint, rx, i.connections[j].GetFamily())
cable.RecordTxBytes(cableDriverName, &i.localEndpoint, &i.connections[j].Endpoint, tx, i.connections[j].GetFamily())

if !isConnected {
// Pluto should be connecting for us
i.connections[j].Status = subv1.Connecting
cable.RecordConnection(cableDriverName, &i.localEndpoint, &i.connections[j].Endpoint, string(i.connections[j].Status), false)
cable.RecordConnection(cableDriverName, &i.localEndpoint, &i.connections[j].Endpoint, string(i.connections[j].Status), false, i.connections[j].GetFamily())
logger.V(log.DEBUG).Infof("Connection %q not found in active connections obtained from whack: %v, %v",
i.connections[j].Endpoint.CableName, activeConnectionsRx, activeConnectionsTx)
}
Expand Down Expand Up @@ -407,7 +407,7 @@ func (i *libreswan) ConnectToEndpoint(endpointInfo *natdiscovery.NATEndpointInfo

i.connections = append(i.connections,
subv1.Connection{Endpoint: endpoint.Spec, Status: subv1.Connected, UsingIP: endpointInfo.UseIP, UsingNAT: endpointInfo.UseNAT})
cable.RecordConnection(cableDriverName, &i.localEndpoint, &endpoint.Spec, string(subv1.Connected), true)
cable.RecordConnection(cableDriverName, &i.localEndpoint, &endpoint.Spec, string(subv1.Connected), true, i.connections[j].GetFamily())

return endpointInfo.UseIP, nil
}
Expand Down Expand Up @@ -551,7 +551,7 @@ func (i *libreswan) clientConnectToEndpoint(connectionName string, endpointInfo
}

// DisconnectFromEndpoint disconnects from the connection to the given endpoint.
func (i *libreswan) DisconnectFromEndpoint(endpoint *types.SubmarinerEndpoint) error {
func (i *libreswan) DisconnectFromEndpoint(endpoint *types.SubmarinerEndpoint, family k8snet.IPFamily) error {
// We'll panic if endpoint is nil, this is intentional
leftSubnets := extractSubnets(&i.localEndpoint)
rightSubnets := extractSubnets(&endpoint.Spec)
Expand All @@ -561,7 +561,7 @@ func (i *libreswan) DisconnectFromEndpoint(endpoint *types.SubmarinerEndpoint) e
if len(leftSubnets) > 0 && len(rightSubnets) > 0 {
for lsi := range leftSubnets {
for rsi := range rightSubnets {
connectionName := toConnectionName(endpoint.Spec.CableName, lsi, rsi)
connectionName := toConnectionName(endpoint.Spec.GetFamilyCableName(family), lsi, rsi)
args := []string{"--delete", nameArg, connectionName}

if err := whack(args...); err != nil {
Expand All @@ -576,15 +576,15 @@ func (i *libreswan) DisconnectFromEndpoint(endpoint *types.SubmarinerEndpoint) e
}
}

i.connections = removeConnectionForEndpoint(i.connections, endpoint)
cable.RecordDisconnected(cableDriverName, &i.localEndpoint, &endpoint.Spec)
i.connections = removeConnectionForEndpoint(i.connections, endpoint, family)
cable.RecordDisconnected(cableDriverName, &i.localEndpoint, &endpoint.Spec, family)

return nil
}

func removeConnectionForEndpoint(connections []subv1.Connection, endpoint *types.SubmarinerEndpoint) []subv1.Connection {
func removeConnectionForEndpoint(connections []subv1.Connection, endpoint *types.SubmarinerEndpoint, family k8snet.IPFamily) []subv1.Connection {
for j := range connections {
if connections[j].Endpoint.CableName == endpoint.Spec.CableName {
if connections[j].Endpoint.CableName == endpoint.Spec.CableName && connections[j].GetFamily() == family {
copy(connections[j:], connections[j+1:])
return connections[:len(connections)-1]
}
Expand Down
28 changes: 14 additions & 14 deletions pkg/cable/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ limitations under the License.
package cable

import (
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
submv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
k8snet "k8s.io/utils/net"
)

const (
Expand Down Expand Up @@ -132,15 +132,15 @@ func init() {
connectionLatencySecondsGauge)
}

func getLabels(cableDriverName string, localEndpoint, remoteEndpoint *submv1.EndpointSpec) prometheus.Labels {
func getLabels(cableDriverName string, localEndpoint, remoteEndpoint *submv1.EndpointSpec, family k8snet.IPFamily) prometheus.Labels {
return prometheus.Labels{
cableDriverLabel: cableDriverName,
localClusterLabel: localEndpoint.ClusterID,
localHostnameLabel: localEndpoint.Hostname,
localEndpointIPLabel: strings.Join(localEndpoint.PublicIPs, ","),
localEndpointIPLabel: localEndpoint.GetPublicIP(family),
remoteClusterLabel: remoteEndpoint.ClusterID,
remoteHostnameLabel: remoteEndpoint.Hostname,
remoteEndpointIPLabel: strings.Join(remoteEndpoint.PublicIPs, ","),
remoteEndpointIPLabel: remoteEndpoint.GetPublicIP(family),
}
}

Expand All @@ -150,20 +150,20 @@ func getShortLabels(cableDriverName string) prometheus.Labels {
}
}

func RecordRxBytes(cableDriverName string, localEndpoint, remoteEndpoint *submv1.EndpointSpec, bytes int) {
rxGauge.With(getLabels(cableDriverName, localEndpoint, remoteEndpoint)).Set(float64(bytes))
func RecordRxBytes(cableDriverName string, localEndpoint, remoteEndpoint *submv1.EndpointSpec, bytes int, family k8snet.IPFamily) {
rxGauge.With(getLabels(cableDriverName, localEndpoint, remoteEndpoint, family)).Set(float64(bytes))
}

func RecordTxBytes(cableDriverName string, localEndpoint, remoteEndpoint *submv1.EndpointSpec, bytes int) {
txGauge.With(getLabels(cableDriverName, localEndpoint, remoteEndpoint)).Set(float64(bytes))
func RecordTxBytes(cableDriverName string, localEndpoint, remoteEndpoint *submv1.EndpointSpec, bytes int, family k8snet.IPFamily) {
txGauge.With(getLabels(cableDriverName, localEndpoint, remoteEndpoint, family)).Set(float64(bytes))
}

func RecordConnectionLatency(cableDriverName string, localEndpoint, remoteEndpoint *submv1.EndpointSpec, latencySeconds float64) {
connectionLatencySecondsGauge.With(getLabels(cableDriverName, localEndpoint, remoteEndpoint)).Set(latencySeconds)
func RecordConnectionLatency(cableDriverName string, localEndpoint, remoteEndpoint *submv1.EndpointSpec, latencySeconds float64, family k8snet.IPFamily) {
connectionLatencySecondsGauge.With(getLabels(cableDriverName, localEndpoint, remoteEndpoint, family)).Set(latencySeconds)
}

func RecordConnection(cableDriverName string, localEndpoint, remoteEndpoint *submv1.EndpointSpec, status string, isNew bool) {
labels := getLabels(cableDriverName, localEndpoint, remoteEndpoint)
func RecordConnection(cableDriverName string, localEndpoint, remoteEndpoint *submv1.EndpointSpec, status string, isNew bool, family k8snet.IPFamily) {
labels := getLabels(cableDriverName, localEndpoint, remoteEndpoint, family)

if isNew {
connectionEstablishedTimestampGauge.With(labels).Set(float64(time.Now().Unix()))
Expand All @@ -177,8 +177,8 @@ func RecordConnection(cableDriverName string, localEndpoint, remoteEndpoint *sub
shortConnectionsGauge.With(shortLabels).Set(1)
}

func RecordDisconnected(cableDriverName string, localEndpoint, remoteEndpoint *submv1.EndpointSpec) {
labels := getLabels(cableDriverName, localEndpoint, remoteEndpoint)
func RecordDisconnected(cableDriverName string, localEndpoint, remoteEndpoint *submv1.EndpointSpec, family k8snet.IPFamily) {
labels := getLabels(cableDriverName, localEndpoint, remoteEndpoint, family)
shortLabels := getShortLabels(cableDriverName)

connectionLatencySecondsGauge.Delete(labels)
Expand Down
6 changes: 3 additions & 3 deletions pkg/cable/vxlan/vxlan.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (v *vxLan) ConnectToEndpoint(endpointInfo *natdiscovery.NATEndpointInfo) (s
v.mutex.Lock()
defer v.mutex.Unlock()

cable.RecordConnection(CableDriverName, &v.localEndpoint, &remoteEndpoint.Spec, string(v1.Connected), true)
cable.RecordConnection(CableDriverName, &v.localEndpoint, &remoteEndpoint.Spec, string(v1.Connected), true, k8snet.IPv4)

privateIP := endpointInfo.Endpoint.Spec.GetPrivateIP(k8snet.IPv4)

Expand Down Expand Up @@ -204,7 +204,7 @@ func (v *vxLan) ConnectToEndpoint(endpointInfo *natdiscovery.NATEndpointInfo) (s
return endpointInfo.UseIP, nil
}

func (v *vxLan) DisconnectFromEndpoint(remoteEndpoint *types.SubmarinerEndpoint) error {
func (v *vxLan) DisconnectFromEndpoint(remoteEndpoint *types.SubmarinerEndpoint, family k8snet.IPFamily) error {
// We'll panic if remoteEndpoint is nil, this is intentional
logger.V(log.DEBUG).Infof("Removing endpoint %#v", remoteEndpoint)

Expand Down Expand Up @@ -248,7 +248,7 @@ func (v *vxLan) DisconnectFromEndpoint(remoteEndpoint *types.SubmarinerEndpoint)
}

v.connections = removeConnectionForEndpoint(v.connections, remoteEndpoint)
cable.RecordDisconnected(CableDriverName, &v.localEndpoint, &remoteEndpoint.Spec)
cable.RecordDisconnected(CableDriverName, &v.localEndpoint, &remoteEndpoint.Spec, k8snet.IPv4)

logger.V(log.DEBUG).Infof("Done removing endpoint for cluster %s", remoteEndpoint.Spec.ClusterID)

Expand Down
6 changes: 3 additions & 3 deletions pkg/cable/wireguard/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (w *wireguard) ConnectToEndpoint(endpointInfo *natdiscovery.NATEndpointInfo

logger.V(log.DEBUG).Infof("Done connecting endpoint peer %s@%s", *remoteKey, remoteIP)

cable.RecordConnection(cableDriverName, &w.localEndpoint, &connection.Endpoint, string(v1.Connected), true)
cable.RecordConnection(cableDriverName, &w.localEndpoint, &connection.Endpoint, string(v1.Connected), true, k8snet.IPv4)

return ip, nil
}
Expand All @@ -323,7 +323,7 @@ func keyFromSpec(ep *v1.EndpointSpec) (*wgtypes.Key, error) {
return &key, nil
}

func (w *wireguard) DisconnectFromEndpoint(remoteEndpoint *types.SubmarinerEndpoint) error {
func (w *wireguard) DisconnectFromEndpoint(remoteEndpoint *types.SubmarinerEndpoint, family k8snet.IPFamily) error {
// We'll panic if remoteEndpoint is nil, this is intentional
logger.V(log.DEBUG).Infof("Removing endpoint %v+", remoteEndpoint)

Expand Down Expand Up @@ -353,7 +353,7 @@ func (w *wireguard) DisconnectFromEndpoint(remoteEndpoint *types.SubmarinerEndpo
delete(w.connections, remoteEndpoint.Spec.ClusterID)

logger.V(log.DEBUG).Infof("Done removing endpoint for cluster %s", remoteEndpoint.Spec.ClusterID)
cable.RecordDisconnected(cableDriverName, &w.localEndpoint, &remoteEndpoint.Spec)
cable.RecordDisconnected(cableDriverName, &w.localEndpoint, &remoteEndpoint.Spec, k8snet.IPv4)

return nil
}
Expand Down
17 changes: 9 additions & 8 deletions pkg/cable/wireguard/getconnections.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
v1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
"github.com/submariner-io/submariner/pkg/cable"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
k8snet "k8s.io/utils/net"
)

func (w *wireguard) GetConnections() ([]v1.Connection, error) {
Expand Down Expand Up @@ -95,15 +96,15 @@ func (w *wireguard) updateConnectionForPeer(p *wgtypes.Peer, connection *v1.Conn
if lc > handshakeTimeout.Milliseconds() {
// No initial handshake for too long.
connection.SetStatus(v1.ConnectionError, "no initial handshake for %.1f seconds", lcSec)
cable.RecordConnection(cableDriverName, &w.localEndpoint, &connection.Endpoint, string(connection.Status), false)
cable.RecordConnection(cableDriverName, &w.localEndpoint, &connection.Endpoint, string(connection.Status), false, k8snet.IPv4)

return
}

if tx > 0 || rx > 0 {
// No handshake, but at least some communication in progress.
connection.SetStatus(v1.Connecting, "no initial handshake yet")
cable.RecordConnection(cableDriverName, &w.localEndpoint, &connection.Endpoint, string(connection.Status), false)
cable.RecordConnection(cableDriverName, &w.localEndpoint, &connection.Endpoint, string(connection.Status), false, k8snet.IPv4)

return
}
Expand All @@ -112,7 +113,7 @@ func (w *wireguard) updateConnectionForPeer(p *wgtypes.Peer, connection *v1.Conn
if tx > 0 || rx > 0 {
// All is good.
connection.SetStatus(v1.Connected, "Rx=%d Bytes, Tx=%d Bytes", p.ReceiveBytes, p.TransmitBytes)
cable.RecordConnection(cableDriverName, &w.localEndpoint, &connection.Endpoint, string(connection.Status), false)
cable.RecordConnection(cableDriverName, &w.localEndpoint, &connection.Endpoint, string(connection.Status), false, k8snet.IPv4)
saveAndRecordPeerTraffic(&w.localEndpoint, &connection.Endpoint, now, p.TransmitBytes, p.ReceiveBytes)

return
Expand All @@ -124,7 +125,7 @@ func (w *wireguard) updateConnectionForPeer(p *wgtypes.Peer, connection *v1.Conn
// Hard error, really long time since handshake.
connection.SetStatus(v1.ConnectionError, "no handshake for %.1f seconds",
handshakeDelta.Seconds())
cable.RecordConnection(cableDriverName, &w.localEndpoint, &connection.Endpoint, string(connection.Status), false)
cable.RecordConnection(cableDriverName, &w.localEndpoint, &connection.Endpoint, string(connection.Status), false, k8snet.IPv4)

return
}
Expand All @@ -138,14 +139,14 @@ func (w *wireguard) updateConnectionForPeer(p *wgtypes.Peer, connection *v1.Conn
// Soft error, no traffic, stale handshake.
connection.SetStatus(v1.ConnectionError, "no bytes sent or received for %.1f seconds",
lcSec)
cable.RecordConnection(cableDriverName, &w.localEndpoint, &connection.Endpoint, string(connection.Status), false)
cable.RecordConnection(cableDriverName, &w.localEndpoint, &connection.Endpoint, string(connection.Status), false, k8snet.IPv4)
}

func (w *wireguard) updatePeerStatus(c *v1.Connection, key *wgtypes.Key) {
p, err := w.peerByKey(key)
if err != nil {
c.SetStatus(v1.ConnectionError, "cannot fetch status for peer %s: %v", key, err)
cable.RecordConnection(cableDriverName, &w.localEndpoint, &c.Endpoint, string(c.Status), false)
cable.RecordConnection(cableDriverName, &w.localEndpoint, &c.Endpoint, string(c.Status), false, k8snet.IPv4)

return
}
Expand Down Expand Up @@ -174,6 +175,6 @@ func saveAndRecordPeerTraffic(localEndpoint, remoteEndpoint *v1.EndpointSpec, lc
remoteEndpoint.BackendConfig[transmitBytes] = strconv.FormatInt(tx, 10)
remoteEndpoint.BackendConfig[receiveBytes] = strconv.FormatInt(rx, 10)

cable.RecordTxBytes(cableDriverName, localEndpoint, remoteEndpoint, int(tx))
cable.RecordRxBytes(cableDriverName, localEndpoint, remoteEndpoint, int(rx))
cable.RecordTxBytes(cableDriverName, localEndpoint, remoteEndpoint, int(tx), k8snet.IPv4)
cable.RecordRxBytes(cableDriverName, localEndpoint, remoteEndpoint, int(rx), k8snet.IPv4)
}
Loading

0 comments on commit eb93fdd

Please sign in to comment.