Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix rescheduling: SelfIPs and subnet routes not cleaned up before creation #214

Merged
merged 10 commits into from
Apr 14, 2023
Merged
1 change: 0 additions & 1 deletion octavia_f5/api/drivers/f5_driver/tasks/reschedule_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from taskflow import task
from taskflow.types import failure

from octavia.common import constants
from octavia.common import data_models as models
from octavia.db import api as db_apis
from octavia_f5.db import repositories as repo
Expand Down
35 changes: 28 additions & 7 deletions octavia_f5/controller/worker/controller_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ def pending_sync(self):
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
def _get_all_loadbalancer(self, network_id):
LOG.debug("Get load balancers from DB for network id: %s ", network_id)
LOG.debug("Get load balancers from DB for this host for network id: %s ", network_id)
return self._loadbalancer_repo.get_all_by_network(
db_apis.get_session(), network_id=network_id, show_deleted=False)

Expand Down Expand Up @@ -612,7 +612,15 @@ def add_loadbalancer(self, load_balancer_id):

selfips = list(chain.from_iterable(
self.network_driver.ensure_selfips(loadbalancers, CONF.host, cleanup_orphans=False)))
self.l2sync.ensure_l2_flow(selfips, network_id)

# If other LBs in the same network already exist on this host, just ensure correct selfips and subnet routes
lbs_already_present = [lb for lb in loadbalancers if lb.id != load_balancer_id]
if lbs_already_present:
LOG.debug(f'Only syncing SelfIPs and subnet routes on network {network_id}')
self.l2sync.sync_l2_selfips_and_subnet_routes_flow(selfips, network_id)
else:
LOG.debug(f'Running complete ensure_l2_flow on network {network_id}')
self.l2sync.ensure_l2_flow(selfips, network_id)
self.sync.tenant_update(network_id, selfips=selfips, loadbalancers=loadbalancers).raise_for_status()
self.network_driver.invalidate_cache()
return True
Expand All @@ -635,18 +643,31 @@ def remove_loadbalancer(self, load_balancer_id):
LOG.debug("remove_loadbalancer: force removing loadbalancer '%s' for tenant '%s'",
load_balancer_id, network_id)

# all LBs on this device, including the one to be removed
loadbalancers = self._get_all_loadbalancer(network_id)
loadbalancers = [_lb for _lb in loadbalancers if _lb.id != load_balancer_id]

selfips = list(chain.from_iterable(
self.network_driver.ensure_selfips(loadbalancers, CONF.host, cleanup_orphans=False)))
if loadbalancers:
self.l2sync.ensure_l2_flow(selfips, network_id)
self.sync.tenant_update(network_id, selfips=selfips, loadbalancers=loadbalancers).raise_for_status()

loadbalancers_remaining = [lb for lb in loadbalancers if lb.id != load_balancer_id]
if loadbalancers_remaining:
# if there are still load balancers we only need to sync SelfIPs and subnet routes

# If the subnet of the LB to be removed is now empty, remove its SelfIPs by treating them as orphaned
selfips = list(chain.from_iterable(
self.network_driver.ensure_selfips(loadbalancers_remaining, CONF.host, cleanup_orphans=True)))
BenjaminLudwigSAP marked this conversation as resolved.
Show resolved Hide resolved

# provision the rest to the device
self.l2sync.sync_l2_selfips_and_subnet_routes_flow(selfips, network_id)
self.sync.tenant_update(
network_id, selfips=selfips, loadbalancers=loadbalancers_remaining).raise_for_status()

else:
# this was the last load balancer - delete everything
self.sync.tenant_delete(network_id).raise_for_status()
self.l2sync.remove_l2_flow(network_id)
self.network_driver.cleanup_selfips(selfips)

# invalidate cache so that workers forget about the old host
self.network_driver.invalidate_cache()
return True

