Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove greenlet from asr1kl3 agent, use python threads instead #63

Open
wants to merge 4 commits into
base: stable/ussuri-m3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions asr1k_neutron_l3/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from oslo_log import log as logging
from oslo_config import cfg
from oslo_service import _options as oslo_opts
from neutron.conf.agent import common
from neutron.conf import service
from neutron_lib._i18n import _
Expand Down Expand Up @@ -145,6 +146,7 @@ def register_l3_opts():
cfg.CONF.register_opts(ASR1K_L2_OPTS, "asr1k_l2")
cfg.CONF.register_opts(AGENT_STATE_OPTS, 'AGENT')
cfg.CONF.register_opts(AVAILABILITY_ZONE_OPTS, 'AGENT')
cfg.CONF.register_opts(oslo_opts.eventlet_backdoor_opts)
common.register_interface_opts()
common.register_interface_driver_opts_helper(cfg.CONF)

Expand Down
5 changes: 0 additions & 5 deletions asr1k_neutron_l3/models/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os

if not os.environ.get('DISABLE_EVENTLET_PATCHING'):
import eventlet
eventlet.monkey_patch()

import datetime
from retrying import retry
Expand Down
204 changes: 106 additions & 98 deletions asr1k_neutron_l3/plugins/l3/agents/asr1k_l3_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,20 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os

if not os.environ.get('DISABLE_EVENTLET_PATCHING'):
import eventlet
eventlet.monkey_patch()

import datetime
import eventlet
import gc
import re
import requests
import os
import threading
import signal
import sys
import time
import traceback
import urllib3

from greenlet import greenlet
import manhole
from futurist import periodics, ThreadPoolExecutor
from neutron.agent.common import resource_processing_queue as queue
from neutron.agent.l3 import agent as l3_agent
from neutron.agent.linux import external_process
from neutron.agent import rpc as agent_rpc
from neutron.common import config as common_config
from neutron import service as neutron_service
from neutron_lib.agent import constants as agent_consts
from neutron_lib.agent import topics
from neutron_lib.callbacks import events
Expand All @@ -46,14 +36,10 @@
from neutron_lib import constants as lib_constants
from neutron_lib.exceptions import l3 as l3_exc
from neutron_lib import rpc as n_rpc
from neutron import manager
from oslo_config import cfg
from oslo_log import helpers as log_helpers
from oslo_log import log as logging
import oslo_messaging
from oslo_service import loopingcall
from oslo_service import periodic_task
from oslo_service import service
from oslo_utils import timeutils

from asr1k_neutron_l3.plugins.l3.agents import router_processing_queue as asr1k_queue
Expand All @@ -79,30 +65,19 @@
# from neutron.services.firewall.agents.l3reference import firewall_l3_agent

LOG = logging.getLogger(__name__)
CONF = cfg.CONF

requests.packages.urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Number of routers to fetch from server at a time on resync.
# Needed to reduce load on server side and to speed up resync on agent side.
SYNC_ROUTERS_MIN_CHUNK_SIZE = 1


def main(manager='asr1k_neutron_l3.plugins.l3.agents.asr1k_l3_agent.L3ASRAgentWithStateReport'):
def main():
asr1k_config.register_l3_opts()
common_config.init(sys.argv[1:])
common_config.setup_logging()
# set periodic interval to 10 seconds, as I understand the code this means
# the
server = neutron_service.Service.create(
binary='neutron-asr1k-l3-agent',
topic=topics.L3_AGENT,
report_interval=cfg.CONF.AGENT.report_interval,
periodic_interval=10,
periodic_fuzzy_delay=10,
manager=manager)
service.launch(cfg.CONF, server).wait()


# Number of routers to fetch from server at a time on resync.
# Needed to reduce load on server side and to speed up resync on agent side.
SYNC_ROUTERS_MIN_CHUNK_SIZE = 1
server = L3ASRAgentWithStateReport(CONF.host)
server.run()


class L3PluginApi(object):
Expand Down Expand Up @@ -263,13 +238,16 @@ def get_all_router_ids(self, context):
return cctxt.call(context, 'get_all_router_ids', host=self.host)


