Skip to content

Commit

Permalink
prepare for rework
Browse files Browse the repository at this point in the history
  • Loading branch information
everesio committed Jan 26, 2025
1 parent b8646e4 commit 93770ec
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 74 deletions.
5 changes: 1 addition & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@ type ListenerConfig struct {
ListenerAddress string
AdvertisedAddress string
}
type IdListenerConfig struct {
BrokerAddress string
Listener net.Listener
}

type DialAddressMapping struct {
SourceAddress string
DestinationAddress string
Expand Down
12 changes: 6 additions & 6 deletions proxy/protocol/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var (

func createMetadataResponseSchemaVersions() []Schema {
metadataBrokerV0 := NewSchema("metadata_broker_v0",
&Mfield{Name: "node_id", Ty: TypeInt32},
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
&Mfield{Name: hostKeyName, Ty: TypeStr},
&Mfield{Name: portKeyName, Ty: TypeInt32},
)
Expand All @@ -52,14 +52,14 @@ func createMetadataResponseSchemaVersions() []Schema {
)

metadataBrokerV1 := NewSchema("metadata_broker_v1",
&Mfield{Name: "node_id", Ty: TypeInt32},
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
&Mfield{Name: hostKeyName, Ty: TypeStr},
&Mfield{Name: portKeyName, Ty: TypeInt32},
&Mfield{Name: "rack", Ty: TypeNullableStr},
)

metadataBrokerSchema9 := NewSchema("metadata_broker_schema9",
&Mfield{Name: "node_id", Ty: TypeInt32},
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
&Mfield{Name: hostKeyName, Ty: TypeCompactStr},
&Mfield{Name: portKeyName, Ty: TypeInt32},
&Mfield{Name: "rack", Ty: TypeCompactNullableStr},
Expand Down Expand Up @@ -249,13 +249,13 @@ func createMetadataResponseSchemaVersions() []Schema {

func createFindCoordinatorResponseSchemaVersions() []Schema {
findCoordinatorBrokerV0 := NewSchema("find_coordinator_broker_v0",
&Mfield{Name: "node_id", Ty: TypeInt32},
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
&Mfield{Name: hostKeyName, Ty: TypeStr},
&Mfield{Name: portKeyName, Ty: TypeInt32},
)

findCoordinatorBrokerSchema9 := NewSchema("find_coordinator_broker_schema9",
&Mfield{Name: "node_id", Ty: TypeInt32},
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
&Mfield{Name: hostKeyName, Ty: TypeCompactStr},
&Mfield{Name: portKeyName, Ty: TypeInt32},
)
Expand Down Expand Up @@ -341,7 +341,7 @@ func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFu
}
}
if port != newPort {
err = broker.Replace(portKeyName, int32(newPort))
err = broker.Replace(portKeyName, newPort)
if err != nil {
return err
}
Expand Down
131 changes: 72 additions & 59 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"crypto/tls"
"fmt"
"net"
"strconv"
"sync"

"github.com/grepplabs/kafka-proxy/config"
"github.com/grepplabs/kafka-proxy/pkg/libs/util"
"github.com/sirupsen/logrus"
)

type ListenFunc func(cfg config.ListenerConfig) (l net.Listener, err error)
type ListenFunc func(cfg *ListenerConfig) (l net.Listener, err error)

type Listeners struct {
// Source of new connections to Kafka broker.
Expand All @@ -29,16 +30,11 @@ type Listeners struct {
disableDynamicListeners bool
dynamicSequentialMinPort int

brokerToListenerConfig map[string]config.ListenerConfig
brokerIdToIdListenerConfig map[int32]config.IdListenerConfig
lock sync.RWMutex
brokerToListenerConfig map[string]*ListenerConfig
lock sync.RWMutex
}

func NewListeners(cfg *config.Config) (*Listeners, error) {

defaultListenerIP := cfg.Proxy.DefaultListenerIP
dynamicAdvertisedListener := cfg.Proxy.DynamicAdvertisedListener

tcpConnOptions := TCPConnOptions{
KeepAlive: cfg.Proxy.ListenerKeepAlive,
ReadBufferSize: cfg.Proxy.ListenerReadBufferSize,
Expand All @@ -54,7 +50,7 @@ func NewListeners(cfg *config.Config) (*Listeners, error) {
}
}

listenFunc := func(cfg config.ListenerConfig) (net.Listener, error) {
listenFunc := func(cfg *ListenerConfig) (net.Listener, error) {
if tlsConfig != nil {
return tls.Listen("tcp", cfg.ListenerAddress, tlsConfig)
}
Expand All @@ -66,34 +62,31 @@ func NewListeners(cfg *config.Config) (*Listeners, error) {
return nil, err
}

brokerIdToIdListenerConfig := make(map[int32]config.IdListenerConfig)

return &Listeners{
defaultListenerIP: defaultListenerIP,
dynamicAdvertisedListener: dynamicAdvertisedListener,
connSrc: make(chan Conn, 1),
brokerToListenerConfig: brokerToListenerConfig,
brokerIdToIdListenerConfig: brokerIdToIdListenerConfig,
tcpConnOptions: tcpConnOptions,
listenFunc: listenFunc,
deterministicListeners: cfg.Proxy.DeterministicListeners,
disableDynamicListeners: cfg.Proxy.DisableDynamicListeners,
dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort,
defaultListenerIP: cfg.Proxy.DefaultListenerIP,
dynamicAdvertisedListener: cfg.Proxy.DynamicAdvertisedListener,
connSrc: make(chan Conn, 1),
brokerToListenerConfig: brokerToListenerConfig,
tcpConnOptions: tcpConnOptions,
listenFunc: listenFunc,
deterministicListeners: cfg.Proxy.DeterministicListeners,
disableDynamicListeners: cfg.Proxy.DisableDynamicListeners,
dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort,
}, nil
}

func getBrokerToListenerConfig(cfg *config.Config) (map[string]config.ListenerConfig, error) {
brokerToListenerConfig := make(map[string]config.ListenerConfig)
func getBrokerToListenerConfig(cfg *config.Config) (map[string]*ListenerConfig, error) {
brokerToListenerConfig := make(map[string]*ListenerConfig)

for _, v := range cfg.Proxy.BootstrapServers {
if lc, ok := brokerToListenerConfig[v.BrokerAddress]; ok {
if lc.ListenerAddress != v.ListenerAddress || lc.AdvertisedAddress != v.AdvertisedAddress {
return nil, fmt.Errorf("bootstrap server mapping %s configured twice: %v and %v", v.BrokerAddress, v, lc)
return nil, fmt.Errorf("bootstrap server mapping %s configured twice: %v and %v", v.BrokerAddress, v, lc.ToListenerConfig())
}
continue
}
logrus.Infof("Bootstrap server %s advertised as %s", v.BrokerAddress, v.AdvertisedAddress)
brokerToListenerConfig[v.BrokerAddress] = v
brokerToListenerConfig[v.BrokerAddress] = FromListenerConfig(v)
}

externalToListenerConfig := make(map[string]config.ListenerConfig)
Expand All @@ -118,7 +111,7 @@ func getBrokerToListenerConfig(cfg *config.Config) (map[string]config.ListenerCo
continue
}
logrus.Infof("External server %s advertised as %s", v.BrokerAddress, v.AdvertisedAddress)
brokerToListenerConfig[v.BrokerAddress] = v
brokerToListenerConfig[v.BrokerAddress] = FromListenerConfig(v)
}
return brokerToListenerConfig, nil
}
Expand All @@ -132,32 +125,28 @@ func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32, br

p.lock.RLock()
listenerConfig, ok := p.brokerToListenerConfig[brokerAddress]
idListenerConfig, brokerIdFound := p.brokerIdToIdListenerConfig[brokerId]
p.lock.RUnlock()

if ok {
logrus.Debugf("Address mappings broker=%s, listener=%s, advertised=%s", listenerConfig.BrokerAddress, listenerConfig.ListenerAddress, listenerConfig.AdvertisedAddress)
logrus.Debugf("Address mappings broker=%s, listener=%s, advertised=%s, brokerId=%d", listenerConfig.GetBrokerAddress(), listenerConfig.ListenerAddress, listenerConfig.AdvertisedAddress, brokerId)
return util.SplitHostPort(listenerConfig.AdvertisedAddress)
}
if !p.disableDynamicListeners {
if brokerIdFound {
logrus.Infof("Broker ID %d has a new advertised listener, closing existing dynamic listener", brokerId)
// Existing broker ID found, but with a different upstream broker
// Close existing listener, remove two mappings:
// * ID to removed upstream broker
// * removed upstream broker
idListenerConfig.Listener.Close()
p.lock.Lock()
delete(p.brokerIdToIdListenerConfig, brokerId)
delete(p.brokerToListenerConfig, idListenerConfig.BrokerAddress)
p.lock.Unlock()
}
logrus.Infof("Starting dynamic listener for broker %s", brokerAddress)
return p.ListenDynamicInstance(brokerAddress, brokerId)
}
return "", 0, fmt.Errorf("net address mapping for %s:%d was not found", brokerHost, brokerPort)
}

func (p *Listeners) findListenerConfig(brokerId int32) *ListenerConfig {
for _, listenerConfig := range p.brokerToListenerConfig {
if listenerConfig.BrokerID == brokerId {
return listenerConfig
}
}
return nil
}

func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32) (string, int32, error) {
p.lock.Lock()
defer p.lock.Unlock()
Expand All @@ -166,18 +155,34 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32)
return util.SplitHostPort(v.AdvertisedAddress)
}

var defaultListenerAddress string

var listenerAddress string
if p.deterministicListeners {
defaultListenerAddress = net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort+int(brokerId)))
if brokerId < 0 {
return "", 0, fmt.Errorf("brokerId is negative %s %d", brokerAddress, brokerId)
}
deterministicPort := p.dynamicSequentialMinPort + int(brokerId)
if deterministicPort < p.dynamicSequentialMinPort {
return "", 0, fmt.Errorf("port assignment overflow %s %d: %d", brokerAddress, brokerId, deterministicPort)
}
listenerAddress = net.JoinHostPort(p.defaultListenerIP, strconv.Itoa(deterministicPort))
cfg := p.findListenerConfig(brokerId)
if cfg != nil {
oldBrokerAddress := cfg.GetBrokerAddress()
if oldBrokerAddress != brokerAddress {
delete(p.brokerToListenerConfig, oldBrokerAddress)
cfg.SetBrokerAddress(brokerAddress)
p.brokerToListenerConfig[brokerAddress] = cfg
logrus.Infof("Broker address changed listener %s for new address %s old address %s brokerId %d advertised as %s", cfg.ListenerAddress, cfg.GetBrokerAddress(), oldBrokerAddress, cfg.BrokerID, cfg.AdvertisedAddress)
}
return util.SplitHostPort(cfg.AdvertisedAddress)
}
} else {
defaultListenerAddress = net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort))
listenerAddress = net.JoinHostPort(p.defaultListenerIP, strconv.Itoa(p.dynamicSequentialMinPort))
if p.dynamicSequentialMinPort != 0 {
p.dynamicSequentialMinPort += 1
}
}

