-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathpareq
executable file
·86 lines (65 loc) · 2.5 KB
/
pareq
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#!/usr/bin/env python3
"""
Checks that two or more files have the same number of lines.
This is really useful in creating idempotent scripts.
For example, only run an expensive translation job if its
needed:
in=source.txt
out=translations.txt
if ! pareq -q $in $out; then
# do the translation here
# write to $out
fi
It's safe to run this script since the file won't be
retranslated unless there was some error the first time
(in which case you want to retranslate anyway).
Usage:
pareq [-q] FILE1 FILE2 [FILE3...]
When -q is present, it will only exit with success (0) or failure (1).
Otherwise, it will print details of what was different.
"""
import sys
import gzip
from itertools import zip_longest
def smart_open(filepath, mode='rt', encoding='utf-8'):
"""Convenience function for reading compressed or plain text files.
:param filepath: The file to read.
:param mode: The file mode (read, write).
:param encoding: The file encoding.
"""
if filepath.endswith('.gz'):
return gzip.open(filepath, mode=mode, encoding=encoding, newline="\n")
return open(filepath, mode=mode, encoding=encoding, newline="\n")
def count_lines(filename):
line_count = 0
with smart_open(filename) as infh:
for line in infh:
line_count += 1
return line_count
def main(args):
try:
linecount = 0
for linecount, lines in enumerate(zip_longest(*list(map(smart_open, args.files))), 1):
if None in lines:
if args.verbose:
print(f"pareq failure: files are NOT of equal length", file=sys.stderr)
for file in args.files:
print("->", count_lines(file), file)
sys.exit(1)
except FileNotFoundError as e:
if args.verbose:
print(f"pareq failure: {e}", file=sys.stderr)
sys.exit(1)
if args.zero and linecount == 0:
sys.exit(2)
if args.verbose:
print(f"pareq success: all files have {linecount} lines", file=sys.stderr)
sys.exit(0)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser("Utility to test line-parallelism of two or more files")
parser.add_argument("files", nargs="+", help="List of file paths (plain text or .gz)")
parser.add_argument("--zero", "-z", action="store_true", help="Fail if files are equal but have zero length")
parser.add_argument("--verbose", "-v", action="store_true")
args = parser.parse_args()
main(args)