-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
70 lines (57 loc) · 1.92 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import argparse
import multiprocessing as mp
import os
import time
from datetime import datetime
import pandas as pd
def calculate_age(dob):
dob = datetime.strptime(dob, '%Y-%m-%d')
age = (datetime.today() - dob).days
return int(age / 365)
def process_df(chunk):
age_list = []
for i ,j in enumerate(chunk.index):
dob = chunk.at[j,"dob"]
age_list.append((calculate_age(dob)))
chunk['age'] = age_list
return chunk
def run(param):
try:
reader = pd.read_csv(param.input_csv, chunksize=param.chunk_size)
pool = mp.Pool(param.pool)
funclist = []
for df in reader:
# process each data frame
f = pool.apply_async(process_df,[df])
funclist.append(f)
result = []
for f in funclist:
result.append(f.get())
training = pd.concat(result)
try:
os.remove(param.output_csv)
except Exception as ex:
pass
training.to_csv(param.output_csv, mode='a', header=True, index=False)
except Exception as e:
import traceback
print("something wrong", e)
traceback.print_exc()
class RunConfig(object):
def __init__(self, args):
self.input_csv = args.input_csv
self.output_csv = args.output_csv
self.chunk_size = args.chunk_size
self.pool = args.pool
if __name__ == '__main__':
chunksize = os.getenv("chunksize")
parser = argparse.ArgumentParser()
parser.add_argument("--input_csv", type=str, default=os.getenv("input_csv"))
parser.add_argument("--output_csv", type=str, default=os.getenv("output_csv"))
parser.add_argument("--chunk_size", type=int, default=os.getenv("chunk_size"))
parser.add_argument("--pool", type=int, default=os.getenv("pool"))
args = parser.parse_args()
print(args)
start_timestamp = time.time()
run(RunConfig(args))
print(str(time.time() - start_timestamp))