From ff60f8f4e65a21d74bd6639920fe37abdfda9a0a Mon Sep 17 00:00:00 2001 From: Juan Caballero Date: Thu, 23 Feb 2023 11:07:18 +0100 Subject: [PATCH] Format docstrings appropriately --- avclass/common.py | 184 ++++++++++++++++++++++----------------------- avclass/labeler.py | 10 +-- avclass/update.py | 41 +++++----- 3 files changed, 116 insertions(+), 119 deletions(-) diff --git a/avclass/common.py b/avclass/common.py index 51b869c..4a04f78 100644 --- a/avclass/common.py +++ b/avclass/common.py @@ -38,7 +38,7 @@ } class Tag: - ''' A Tag in the taxonomy ''' + """A Tag in the taxonomy""" def __init__(self, s): word_list = s.strip().split(":") if len(word_list) > 1: @@ -56,51 +56,49 @@ def __init__(self, s): self._path = self._name def __hash__(self): - ''' Return hash ''' + """Return hash""" return hash((self._path)) @property def name(self): - ''' Return tag name ''' + """Return tag name""" return self._name @property def cat(self): - ''' Return tag category ''' + """Return tag category""" return self._cat @property def path(self): - ''' Return tag path ''' + """Return tag path""" return self._path @property def prefix_l(self): - ''' Return tag prefix list ''' + """Return tag prefix list""" return self._prefix_l class Taxonomy: - ''' - A taxonomy of tags and generic tokens read from file - ''' + """A taxonomy of tags and generic tokens read from file""" def __init__(self, filepath): - ''' Map tag.name | tag.path -> Tag ''' - self._tags = set() + """Initialize and populate _tag_map from input file""" + self._tags = set() # Map tag.name | tag.path -> Tag self._tag_map = {} if filepath: self.read_taxonomy(filepath) def __len__(self): - ''' Taxonomy length is the number of tags it contains ''' + """Taxonomy length is the number of tags it contains""" return len(self._tags) def __iter__(self): - ''' Iterator over the alphabetically sorted tags in the taxonomy ''' + """Iterator over the alphabetically sorted tags in the taxonomy""" return (t for t in sorted(self._tags)) def is_generic(self, t): - ''' Return true if input is generic, false otherwise ''' + """Whether the input tag is generic""" tag = self._tag_map.get(t, None) if tag: return tag.cat == "GEN" @@ -108,13 +106,15 @@ def is_generic(self, t): return False def is_tag(self, t): - ''' Return true if input is tag, false otherwise ''' + """Whether the input tag exists in the taxonomy""" return t in self._tag_map def add_tag(self, s, override=False): - ''' Add tag to taxonomy - If tag already exists with different path, - only replaces if override True ''' + """Add the input tag to the taxonomy + + If tag already exists with different path, + it is only replaced if override is True + """ tag = Tag(s) t = self._tag_map.get(tag.name, None) if t and (t.path != tag.path): @@ -131,19 +131,17 @@ def add_tag(self, s, override=False): return def remove_tag(self, t): - ''' Remove tag from taxonomy. Returns 1 if removed, zero if unknown ''' + """Remove tag from taxonomy. Returns whether tag was removed""" tag = self._tag_map.get(t, None) if tag: log.debug("[Taxonomy] Removing tag: %s" % tag.path) del self._tag_map[tag.name] del self._tag_map[tag.path] self._tags.remove(tag) - return 1 - else: - return 0 + return tag is not None def get_category(self, t): - ''' Return category of input tag, UNK if not a tag ''' + """Return category of input tag, UNK if not a tag""" tag = self._tag_map.get(t, None) if tag: return tag.cat @@ -151,7 +149,7 @@ def get_category(self, t): return "UNK" def get_path(self, t): - ''' Return full path for given tag, or empty string if not a tag ''' + """Return full path for given tag, or empty string if not a tag""" tag = self._tag_map.get(t, None) if tag: return tag.path @@ -159,7 +157,7 @@ def get_path(self, t): return ("UNK:" + t) def get_prefix_l(self, t): - ''' Return prefix list for given tag, or empty string if not a tag ''' + """Return prefix list for given tag, or empty string if not a tag""" tag = self._tag_map.get(t, None) if tag: return tag.prefix_l @@ -167,8 +165,7 @@ def get_prefix_l(self, t): return [] def get_prefix(self, t): - ''' Return prefix string for given tag, - or empty string if not a tag ''' + """Return prefix string for given tag, or empty string if not a tag""" tag = self._tag_map.get(t, None) if tag: return tag.prefix_l @@ -176,9 +173,11 @@ def get_prefix(self, t): return t.path[0:t.path.rfind(':')] def get_depth(self, t): - ''' Return depth of tag in taxonomy. - Returns zero if tag not in taxonomy. - A normal tag CAT:name has depth two ''' + """Return depth of tag in taxonomy. + + Returns zero if tag not in taxonomy. + A normal tag CAT:name has depth two + """ tag = self._tag_map.get(t, None) if tag: return len(tag.prefix_l) + 2 @@ -186,7 +185,7 @@ def get_depth(self, t): return 0 def get_info(self, t): - ''' Return (path,category) for given tag, or UNK:t if not a tag ''' + """Return (path,category) for given tag, or UNK:t if not a tag""" tag = self._tag_map.get(t, None) if tag: return tag.path, tag.cat @@ -194,7 +193,7 @@ def get_info(self, t): return "UNK:" + t, "UNK" def expand(self, t): - ''' Return list of tags in prefix list that are leaves ''' + """Return list of tags in prefix list that are leaves""" tag = self._tag_map.get(t, None) if tag: return [t for t in tag.prefix_l if t in self._tag_map] @@ -202,7 +201,7 @@ def expand(self, t): return [] def platform_tags(self): - ''' Returns list with platform tags in taxonomy ''' + """Returns list with platform tags in taxonomy""" acc = set() for idx,tag in self._tag_map.items(): if tag.path.startswith(platform_prefix): @@ -210,13 +209,13 @@ def platform_tags(self): return acc def overlaps(self, t1, t2): - ''' Returns true if the path of the given tags overlaps ''' + """Returns true if the path of the given tags overlaps""" m1 = self.get_prefix_l(t1) m2 = self.get_prefix_l(t2) return (t1 in m2) or (t2 in m1) def remove_overlaps(self, l): - ''' Returns list with overlapping tags removed ''' + """Returns list with overlapping tags removed""" if not l: return l pair_l = sorted([(self.get_depth(t),t) for t in l]) @@ -228,7 +227,7 @@ def remove_overlaps(self, l): return out_l def read_taxonomy(self, filepath): - '''Read taxonomy from given file ''' + """Read taxonomy from given file""" with open(filepath, 'r') as fd: for line in fd: if line.startswith('#') or line == '\n': @@ -237,7 +236,7 @@ def read_taxonomy(self, filepath): return def to_file(self, filepath): - ''' Output sorted taxonomy to given file ''' + """Output sorted taxonomy to given file""" # Open output file fd = open(filepath, 'w') # Write sorted tags @@ -253,23 +252,24 @@ def to_file(self, filepath): fd.close() class Rules: - ''' - Rules are src -> dst1, dst2, ... relations - ''' + """A relation from one source to one or more destinations""" def __init__(self, filepath): - ''' Map src -> set(dst) ''' - self._src_map = {} + """Initialize rule map and read rules from input file""" + self._src_map = {} # src -> set(dst) if filepath: self.read_rules(filepath) def __len__(self): - ''' Length is number of rules, i.e., number of src ''' + """The number of rules (i.e., source tags)""" return len(self._src_map) def add_rule(self, src, dst_l, overwrite=False): - ''' Add rule. If rule exists: - if overwrite==True, replace destination list - else append dst_l to current target set ''' + """Add rule. + + If rule exists: + if overwrite==True, replace destination list + else append dst_l to current target set + """ # Remove src from dst_l if it exists dst_l = filter(lambda x: x != src, dst_l) # If no destinations, nothing to do @@ -289,6 +289,7 @@ def add_rule(self, src, dst_l, overwrite=False): return def remove_rule(self, src): + """Remove the rule for the input source""" l = self._src_map.get(src, []) if l: log.debug("[Rules] Removing rule: %s -> %s" % (src, l)) @@ -298,11 +299,11 @@ def remove_rule(self, src): return 0 def get_dst(self, src): - ''' Returns dst list for given src, or empty list if no expansion ''' + """Returns dst list for given src, or empty list if no expansion""" return list(self._src_map.get(src, [])) def read_rules(self, filepath): - '''Read rules from given file''' + """Read rules from given file""" with open(filepath, 'r') as fd: for line in fd: if line.startswith('#') or line == '\n': @@ -313,8 +314,10 @@ def read_rules(self, filepath): return def to_file(self, filepath, taxonomy=None): - ''' Output sorted rules to given file - If taxonomy is provided, it outputs full tag path ''' + """Output sorted rules to given file + + If taxonomy is provided, it outputs full tag path + """ fd = open(filepath, 'w') for src,dst_set in sorted(self._src_map.items()): dst_l = sorted(dst_set, reverse=False) @@ -329,8 +332,10 @@ def to_file(self, filepath, taxonomy=None): fd.close() def expand_src_destinations(self, src): - ''' Return destination list for given src after recursively - following any rules for destinations ''' + """Return a list of all expanded destinations for given source + + Recursively follows any rules for destinations + """ dst_set = self._src_map.get(src, set()) out = set() while dst_set: @@ -345,22 +350,20 @@ def expand_src_destinations(self, src): return out def expand_all_destinations(self): - ''' Return destination list for given src after recursively - following any rules for destinations ''' + """Expand all sources""" src_l = self._src_map.keys() for src in src_l: dst_l = self.expand_src_destinations(src) self._src_map[src] = dst_l class Tagging(Rules): - ''' - Tagging rules have src UNK and dst in taxonomy - ''' + """A rule with an unknown source and a destination in the taxonomy""" def __init__(self, filepath): + """Initialize rules from input file""" Rules.__init__(self, filepath) def validate(self, taxonomy): - ''' Check that tags in tagging rules are in given taxonomy ''' + """Check that tags in tagging rules are in given taxonomy""" for tok,tag_l in self._src_map.items(): if taxonomy.is_tag(tok): sys.stdout.write("[Tagging] SRC %s in taxonomy\n" % tok) @@ -369,15 +372,12 @@ def validate(self, taxonomy): sys.stdout.write("[Tagging] %s not in taxonomy\n" % t) class Expansion(Rules): - ''' - Expansion rules have src and dst in taxonomy and - src.category != dst.category - ''' + """A rule where source different than destination and both in taxonomy""" def __init__(self, filepath): Rules.__init__(self, filepath) def validate(self, taxonomy): - ''' Check that tags in expansion rules are in given taxonomy ''' + """Check that tags in expansion rules are in given taxonomy""" for src,dst_set in self._src_map.items(): if (not taxonomy.is_tag(src)): sys.stdout.write("[Expansion] %s not in taxonomy\n" % src) @@ -386,12 +386,10 @@ def validate(self, taxonomy): sys.stdout.write("[Expansion] %s not in taxonomy\n" % dst) class AvLabels: - ''' - Class to operate on AV labels, - such as extracting the most likely family name. - ''' + """Primary class to process AV labels""" def __init__(self, tag_file, exp_file = None, tax_file = None, av_file = None, aliasdetect=False): + """Initialize using given files and options""" # Read taxonomy self.taxonomy = Taxonomy(tax_file) # Read tag rules @@ -405,7 +403,7 @@ def __init__(self, tag_file, exp_file = None, tax_file = None, @staticmethod def read_avs(avs_file): - '''Read AV engine set from given file''' + """Read AV engine set from given file""" with open(avs_file) as fd: avs = set(map(str.strip, fd.readlines())) sys.stderr.write("[-] Using %d AV engines in %s\n" % (len(avs), @@ -414,17 +412,13 @@ def read_avs(avs_file): @staticmethod def get_sample_info_lb(vt_rep): - '''Parse and extract sample information from JSON line - Returns a SampleInfo named tuple - ''' + """Parse sample information from basic report""" return SampleInfo(vt_rep['md5'], vt_rep['sha1'], vt_rep['sha256'], vt_rep['av_labels'], []) @staticmethod def get_sample_info_vt_v2(vt_rep): - '''Parse and extract sample information from JSON line - Returns a SampleInfo named tuple - ''' + """Parse sample information from VT v2 report""" label_pairs = [] # Obtain scan results, if available try: @@ -449,9 +443,7 @@ def get_sample_info_vt_v2(vt_rep): @staticmethod def get_sample_info_vt_v3(vt_rep): - '''Parse and extract sample information from JSON line - Returns a SampleInfo named tuple - ''' + """Parse sample information from VT v3 report""" # VT file reports in APIv3 contain all info under 'data' # but reports from VT file feed (also APIv3) don't have it # Handle both cases silently here @@ -481,12 +473,13 @@ def get_sample_info_vt_v3(vt_rep): @staticmethod def is_pup(tag_pairs, taxonomy): - '''This function classifies the sample as PUP or not - by checking if highest ranked CLASS tag contains "grayware" - and is above a predefined threshold - Return: - True/False/None - ''' + """Whether the sample is PUP + + Checks if highest ranked CLASS tag contains "grayware" + and is above a predefined threshold + Return: + True/False/None + """ threshold = 0.5 # If no tags, return false if len(tag_pairs) < 1: @@ -503,9 +496,7 @@ def is_pup(tag_pairs, taxonomy): @staticmethod def _remove_suffixes(av_name, label): - '''Remove AV specific suffixes from given label - Returns updated label''' - + """Returns input label without AV specific suffixes""" # Truncate after last '.' if av_name in suffix_removal_av_set: label = label.rsplit('.', 1)[0] @@ -525,10 +516,11 @@ def _remove_suffixes(av_name, label): def get_label_tags(self, label, hashes): - ''' Return list of tags in given label - Tokenizes label, filters unneeded tokens, and - applies tagging rules ''' - + """Return list of tags in given label + + Tokenizes label, filters unneeded tokens, and + applies tagging rules + """ # Initialize set of tags to return # We use a set to avoid duplicate tokens in the same AV label # This avoids "potentially unwanted" contributing twice BEH:pup @@ -579,7 +571,7 @@ def get_label_tags(self, label, hashes): def _expand(self, tag_set): - ''' Return expanded set of tags ''' + """Return expanded set of tags""" ret = set() for t in tag_set: # Include tag @@ -595,7 +587,7 @@ def _expand(self, tag_set): return ret def get_sample_tags(self, sample_info): - ''' Returns dictionary tag -> AV list of tags for the given sample ''' + """Returns dictionary tag -> AV list of tags for the given sample""" # Whitelist the AVs to filter the ones with meaningful labels av_whitelist = self.avs @@ -672,16 +664,16 @@ def get_sample_tags(self, sample_info): return av_dict def rank_tags(self, av_dict, threshold=1): - ''' Return list of (tag, confidence) ranked by decreasing confidence - and filter tags with less or equal threshold confidence ''' + """Return list of (tag, confidence) ranked by decreasing confidence + Filters tags with less or equal threshold confidence + """ pairs = ((t, len(avs)) for (t,avs) in av_dict.items() if len(avs) > threshold) return sorted(pairs, key=itemgetter(1,0), reverse=True) def get_sample_vt_count(self, sample_info): - ''' Return number of detections for sample - in the provided AV whitelist (if any) ''' + """Return number of detections for sample using AV whitelist""" if self.avs is None: return len(sample_info.labels) else: diff --git a/avclass/labeler.py b/avclass/labeler.py index 577b9bb..516956f 100755 --- a/avclass/labeler.py +++ b/avclass/labeler.py @@ -21,7 +21,7 @@ from avclass import evaluate as ec def guess_hash(h): - ''' Given a hash string, guess the hash type based on the string length ''' + """Guess the hash type of input string""" hlen = len(h) if hlen == 32: return 'md5' @@ -33,7 +33,7 @@ def guess_hash(h): return None def format_tag_pairs(l, taxonomy=None): - ''' Return ranked tags as string ''' + """Return ranked tags as string""" if not l: return "" if taxonomy is not None: @@ -50,7 +50,7 @@ def format_tag_pairs(l, taxonomy=None): return out def list_str(l, sep=", ", prefix=""): - ''' Return list as a string ''' + """Return list as a string""" if not l: return "" out = prefix + l[0] @@ -364,8 +364,8 @@ def main(): def parse_args(): argparser = argparse.ArgumentParser(prog='avclass', - description='''Extracts tags for a set of samples. - Also calculates precision and recall if ground truth available''') + description='Extracts tags for a set of samples. + Also calculates precision and recall if ground truth available') argparser.add_argument('-vt', action='append', help='file with VT reports ' diff --git a/avclass/update.py b/avclass/update.py index 93cf8e7..94e0a6b 100755 --- a/avclass/update.py +++ b/avclass/update.py @@ -38,7 +38,7 @@ 'nalias_num', 'talias_num', 'tinv_alias_num']) class Update: - ''' Update Module ''' + """Update Module""" def __init__(self, rel_filepath, in_taxonomy, in_tagging, in_expansion, n, t): # Initialize inputs @@ -55,20 +55,20 @@ def __init__(self, rel_filepath, in_taxonomy, in_tagging, in_expansion, self.rel_set = self.read_relations(rel_filepath) def num_rules(self): + """Number of relations""" return len(self.rel_set) def is_weak_rel(self, rel): - ''' Return true if relationship is weak, - i.e., does not meet thresholds ''' + """Whether input relationship is weak""" return ((int(rel.nalias_num) < self._n) or (float(rel.talias_num) < self._t)) def is_blacklisted_rel(self, rel): - ''' Return true if relationship is blacklisted ''' + """Whether input relationship is blacklisted""" return (rel.t1 in self.blist) or (rel.t2 in self.blist) def is_known_rel(self, rel): - ''' Return true if relationship is known ''' + """Whether input relationship is known""" t1 = rel.t1 t2 = rel.t2 # Known taxonomy relation @@ -90,13 +90,13 @@ def is_known_rel(self, rel): return False def add_tag(self, name, path): - ''' Add tag to taxonomy if not in tagging ''' + """Add tag to taxonomy if not in tagging""" l = self._out_tagging.get_dst(name) if (not l): self._out_taxonomy.add_tag(path) def add_expansion(self, src, dst_l): - ''' Add expansion rule fixing destination if src in tagging ''' + """Add expansion rule fixing destination if src in tagging""" # Select source handling aliases l = self._out_tagging.get_dst(src) if l: @@ -113,7 +113,7 @@ def add_expansion(self, src, dst_l): self._out_expansion.add_rule(new_src, dst_l, True) def add_alias(self, src, dst, dst_prefix): - ''' Add alias relation to taxonomy, tagging ''' + """Add alias relation to taxonomy, tagging""" # If src in tagging, use most popular target l = self._out_tagging.get_dst(src) target = dst @@ -137,7 +137,7 @@ def add_alias(self, src, dst, dst_prefix): self._out_tagging.add_rule(src, target_l, True) def is_expansion_rel(self, rel): - ''' Return true if relation implies expansion rule ''' + """Whether input relation implies expansion rule""" c1 = self._out_taxonomy.get_category(rel.t1) c2 = self._out_taxonomy.get_category(rel.t2) return (((c1 == "FAM") and (c2 != c1) and (c2 != "UNK")) or @@ -145,7 +145,7 @@ def is_expansion_rel(self, rel): ((c1 == "UNK") and ((c2 == "BEH") or (c2 == "CLASS")))) def find_expansions(self): - ''' Find expansions among relations ''' + """Find expansions among relations""" acc = [] for rel in self.rel_set: p1 = self._out_taxonomy.get_path(rel.t1) @@ -163,7 +163,7 @@ def find_expansions(self): self.rel_set.remove(rel) #def is_alias_rel(self, rel): - # ''' Return true if relation implies alias rule ''' + # """Whether input relation implies alias rule""" # c1 = self._out_taxonomy.get_category(rel.t1) # c2 = self._out_taxonomy.get_category(rel.t2) # return (((c1 == "UNK") and (c2 == "FAM")) or @@ -171,7 +171,7 @@ def find_expansions(self): #def find_aliases(self): - # ''' Find aliases among relations ''' + # """Find aliases among relations""" # for rel in self.rel_set: # c1 = self._out_taxonomy.get_category(rel.t1) # c2 = self._out_taxonomy.get_category(rel.t2) @@ -182,8 +182,7 @@ def find_expansions(self): # self.output_components("comp") def process_relation(self, rel): - ''' Process relation and update taxonomy/tagging correspondingly ''' - + """Process relation and update taxonomy/tagging correspondingly""" # Obtain tag info t1 = rel.t1 t2 = rel.t2 @@ -272,6 +271,7 @@ def process_relation(self, rel): def run(self): + """Identify updates""" num_iter = 0 while self.rel_set: # Do a pass in remaining relations @@ -310,8 +310,10 @@ def run(self): def read_relations(self, filepath): - ''' Returns relations in file as a set - Filters weak and blacklisted relations ''' + """Returns relations in file as a set + + Filters weak and blacklisted relations + """ rel_set = set() with open(filepath, 'r') as fd: for line in fd: @@ -344,6 +346,7 @@ def read_relations(self, filepath): return rel_set def output_relations(self, filepath): + """Output relations to given file""" fd = open(filepath, 'w') fd.write("# t1\tt2\t|t1|\t|t2|\t|t1^t2|\t|t1^t2|/|t1|\t" "|t1^t2|/|t2|\n") @@ -361,6 +364,7 @@ def output_relations(self, filepath): fd.close() def output_rule_stats(self, fd): + """Output rule statistics to given file descriptor""" # Initialize maps for statistics self.dst_map = {} self.cat_pairs_map = {} @@ -384,6 +388,7 @@ def output_rule_stats(self, fd): fd.write("%s\t%03d\n" % (taxonomy.get_path(dst), cnt)) def output(self, out_prefix): + """Output updated taxonomy/tagging/expansions files""" if (not out_prefix): tax_filepath = DEFAULT_TAX_PATH tag_filepath = DEFAULT_TAG_PATH @@ -406,8 +411,8 @@ def output(self, out_prefix): def main(): argparser = argparse.ArgumentParser( - description='''Given a .alias file from the labeler, - generates updates for the taxonomy, tagging, and expansion files.''') + description='Given a .alias file from the labeler, + generates updates for the taxonomy, tagging, and expansion files.') argparser.add_argument('-alias', help='input file with alias from labeler. Mandatory.')