Expand Down
18 changes: 10 additions & 8 deletions octavia_f5/controller/worker/l2_sync_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,12 @@ def ensure_l2_flow(self, selfips: [network_models.Port], network_id: str, device
if not selfips:
return

# get and check network
network = self._network_driver.get_network(network_id)
if not network.has_bound_segment():
raise Exception(f"Failed ensure_l2_flow for network_id={network_id}: No segment bound")

# run l2 flow for all devices in parallel
fs = {}
for bigip in self._bigips:
if device and bigip.hostname != device:
Expand All @@ -161,15 +163,16 @@ def ensure_l2_flow(self, selfips: [network_models.Port], network_id: str, device
store = {'bigip': bigip, 'network': network, 'subnet_id': subnet_ids.pop()}
fs[self.executor.submit(self._do_ensure_l2_flow, selfips=selfips_for_host, store=store)] = bigip

if CONF.networking.override_vcmp_guest_names:
guest_names = CONF.networking.override_vcmp_guest_names
else:
guest_names = [bigip.hostname for bigip in self._bigips]

# run VCMP l2 flow for all VCMPs in parallel
for vcmp in self._vcmps:
store = {'bigip': vcmp, 'network': network, 'bigip_guest_names': guest_names}
store = {'bigip': vcmp, 'network': network}
if CONF.networking.override_vcmp_guest_names:
store['bigip_guest_names'] = CONF.networking.override_vcmp_guest_names
else:
store['bigip_guest_names'] = [bigip.hostname for bigip in self._bigips]
fs[self.executor.submit(self._do_ensure_vcmp_l2_flow, store=store)] = vcmp

# wait for all flows to finish
failed_bigips = []
done, not_done = futures.wait(fs, timeout=CONF.networking.l2_timeout)
for f in done | not_done:
Expand All @@ -181,10 +184,9 @@ def ensure_l2_flow(self, selfips: [network_models.Port], network_id: str, device
LOG.error("Failed running ensure_l2_flow for host %s: %s", bigip.hostname, e)
failed_bigips.append(bigip)

# We raise error only if all pairs failed
# raise error only if all pairs failed
if self._bigips and all(bigip in failed_bigips for bigip in self._bigips):
raise Exception(f"Failed ensure_l2_flow for all bigip devices of network_id={network_id}")

if self._vcmps and all(vcmp in failed_bigips for vcmp in self._vcmps):
raise Exception(f"Failed ensure_l2_flow for all vcmp devices of network_id={network_id}")

Expand Down
3 changes: 3 additions & 0 deletions octavia_f5/network/drivers/noop_driver_f5/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,6 @@ def cleanup_selfips(self, selfips):

def create_vip(self, load_balancer, candidate):
return self.driver.create_port(load_balancer.vip.network_id)

def invalidate_cache(self, hard=True):
pass
39 changes: 33 additions & 6 deletions octavia_f5/tests/unit/controller/worker/test_controller_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@

CONF = cfg.CONF

_health_mon_mock = mock.MagicMock()
LB_ID = uuidutils.generate_uuid()
NETWORK_ID = uuidutils.generate_uuid()
_status_manager = mock.MagicMock()
_vip_mock = mock.MagicMock()
_vip_mock.network_id = NETWORK_ID
_listener_mock = mock.MagicMock()
_load_balancer_mock = mock.MagicMock()
_load_balancer_mock.id = LB_ID
_load_balancer_mock.listeners = [_listener_mock]
_load_balancer_mock.vip = _vip_mock
_load_balancer_mock.flavor_id = None
_load_balancer_mock.availability_zone = None
_member_mock = mock.MagicMock()
_pool_mock = mock.MagicMock()
_l7policy_mock = mock.MagicMock()
_l7rule_mock = mock.MagicMock()
_az_mock = mock.MagicMock()
_selfip = mock.MagicMock()
_db_session = mock.MagicMock()


Expand All @@ -49,6 +49,8 @@ def setUp(self):
conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
conf.config(group="f5_agent", prometheus=False)
conf.config(group="controller_worker", network_driver='network_noop_driver_f5')
# prevent ControllerWorker() from spawning threads
conf.config(group="f5_agent", sync_immediately=False)

@mock.patch('octavia.db.repositories.AvailabilityZoneRepository')
@mock.patch('octavia.db.repositories.AvailabilityZoneProfileRepository')
Expand Down Expand Up @@ -79,3 +81,28 @@ def test_register_in_availability_zone(self,
cw.register_in_availability_zone(az)
mock_az_repo.return_value.create.assert_called_once()
mock_azp_repo.return_value.create.assert_called_once()

@mock.patch('octavia.db.repositories.LoadBalancerRepository.get',
return_value=_load_balancer_mock)
@mock.patch('octavia_f5.db.repositories.LoadBalancerRepository.get_all_by_network',
return_value=[_load_balancer_mock])
@mock.patch("octavia_f5.network.drivers.noop_driver_f5.driver.NoopNetworkDriverF5"
".ensure_selfips",
return_value=([_selfip], []))
@mock.patch("octavia_f5.network.drivers.noop_driver_f5.driver.NoopNetworkDriverF5"
".cleanup_selfips")
def test_remove_loadbalancer_last(self,
mock_cleanup_selfips,
mock_ensure_selfips,
mock_lb_repo_get_all_by_network,
mock_lb_repo_get,
mock_api_get_session,
mock_sync_manager,
mock_status_manager):
cw = controller_worker.ControllerWorker()
cw.remove_loadbalancer(LB_ID)

mock_lb_repo_get_all_by_network.assert_called_once_with(_db_session, network_id=NETWORK_ID, show_deleted=False)
mock_lb_repo_get.assert_called_once_with(_db_session, id=LB_ID)
mock_ensure_selfips.assert_called_with([_load_balancer_mock], CONF.host, cleanup_orphans=False)
mock_cleanup_selfips.assert_called_with([_selfip])