cfg := config.ListenerConfig{ListenerAddress: defaultListenerAddress, BrokerAddress: brokerAddress}
cfg := NewListenerConfig(brokerAddress, listenerAddress, "", brokerId)
l, err := listenInstance(p.connSrc, cfg, p.tcpConnOptions, p.listenFunc)
if err != nil {
return "", 0, err
Expand All @@ -189,12 +194,11 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32)
if dynamicAdvertisedListener == "" {
dynamicAdvertisedListener = p.defaultListenerIP
}
cfg.AdvertisedAddress = net.JoinHostPort(dynamicAdvertisedListener, fmt.Sprint(port))
cfg.ListenerAddress = address

advertisedAddress := net.JoinHostPort(dynamicAdvertisedListener, fmt.Sprint(port))
p.brokerToListenerConfig[brokerAddress] = config.ListenerConfig{BrokerAddress: brokerAddress, ListenerAddress: address, AdvertisedAddress: advertisedAddress}
p.brokerIdToIdListenerConfig[brokerId] = config.IdListenerConfig{BrokerAddress: brokerAddress, Listener: l}

logrus.Infof("Dynamic listener %s for broker %s advertised as %s", address, brokerAddress, advertisedAddress)
p.brokerToListenerConfig[brokerAddress] = cfg
logrus.Infof("Dynamic listener %s for broker %s brokerId %d advertised as %s", cfg.ListenerAddress, cfg.GetBrokerAddress(), cfg.BrokerID, cfg.AdvertisedAddress)

