Skip to content

Commit

Permalink
[FR][DAC] Import Rules Verbose Message (#4093)
Browse files Browse the repository at this point in the history
* Draft Verbose Message

* Fix Linting

* Made more descriptive

* Updated for readability
  • Loading branch information
eric-forte-elastic authored Oct 9, 2024
1 parent 2819260 commit 4edef2e
Showing 1 changed file with 34 additions and 23 deletions.
57 changes: 34 additions & 23 deletions detection_rules/kbwrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,28 +94,9 @@ def kibana_import_rules(ctx: click.Context, rules: RuleCollection, overwrite: Op
overwrite_exceptions: Optional[bool] = False,
overwrite_action_connectors: Optional[bool] = False) -> (dict, List[RuleResource]):
"""Import custom rules into Kibana."""
kibana = ctx.obj['kibana']
rule_dicts = [r.contents.to_api_format() for r in rules]
with kibana:
cl = GenericCollection.default()
exception_dicts = [
d.contents.to_api_format() for d in cl.items if isinstance(d.contents, TOMLExceptionContents)
]
action_connectors_dicts = [
d.contents.to_api_format() for d in cl.items if isinstance(d.contents, TOMLActionConnectorContents)
]
response, successful_rule_ids, results = RuleResource.import_rules(
rule_dicts,
exception_dicts,
action_connectors_dicts,
overwrite=overwrite,
overwrite_exceptions=overwrite_exceptions,
overwrite_action_connectors=overwrite_action_connectors
)

def handle_response_errors(response: dict):
def _handle_response_errors(response: dict):
"""Handle errors from the import response."""
def parse_list_id(s: str):
def _parse_list_id(s: str):
"""Parse the list ID from the error message."""
match = re.search(r'list_id: "(.*?)"', s)
return match.group(1) if match else None
Expand All @@ -132,7 +113,7 @@ def parse_list_id(s: str):
click.echo(f' - {error["rule_id"]}: ({error["error"]["status_code"]}) {error["error"]["message"]}')

if "references a non existent exception list" in error["error"]["message"]:
list_id = parse_list_id(error["error"]["message"])
list_id = _parse_list_id(error["error"]["message"])
if list_id in all_exception_list_ids:
workaround_errors.append(error["rule_id"])

Expand All @@ -144,12 +125,42 @@ def parse_list_id(s: str):
click.echo(' '.join(f'-id {rule_id}' for rule_id in workaround_errors))
click.echo()

def _process_imported_items(imported_items_list, item_type_description, item_key):
"""Displays appropriately formatted success message that all items imported successfully."""
all_ids = {item[item_key] for sublist in imported_items_list for item in sublist}
if all_ids:
click.echo(f'{len(all_ids)} {item_type_description} successfully imported')
ids_str = '\n - '.join(all_ids)
click.echo(f' - {ids_str}')

kibana = ctx.obj['kibana']
rule_dicts = [r.contents.to_api_format() for r in rules]
with kibana:
cl = GenericCollection.default()
exception_dicts = [
d.contents.to_api_format() for d in cl.items if isinstance(d.contents, TOMLExceptionContents)
]
action_connectors_dicts = [
d.contents.to_api_format() for d in cl.items if isinstance(d.contents, TOMLActionConnectorContents)
]
response, successful_rule_ids, results = RuleResource.import_rules(
rule_dicts,
exception_dicts,
action_connectors_dicts,
overwrite=overwrite,
overwrite_exceptions=overwrite_exceptions,
overwrite_action_connectors=overwrite_action_connectors
)

if successful_rule_ids:
click.echo(f'{len(successful_rule_ids)} rule(s) successfully imported')
rule_str = '\n - '.join(successful_rule_ids)
click.echo(f' - {rule_str}')
if response['errors']:
handle_response_errors(response)
_handle_response_errors(response)
else:
_process_imported_items(exception_dicts, 'exception list(s)', 'list_id')
_process_imported_items(action_connectors_dicts, 'action connector(s)', 'id')

return response, results

Expand Down

0 comments on commit 4edef2e

Please sign in to comment.