From 94a264241427e508587f0e510a42b9b7e2f55666 Mon Sep 17 00:00:00 2001 From: "Brandyn A. White" Date: Fri, 3 Aug 2012 17:04:26 -0400 Subject: [PATCH] Moved _local.pyx back to _local.py as there were no noticeable speed gains without further optimization Signed-off-by: Brandyn A. White --- hadoopy/__init__.py | 2 +- hadoopy/{_local.pyx => _local.py} | 20 +++++++++++++------- setup.py | 3 +-- 3 files changed, 15 insertions(+), 10 deletions(-) rename hadoopy/{_local.pyx => _local.py} (90%) diff --git a/hadoopy/__init__.py b/hadoopy/__init__.py index 5b1ce3a..4434fd3 100644 --- a/hadoopy/__init__.py +++ b/hadoopy/__init__.py @@ -18,7 +18,7 @@ __license__ = 'GPL V3' from _runner import launch, launch_frozen -from _hadoopy_local import launch_local +from _local import launch_local from _hdfs import get, put, readtb, writetb, ls, exists, rmr, isempty, abspath, isdir, mv, mkdir from _job_cli import run from _reporter import status, counter diff --git a/hadoopy/_local.pyx b/hadoopy/_local.py similarity index 90% rename from hadoopy/_local.pyx rename to hadoopy/_local.py index d9e63ae..42c678c 100644 --- a/hadoopy/_local.pyx +++ b/hadoopy/_local.py @@ -21,7 +21,7 @@ def chdir(path): class LocalTask(object): - def __init__(self, script_path, task, files, max_input, pipe, python_cmd, remove_tempdir=True): + def __init__(self, script_path, task, files=(), max_input=None, pipe=True, python_cmd='python', remove_tempdir=True): self.remove_tempdir = remove_tempdir self.temp_dir = tempfile.mkdtemp() self.script_path = script_path @@ -62,7 +62,7 @@ def _setup_env(self, cmdenvs): env.update(cmdenvs) return env - def run_task(self, kvs, cmdenvs=()): + def run_task(self, kvs, cmdenvs=(), poll=None): env = self._setup_env(cmdenvs) # Setup pipes task = 'pipe %s' % self.task if self.pipe else self.task @@ -87,12 +87,17 @@ def run_task(self, kvs, cmdenvs=()): for num, kv in enumerate(kvs): if self.max_input is not None and self.max_input <= num: break + timeout = None + wrote = False while True: - r, w, _ = select.select([out_r_fd], [in_w_fd], []) + r, w, _ = select.select([out_r_fd], [in_w_fd], [], timeout) if r: # If data is available to be read, than get it yield tbfp_r.next() - elif w: + elif w and not wrote: tbfp_w.write(kv) + wrote = True + timeout = .001 + if wrote and (poll is None or poll()): break # Get any remaining values while True: @@ -105,7 +110,7 @@ def run_task(self, kvs, cmdenvs=()): p.wait() -def launch_local(in_name, out_name, script_path, max_input=None, +def launch_local(in_name, out_name, script_path, poll=None, max_input=None, files=(), cmdenvs=(), pipe=True, python_cmd='python', remove_tempdir=True, **kw): """A simple local emulation of hadoop @@ -129,6 +134,7 @@ def launch_local(in_name, out_name, script_path, max_input=None, :param in_name: Input path (string or list of strings) or Iterator of (key, value). If it is an iterator then no input is taken from HDFS. :param out_name: Output path or None. If None then output is not placed on HDFS, it is available through the 'output' key of the return value. :param script_path: Path to the script (e.g., script.py) + :param poll: If not None, then only attempt to get a kv pair from kvs if when called, poll returns True. :param max_input: Maximum number of Mapper inputs, None (default) then unlimited. :param files: Extra files (other than the script) (iterator). NOTE: Hadoop copies the files into working directory :param cmdenvs: Extra cmdenv parameters (iterator) @@ -156,7 +162,7 @@ def launch_local(in_name, out_name, script_path, max_input=None, in_kvs = in_name if 'reduce' in script_info['tasks']: kvs = list(LocalTask(script_path, 'map', files, max_input, pipe, - python_cmd, remove_tempdir).run_task(in_kvs, cmdenvs)) + python_cmd, remove_tempdir).run_task(in_kvs, cmdenvs, poll)) if 'combine' in script_info['tasks']: kvs = hadoopy.Test.sort_kv(kvs) kvs = list(LocalTask(script_path, 'combine', files, max_input, pipe, @@ -166,7 +172,7 @@ def launch_local(in_name, out_name, script_path, max_input=None, python_cmd, remove_tempdir).run_task(kvs, cmdenvs) else: kvs = LocalTask(script_path, 'map', files, max_input, pipe, - python_cmd, remove_tempdir).run_task(in_kvs, cmdenvs) + python_cmd, remove_tempdir).run_task(in_kvs, cmdenvs, poll) out = {} if out_name is not None: hadoopy.writetb(out_name, kvs) diff --git a/setup.py b/setup.py index 8050e8b..e02f3c3 100644 --- a/setup.py +++ b/setup.py @@ -82,8 +82,7 @@ def _remove_prefix(string, prefix='hadoopy/'): ext_modules = [Extension("_hadoopy_main", ["hadoopy/_main" + source_ext, "hadoopy/getdelim.c"]), Extension("_hadoopy_typedbytes", ["hadoopy/_typedbytes" + source_ext], - extra_compile_args=tb_extra_args), - Extension("_hadoopy_local", ["hadoopy/_local" + source_ext])] + extra_compile_args=tb_extra_args)] setup(name='hadoopy', cmdclass=cmdclass, version='0.6.0',