Skip to content

Commit

Permalink
[SOAR-18657] mimecast v2 (#3068)
Browse files Browse the repository at this point in the history
* Update threads, error handling, custom config, rate limiting

* Add connection test

* Fix lint

* Add unit tests

* Add unit tests

* Fix requiremnts

* Fix requiremnts
  • Loading branch information
ablakley-r7 authored Feb 4, 2025
1 parent 05b2bc2 commit d21db72
Show file tree
Hide file tree
Showing 13 changed files with 541 additions and 66 deletions.
25 changes: 25 additions & 0 deletions plugins/mimecast_v2/icon_mimecast_v2/connection/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from icon_mimecast_v2.util.api import API

# Custom imports below
from datetime import datetime, timezone


class Connection(insightconnect_plugin_runtime.Connection):
Expand All @@ -23,3 +24,27 @@ def test(self):
return {"success": True}
except PluginException as error:
raise ConnectionTestException(cause=error.cause, assistance=error.assistance, data=error.data)

def test_task(self):
try:
now_date = datetime.now(tz=timezone.utc).date()
self.api.get_siem_logs(log_type="receipt", query_date=now_date, page_size=1, max_threads=1, next_page=None)
self.logger.info("The connection test to Mimecast was successful.")
return {"success": True}
except PluginException as error:
return_message = ""
failed_message = "The connection test to Mimecast for has failed."
self.logger.info(failed_message)
return_message += f"{failed_message}\n"

cause_message = f"This failure was caused by: '{error.cause}'"
self.logger.info(cause_message)
return_message += f"{cause_message}\n"

self.logger.info(error.assistance)
return_message += f"{error.assistance}\n"
raise ConnectionTestException(
cause="Configured credentials do not have permission for this API endpoint.",
assistance="Please ensure credentials have required permissions.",
data=return_message,
)
137 changes: 85 additions & 52 deletions plugins/mimecast_v2/icon_mimecast_v2/tasks/monitor_siem_logs/task.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,32 @@
import insightconnect_plugin_runtime
from insightconnect_plugin_runtime.exceptions import APIException, PluginException
from insightconnect_plugin_runtime.helper import compare_and_dedupe_hashes, hash_sha1
from .schema import MonitorSiemLogsInput, MonitorSiemLogsOutput, MonitorSiemLogsState, Input, Output, Component, State
from typing import Dict, List, Tuple
from datetime import datetime, timezone, timedelta
import copy

# Date format for conversion
DATE_FORMAT = "%Y-%m-%d"
# Default and max values
LOG_TYPES = ["receipt", "url protect", "attachment protect"]
DEFAULT_THREAD_COUNT = 10
DEFAULT_PAGE_SIZE = 100
MAX_LOOKBACK_DAYS = 7
INITIAL_MAX_LOOKBACK_DAYS = 1
# Run type
INITIAL_RUN = "initial_run"
SUBSEQUENT_RUN = "subsequent_run"
PAGINATION_RUN = "pagination_run"
# Access keys for state and custom config
LOG_HASHES = "log_hashes"
QUERY_CONFIG = "query_config"
QUERY_DATE = "query_date"
CAUGHT_UP = "caught_up"
NEXT_PAGE = "next_page"
# Access keys for custom config
THREAD_COUNT = "thread_count"
PAGE_SIZE = "page_size"


class MonitorSiemLogs(insightconnect_plugin_runtime.Task):
Expand All @@ -24,21 +40,21 @@ def __init__(self):
)

