Skip to content
This repository has been archived by the owner on Jul 19, 2023. It is now read-only.

Commit

Permalink
Ruler sync implementation done
Browse files Browse the repository at this point in the history
  • Loading branch information
rbprado authored and rbenhurd committed Jul 5, 2023
1 parent e4af367 commit 2f64406
Show file tree
Hide file tree
Showing 2 changed files with 204 additions and 47 deletions.
33 changes: 31 additions & 2 deletions src/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ def execute(script_path):
exception=(
Exception,
),
max_time=600,
max_time=60,
on_backoff=lambda details: logger.warning(f"backoff try {details['tries']} waiting {details['wait']:.1f}s")
)
def request_post(url, headers, data):
Expand All @@ -213,7 +213,7 @@ def request_post(url, headers, data):
exception=(
Exception,
),
max_time=600,
max_time=60,
on_backoff=lambda details: logger.warning(f"backoff try {details['tries']} waiting {details['wait']:.1f}s")
)
def request_delete(url, headers):
Expand All @@ -225,3 +225,32 @@ def request_delete(url, headers):
logger.info(f'delete request {url} with headers {headers} giving response {response.status_code}')
response.raise_for_status()
return response


@backoff.on_exception(
wait_gen=backoff.expo,
exception=(
Exception,
),
max_time=60,
on_backoff=lambda details: logger.warning(f"backoff try {details['tries']} waiting {details['wait']:.1f}s")
)
def request_get(url, headers):
# TODO
# move this try/except into the resources.py
# timeout gets strange
try:
response = requests.get(
url,
auth=None,
headers=headers,
)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404 and "no rule groups found" in e.response.text:
response = e.response
else:
raise

logger.info(f'get request {url} with headers {headers} giving response {response.status_code}')
return response
218 changes: 173 additions & 45 deletions src/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,36 @@
import traceback
import json
import yaml
import pprint
import requests
from collections import defaultdict
from multiprocessing import Process
from time import sleep

from kubernetes import client, watch
from kubernetes.client.rest import ApiException
from urllib3.exceptions import MaxRetryError, ProtocolError

from helpers import (CONTENT_TYPE_BASE64_BINARY, CONTENT_TYPE_TEXT,
WATCH_CLIENT_TIMEOUT, WATCH_SERVER_TIMEOUT, execute,
remove_file, request, unique_filename, write_data_to_file)
from helpers import request_delete, request_post
from helpers import request_get, request_delete, request_post
from logger import get_logger

RESOURCE_SECRET = "secret"
RESOURCE_CONFIGMAP = "configmap"

_list_namespace = defaultdict(lambda: {
RESOURCE_SECRET: "list_namespaced_secret",
RESOURCE_CONFIGMAP: "list_namespaced_config_map"
}, {'ALL': {
RESOURCE_SECRET: "list_secret_for_all_namespaces",
RESOURCE_CONFIGMAP: "list_config_map_for_all_namespaces"
}})
_list_namespace = defaultdict(
lambda: {
RESOURCE_SECRET: "list_namespaced_secret",
RESOURCE_CONFIGMAP: "list_namespaced_config_map"
},
{
'ALL': {
RESOURCE_SECRET: "list_secret_for_all_namespaces",
RESOURCE_CONFIGMAP: "list_config_map_for_all_namespaces"
}
}
)