return dynamicAdvertisedListener, int32(port), nil
}
Expand All @@ -205,15 +209,16 @@ func (p *Listeners) ListenInstances(cfgs []config.ListenerConfig) (<-chan Conn,

// allows multiple local addresses to point to the remote
for _, v := range cfgs {
_, err := listenInstance(p.connSrc, v, p.tcpConnOptions, p.listenFunc)
cfg := FromListenerConfig(v)
_, err := listenInstance(p.connSrc, cfg, p.tcpConnOptions, p.listenFunc)
if err != nil {
return nil, err
}
}
return p.connSrc, nil
}

func listenInstance(dst chan<- Conn, cfg config.ListenerConfig, opts TCPConnOptions, listenFunc ListenFunc) (net.Listener, error) {
func listenInstance(dst chan<- Conn, cfg *ListenerConfig, opts TCPConnOptions, listenFunc ListenFunc) (net.Listener, error) {
l, err := listenFunc(cfg)
if err != nil {
return nil, err
Expand All @@ -222,20 +227,28 @@ func listenInstance(dst chan<- Conn, cfg config.ListenerConfig, opts TCPConnOpti
for {
c, err := l.Accept()
if err != nil {
logrus.Infof("Error in accept for %q on %v: %v", cfg, cfg.ListenerAddress, err)
logrus.Infof("Error in accept for %q on %v: %v", cfg.ToListenerConfig(), cfg.ListenerAddress, err)
l.Close()
return
}
if tcpConn, ok := c.(*net.TCPConn); ok {
if err := opts.setTCPConnOptions(tcpConn); err != nil {
logrus.Infof("WARNING: Error while setting TCP options for accepted connection %q on %v: %v", cfg, l.Addr().String(), err)
logrus.Infof("WARNING: Error while setting TCP options for accepted connection %q on %v: %v", cfg.ToListenerConfig(), l.Addr().String(), err)
}
}
logrus.Infof("New connection for %s", cfg.BrokerAddress)
dst <- Conn{BrokerAddress: cfg.BrokerAddress, LocalConnection: c}
brokerAddress := cfg.GetBrokerAddress()
if cfg.BrokerID != UnknownBrokerID {
logrus.Infof("New connection for %s brokerId %d", brokerAddress, cfg.BrokerID)
} else {
logrus.Infof("New connection for %s", brokerAddress)
}
dst <- Conn{BrokerAddress: brokerAddress, LocalConnection: c}
}
})

logrus.Infof("Listening on %s (%s) for remote %s", cfg.ListenerAddress, l.Addr().String(), cfg.BrokerAddress)
if cfg.BrokerID != UnknownBrokerID {
logrus.Infof("Listening on %s (%s) for remote %s broker %d", cfg.ListenerAddress, l.Addr().String(), cfg.GetBrokerAddress(), cfg.BrokerID)
} else {
logrus.Infof("Listening on %s (%s) for remote %s", cfg.ListenerAddress, l.Addr().String(), cfg.GetBrokerAddress())
}
return l, nil
}
54 changes: 54 additions & 0 deletions proxy/proxy_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package proxy

import (
"github.com/grepplabs/kafka-proxy/config"
"sync/atomic"
)

const UnknownBrokerID = -1

type ListenerConfig struct {
BrokerAddressPtr atomic.Pointer[string]
ListenerAddress string
AdvertisedAddress string
BrokerID int32
}

func FromListenerConfig(listenerConfig config.ListenerConfig) *ListenerConfig {
c := &ListenerConfig{
ListenerAddress: listenerConfig.ListenerAddress,
AdvertisedAddress: listenerConfig.AdvertisedAddress,
BrokerID: UnknownBrokerID,
}
c.BrokerAddressPtr.Store(&listenerConfig.BrokerAddress)
return c
}

func NewListenerConfig(brokerAddress, listenerAddress, advertisedAddress string, brokerID int32) *ListenerConfig {
c := &ListenerConfig{
ListenerAddress: listenerAddress,
AdvertisedAddress: advertisedAddress,
BrokerID: brokerID,
}
c.BrokerAddressPtr.Store(&brokerAddress)
return c
}
func (c *ListenerConfig) ToListenerConfig() config.ListenerConfig {
return config.ListenerConfig{
BrokerAddress: c.GetBrokerAddress(),
ListenerAddress: c.ListenerAddress,
AdvertisedAddress: c.AdvertisedAddress,
}
}

func (c *ListenerConfig) GetBrokerAddress() string {
addressPtr := c.BrokerAddressPtr.Load()
if addressPtr == nil {
return ""
}
return *addressPtr
}

func (c *ListenerConfig) SetBrokerAddress(address string) {
c.BrokerAddressPtr.Store(&address)
}
Loading

0 comments on commit 93770ec

Please sign in to comment.