def run(self, params={}, state={}, custom_config={}): # pylint: disable=unused-argument
self.logger.info(f"TASK: Received State: {state}")
self.logger.info(f"TASK: Received State: {state.get(QUERY_CONFIG)}")
existing_state = state.copy()
try:
# TODO: Additional error handling
run_condition = self.detect_run_condition(state.get("query_config", {}))
self.logger.info(f"TASK: Current run state is {run_condition}")
state = self.update_state(state, custom_config)
self.logger.info(f"NEW STATE: {state}")
now_date = datetime.now(tz=timezone.utc).date()
run_condition = self.detect_run_condition(state.get(QUERY_CONFIG, {}), now_date)
self.logger.info(f"TASK: Run state is {run_condition}")
state = self.update_state(state)
page_size, thead_count = self.apply_custom_config(state, custom_config)
max_run_lookback_date = self.get_max_lookback_date(now_date, run_condition, bool(custom_config))
query_config = self.prepare_query_params(state.get("query_config", {}), max_run_lookback_date, now_date)
logs, query_config = self.get_all_logs(run_condition, query_config)
# TODO: Dedupe
query_config = self.prepare_query_params(state.get(QUERY_CONFIG, {}), max_run_lookback_date, now_date)
logs, query_config = self.get_all_logs(run_condition, query_config, page_size, thead_count)
self.logger.info(f"TASK: Total logs collected this run {len(logs)}")
exit_state, has_more_pages = self.prepare_exit_state(state, query_config, now_date)
logs, log_hashes = compare_and_dedupe_hashes(state.get(LOG_HASHES, []), logs)
self.logger.info(f"TASK: Total logs after deduplication {len(logs)}")
exit_state, has_more_pages = self.prepare_exit_state(state, query_config, now_date, log_hashes)
return logs, exit_state, has_more_pages, 200, None
except APIException as error:
self.logger.info(
Expand All @@ -47,39 +63,40 @@ def run(self, params={}, state={}, custom_config={}): # pylint: disable=unused-
return [], existing_state, False, error.status_code, error
except PluginException as error:
self.logger.info(f"Error: A Plugin exception has occurred. Cause: {error.cause} Error data: {error.data}.")
return [], existing_state, False, error.status_code, error
return [], existing_state, False, 500, error
except Exception as error:
self.logger.info(f"Error: Unknown exception has occurred. No results returned. Error Data: {error}")
return [], existing_state, False, 500, PluginException(preset=PluginException.Preset.UNKNOWN, data=error)

def detect_run_condition(self, query_config: Dict) -> str:
def detect_run_condition(self, query_config: Dict, now_date: datetime) -> str:
"""
Return runtype based on query configuration
:param query_config:
:param now_date:
:return: runtype string
"""
if not query_config:
return INITIAL_RUN
for log_type_config in query_config.values():
if not log_type_config.get("caught_up"):
if not log_type_config.get(CAUGHT_UP) or log_type_config.get(QUERY_DATE) not in str(now_date):
return PAGINATION_RUN
return SUBSEQUENT_RUN

def update_state(self, state: Dict, custom_config: Dict) -> Dict:
def update_state(self, state: Dict) -> Dict:
"""
Initialise state, validate state, apply custom config
:param state:
:param custom_config:
:return:
:return: State
"""
initial_log_type_config = {"caught_up": False}
initial_log_type_config = {CAUGHT_UP: False}
if not state:
state = {"query_config": {log_type: copy.deepcopy(initial_log_type_config) for log_type in LOG_TYPES}}
self.apply_custom_config(state, custom_config)
self.logger.info("TASK: Initializing first state...")
state = {QUERY_CONFIG: {log_type: copy.deepcopy(initial_log_type_config) for log_type in LOG_TYPES}}
else:
for log_type in LOG_TYPES:
if log_type not in state.get("query_config", {}).keys():
state["query_config"][log_type] = copy.deepcopy(initial_log_type_config)
if log_type not in state.get(QUERY_CONFIG, {}).keys():
self.logger.info(f"TASK: {log_type} missing from state. Initializing...")
state[QUERY_CONFIG][log_type] = copy.deepcopy(initial_log_type_config)
return state

def get_max_lookback_date(self, now_date: datetime, run_condition: str, custom_config: bool) -> datetime:
Expand All @@ -97,18 +114,23 @@ def get_max_lookback_date(self, now_date: datetime, run_condition: str, custom_c
max_run_lookback_date = now_date - timedelta(days=max_run_lookback_days)
return max_run_lookback_date

def apply_custom_config(self, state: Dict, custom_config: Dict) -> None:
def apply_custom_config(self, state: Dict, custom_config: Dict = {}) -> Tuple[int, int]:
"""
Apply custom configuration for lookback, query date applies to start and end time of query
:param current_query_config:
:param custom_config:
:return: N/A
:return:
"""
# TODO: Additional custom config for page size, thread size, limit
current_query_config = state.get("query_config")
for log_type, lookback_date_string in custom_config.items():
self.logger.info(f"TASK: Supplied lookback date of {lookback_date_string} for {log_type} log type")
current_query_config[log_type] = {"query_date": lookback_date_string}
if custom_config:
self.logger.info("TASK: Custom config detected")
if not state:
current_query_config = state.get(QUERY_CONFIG)
for log_type, query_date_string in custom_config.items():
self.logger.info(f"TASK: Supplied lookback date of {query_date_string} for log type {log_type}")
current_query_config[log_type] = {QUERY_DATE: query_date_string}
page_size = max(1, min(custom_config.get(PAGE_SIZE, DEFAULT_PAGE_SIZE), DEFAULT_PAGE_SIZE))
thread_count = max(1, custom_config.get(THREAD_COUNT, DEFAULT_THREAD_COUNT))
return page_size, thread_count

def prepare_query_params(self, query_config: Dict, max_lookback_date: Dict, now_date: datetime) -> Dict:
"""
Expand All @@ -119,18 +141,19 @@ def prepare_query_params(self, query_config: Dict, max_lookback_date: Dict, now_
:return:
"""
for log_type, log_type_config in query_config.items():
query_date_str = log_type_config.get("query_date")
self.logger.info(f"PREPPING {log_type_config}")
self.logger.info(f"{log_type}, {query_date_str}")
query_date_str = log_type_config.get(QUERY_DATE)
if query_date_str:
query_date = datetime.strptime(query_date_str, "%Y-%m-%d").date()
query_date = datetime.strptime(query_date_str, DATE_FORMAT).date()
if not query_date_str:
log_type_config["query_date"] = max_lookback_date
elif query_date < now_date and log_type_config.get("caught_up") is True:
self.logger.info(
f"TASK: Query date for {log_type} log type is not present. Initializing a {max_lookback_date}"
)
log_type_config[QUERY_DATE] = max_lookback_date
elif query_date < now_date and log_type_config.get(CAUGHT_UP) is True:
self.logger.info(f"TASK: Log type {log_type} has caught up for {query_date}")
log_type_config["query_date"] = query_date + timedelta(days=1)
log_type_config["caught_up"] = False
log_type_config.pop("next_page")
log_type_config[QUERY_DATE] = query_date + timedelta(days=1)
log_type_config[CAUGHT_UP] = False
log_type_config.pop(NEXT_PAGE)
query_config[log_type] = self.validate_config_lookback(log_type_config, max_lookback_date, now_date)
return query_config

Expand All @@ -142,51 +165,61 @@ def validate_config_lookback(self, log_type_config: Dict, max_lookback_date: dat
:param now_date:
:return: log_type_config
"""
query_date = log_type_config.get("query_date")
query_date = log_type_config.get(QUERY_DATE)
if isinstance(query_date, str):
query_date = datetime.strptime(query_date, "%Y-%m-%d").date()
query_date = datetime.strptime(query_date, DATE_FORMAT).date()
if query_date < max_lookback_date:
return {"query_date": max_lookback_date}
return {QUERY_DATE: max_lookback_date}
if query_date > now_date:
log_type_config["query_date"] = now_date
log_type_config[QUERY_DATE] = now_date
return log_type_config

def get_all_logs(self, run_condition: str, query_config: Dict) -> Tuple[List, Dict]:
def get_all_logs(
self, run_condition: str, query_config: Dict, page_size: int, thead_count: int
) -> Tuple[List, Dict]:
"""
Gets all logs of provided log type. First retrieves batch URLs. Then downloads and reads batches, pooling logs.
:param run_condition:
:param query_config:
:param page_size:
:param thead_count:
:return: Logs, updated query configuration (state)
"""
complete_logs = []
for log_type, log_type_config in query_config.items():
if (not log_type_config.get("caught_up")) or (run_condition != PAGINATION_RUN):
if (not log_type_config.get(CAUGHT_UP)) or (run_condition != PAGINATION_RUN):
logs, results_next_page, caught_up = self.connection.api.get_siem_logs(
log_type=log_type,
query_date=log_type_config.get("query_date"),
next_page=log_type_config.get("next_page"),
query_date=log_type_config.get(QUERY_DATE),
next_page=log_type_config.get(NEXT_PAGE),
page_size=page_size,
max_threads=thead_count,
)
complete_logs.extend(logs)
log_type_config.update({"next_page": results_next_page, "caught_up": caught_up})
log_type_config.update({NEXT_PAGE: results_next_page, CAUGHT_UP: caught_up})
else:
self.logger.info(f"TASK: Query for {log_type} is caught up. Skipping as we are currently paginating")
return complete_logs, query_config

def prepare_exit_state(self, state: dict, query_config: dict, now_date: datetime) -> Tuple[Dict, bool]:
def prepare_exit_state(
self, state: dict, query_config: dict, now_date: datetime, log_hashes: List[str]
) -> Tuple[Dict, bool]:
"""
Prepare state and pagination for task completion. Format date time.
:param state:
:param query_config:
:param now_date:
:param log_hashes:
:return: state, has_more_pages
"""
has_more_pages = False
for log_type_config in query_config.values():
query_date = log_type_config.get("query_date")
query_date = log_type_config.get(QUERY_DATE)
if isinstance(query_date, str):
query_date = datetime.strptime(query_date, "%Y-%m-%d").date()
if (not log_type_config.get("caught_up")) or query_date < now_date:
query_date = datetime.strptime(query_date, DATE_FORMAT).date()
if (not log_type_config.get(CAUGHT_UP)) or query_date < now_date:
has_more_pages = True
log_type_config["query_date"] = query_date.strftime("%Y-%m-%d")
state["query_config"] = query_config
log_type_config[QUERY_DATE] = query_date.strftime(DATE_FORMAT)
state[QUERY_CONFIG] = query_config
state[LOG_HASHES] = log_hashes
return state, has_more_pages
35 changes: 21 additions & 14 deletions plugins/mimecast_v2/icon_mimecast_v2/util/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
HTTPStatusCodes,
ResponseExceptionData,
)
from insightconnect_plugin_runtime.helper import make_request, extract_json
from insightconnect_plugin_runtime.helper import extract_json, make_request, rate_limiting
from logging import Logger
from requests import Response, Request
from io import BytesIO
from icon_mimecast_v2.util.endpoints import Endpoints
from icon_mimecast_v2.util.constants import Endpoints
from typing import Dict, List, Tuple
from multiprocessing.dummy import Pool
import gzip
import json

Expand Down Expand Up @@ -39,17 +40,19 @@ def authenticate(self) -> None:
self.logger.info("API: Authenticated")

def get_siem_logs(
self, log_type: str, query_date: str, next_page: str, page_size: int = 100
self, log_type: str, query_date: str, next_page: str, page_size: int = 100, max_threads: int = 10
) -> Tuple[List[str], str, bool]:
batch_download_urls, result_next_page, caught_up = self.get_siem_batches(
log_type, query_date, next_page, page_size
)
logs = []
self.logger.info(f"API: Getting SIEM logs from batches for log type {log_type}...")
for url in batch_download_urls:
batch_logs = self.get_siem_logs_from_batch(url=url)
if isinstance(batch_logs, (List, Dict)):
logs.extend(batch_logs)
self.logger.info(f"API: Applying page size limit of {page_size}")
with Pool(max_threads) as pool:
batch_logs = pool.imap(self.get_siem_logs_from_batch, batch_download_urls)
for result in batch_logs:
if isinstance(result, (List, Dict)):
logs.extend(result)
self.logger.info(f"API: Discovered {len(logs)} logs for log type {log_type}")
return logs, result_next_page, caught_up

Expand Down Expand Up @@ -77,7 +80,6 @@ def get_siem_batches(
return urls, batch_response.get("@nextPage"), caught_up

def get_siem_logs_from_batch(self, url: str):
# TODO: Threading
response = requests.request(method=GET, url=url, stream=False)
with gzip.GzipFile(fileobj=BytesIO(response.content), mode="rb") as file_:
logs = []
Expand All @@ -87,6 +89,7 @@ def get_siem_logs_from_batch(self, url: str):
logs.append(json.loads(decoded_line))
return logs

@rate_limiting(5)
def make_api_request(
self,
url: str,
Expand All @@ -101,7 +104,6 @@ def make_api_request(
if auth:
headers["Authorization"] = f"Bearer {self.access_token}"
request = Request(url=url, method=method, headers=headers, params=params, data=data, json=json)
# TODO: Handle rate limit, handle retry backoff
try:
response = make_request(
_request=request,
Expand All @@ -117,11 +119,16 @@ def make_api_request(
status_code=exception.data.status_code,
)
raise exception
if (
response.status_code == HTTPStatusCodes.UNAUTHORIZED
and extract_json(response).get("fail", [{}])[0].get("code") == "token_expired"
):
self.authenticate()
if response.status_code == HTTPStatusCodes.UNAUTHORIZED:
json_data = extract_json(response)
if json_data.get("fail", [{}])[0].get("code") == "token_expired":
self.authenticate()
self.logger.info("API: Token has expired, attempting re-authentication...")
return self.make_api_request(url, method, headers, json, data, params, return_json, auth)
if response.status_code == HTTPStatusCodes.UNAUTHORIZED:
raise APIException(
preset=PluginException.Preset.API_KEY, data=response.text, status_code=response.status_code
)
if return_json:
json_data = extract_json(response)
return json_data
Expand Down
2 changes: 2 additions & 0 deletions plugins/mimecast_v2/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# List third-party dependencies here, separated by newlines.
# All dependencies must be version-pinned, eg. requests==1.2.0
# See: https://pip.pypa.io/en/stable/user_guide/#requirements-files
parameterized==0.8.1
freezegun==1.5.1
Loading

0 comments on commit d21db72

Please sign in to comment.