Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address event correlation issues in the awx_display callback #1345

Draft
wants to merge 6 commits into
base: devel
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 65 additions & 61 deletions src/ansible_runner/display_callback/callback/awx_display.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@

from __future__ import (absolute_import, division, print_function)

# Python
import inspect
import json
import stat
import multiprocessing
import threading
import base64
import functools
Expand All @@ -32,13 +31,15 @@
import os
import sys
import uuid
from copy import copy

# Ansible
from ansible import constants as C
from ansible.plugins.callback import CallbackBase
from ansible.plugins.loader import callback_loader
from ansible.utils.display import Display
from ansible.utils.multiprocessing import context as multiprocessing

display = Display()


DOCUMENTATION = '''
Expand All @@ -48,8 +49,7 @@
description:
- This callback is necessary for ansible-runner to work
type: stdout
extends_documentation_fragment:
- default_callback
options: {}
requirements:
- Set as stdout in config
'''
Expand All @@ -64,7 +64,7 @@
else:
default_stdout_callback = 'default'

DefaultCallbackModule: CallbackBase = callback_loader.get(default_stdout_callback).__class__
DefaultCallbackModule: CallbackBase = callback_loader.get(default_stdout_callback, class_only=True)

CENSORED = "the output has been hidden due to the fact that 'no_log: true' was specified for this result"

Expand Down Expand Up @@ -132,11 +132,16 @@ def __init__(self):
self._local = threading.local()
if os.getenv('AWX_ISOLATED_DATA_DIR'):
self.cache = IsolatedFileWrite()
self._global_ctx = {}

def add_local(self, **kwargs):
tls = vars(self._local)
ctx = tls.setdefault('_ctx', {})
ctx.update(kwargs)
try:
ctx = self._local._ctx
except AttributeError:
ctx = self._local._ctx = {}

# Don't overwrite an earlier event context, this is only additive
ctx.update(kwargs | ctx)

def remove_local(self, **kwargs):
for key in kwargs:
Expand Down Expand Up @@ -172,22 +177,13 @@ def get_global(self):
return self._global_ctx

def get(self):
ctx = {}
ctx.update(self.get_global())
ctx.update(self.get_local())
return ctx
return self.get_global() | self.get_local()

def get_begin_dict(self):
omit_event_data = os.getenv("RUNNER_OMIT_EVENTS", "False").lower() == "true"
include_only_failed_event_data = os.getenv("RUNNER_ONLY_FAILED_EVENTS", "False").lower() == "true"
event_data = self.get()
event = event_data.pop('event', None)
if not event:
event = 'verbose'
for key in ('debug', 'verbose', 'deprecated', 'warning', 'system_warning', 'error'):
if event_data.get(key, False):
event = key
break
event_dict = {'event': event}
should_process_event_data = (include_only_failed_event_data and event in ('runner_on_failed', 'runner_on_async_failed', 'runner_on_item_failed')) \
or not include_only_failed_event_data
Expand Down Expand Up @@ -247,62 +243,50 @@ def dump_end(self, fileobj):
event_context = EventContext()


def with_context(**context):
global event_context # pylint: disable=W0602