_resources_version_map = {
RESOURCE_SECRET: {},
Expand Down Expand Up @@ -305,6 +310,68 @@ def prepare_payload(payload):
# logger.exception(f"Error when updating from '%s' into '%s'", data_key, dest_folder)
# return False

def _del_rulegroup(namespace, orgid, rules_url, rulegroup_name):
headers = {'X-Scope-OrgID': orgid}

url = f'{rules_url}/{namespace}/{rulegroup_name}'
logger.info(f"RULER DEL - namespace: {namespace}, group: {rulegroup_name}, url: {url}")
response = request_delete(url, headers)
logger.info(f"RULER DEL - namespace: {namespace}, group: {rulegroup_name}, response: {response}")


def _addmod_rulegroup(namespace, orgid, rules_url, rulegroup_name, rulegroup_content):
headers = {
'Content-Type': 'application/yaml',
'X-Scope-OrgID': orgid,
}

payload = {
'name': rulegroup_name,
'rules': rulegroup_content,
}

url = f'{rules_url}/{namespace}'
logger.info(f"RULER ADD/MOD - namespace: {namespace}, group: {rulegroup_name}, url: {url}")
response = request_post(url, headers, yaml.dump(payload))
logger.info(f"RULER ADD/MOD - namespace: {namespace}, group: {rulegroup_name}, response: {response}")


def _get_cortex_rulegroups_list(namespace_label, rules_url):
headers = {'X-Scope-OrgID': namespace_label}

logger.info(f"RULER SYNC - CORTEX list rules: url: {rules_url}, headers: {headers}")
response = request_get(rules_url, headers=headers)
logger.info(f"RULER SYNC - CORTEX list rules response: {response}")

if "no rule groups found" in response.text:
return {}
else:
content = response.content.decode("utf-8")
content = yaml.safe_load(content)
logger.info(f"RULER SYNC - CORTEX lis rules content: {content}")

return content


def _watch_configmap_resources(v1, label, label_value, namespace, resource):
# Filter resources based on label and value or just label
label_selector = f"{label}={label_value}" if label_value else label

additional_args = {
'label_selector': label_selector,
'timeout_seconds': WATCH_SERVER_TIMEOUT,
'_request_timeout': WATCH_CLIENT_TIMEOUT,
}

if namespace != "ALL":
additional_args['namespace'] = namespace

value = watch.Watch().stream(getattr(v1, _list_namespace[namespace][resource]), **additional_args)

logger.info(f"Configmap loaded from {resource} resources: {additional_args}")
return value


def _get_namespace_label(v1, namespace, label, default):
# prevent fetching all namespaces; so a filter on name is required
ns = v1.list_namespace(field_selector=f'metadata.name={namespace}').items[0]
Expand Down Expand Up @@ -382,49 +449,71 @@ def _watch_resource_iterator(function, label, label_value, rules_url, alerts_url
url = f'{alerts_url}'
response = request_post(url, headers, yaml.dump(payload))

# # Ignore already processed resource
# # Avoid numerous logs about useless resource processing each time the WATCH loop reconnects
# if ignore_already_processed:
# if _resources_version_map[resource].get(metadata.namespace + metadata.name) == metadata.resource_version:
# if event_type == "ADDED" or event_type == "MODIFIED":
# logger.debug(f"Ignoring {event_type} {resource} {metadata.namespace}/{metadata.name}")
# continue
# elif event_type == "DELETED":
# _resources_version_map[resource].pop(metadata.namespace + metadata.name)

# if event_type == "ADDED" or event_type == "MODIFIED":
# _resources_version_map[resource][metadata.namespace + metadata.name] = metadata.resource_version

# logger.debug(f"Working on {event_type} {resource} {metadata.namespace}/{metadata.name}")

# files_changed = False

# # Get the destination folder
# dest_folder = _get_destination_folder(metadata, target_folder, folder_annotation)

# item_removed = event_type == "DELETED"
# if resource == RESOURCE_CONFIGMAP:
# files_changed |= _process_config_map(dest_folder, item, resource, unique_filenames, enable_5xx,
# item_removed)
# else:
# files_changed |= _process_secret(dest_folder, item, resource, unique_filenames, enable_5xx, item_removed)

# if script and files_changed:
# execute(script)

# if request_url and files_changed:
# request(request_url, request_method, enable_5xx, request_payload)


def _get_namespace_labels(v1, x_scope_orgid_namespace_label):
logger.info(f"RULER SYNC - label_selector: {x_scope_orgid_namespace_label}")
namespaces = v1.list_namespace(label_selector=x_scope_orgid_namespace_label)

namespace_labels = []
for namespace in namespaces.items:
logger.info(f"RULER SYNC - NAMESPACE NAME: {namespace.metadata.name}")
logger.info(f"RULER SYNC - NAMESPACE LABEL: {namespace.metadata.labels[x_scope_orgid_namespace_label]}")
namespace_labels.append({namespace.metadata.name: namespace.metadata.labels[x_scope_orgid_namespace_label]})

logger.info(f"RULER SYNC - NAMESPACE LABELS: {namespace_labels}")

return namespace_labels


def _get_rulegropups_from_configmaps(v1, namespace_label):
rulegroup_dict_configmap = {}
configmap = v1.list_namespaced_config_map(namespace_label, label_selector='cortex/rules').items
if configmap and configmap is not None:
for configmap_item in configmap:
logger.info(f"RULER SYNC - CONFIGMAP ITEM DATA: {configmap_item.data}")
logger.info(f"RULER SYNC - CONFIGMAP ITEM METADATA: {configmap_item.metadata}")
for key in configmap_item.data.keys():
configmap_document = yaml.load(configmap_item.data[key], Loader=yaml.Loader)
logger.info(f"RULER SYNC - CONFIGMAP DOC: {configmap_document}")
logger.info(f"RULER SYNC - CONFIGMAP DOC KEYS: {configmap_document.keys()}")
if 'groups' in configmap_document.keys() and configmap_document['groups'] is not None:
rulegroup_dict_configmap[namespace_label] = {}
for configmap_group in configmap_document['groups']:
logger.info(f"RULER SYNC - CONFIGMAP namespace_label: {namespace_label}")
logger.info(f"RULER SYNC - CONFIGMAP rulegroup name: {configmap_group['name']}")
logger.info(f"RULER SYNC - CONFIGMAP rulegroup content: {pprint.pformat(configmap_group['rules'])}")
rulegroup_dict_configmap[namespace_label][configmap_group['name']] = configmap_group['rules']

logger.info(f"RULER SYNC - CONFIGMAP rulegroup dict: {pprint.pformat(rulegroup_dict_configmap)}")

return rulegroup_dict_configmap


def _get_rulegropups_from_cortex(rules_url, namespace_label, x_scope_orgid_default, namespace):
rulegroup_dict_cortex = {}
logger.info(f"RULER SYNC - CORTEX {namespace_label} {x_scope_orgid_default} {namespace}")

cortex_rulegroups = _get_cortex_rulegroups_list(namespace_label, rules_url)
logger.info(f"RULER SYNC - CORTEX rulegroup list: {cortex_rulegroups}")
for cortex_namespace in cortex_rulegroups:
logger.info(f"RULER SYNC - CORTEX namespace: {cortex_namespace}")
rulegroup_dict_cortex[cortex_namespace] = {}
for cortex_rulegroup in cortex_rulegroups[cortex_namespace]:
logger.info(f"RULER SYNC - CORTEX rulegroup: {cortex_rulegroup['name']}")
logger.info(f"RULER SYNC - CORTEX rulegroup: {pprint.pformat(cortex_rulegroup['rules'])}")
rulegroup_dict_cortex[cortex_namespace][cortex_rulegroup['name']] = cortex_rulegroup['rules']

logger.info(f"RULER SYNC - CORTEX rulegroup dict: {pprint.pformat(rulegroup_dict_cortex)}")

return rulegroup_dict_cortex


def _watch_resource_loop(*args):
while True:
try:
# Always wait to slow down the loop in case of exceptions
sleep(int(os.getenv("ERROR_THROTTLE_SLEEP", 5)))
# if mode == "SLEEP":
# list_resources(*args)
# sleep(int(os.getenv("SLEEP_TIME", 60)))
# else:
_watch_resource_iterator(*args)
except ApiException as e:
if e.status != 500:
Expand All @@ -439,7 +528,44 @@ def _watch_resource_loop(*args):
logger.error(f"Received unknown exception: {e}\n")
traceback.print_exc()


def rulegroup_equalize(namespace, orgid, rulegroup_dict_configmap, rulegroup_dict_cortex, rules_url):
if namespace in rulegroup_dict_configmap:
for rulegroup_configmap in rulegroup_dict_configmap[namespace]:
add = False
if not namespace in rulegroup_dict_cortex or \
not rulegroup_configmap in rulegroup_dict_cortex[namespace] or \
not rulegroup_dict_configmap[namespace][rulegroup_configmap] == rulegroup_dict_cortex[namespace][rulegroup_configmap]:
_addmod_rulegroup(namespace, orgid, rules_url, rulegroup_configmap, rulegroup_dict_configmap[namespace][rulegroup_configmap])

if namespace in rulegroup_dict_cortex:
for rulegroup_cortex in rulegroup_dict_cortex[namespace]:
if namespace not in rulegroup_dict_configmap or not rulegroup_cortex in rulegroup_dict_configmap[namespace]:
_del_rulegroup(namespace, orgid, rules_url, rulegroup_cortex)


#def _sync_alertmanager(function, label, label_value, rules_url, alerts_url, x_scope_orgid_default,
# x_scope_orgid_namespace_label, namespace, resource):



def _sync_ruler(function, label, label_value, rules_url, alerts_url, x_scope_orgid_default,
x_scope_orgid_namespace_label, namespace, resource):
v1 = client.CoreV1Api()
namespace_labels = _get_namespace_labels(v1, x_scope_orgid_namespace_label)

for namespace_label in namespace_labels:
namespace_name = list(namespace_label.keys())[0]
orgid = list(namespace_label.values())[0]

rulegroup_dict_configmap = _get_rulegropups_from_configmaps(v1, namespace_name)
rulegroup_dict_cortex = _get_rulegropups_from_cortex(rules_url, orgid, x_scope_orgid_default, namespace)

rulegroup_equalize(namespace_name, orgid, rulegroup_dict_configmap, rulegroup_dict_cortex, rules_url)


def _sync(*args):

while True:
try:
logger.info(f"Sync back rest api state")
Expand All @@ -449,6 +575,8 @@ def _sync(*args):
# 2. process all of them (iterate + post the content to the rest endpoint)
# 3. List all rules/alerts from the RHS (rest endpoint)
# 4. Remove all rules (groups) or alerts that no longer are present on the LHS (Configmaps / local administration)
_sync_ruler(*args)
#_sync_alertmanager(*args) TODO
sleep(int(os.getenv("SYNC_SLEEP", 60)))
except Exception as e:
logger.exception(f"Exception caught: {e}\n")
Expand Down

0 comments on commit 2f64406

Please sign in to comment.