Skip to content

Commit

Permalink
Moved _local.pyx back to _local.py as there were no noticeable speed …
Browse files Browse the repository at this point in the history
…gains without further optimization

Signed-off-by: Brandyn A. White <[email protected]>
  • Loading branch information
Brandyn A. White committed Aug 3, 2012
1 parent 7c73e76 commit 94a2642
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 10 deletions.
2 changes: 1 addition & 1 deletion hadoopy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
__license__ = 'GPL V3'

from _runner import launch, launch_frozen
from _hadoopy_local import launch_local
from _local import launch_local
from _hdfs import get, put, readtb, writetb, ls, exists, rmr, isempty, abspath, isdir, mv, mkdir
from _job_cli import run
from _reporter import status, counter
Expand Down
20 changes: 13 additions & 7 deletions hadoopy/_local.pyx → hadoopy/_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def chdir(path):

class LocalTask(object):

def __init__(self, script_path, task, files, max_input, pipe, python_cmd, remove_tempdir=True):
def __init__(self, script_path, task, files=(), max_input=None, pipe=True, python_cmd='python', remove_tempdir=True):
self.remove_tempdir = remove_tempdir
self.temp_dir = tempfile.mkdtemp()
self.script_path = script_path
Expand Down Expand Up @@ -62,7 +62,7 @@ def _setup_env(self, cmdenvs):
env.update(cmdenvs)
return env

def run_task(self, kvs, cmdenvs=()):
def run_task(self, kvs, cmdenvs=(), poll=None):
env = self._setup_env(cmdenvs)
# Setup pipes
task = 'pipe %s' % self.task if self.pipe else self.task
Expand All @@ -87,12 +87,17 @@ def run_task(self, kvs, cmdenvs=()):
for num, kv in enumerate(kvs):
if self.max_input is not None and self.max_input <= num:
break
timeout = None
wrote = False
while True:
r, w, _ = select.select([out_r_fd], [in_w_fd], [])
r, w, _ = select.select([out_r_fd], [in_w_fd], [], timeout)
if r: # If data is available to be read, than get it
yield tbfp_r.next()
elif w:
elif w and not wrote:
tbfp_w.write(kv)
wrote = True
timeout = .001
if wrote and (poll is None or poll()):
break
# Get any remaining values
while True:
Expand All @@ -105,7 +110,7 @@ def run_task(self, kvs, cmdenvs=()):
p.wait()


def launch_local(in_name, out_name, script_path, max_input=None,
def launch_local(in_name, out_name, script_path, poll=None, max_input=None,
files=(), cmdenvs=(), pipe=True, python_cmd='python', remove_tempdir=True,
**kw):
"""A simple local emulation of hadoop
Expand All @@ -129,6 +134,7 @@ def launch_local(in_name, out_name, script_path, max_input=None,
:param in_name: Input path (string or list of strings) or Iterator of (key, value). If it is an iterator then no input is taken from HDFS.
:param out_name: Output path or None. If None then output is not placed on HDFS, it is available through the 'output' key of the return value.
:param script_path: Path to the script (e.g., script.py)
:param poll: If not None, then only attempt to get a kv pair from kvs if when called, poll returns True.
:param max_input: Maximum number of Mapper inputs, None (default) then unlimited.
:param files: Extra files (other than the script) (iterator). NOTE: Hadoop copies the files into working directory
:param cmdenvs: Extra cmdenv parameters (iterator)
Expand Down Expand Up @@ -156,7 +162,7 @@ def launch_local(in_name, out_name, script_path, max_input=None,
in_kvs = in_name
if 'reduce' in script_info['tasks']:
kvs = list(LocalTask(script_path, 'map', files, max_input, pipe,
python_cmd, remove_tempdir).run_task(in_kvs, cmdenvs))
python_cmd, remove_tempdir).run_task(in_kvs, cmdenvs, poll))
if 'combine' in script_info['tasks']:
kvs = hadoopy.Test.sort_kv(kvs)
kvs = list(LocalTask(script_path, 'combine', files, max_input, pipe,
Expand All @@ -166,7 +172,7 @@ def launch_local(in_name, out_name, script_path, max_input=None,
python_cmd, remove_tempdir).run_task(kvs, cmdenvs)
else:
kvs = LocalTask(script_path, 'map', files, max_input, pipe,
python_cmd, remove_tempdir).run_task(in_kvs, cmdenvs)
python_cmd, remove_tempdir).run_task(in_kvs, cmdenvs, poll)
out = {}
if out_name is not None:
hadoopy.writetb(out_name, kvs)
Expand Down
3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ def _remove_prefix(string, prefix='hadoopy/'):
ext_modules = [Extension("_hadoopy_main", ["hadoopy/_main" + source_ext,
"hadoopy/getdelim.c"]),
Extension("_hadoopy_typedbytes", ["hadoopy/_typedbytes" + source_ext],
extra_compile_args=tb_extra_args),
Extension("_hadoopy_local", ["hadoopy/_local" + source_ext])]
extra_compile_args=tb_extra_args)]
setup(name='hadoopy',
cmdclass=cmdclass,
version='0.6.0',
Expand Down

0 comments on commit 94a2642

Please sign in to comment.