Skip to content

Commit

Permalink
first step for improve the code
Browse files Browse the repository at this point in the history
  • Loading branch information
rshunim committed Nov 17, 2024
1 parent 6411782 commit a1cca6d
Showing 1 changed file with 38 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from CommonServerPython import *
from CommonServerUserPython import *


''' CONSTANTS '''
VENDOR = 'Jamf'
PRODUCT = 'Protect'
Expand Down Expand Up @@ -407,11 +406,11 @@ def parse_response(response: dict) -> tuple:
return page_info, items


def get_events_alert_type(client: Client, start_date: str, max_fetch: int, last_run: dict) -> tuple:
def get_events_for_specific_type(client: Client, start_date: str, max_fetch: int, last_run: dict, event_type: str) -> tuple:
"""
Fetches alert type events from the Jamf Protect API within a specified date range.
Fetches events for specific type from the Jamf Protect API within a specified date range.
This function fetches alert type events from the Jamf Protect API based on the provided start date.
This function fetches the specific type events from the Jamf Protect API based on the provided start date.
It fetches events up to the maximum number specified by max_fetch.
The function also uses the information from the last run to continue fetching from where it left off in the previous run.
Expand All @@ -420,25 +419,34 @@ def get_events_alert_type(client: Client, start_date: str, max_fetch: int, last_
start_date (str): The start date for fetching events in '%Y-%m-%dT%H:%M:%SZ' format.
max_fetch (int): The maximum number of events to fetch.
last_run (dict): A dictionary containing information about the last run.
event_type (str): A specific type of event to fetch.
Returns:
tuple: A tuple containing two elements:
- A list of dictionaries. Each dictionary represents an event.
- A dictionary with new last run values,
the end date of the fetched events and a continuance token if the fetched reached the max limit.
"""
created, current_date = calculate_fetch_dates(start_date, last_run=last_run, last_run_key="alert")

mapping_event_type = {
"alert": ("alert", client.get_alerts),
"computer": ("computer", client.get_computer),
"audit": ("alert", client.get_audits)
}

event_type = mapping_event_type[event_type]
created, current_date = calculate_fetch_dates(start_date, last_run=last_run, last_run_key=mapping_event_type[event_type][0])
command_args = {"created": created}
client_event_type_func = client.get_alerts
next_page = last_run.get("alert", {}).get("next_page", "")
client_event_type_func = mapping_event_type[event_type][1]
next_page = last_run.get(event_type, {}).get("next_page", "")

demisto.debug(f"Jamf Protect- Fetching alerts from {created}")
demisto.debug(f"Jamf Protect- Fetching {event_type}s from {created}")
events, next_page = get_events(command_args, client_event_type_func, max_fetch, next_page)
for event in events:
event["source_log_type"] = "alert"
event["source_log_type"] = event_type
if next_page:
demisto.debug(
f"Jamf Protect- Fetched {len(events)} which is the maximum number of alerts."
f"Jamf Protect- Fetched {len(events)} which is the maximum number of events."
f" Will keep the fetching in the next fetch.")
new_last_run_with_next_page = {"next_page": next_page, "last_fetch": created}
return events, new_last_run_with_next_page
Expand All @@ -447,97 +455,7 @@ def get_events_alert_type(client: Client, start_date: str, max_fetch: int, last_
for event in events) if dt is not None]).strftime(
DATE_FORMAT) if events else current_date
new_last_run_without_next_page = {"last_fetch": new_last_fetch_date}
demisto.debug(f"Jamf Protect- Fetched {len(events)} alerts")
return events, new_last_run_without_next_page


def get_events_computer_type(client: Client, start_date: str, max_fetch: int, last_run: dict) -> tuple[list[dict], dict]:
"""
Fetches computer type events from the Jamf Protect API within a specified date range.
This function fetches computer type events from the Jamf Protect API based on the provided start date.
It fetches events up to the maximum number specified by max_fetch.
The function also uses the information from the last run to continue fetching from where it left off in the previous run.
Args:
client (Client): An instance of the Client class for interacting with the API.
start_date (str): The start date for fetching events in '%Y-%m-%dT%H:%M:%SZ' format.
max_fetch (int): The maximum number of events to fetch.
last_run (dict): A dictionary containing information about the last run.
Returns:
tuple: A tuple containing two elements:
- A list of dictionaries. Each dictionary represents an event.
- A dictionary with new last run values,
the end date of the fetched events and a continuance token if the fetched reached the max limit.
"""
created, current_date = calculate_fetch_dates(start_date, last_run=last_run, last_run_key="computer")
command_args = {"created": created}
client_event_type_func = client.get_computer
next_page = last_run.get("computer", {}).get("next_page", "")

demisto.debug(f"Fetching computers since {created}")
events, next_page = get_events(command_args, client_event_type_func, max_fetch, next_page)
for event in events:
event["source_log_type"] = "computers"
if next_page:
demisto.debug(
f"Fetched the maximal number of computers ({len(events)} events). "
f"Fetching will continue using {next_page=} and last_fetch={created}, in the next iteration")
new_last_run_with_next_page = {"next_page": next_page, "last_fetch": created}
return events, new_last_run_with_next_page