class L3ASRAgent(manager.Manager, operations.OperationsMixin, DeviceCleanerMixin):
class L3ASRAgent(operations.OperationsMixin, DeviceCleanerMixin):
"""Manager for L3 ASR Agent

API version history:
1.0 initial Version
"""
target = oslo_messaging.Target(version='1.3')
# We need to register and parse the asr1k_l3 config options for annotation configs
asr1k_config.register_l3_opts()
common_config.init(sys.argv[1:])

def __init__(self, host, conf=None):
if conf:
Expand All @@ -286,21 +264,32 @@ def __init__(self, host, conf=None):
max_age=cfg.CONF.asr1k.connection_max_age)
LOG.debug("Connection pool initialized")

if CONF.backdoor_socket:
try:
backdoor_socket_path = CONF.backdoor_socket.format(pid=os.getpid())
except (KeyError, IndexError, ValueError) as e:
backdoor_socket_path = CONF.backdoor_socket
LOG.warning("Could not apply format string to manhole "
"backdoor socket path ({}) - continuing with "
"unformatted path"
"".format(e))
manhole.install(socket_path=backdoor_socket_path)

self.router_info = {}
self.host = host
self.process_monitor = external_process.ProcessMonitor(
config=self.conf,
resource_type='router')

self.context = n_context.get_admin_context_without_session()
self.plugin_rpc = L3PluginApi(topics.L3PLUGIN, host)
self.l3_agent_rpc = None
self.fullsync = cfg.CONF.asr1k_l3.sync_active
self.pause_process = False
self.sync_routers_chunk_size = cfg.CONF.asr1k_l3.sync_chunk_size
self.sync_until_queue_size = cfg.CONF.asr1k_l3.sync_until_queue_size

self.asr1k_pair = asr1k_pair.ASR1KPair()

self._router_loop_thread = None
self._router_loop_stop = False
self._queue = asr1k_queue.RouterProcessingQueue()
self._requeue = {}
self._last_full_sync = timeutils.now()
Expand Down Expand Up @@ -338,63 +327,70 @@ def __init__(self, host, conf=None):
# continue
# break
self.monitor = self._initialize_monitor()
self.worker = periodics.PeriodicWorker([

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you use PeriodicWorker.create() here, but instantiate the worker directly?

Wouldn't it make sense to create a worker with the ThreadPoolExecutor instead of the default SynchronousExecutor? As I understand it, the previous periodic-tasks would be able to run in parallel so this would better mimic previous behavior.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking further into it, neutron.service.Service starts 2 loopingcalls in start(): one for reporting state and one for running the periodic tasks synchronous. This makes sure that we always report state, even if some periodic tasks take some time. Therefore, the default SynchronousExecutor might be fine here, but we cannot have the self._report_state() handled by self.worker, too.

(self.periodic_requeue_routers_task, None, None),
(self.periodic_refresh_address_scope_config, None, None),
(self.periodic_worker_report, None, None)
])

super(L3ASRAgent, self).__init__()

signal.signal(signal.SIGUSR1, self.trigger_sync)
signal.signal(signal.SIGUSR2, self.dump_greenlets)
signal.signal(signal.SIGUSR2, self.periodic_worker_report)
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)

@log_helpers.log_method_call
def after_start(self):
if cfg.CONF.asr1k.init_mode:
LOG.info("Init mode is activated")
eventlet.spawn_n(self._init_noop)
else:
self.periodic_refresh_address_scope_config(self.context)
self.periodic_refresh_address_scope_config()

if cfg.CONF.asr1k_l3.sync_active and cfg.CONF.asr1k_l3.sync_interval > 0:
self.sync_loop = loopingcall.FixedIntervalLoopingCall(self._periodic_sync_routers_task)
self.sync_loop.start(interval=cfg.CONF.asr1k_l3.sync_interval, stop_on_exception=False)

