-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfilter_errors.py
85 lines (69 loc) · 2.2 KB
/
filter_errors.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from genericpath import exists
import io
import argparse
from os import listdir, walk, mkdir
from os.path import join as osjoin, isdir, basename, dirname
import shutil
CAPA_ERROR = b"ERROR:capa"
def get_length(stream: io.BytesIO):
# Save original spot
start = stream.tell()
# Get end
stream.seek(0, io.SEEK_END)
end = stream.tell()
# Reset back to original spot
stream.seek(start, io.SEEK_SET)
return end - start
def is_capa_error(stream: io.BytesIO):
# Return true if the data contains an error
data = stream.read(len(CAPA_ERROR))
return data == CAPA_ERROR
def get_all_files(path):
"""
Gets all of the sample from a path
path: path to folder with samples
return: all of the samples in the folder
"""
files = []
if not isdir(path):
return []
for item in listdir(path):
item = osjoin(path, item)
if isdir(item):
files += [osjoin(dp, f) for dp, dn, filenames in walk(item) for f in filenames]
else:
files.append(item)
return files
def process_file(item, out_dir):
"""
Processses a single file and chooses whether to copy it or not
@item: the item to check and copy
@out_dir: the base dir
"""
mal_family = basename(dirname(item))
out_dir = osjoin(out_dir, mal_family)
if not exists(out_dir):
mkdir(out_dir)
out_filename = f"{osjoin(out_dir, basename(item))}"
with open(item, "rb") as in_stream:
if get_length(in_stream) > 0 and not is_capa_error(in_stream):
shutil.copyfile(item, out_filename)
def main(**kwargs):
in_dir = kwargs["directory"]
out_dir = kwargs["output"]
# Make sure the directories exist
if not exists(in_dir):
mkdir(in_dir)
if not exists(out_dir):
mkdir(out_dir)
# Iterate over all the files
files = get_all_files(in_dir)
for item in files:
process_file(item, out_dir)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Run CAPAs simultaneously')
parser.add_argument('-d', "--directory", type=str,
help='The folder that contains the results to filter', nargs='?', default="out")
parser.add_argument('-o', "--output", type=str,
help='The folder that contains the results to filter', nargs='?', default="filtered_out")
main(**vars(parser.parse_args()))