new_last_fetch_date = max(filter(None, (arg_to_datetime(event.get("created"), DATE_FORMAT)
for event in events))).strftime(DATE_FORMAT) if events else current_date
# If there is no next page, the last fetch date will be the max end date of the fetched events.
new_last_run_without_next_page = {"last_fetch": new_last_fetch_date}
demisto.debug(f"Fetched {len(events)} computers")
return events, new_last_run_without_next_page


def get_events_audit_type(client: Client, start_date: str, end_date: str, max_fetch: int, last_run: dict) -> tuple:
"""
Fetches audit type events from the Jamf Protect API within a specified date range.
This function fetches audit type events from the Jamf Protect API based on the provided start and end dates.
It fetches events up to the maximum number specified by max_fetch.
The function also uses the information from the last run to continue fetching from where it left off in the previous run.
Args:
client (Client): An instance of the Client class for interacting with the API.
start_date (str): The start date for fetching events in '%Y-%m-%dT%H:%M:%SZ' format.
end_date (str): The end date for fetching events in '%Y-%m-%dT%H:%M:%SZ' format.
max_fetch (int): The maximum number of events to fetch.
last_run (dict): A dictionary containing information about the last run.
Returns:
tuple: A tuple containing two elements:
- A list of dictionaries. Each dictionary represents an event.
- A dictionary with new last run values,
the end date of the fetched events and a continuance token if the fetched reached the max limit.
"""
start_date, end_date = calculate_fetch_dates(start_date, end_date=end_date, last_run=last_run, last_run_key="alert")
command_args = {"start_date": start_date, "end_date": end_date}
client_event_type_func = client.get_audits
next_page = last_run.get("audit", {}).get("next_page", "")

demisto.debug(f"Jamf Protect- Fetching audits from {start_date} to {end_date}")
events, next_page = get_events(command_args, client_event_type_func, max_fetch, next_page)
for event in events:
event["source_log_type"] = "audit"
if next_page:
demisto.debug(
f" Jamf Protect - Fetched {len(events)}"
f" which is the maximum number of audits. Will keep the fetching in the next fetch.")
new_last_run_with_next_page = {"next_page": next_page, "last_fetch": start_date}
return events, new_last_run_with_next_page

# If there is no next page, the last fetch date will be the max end date of the fetched events.
new_last_fetch_date = max([dt for dt in (arg_to_datetime(event.get("date"), DATE_FORMAT)
for event in events) if dt is not None]).strftime(
DATE_FORMAT) if events else end_date
new_last_run_without_next_page = {"last_fetch": new_last_fetch_date}
demisto.debug(f"Jamf Protect- Fetched {len(events)} audits")
demisto.debug(f"Jamf Protect- Fetched {len(events)} {event_type}s")
return events, new_last_run_without_next_page


Expand Down Expand Up @@ -621,25 +539,36 @@ def fetch_events(client: Client, max_fetch_alerts: int, max_fetch_audits: int, m
EventResult: A NamedTuple containing four elements
"""
last_run = demisto.getLastRun()

alert_events, alert_next_run = [], {}
audit_events, audit_next_run = [], {}
computer_events: list = []
computer_next_run: dict = {}
computer_events, computer_next_run = [], {}

alert_next_page = last_run.get("alert", {}).get("next_page", "")
audit_next_page = last_run.get("audit", {}).get("next_page", "")
computer_next_page = last_run.get("computer", {}).get("next_page", "")

no_next_pages = not (any((alert_next_page, audit_next_page, computer_next_page)))

# Handle events based on the availability of next page tokens.
if no_next_pages or alert_next_page:
# The only case we don't trigger the alert event type cycle is when have only the audit and computer next page token.
alert_events, alert_next_run = get_events_alert_type(client, start_date_arg, max_fetch_alerts, last_run)
# Trigger the alert event type cycle unless only audit and computer next page tokens exist.
alert_events, alert_next_run = get_events_for_specific_type(
client, start_date_arg, max_fetch_alerts, last_run, "alert"
)

if no_next_pages or audit_next_page:
# The only case we don't trigger the audit event type cycle is when have only the alert and computer next page token.
audit_events, audit_next_run = get_events_audit_type(client, start_date_arg, end_date_arg, max_fetch_audits, last_run)
# Trigger the audit event type cycle unless only alert and computer next page tokens exist.
audit_events, audit_next_run = get_events_for_specific_type(
client, start_date_arg, end_date_arg, max_fetch_audits, last_run, "audit"
)

if no_next_pages or computer_next_page:
# The only case we don't trigger the computer event type cycle is when have only the alert and audit next page token.
computer_events, computer_next_run = get_events_computer_type(client, start_date_arg, max_fetch_computer, last_run)
# Trigger the computer event type cycle unless only alert and audit next page tokens exist.
computer_events, computer_next_run = get_events_for_specific_type(
client, start_date_arg, max_fetch_computer, last_run, "computer"
)

next_run: dict[str, Any] = {"alert": alert_next_run, "audit": audit_next_run, 'computer': computer_next_run}
if "next_page" in (alert_next_run | audit_next_run | computer_next_run):
# Will instantly re-trigger the fetch command.
Expand Down

0 comments on commit a1cca6d

Please sign in to comment.