Skip to content

Commit

Permalink
Added a dumb little timer thread
Browse files Browse the repository at this point in the history
  • Loading branch information
migurski committed Oct 22, 2014
1 parent ffbd8c0 commit 14ab963
Showing 1 changed file with 16 additions and 4 deletions.
20 changes: 16 additions & 4 deletions parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,32 @@
from multiprocessing import cpu_count
from argparse import ArgumentParser
from openaddr import conform
from time import sleep

def run_thread(lock, source_files, destination_files):
def run_conform(lock, source_files, destination_files):
'''
'''
while True:
with lock:
if not source_files:
return
path = source_files.pop()
path = source_files.pop(0)

getLogger('openaddr').info(path)
csv_path = conform(path, 'out')

with lock:
destination_files[path] = csv_path

def run_timer(source_files, interval):
'''
'''
sleep(interval)

while source_files:
getLogger('openaddr').debug('{0} source files remain'.format(len(source_files)))
sleep(interval)

def setup_logger(logfile):
''' Set up logging stream and optional file for 'openaddr' logger.
'''
Expand All @@ -45,16 +55,18 @@ def setup_logger(logfile):

source_files = [
'/var/opt/openaddresses/sources/us-ca-san_francisco.json',
#'/var/opt/openaddresses/sources/us-ca-alameda_county.json',
'/var/opt/openaddresses/sources/us-ca-alameda_county.json',
'/var/opt/openaddresses/sources/us-ca-oakland.json',
'/var/opt/openaddresses/sources/us-ca-berkeley.json'
]

destination_files = OrderedDict()
args = Lock(), source_files, destination_files

threads = [Thread(target=run_thread, args=args)
threads = [Thread(target=run_conform, args=args)
for i in range(cpu_count() + 1)]

threads.append(Thread(target=run_timer, args=(source_files, 15)))

for thread in threads:
thread.start()
Expand Down

0 comments on commit 14ab963

Please sign in to comment.