def wrap(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
with event_context.set_local(**context):
return f(*args, **kwargs)
return wrapper
return wrap


for attr in dir(Display):
if attr.startswith('_') or 'cow' in attr or 'prompt' in attr:
continue
if attr in ('display', 'v', 'vv', 'vvv', 'vvvv', 'vvvvv', 'vvvvvv', 'verbose'):
continue
if not callable(getattr(Display, attr)):
continue
setattr(Display, attr, with_context(**{attr: True})(getattr(Display, attr)))


def with_verbosity(f):
global event_context # pylint: disable=W0602
def display_context(f):

@functools.wraps(f)
def wrapper(*args, **kwargs):
host = args[2] if len(args) >= 3 else kwargs.get('host', None)
caplevel = args[3] if len(args) >= 4 else kwargs.get('caplevel', 2)
context = {'verbose': True, 'verbosity': (caplevel + 1)}
if host is not None:
context['remote_addr'] = host
with event_context.set_local(**context):
if multiprocessing.parent_process() is not None:
return f(*args, **kwargs)
return wrapper

name = f.__name__
ctx = {'event': name}
callargs = inspect.getcallargs(f, *args, **kwargs)
host = callargs.get('host')
caplevel = callargs.get('caplevel')
log_only = callargs.get('log_only')
stderr = callargs.get('stderr')
if host:
ctx['remote_addr'] = host
if caplevel:
ctx['verbosity'] = caplevel + 1

Display.verbose = with_verbosity(Display.verbose)
dump = True
if caplevel and display.verbosity <= caplevel:
dump = False

if name == 'debug' and not C.DEFAULT_DEBUG:
dump = False

def display_with_context(f):
if name == 'system_warning' and not C.SYSTEM_WARNINGS:
dump = False

if name == "deprecated" and not C.DEPRECATION_WARNINGS:
dump = False

if not dump:
return f(*args, **kwargs)

@functools.wraps(f)
def wrapper(*args, **kwargs):
log_only = args[5] if len(args) >= 6 else kwargs.get('log_only', False)
stderr = args[3] if len(args) >= 4 else kwargs.get('stderr', False)
event_uuid = event_context.get().get('uuid', None)
with event_context.display_lock:
# If writing only to a log file or there is already an event UUID
# set (from a callback module method), skip dumping the event data.
if log_only or event_uuid:
return f(*args, **kwargs)
try:
fileobj = sys.stderr if stderr else sys.stdout
event_context.add_local(uuid=str(uuid.uuid4()))
event_context.dump_begin(fileobj)
with event_context.set_local(**ctx):
event_context.dump_begin(fileobj)
return f(*args, **kwargs)
finally:
event_context.dump_end(fileobj)
Expand All @@ -311,7 +295,15 @@ def wrapper(*args, **kwargs):
return wrapper


Display.display = display_with_context(Display.display)
for _display_attr in ('banner', 'deprecated', 'display', 'error',
'system_warning', 'verbose', 'warning'):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The display events "supported" by awx are only:

        (0, 'debug', _('Debug'), False),
        (0, 'verbose', _('Verbose'), False),
        (0, 'deprecated', _('Deprecated'), False),
        (0, 'warning', _('Warning'), False),
        (0, 'system_warning', _('System Warning'), False),
        (0, 'error', _('Error'), True),

This would potentially apply to unclassified_display too. Although the previous monkeypatching didn't exclude banner.

setattr(
Display,
_display_attr,
display_context(
getattr(Display, _display_attr)
)
)


class CallbackModule(DefaultCallbackModule):
Expand Down Expand Up @@ -352,6 +344,18 @@ def __init__(self):
# NOTE: Ansible doesn't generate a UUID for playbook_on_start so do it for them.
self.playbook_uuid = str(uuid.uuid4())

def set_options(self, *args, **kwargs):
base_config = C.config.get_configuration_definition(
DefaultCallbackModule._load_name, plugin_type='callback'
)
my_config = C.config.get_configuration_definition(
self._load_name, plugin_type='callback'
)
C.config.initialize_plugin_configuration_definitions(
'callback', self._load_name, base_config | my_config
)
return super().set_options(*args, **kwargs)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related, but since I'm in here making a mess, this fixes the options issue with callbacks not matching the default from a config perspective.


@contextlib.contextmanager
def capture_event_data(self, event, **event_data):
event_data.setdefault('uuid', str(uuid.uuid4()))
Expand All @@ -365,7 +369,7 @@ def capture_event_data(self, event, **event_data):
if event_data['res'].get('_ansible_no_log', False):
event_data['res'] = {'censored': CENSORED}
if event_data['res'].get('results', []):
event_data['res']['results'] = copy(event_data['res']['results'])
event_data['res']['results'] = event_data['res']['results'][:]
for i, item in enumerate(event_data['res'].get('results', [])):
if isinstance(item, dict) and item.get('_ansible_no_log', False):
event_data['res']['results'][i] = {'censored': CENSORED}
Expand Down
4 changes: 2 additions & 2 deletions src/ansible_runner/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,14 +386,14 @@ def _emit_event(self,
event_data = self._current_event_data
stdout_chunks = [buffered_stdout]
elif buffered_stdout:
event_data = dict({'event': 'verbose'})
event_data = dict({'event': 'unclassified_display'})
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the CI failures appear to be related to this, where verbose is being excluded, or depended on. I'm not sure we can make this change, but I'd prefer to not just classify any unknown display event as verbose if possible.

stdout_chunks = buffered_stdout.splitlines(True)
else:
event_data = {}
stdout_chunks = []

for stdout_chunk in stdout_chunks:
if event_data.get('event') == 'verbose':
if event_data.get('event') == 'unclassified_display':
event_data['uuid'] = str(uuid.uuid4())
self._counter += 1
event_data['counter'] = self._counter
Expand Down
Loading