From e696946adf4df4358f8511b70208e96783c6730d Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 20 Nov 2024 08:16:39 -0500 Subject: [PATCH 1/6] ERL: Use IP address instead of IP:port pair --- util/rateLimit.go | 17 ++++++++++------- util/rateLimit_test.go | 28 +++++++++++++++++----------- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/util/rateLimit.go b/util/rateLimit.go index 3fea7f8912..a6d7b4dead 100644 --- a/util/rateLimit.go +++ b/util/rateLimit.go @@ -42,7 +42,7 @@ type ElasticRateLimiter struct { MaxCapacity int CapacityPerReservation int sharedCapacity capacityQueue - capacityByClient map[ErlClient]capacityQueue + capacityByClient map[string]capacityQueue clientLock deadlock.RWMutex // CongestionManager and enable flag cm CongestionManager @@ -53,6 +53,7 @@ type ElasticRateLimiter struct { // ErlClient clients must support OnClose for reservation closing type ErlClient interface { OnClose(func()) + RoutingAddr() []byte } // capacity is an empty structure used for loading and draining queues @@ -122,7 +123,7 @@ func NewElasticRateLimiter( ret := ElasticRateLimiter{ MaxCapacity: maxCapacity, CapacityPerReservation: reservedCapacity, - capacityByClient: map[ErlClient]capacityQueue{}, + capacityByClient: map[string]capacityQueue{}, sharedCapacity: capacityQueue(make(chan capacity, maxCapacity)), congestionControlCounter: conmanCount, } @@ -178,7 +179,7 @@ func (erl *ElasticRateLimiter) ConsumeCapacity(c ErlClient) (*ErlCapacityGuard, var isCMEnabled bool // get the client's queue erl.clientLock.RLock() - q, exists = erl.capacityByClient[c] + q, exists = erl.capacityByClient[string(c.RoutingAddr())] isCMEnabled = erl.enableCM erl.clientLock.RUnlock() @@ -234,7 +235,8 @@ func (erl *ElasticRateLimiter) ConsumeCapacity(c ErlClient) (*ErlCapacityGuard, func (erl *ElasticRateLimiter) openReservation(c ErlClient) (capacityQueue, error) { erl.clientLock.Lock() defer erl.clientLock.Unlock() - if _, exists := erl.capacityByClient[c]; exists { + addr := string(c.RoutingAddr()) + if _, exists := erl.capacityByClient[addr]; exists { return capacityQueue(nil), errERLReservationExists } // guard against overprovisioning, if there is less than a reservedCapacity amount left @@ -244,7 +246,7 @@ func (erl *ElasticRateLimiter) openReservation(c ErlClient) (capacityQueue, erro } // make capacity for the provided client q := capacityQueue(make(chan capacity, erl.CapacityPerReservation)) - erl.capacityByClient[c] = q + erl.capacityByClient[addr] = q // create a thread to drain the capacity from sharedCapacity in a blocking way // and move it to the reservation, also in a blocking way go func() { @@ -261,12 +263,13 @@ func (erl *ElasticRateLimiter) openReservation(c ErlClient) (capacityQueue, erro func (erl *ElasticRateLimiter) closeReservation(c ErlClient) { erl.clientLock.Lock() defer erl.clientLock.Unlock() - q, exists := erl.capacityByClient[c] + addr := string(c.RoutingAddr()) + q, exists := erl.capacityByClient[addr] // guard clauses, and preventing the ElasticRateLimiter from draining its own sharedCapacity if !exists || q == erl.sharedCapacity { return } - delete(erl.capacityByClient, c) + delete(erl.capacityByClient, addr) // start a routine to consume capacity from the closed reservation, and return it to the sharedCapacity go func() { for i := 0; i < erl.CapacityPerReservation; i++ { diff --git a/util/rateLimit_test.go b/util/rateLimit_test.go index 8888bfcf4c..938ba6a657 100644 --- a/util/rateLimit_test.go +++ b/util/rateLimit_test.go @@ -38,6 +38,10 @@ func (c mockClient) OnClose(func()) { return } +func (c mockClient) RoutingAddr() []byte { + return []byte(c) +} + func TestNewElasticRateLimiter(t *testing.T) { partitiontest.PartitionTest(t) erl := NewElasticRateLimiter(100, 10, time.Second, nil) @@ -49,6 +53,7 @@ func TestNewElasticRateLimiter(t *testing.T) { func TestElasticRateLimiterCongestionControlled(t *testing.T) { partitiontest.PartitionTest(t) client := mockClient("client") + clientAddr := string(client.RoutingAddr()) erl := NewElasticRateLimiter(3, 2, time.Second, nil) // give the ERL a congestion controler with well defined behavior for testing erl.cm = mockCongestionControl{} @@ -57,24 +62,24 @@ func TestElasticRateLimiterCongestionControlled(t *testing.T) { // because the ERL gives capacity to a reservation, and then asynchronously drains capacity from the share, // wait a moment before testing the size of the sharedCapacity time.Sleep(100 * time.Millisecond) - assert.Equal(t, 1, len(erl.capacityByClient[client])) + assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) erl.EnableCongestionControl() _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[client])) + assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[client])) + assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.Error(t, err) erl.DisableCongestionControl() _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[client])) + assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.NoError(t, err) } @@ -132,46 +137,47 @@ func TestZeroSizeReservations(t *testing.T) { func TestConsumeReleaseCapacity(t *testing.T) { partitiontest.PartitionTest(t) client := mockClient("client") + clientAddr := string(client.RoutingAddr()) erl := NewElasticRateLimiter(4, 3, time.Second, nil) c1, _, err := erl.ConsumeCapacity(client) // because the ERL gives capacity to a reservation, and then asynchronously drains capacity from the share, // wait a moment before testing the size of the sharedCapacity time.Sleep(100 * time.Millisecond) - assert.Equal(t, 2, len(erl.capacityByClient[client])) + assert.Equal(t, 2, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 1, len(erl.capacityByClient[client])) + assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[client])) + assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) // remember this capacity, as it is a shared capacity c4, _, err := erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[client])) + assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[client])) + assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.Error(t, err) // now release the capacity and observe the items return to the correct places err = c1.Release() - assert.Equal(t, 1, len(erl.capacityByClient[client])) + assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.NoError(t, err) // now release the capacity and observe the items return to the correct places err = c4.Release() - assert.Equal(t, 1, len(erl.capacityByClient[client])) + assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) From 39911118fa9c3e956b77a6c08d66bce2e476b484 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 20 Nov 2024 08:27:42 -0500 Subject: [PATCH 2/6] fix linter --- util/rateLimit_test.go | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/util/rateLimit_test.go b/util/rateLimit_test.go index 938ba6a657..7c09ee3e9a 100644 --- a/util/rateLimit_test.go +++ b/util/rateLimit_test.go @@ -53,7 +53,6 @@ func TestNewElasticRateLimiter(t *testing.T) { func TestElasticRateLimiterCongestionControlled(t *testing.T) { partitiontest.PartitionTest(t) client := mockClient("client") - clientAddr := string(client.RoutingAddr()) erl := NewElasticRateLimiter(3, 2, time.Second, nil) // give the ERL a congestion controler with well defined behavior for testing erl.cm = mockCongestionControl{} @@ -62,24 +61,24 @@ func TestElasticRateLimiterCongestionControlled(t *testing.T) { // because the ERL gives capacity to a reservation, and then asynchronously drains capacity from the share, // wait a moment before testing the size of the sharedCapacity time.Sleep(100 * time.Millisecond) - assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 1, len(erl.capacityByClient[string(client.RoutingAddr())])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) erl.EnableCongestionControl() _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 0, len(erl.capacityByClient[string(client.RoutingAddr())])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 0, len(erl.capacityByClient[string(client.RoutingAddr())])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.Error(t, err) erl.DisableCongestionControl() _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 0, len(erl.capacityByClient[string(client.RoutingAddr())])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.NoError(t, err) } @@ -137,47 +136,46 @@ func TestZeroSizeReservations(t *testing.T) { func TestConsumeReleaseCapacity(t *testing.T) { partitiontest.PartitionTest(t) client := mockClient("client") - clientAddr := string(client.RoutingAddr()) erl := NewElasticRateLimiter(4, 3, time.Second, nil) c1, _, err := erl.ConsumeCapacity(client) // because the ERL gives capacity to a reservation, and then asynchronously drains capacity from the share, // wait a moment before testing the size of the sharedCapacity time.Sleep(100 * time.Millisecond) - assert.Equal(t, 2, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 2, len(erl.capacityByClient[string(client.RoutingAddr())])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 1, len(erl.capacityByClient[string(client.RoutingAddr())])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 0, len(erl.capacityByClient[string(client.RoutingAddr())])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) // remember this capacity, as it is a shared capacity c4, _, err := erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 0, len(erl.capacityByClient[string(client.RoutingAddr())])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 0, len(erl.capacityByClient[string(client.RoutingAddr())])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.Error(t, err) // now release the capacity and observe the items return to the correct places err = c1.Release() - assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 1, len(erl.capacityByClient[string(client.RoutingAddr())])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.NoError(t, err) // now release the capacity and observe the items return to the correct places err = c4.Release() - assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 1, len(erl.capacityByClient[string(client.RoutingAddr())])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) From 145a7f7b51b7ed68063bf6cd33b8a55ba8e8673d Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 14 Feb 2025 11:54:49 -0500 Subject: [PATCH 3/6] Revert "fix linter" This reverts commit 39911118fa9c3e956b77a6c08d66bce2e476b484. --- util/rateLimit_test.go | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/util/rateLimit_test.go b/util/rateLimit_test.go index 4b3f4007dc..638bc58467 100644 --- a/util/rateLimit_test.go +++ b/util/rateLimit_test.go @@ -53,6 +53,7 @@ func TestNewElasticRateLimiter(t *testing.T) { func TestElasticRateLimiterCongestionControlled(t *testing.T) { partitiontest.PartitionTest(t) client := mockClient("client") + clientAddr := string(client.RoutingAddr()) erl := NewElasticRateLimiter(3, 2, time.Second, nil) // give the ERL a congestion controler with well defined behavior for testing erl.cm = mockCongestionControl{} @@ -61,24 +62,24 @@ func TestElasticRateLimiterCongestionControlled(t *testing.T) { // because the ERL gives capacity to a reservation, and then asynchronously drains capacity from the share, // wait a moment before testing the size of the sharedCapacity time.Sleep(100 * time.Millisecond) - assert.Equal(t, 1, len(erl.capacityByClient[string(client.RoutingAddr())])) + assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) erl.EnableCongestionControl() _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[string(client.RoutingAddr())])) + assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[string(client.RoutingAddr())])) + assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.Error(t, err) erl.DisableCongestionControl() _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[string(client.RoutingAddr())])) + assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.NoError(t, err) } @@ -136,46 +137,47 @@ func TestZeroSizeReservations(t *testing.T) { func TestConsumeReleaseCapacity(t *testing.T) { partitiontest.PartitionTest(t) client := mockClient("client") + clientAddr := string(client.RoutingAddr()) erl := NewElasticRateLimiter(4, 3, time.Second, nil) c1, _, err := erl.ConsumeCapacity(client) // because the ERL gives capacity to a reservation, and then asynchronously drains capacity from the share, // wait a moment before testing the size of the sharedCapacity time.Sleep(100 * time.Millisecond) - assert.Equal(t, 2, len(erl.capacityByClient[string(client.RoutingAddr())])) + assert.Equal(t, 2, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 1, len(erl.capacityByClient[string(client.RoutingAddr())])) + assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[string(client.RoutingAddr())])) + assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) // remember this capacity, as it is a shared capacity c4, _, err := erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[string(client.RoutingAddr())])) + assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[string(client.RoutingAddr())])) + assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.Error(t, err) // now release the capacity and observe the items return to the correct places err = c1.Release() - assert.Equal(t, 1, len(erl.capacityByClient[string(client.RoutingAddr())])) + assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.NoError(t, err) // now release the capacity and observe the items return to the correct places err = c4.Release() - assert.Equal(t, 1, len(erl.capacityByClient[string(client.RoutingAddr())])) + assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) From b6abe5af71e7caaea9884f298671895317be6a54 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 14 Feb 2025 11:54:56 -0500 Subject: [PATCH 4/6] Revert "ERL: Use IP address instead of IP:port pair" This reverts commit e696946adf4df4358f8511b70208e96783c6730d. --- util/rateLimit.go | 17 +++++++---------- util/rateLimit_test.go | 28 +++++++++++----------------- 2 files changed, 18 insertions(+), 27 deletions(-) diff --git a/util/rateLimit.go b/util/rateLimit.go index b062c2cafc..8406711418 100644 --- a/util/rateLimit.go +++ b/util/rateLimit.go @@ -42,7 +42,7 @@ type ElasticRateLimiter struct { MaxCapacity int CapacityPerReservation int sharedCapacity capacityQueue - capacityByClient map[string]capacityQueue + capacityByClient map[ErlClient]capacityQueue clientLock deadlock.RWMutex // CongestionManager and enable flag cm CongestionManager @@ -53,7 +53,6 @@ type ElasticRateLimiter struct { // ErlClient clients must support OnClose for reservation closing type ErlClient interface { OnClose(func()) - RoutingAddr() []byte } // capacity is an empty structure used for loading and draining queues @@ -123,7 +122,7 @@ func NewElasticRateLimiter( ret := ElasticRateLimiter{ MaxCapacity: maxCapacity, CapacityPerReservation: reservedCapacity, - capacityByClient: map[string]capacityQueue{}, + capacityByClient: map[ErlClient]capacityQueue{}, sharedCapacity: capacityQueue(make(chan capacity, maxCapacity)), congestionControlCounter: conmanCount, } @@ -179,7 +178,7 @@ func (erl *ElasticRateLimiter) ConsumeCapacity(c ErlClient) (*ErlCapacityGuard, var isCMEnabled bool // get the client's queue erl.clientLock.RLock() - q, exists = erl.capacityByClient[string(c.RoutingAddr())] + q, exists = erl.capacityByClient[c] isCMEnabled = erl.enableCM erl.clientLock.RUnlock() @@ -235,8 +234,7 @@ func (erl *ElasticRateLimiter) ConsumeCapacity(c ErlClient) (*ErlCapacityGuard, func (erl *ElasticRateLimiter) openReservation(c ErlClient) (capacityQueue, error) { erl.clientLock.Lock() defer erl.clientLock.Unlock() - addr := string(c.RoutingAddr()) - if _, exists := erl.capacityByClient[addr]; exists { + if _, exists := erl.capacityByClient[c]; exists { return capacityQueue(nil), errERLReservationExists } // guard against overprovisioning, if there is less than a reservedCapacity amount left @@ -246,7 +244,7 @@ func (erl *ElasticRateLimiter) openReservation(c ErlClient) (capacityQueue, erro } // make capacity for the provided client q := capacityQueue(make(chan capacity, erl.CapacityPerReservation)) - erl.capacityByClient[addr] = q + erl.capacityByClient[c] = q // create a thread to drain the capacity from sharedCapacity in a blocking way // and move it to the reservation, also in a blocking way go func() { @@ -263,13 +261,12 @@ func (erl *ElasticRateLimiter) openReservation(c ErlClient) (capacityQueue, erro func (erl *ElasticRateLimiter) closeReservation(c ErlClient) { erl.clientLock.Lock() defer erl.clientLock.Unlock() - addr := string(c.RoutingAddr()) - q, exists := erl.capacityByClient[addr] + q, exists := erl.capacityByClient[c] // guard clauses, and preventing the ElasticRateLimiter from draining its own sharedCapacity if !exists || q == erl.sharedCapacity { return } - delete(erl.capacityByClient, addr) + delete(erl.capacityByClient, c) // start a routine to consume capacity from the closed reservation, and return it to the sharedCapacity go func() { for i := 0; i < erl.CapacityPerReservation; i++ { diff --git a/util/rateLimit_test.go b/util/rateLimit_test.go index 638bc58467..5a62cac5b5 100644 --- a/util/rateLimit_test.go +++ b/util/rateLimit_test.go @@ -38,10 +38,6 @@ func (c mockClient) OnClose(func()) { return } -func (c mockClient) RoutingAddr() []byte { - return []byte(c) -} - func TestNewElasticRateLimiter(t *testing.T) { partitiontest.PartitionTest(t) erl := NewElasticRateLimiter(100, 10, time.Second, nil) @@ -53,7 +49,6 @@ func TestNewElasticRateLimiter(t *testing.T) { func TestElasticRateLimiterCongestionControlled(t *testing.T) { partitiontest.PartitionTest(t) client := mockClient("client") - clientAddr := string(client.RoutingAddr()) erl := NewElasticRateLimiter(3, 2, time.Second, nil) // give the ERL a congestion controler with well defined behavior for testing erl.cm = mockCongestionControl{} @@ -62,24 +57,24 @@ func TestElasticRateLimiterCongestionControlled(t *testing.T) { // because the ERL gives capacity to a reservation, and then asynchronously drains capacity from the share, // wait a moment before testing the size of the sharedCapacity time.Sleep(100 * time.Millisecond) - assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 1, len(erl.capacityByClient[client])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) erl.EnableCongestionControl() _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 0, len(erl.capacityByClient[client])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 0, len(erl.capacityByClient[client])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.Error(t, err) erl.DisableCongestionControl() _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 0, len(erl.capacityByClient[client])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.NoError(t, err) } @@ -137,47 +132,46 @@ func TestZeroSizeReservations(t *testing.T) { func TestConsumeReleaseCapacity(t *testing.T) { partitiontest.PartitionTest(t) client := mockClient("client") - clientAddr := string(client.RoutingAddr()) erl := NewElasticRateLimiter(4, 3, time.Second, nil) c1, _, err := erl.ConsumeCapacity(client) // because the ERL gives capacity to a reservation, and then asynchronously drains capacity from the share, // wait a moment before testing the size of the sharedCapacity time.Sleep(100 * time.Millisecond) - assert.Equal(t, 2, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 2, len(erl.capacityByClient[client])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 1, len(erl.capacityByClient[client])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 0, len(erl.capacityByClient[client])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) // remember this capacity, as it is a shared capacity c4, _, err := erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 0, len(erl.capacityByClient[client])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.NoError(t, err) _, _, err = erl.ConsumeCapacity(client) - assert.Equal(t, 0, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 0, len(erl.capacityByClient[client])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.Error(t, err) // now release the capacity and observe the items return to the correct places err = c1.Release() - assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 1, len(erl.capacityByClient[client])) assert.Equal(t, 0, len(erl.sharedCapacity)) assert.NoError(t, err) // now release the capacity and observe the items return to the correct places err = c4.Release() - assert.Equal(t, 1, len(erl.capacityByClient[clientAddr])) + assert.Equal(t, 1, len(erl.capacityByClient[client])) assert.Equal(t, 1, len(erl.sharedCapacity)) assert.NoError(t, err) From 038936e9c5fa8dc733a595ad1fbfabd235ef8dbc Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 14 Feb 2025 15:54:23 -0500 Subject: [PATCH 5/6] ip-based ERL client --- data/txHandler.go | 121 +++++++++++++++++++++++++++++++++-------- data/txHandler_test.go | 93 +++++++++++++++++++++++++++++++ 2 files changed, 191 insertions(+), 23 deletions(-) diff --git a/data/txHandler.go b/data/txHandler.go index 65fd869d7e..53aeba22c5 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -38,6 +38,7 @@ import ( "github.com/algorand/go-algorand/util" "github.com/algorand/go-algorand/util/execpool" "github.com/algorand/go-algorand/util/metrics" + "github.com/algorand/go-deadlock" ) var transactionMessagesHandled = metrics.MakeCounter(metrics.TransactionMessagesHandled) @@ -130,6 +131,7 @@ type TxHandler struct { streamVerifierChan chan execpool.InputJob streamVerifierDropped chan *verify.UnverifiedTxnSigJob erl *util.ElasticRateLimiter + erlClientMapper erlClientMapper appLimiter *appRateLimiter appLimiterBacklogThreshold int appLimiterCountERLDrops bool @@ -206,6 +208,10 @@ func MakeTxHandler(opts TxHandlerOpts) (*TxHandler, error) { time.Duration(opts.Config.TxBacklogServiceRateWindowSeconds)*time.Second, txBacklogDroppedCongestionManagement, ) + handler.erlClientMapper = erlClientMapper{ + mapping: make(map[string]*erlIPClient), + maxClients: opts.Config.MaxConnectionsPerIP, + } } if opts.Config.EnableTxBacklogAppRateLimiting { handler.appLimiter = makeAppRateLimiter( @@ -623,32 +629,97 @@ func (handler *TxHandler) incomingMsgDupCheck(data []byte) (*crypto.Digest, bool return msgKey, false } +// erlClientMapper handles erlIPClient creation from erlClient +// in order to map multiple clients to a single IP address. +// Then that meta erlIPClient is supposed to be supplied to ERL +type erlClientMapper struct { + m deadlock.RWMutex + mapping map[string]*erlIPClient + maxClients int +} + +// getClient returns erlIPClient for a given sender +func (mp *erlClientMapper) getClient(sender network.DisconnectableAddressablePeer) util.ErlClient { + addr := string(sender.RoutingAddr()) + ec := sender.(util.ErlClient) + + mp.m.Lock() + ipClient, has := mp.mapping[addr] + if !has { + ipClient = &erlIPClient{ + clients: make(map[util.ErlClient]struct{}, mp.maxClients), + } + mp.mapping[addr] = ipClient + } + mp.m.Unlock() + + ipClient.register(ec) + return ipClient +} + +type erlIPClient struct { + util.ErlClient + m deadlock.RWMutex + clients map[util.ErlClient]struct{} + closer func() +} + +func (eic *erlIPClient) OnClose(f func()) { + eic.closer = f +} + +// register registers a new client to the erlIPClient +// by adding a helper closer function to track connection closures +func (eic *erlIPClient) register(ec util.ErlClient) { + eic.m.Lock() + _, has := eic.clients[ec] + defer eic.m.Unlock() + if has { + // this peer is known => noop + return + } + eic.clients[ec] = struct{}{} + + ec.OnClose(func() { + eic.connClosed(ec) + }) +} + +// connClosed is called when a connection is closed so that +// erlIPClient removes the client from its list of clients +// and calls the closer function if there are no more clients +func (eic *erlIPClient) connClosed(ec util.ErlClient) { + eic.m.Lock() + delete(eic.clients, ec) + empty := len(eic.clients) == 0 + eic.m.Unlock() + if empty && eic.closer != nil { + eic.closer() + eic.closer = nil + } +} + // incomingMsgErlCheck runs the rate limiting check on a sender. // Returns: // - the capacity guard returned by the elastic rate limiter // - a boolean indicating if the sender is rate limited -func (handler *TxHandler) incomingMsgErlCheck(sender network.DisconnectableAddressablePeer) (*util.ErlCapacityGuard, bool) { - var capguard *util.ErlCapacityGuard - var isCMEnabled bool - var err error - if handler.erl != nil { - congestedERL := float64(cap(handler.backlogQueue))*handler.backlogCongestionThreshold < float64(len(handler.backlogQueue)) - // consume a capacity unit - // if the elastic rate limiter cannot vend a capacity, the error it returns - // is sufficient to indicate that we should enable Congestion Control, because - // an issue in vending capacity indicates the underlying resource (TXBacklog) is full - capguard, isCMEnabled, err = handler.erl.ConsumeCapacity(sender.(util.ErlClient)) - if err != nil || // did ERL ask to enable congestion control? - (!isCMEnabled && congestedERL) { // is CM not currently enabled, but queue is congested? - handler.erl.EnableCongestionControl() - // if there is no capacity, it is the same as if we failed to put the item onto the backlog, so report such - transactionMessagesDroppedFromBacklog.Inc(nil) - return capguard, true - } - // if the backlog Queue has 50% of its buffer back, turn congestion control off - if !congestedERL { - handler.erl.DisableCongestionControl() - } +func (handler *TxHandler) incomingMsgErlCheck(sender util.ErlClient) (*util.ErlCapacityGuard, bool) { + congestedERL := float64(cap(handler.backlogQueue))*handler.backlogCongestionThreshold < float64(len(handler.backlogQueue)) + // consume a capacity unit + // if the elastic rate limiter cannot vend a capacity, the error it returns + // is sufficient to indicate that we should enable Congestion Control, because + // an issue in vending capacity indicates the underlying resource (TXBacklog) is full + capguard, isCMEnabled, err := handler.erl.ConsumeCapacity(sender) + if err != nil || // did ERL ask to enable congestion control? + (!isCMEnabled && congestedERL) { // is CM not currently enabled, but queue is congested? + handler.erl.EnableCongestionControl() + // if there is no capacity, it is the same as if we failed to put the item onto the backlog, so report such + transactionMessagesDroppedFromBacklog.Inc(nil) + return capguard, true + } + // if the backlog Queue has 50% of its buffer back, turn congestion control off + if !congestedERL { + handler.erl.DisableCongestionControl() } return capguard, false } @@ -738,7 +809,11 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net return network.OutgoingMessage{Action: network.Ignore} } - capguard, shouldDrop := handler.incomingMsgErlCheck(rawmsg.Sender) + var capguard *util.ErlCapacityGuard + if handler.erl != nil { + client := handler.erlClientMapper.getClient(rawmsg.Sender) + capguard, shouldDrop = handler.incomingMsgErlCheck(client) + } accepted := false defer func() { // if we failed to put the item onto the backlog, we should release the capacity if any diff --git a/data/txHandler_test.go b/data/txHandler_test.go index 67e82468fa..4626623204 100644 --- a/data/txHandler_test.go +++ b/data/txHandler_test.go @@ -51,6 +51,7 @@ import ( "github.com/algorand/go-algorand/network" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/test/partitiontest" + "github.com/algorand/go-algorand/util" "github.com/algorand/go-algorand/util/execpool" "github.com/algorand/go-algorand/util/metrics" ) @@ -2822,3 +2823,95 @@ func TestTxHandlerValidateIncomingTxMessage(t *testing.T) { require.Equal(t, outmsg.Action, network.Disconnect) }) } + +// Create mock types to satisfy interfaces +type erlMockPeer struct { + network.DisconnectableAddressablePeer + util.ErlClient + addr string + closer func() +} + +func newErlMockPeer(addr string) *erlMockPeer { + return &erlMockPeer{ + addr: addr, + } +} + +// Implement required interface methods +func (m *erlMockPeer) RoutingAddr() []byte { return []byte(m.addr) } +func (m *erlMockPeer) OnClose(f func()) { m.closer = f } + +func TestTxHandlerErlClientMapper(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + t.Run("Same routing address clients share erlIPClient", func(t *testing.T) { + mapper := erlClientMapper{ + mapping: make(map[string]*erlIPClient), + maxClients: 4, + } + + peer1 := newErlMockPeer("192.168.1.1") + peer2 := newErlMockPeer("192.168.1.1") + + client1 := mapper.getClient(peer1) + client2 := mapper.getClient(peer2) + + // Verify both peers got same erlIPClient + require.Equal(t, client1, client2, "Expected same erlIPClient for same routing address") + require.Equal(t, 1, len(mapper.mapping)) + + ipClient := mapper.mapping["192.168.1.1"] + require.Equal(t, 2, len(ipClient.clients)) + }) + + t.Run("Different routing addresses get different erlIPClients", func(t *testing.T) { + mapper := erlClientMapper{ + mapping: make(map[string]*erlIPClient), + maxClients: 4, + } + + peer1 := newErlMockPeer("192.168.1.1") + peer2 := newErlMockPeer("192.168.1.2") + + client1 := mapper.getClient(peer1) + client2 := mapper.getClient(peer2) + + // Verify peers got different erlIPClients + require.NotEqual(t, client1, client2, "Expected different erlIPClients for different routing addresses") + require.Equal(t, 2, len(mapper.mapping)) + }) + + t.Run("Client cleanup on connection close", func(t *testing.T) { + mapper := erlClientMapper{ + mapping: make(map[string]*erlIPClient), + maxClients: 4, + } + + peer1 := newErlMockPeer("192.168.1.1") + peer2 := newErlMockPeer("192.168.1.1") + + // Register clients for both peers + mapper.getClient(peer1) + mapper.getClient(peer2) + + ipClient := mapper.mapping["192.168.1.1"] + closerCalled := false + ipClient.OnClose(func() { + closerCalled = true + }) + + require.Equal(t, 2, len(ipClient.clients)) + + // Simulate connection close for peer1 + peer1.closer() + require.Equal(t, 1, len(ipClient.clients)) + require.False(t, closerCalled) + + // Simulate connection close for peer2 + peer2.closer() + require.Equal(t, 0, len(ipClient.clients)) + require.True(t, closerCalled) + }) +} From 5948fcc60689e17625995a38c04836752a31158e Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 14 Feb 2025 17:49:13 -0500 Subject: [PATCH 6/6] fix closer data race --- data/txHandler.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/data/txHandler.go b/data/txHandler.go index 53aeba22c5..0b1db364db 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -23,6 +23,7 @@ import ( "fmt" "io" "sync" + "sync/atomic" "time" "github.com/algorand/go-algorand/config" @@ -661,11 +662,11 @@ type erlIPClient struct { util.ErlClient m deadlock.RWMutex clients map[util.ErlClient]struct{} - closer func() + closer atomic.Value } func (eic *erlIPClient) OnClose(f func()) { - eic.closer = f + eic.closer.Store(f) } // register registers a new client to the erlIPClient @@ -693,9 +694,14 @@ func (eic *erlIPClient) connClosed(ec util.ErlClient) { delete(eic.clients, ec) empty := len(eic.clients) == 0 eic.m.Unlock() - if empty && eic.closer != nil { - eic.closer() - eic.closer = nil + // if no elements left, call the closer + // use atomic Swap in order to retrieve the closer and call it once + if empty { + // atomic.Value.Swap does not like nil values so use a typed nil + // and cast it to func() in order to compare with nil + if closer := eic.closer.Swap((func())(nil)).(func()); closer != nil { + closer() + } } }