self.scavenge_loop = loopingcall.FixedIntervalLoopingCall(self._periodic_scavenge_task)
self.scavenge_loop.start(interval=cfg.CONF.asr1k_l3.sync_interval, stop_on_exception=False)
self.worker.add(self._periodic_sync_routers_task)
self.worker.add(self._periodic_scavenge_task)

self.device_check_loop = loopingcall.FixedIntervalLoopingCall(self._check_devices_alive, self.context)
self.device_check_loop.start(interval=cfg.CONF.asr1k_l3.sync_interval / 2, stop_on_exception=False)
self.worker.add(self._check_devices_alive)
self.setup_rpc()

if cfg.CONF.asr1k.clean_orphans:
LOG.info("Orphan clean is active, starting cleaning loop")
self.orphan_loop = loopingcall.FixedIntervalLoopingCall(self.clean_device, dry_run=False)
self.orphan_loop.start(interval=cfg.CONF.asr1k.clean_orphan_interval, stop_on_exception=False)
self.worker.add(self._clean_device)

self.clean_deleted_routers_dict_loop = loopingcall.FixedIntervalLoopingCall(
self._clean_deleted_routers_dict)
self.clean_deleted_routers_dict_loop.start(interval=3600, stop_on_exception=False)

eventlet.spawn_n(self._process_routers_loop)
self._router_loop_thread = threading.Thread(target=self._process_routers_loop, daemon=True)
self._router_loop_thread.start()

LOG.info("L3 agent started")

def exit_gracefully(self, *args):
self._router_loop_stop = True
self.worker.stop()
if self._router_loop_thread:
self._router_loop_thread.join()
if self.l3_agent_rpc:
self.l3_agent_rpc.stop()
# proccess all remaining messages
self.l3_agent_rpc.wait()

@periodics.periodic(CONF.asr1k.clean_orphan_interval)
def _clean_device(self):
self.clean_device(dry_run=False)

def setup_rpc(self):
self.topic = topics.L3_AGENT
self.endpoints = [self]
target = oslo_messaging.Target(topic=self.topic, server=CONF.host)
self.l3_agent_rpc = oslo_messaging.get_rpc_server(
n_rpc.TRANSPORT, target, self.endpoints,
'threading', n_rpc.RequestContextSerializer(),
access_policy=oslo_messaging.DefaultRPCAccessPolicy)
self.l3_agent_rpc.start()

def trigger_sync(self, signum, frame):
LOG.info("Setup full sync based on external signal")
self.fullsync = True

def dump_greenlets(self, signum, frame):
count = 0
total_count = 0
for ob in gc.get_objects():
if not isinstance(ob, greenlet):
continue
if not ob:
continue

LOG.debug(''.join(traceback.format_stack(ob.gr_frame)))
if re.search('ncclient/transport/ssh.py', traceback.format_stack(ob.gr_frame).__str__(), re.I):

count += 1
total_count += 1
LOG.debug("************* Total SSH Greenlets : {} out of {}".format(count, total_count))

def _initialize_monitor(self):
monitor = PrometheusMonitor(host=self.host, namespace="neutron_asr1k", type=prometheus_monitor.L3)
monitor.start()
Expand Down Expand Up @@ -430,10 +426,12 @@ def router_added_to_agent(self, context, payload):
LOG.debug('Got router added to agent :%r', payload)
self.routers_updated(context, payload)

def _check_devices_alive(self, context):
device_info = self.plugin_rpc.get_device_info(context)
@periodics.periodic(CONF.asr1k_l3.sync_interval / 2)
velp marked this conversation as resolved.
Show resolved Hide resolved
def _check_devices_alive(self):
device_info = self.plugin_rpc.get_device_info(self.context)
connection.check_devices(device_info)

@periodics.periodic(cfg.CONF.asr1k_l3.sync_interval)
def _periodic_scavenge_task(self):
try:
LOG.debug('Starting to scavenge orphans from extra atts')
Expand All @@ -443,11 +441,13 @@ def _periodic_scavenge_task(self):
except Exception as e:
LOG.exception(e)

