-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtf
executable file
·96 lines (73 loc) · 3.38 KB
/
tf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#!/usr/bin/python
"""
Parse a declarative workflow configuration and run it.
"""
def setup(workflow_file):
import yaml
from timeflow import LabeledRegistry
routine_registry = LabeledRegistry()
logging.debug('loading yaml workflow')
workflow = yaml.load(workflow_file)
if not isinstance(workflow, dict):
raise ValueError('Workflow "{}" did not parse to a dict (is it yaml?)'
.format(workflow_file.name))
def build_routine(routine_structure):
label, config = routine_structure
routine_label = config.pop('routine')
routine = routine_registry[routine_label]
routine_registry[label] = routine(label, **config)
map(build_routine, workflow.items())
return routine_registry
def get_connected(source, filtercolumn):
from timeflow import Connector
if filtercolumn and '=' in filtercolumn:
raise NotImplemented
data = Connector(source, filter=filtercolumn)
return data
if __name__ == '__main__':
import argparse, logging
parser = argparse.ArgumentParser(description="TimeFlow is a simple utility"\
" for managing indexed data-processing workflows.",
epilog="The goal is to make it painless (and even fun?) to write your"\
" genius algorithms as stand-alone routiens, which you can plug"\
" together using a declarative YAML workflow definition.")
# positional arguments
parser.add_argument('workflow', help='the yaml workflow description')
# optional arguments
parser.add_argument('--version', action='store_false',
help='report the version of the timeflow installation')
loudness = parser.add_mutually_exclusive_group()
loudness.add_argument('-v', '--verbose', action='store_true',
help='show lots of running info')
loudness.add_argument('-q', '--quiet', action='store_true',
help='suppress info and warnings')
# control arguments
control = parser.add_argument_group('control arguments')
control.add_argument('-r', '--routine', default='save',
help='the routine whose output you want')
# control.add_argument('-c', '--column', dest='column', nargs='+',
# help='retrieve specified columns')
control.add_argument('-f', '--filter', dest='filtercolumn',
help='remove rows where this column evaluates false')
# output arguments
out = parser.add_argument_group('output options')
out.add_argument('-o', '--output', default='timeflowed',
help='filename to save')
out.add_argument('-t', '--type', dest='outputtype', default='csv',
choices=['csv', 'plot'],
help='type to output')
args = parser.parse_args()
if args.quiet:
logging.basicConfig(level=logging.ERROR)
elif not args.verbose:
logging.basicConfig(level=logging.INFO)
elif args.verbose:
logging.basicConfig(level=logging.DEBUG)
logging.debug('opening workflow file')
workflow_file = open(args.workflow, 'r')
logging.debug('registering workflow routines')
registry = setup(workflow_file)
logging.debug('connecting to routine "{}"'.format(args.routine))
data = get_connected(args.routine, args.filtercolumn)
print data.get()
# logging.info('Running workflow for "{}"...'.format(args.routine))