Skip to content

Commit

Permalink
migration helper: handle discovery rules
Browse files Browse the repository at this point in the history
Change-Id: Ice0a04cb161808dd8b8ec1fcad7c011c112648e7
  • Loading branch information
mo-ki committed Jan 31, 2025
1 parent b5062e2 commit c335521
Showing 1 changed file with 37 additions and 3 deletions.
40 changes: 37 additions & 3 deletions doc/treasures/migration_helpers/legacy_vs_to_ruleset_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,13 +432,20 @@ def _make_ruleset_plugin(self, old_ruleset: cst.Call) -> cst.SimpleStatementLine
return self._construct_check_parameters(old_ruleset.args)

args = {k.value: arg.value for arg in old_ruleset.args if (k := arg.keyword) is not None}
group = cst.ensure_type(
cst.ensure_type(args["name"], cst.Call).func, cst.Attribute
).attr.value
group = (
name.value
if isinstance(name := args["name"], cst.SimpleString)
else cst.ensure_type(
cst.ensure_type(args["name"], cst.Call).func, cst.Attribute
).attr.value
)
if rule_type == "HostRulespec" and group == "SpecialAgents":
return self._construct_special_agent(old_ruleset.args)
if rule_type == "HostRulespec" and group == "ActiveChecks":
return self._construct_active_check(old_ruleset.args)
if rule_type == "HostRulespec":
# a guess, but most of the time it's a discovery rule
return self._construct_discovery_parameters(group, old_ruleset.args)

print(f"not yet implemented rulespec type: {rule_type}", file=sys.stderr)
return cst.SimpleStatementLine(
Expand Down Expand Up @@ -476,6 +483,33 @@ def _construct_check_parameters(self, old: Sequence[cst.Arg]) -> cst.SimpleState
)
)

def _construct_discovery_parameters(
self, name: str, old: Sequence[cst.Arg]
) -> cst.SimpleStatementLine:
args = {k.value: arg.value for arg in old if (k := arg.keyword) is not None}
form_spec = args["parameter_valuespec"]

return cst.SimpleStatementLine(
(
cst.Assign(
targets=(cst.AssignTarget(cst.Name(f"rule_spec_{name}")),),
value=cst.Call(
func=cst.Name("DiscoveryParameters"),
args=(
cst.Arg(cst.SimpleString(f'"{name}"'), cst.Name("name")),
*_extract("title", old),
cst.Arg(cst.Name("Topic"), cst.Name("topic")),
cst.Arg(form_spec, cst.Name("parameter_form")),
cst.Arg(
self._make_condition(args.get("item_spec")),
cst.Name("condition"),
),
),
),
),
)
)

def _construct_active_check(self, old: Sequence[cst.Arg]) -> cst.SimpleStatementLine:
args = {k.value: arg.value for arg in old if (k := arg.keyword) is not None}
name = self._extract_string(cst.ensure_type(args["name"], cst.Call).args[0].value)
Expand Down

0 comments on commit c335521

Please sign in to comment.