@periodics.periodic(3600)
def _clean_deleted_routers_dict(self):
for router_id, created_at in list(self._deleted_routers.items()):
if (datetime.datetime.now() - created_at).total_seconds() > 3600:
self._deleted_routers.pop(router_id, None)

@periodics.periodic(CONF.asr1k_l3.sync_interval)
def _periodic_sync_routers_task(self):
try:
LOG.debug("Starting partial sync, last partial sync started {} seconds ago"
Expand Down Expand Up @@ -573,17 +573,26 @@ def fetch_and_sync_routers_partial(self, context):

self.fullsync = cfg.CONF.asr1k_l3.sync_active

@periodic_task.periodic_task(spacing=60, run_immediately=True)
def periodic_requeue_routers_task(self, context):
@periodics.periodic(spacing=60, run_immediately=True)
def periodic_requeue_routers_task(self):
for update in self._requeue.values():
LOG.debug("Adding requeued router {} to processing queue".format(update.id))
self._queue.add(update)

self._requeue = {}

@periodic_task.periodic_task(spacing=5, run_immediately=False)
def periodic_refresh_address_scope_config(self, context):
self.address_scopes = utils.get_address_scope_config(self.plugin_rpc, context)
@periodics.periodic(spacing=5, run_immediately=False)
def periodic_refresh_address_scope_config(self):
self.address_scopes = utils.get_address_scope_config(self.plugin_rpc, self.context)

@periodics.periodic(spacing=600)
def periodic_worker_report(self, *args):
LOG.debug("Currently running %s periodic workers:\n%s", len(self.worker), self.worker.pformat())

thread_names = []
for thread in threading.enumerate():
thread_names.append(thread.name)
LOG.debug("Currently running %s threads: %s", len(thread_names), thread_names)

def agent_updated(self, context, payload):
"""Handle the agent_updated notification event."""
Expand Down Expand Up @@ -724,9 +733,14 @@ def _process_routers_loop(self):
LOG.warning("The processing thread pool size has been reduced to match 'yang_connection_pool_size' "
"its now {}".format(poolsize))

pool = eventlet.GreenPool(size=poolsize)
while True:
pool.spawn_n(self._process_router_update)
with ThreadPoolExecutor(max_workers=poolsize) as executer:
velp marked this conversation as resolved.
Show resolved Hide resolved
while not self._router_loop_stop:
# the executer worker queue is infinite and doesn't block
if self._queue.get_size():
executer.submit(self._process_router_update)
else:
time.sleep(5)
velp marked this conversation as resolved.
Show resolved Hide resolved
LOG.warn("Router loop stopped")

def _requeue_router(self, router_update,
priority=l3_agent.PRIORITY_SYNC_ROUTERS_TASK):
Expand Down Expand Up @@ -797,16 +811,6 @@ def _clean(self, router):
def check_success(self, results):
return results is not None and all(result.success for result in results)

def _init_noop(self):
LOG.debug("Init mode active - in noop mode")
pool = eventlet.GreenPool(size=1)
while True:
pool.spawn_n(self._agent_init)

def _agent_init(self):
if not self.init_complete:
time.sleep(5)


class L3ASRAgentWithStateReport(L3ASRAgent):
def __init__(self, host, conf=None):
Expand All @@ -821,13 +825,17 @@ def __init__(self, host, conf=None):
'log_agent_heartbeats': self.conf.AGENT.log_agent_heartbeats},
'start_flag': True,
'agent_type': constants.AGENT_TYPE_ASR1K_L3}
report_interval = self.conf.AGENT.report_interval
if self.conf.AGENT.report_interval:
self.worker.add(self._report_state)

if report_interval:
self.heartbeat = loopingcall.FixedIntervalLoopingCall(
self._report_state)
self.heartbeat.start(interval=report_interval, stop_on_exception=False)
def run(self):
t = threading.Thread(target=self.worker.start, name="ASR1k Worker Thread",
kwargs={'allow_empty': True}, daemon=True)
t.start()
self.after_start()
t.join()

@periodics.periodic(CONF.AGENT.report_interval)
def _report_state(self):
num_ex_gw_ports = 0
num_interfaces = 0
Expand Down
Loading