diff --git a/hadoopy/_hdfs.py b/hadoopy/_hdfs.py index fd08f14..95b9a98 100755 --- a/hadoopy/_hdfs.py +++ b/hadoopy/_hdfs.py @@ -33,9 +33,7 @@ def _cleaned_hadoop_stderr(hdfs_stderr): yield line -def _hadoop_fs_command(cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE, java_mem_mb=100): - env = dict(os.environ) - env['HADOOP_OPTS'] = "-Xmx%dm" % java_mem_mb +def _hadoop_fs_command(cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE): p = subprocess.Popen(cmd, env=env, shell=True, close_fds=True, stdin=stdin, stdout=stdout, @@ -226,19 +224,18 @@ def ls(path): return out -def writetb(path, kvs, java_mem_mb=256): +def writetb(path, kvs): """Write typedbytes sequence file to HDFS given an iterator of KeyValue pairs :param path: HDFS path (string) :param kvs: Iterator of (key, value) - :param java_mem_mb: Integer of java heap size in MB (default 256) :raises: IOError: An error occurred while saving the data. """ read_fd, write_fd = os.pipe() read_fp = os.fdopen(read_fd, 'r') hstreaming = _find_hstreaming() cmd = 'hadoop jar %s loadtb %s' % (hstreaming, path) - p = _hadoop_fs_command(cmd, stdin=read_fp, java_mem_mb=java_mem_mb) + p = _hadoop_fs_command(cmd, stdin=read_fp) read_fp.close() with hadoopy.TypedBytesFile(write_fd=write_fd) as tb_fp: for kv in kvs: @@ -262,7 +259,6 @@ def writetb_parts(path, kvs, num_per_file, **kw): :param path: HDFS path (string) :param kvs: Iterator of (key, value) :param num_per_file: Max # of kv pairs per file. - :param java_mem_mb: Integer of java heap size in MB (default 256) :raises: IOError: An error occurred while saving the data. """ out = [] @@ -279,7 +275,7 @@ def _flush(out, part_num): out, part_num = _flush(out, part_num) -def readtb(paths, num_procs=1, java_mem_mb=256, ignore_logs=True): +def readtb(paths, num_procs=1, ignore_logs=True): """Read typedbytes sequence files on HDFS (with optional compression). By default, ignores files who's names start with an underscore '_' as they @@ -290,7 +286,6 @@ def readtb(paths, num_procs=1, java_mem_mb=256, ignore_logs=True): :param paths: HDFS path (str) or paths (iterator) :param num_procs: Number of reading procs to open (default 1) - :param java_mem_mb: Integer of java heap size in MB (default 256) :param ignore_logs: If True, ignore all files who's name starts with an underscore. Defaults to True. :returns: An iterator of key, value pairs. :raises: IOError: An error occurred reading the directory (e.g., not available). @@ -307,7 +302,7 @@ def _open_tb(cur_path): cmd = 'hadoop jar %s dumptb %s' % (hstreaming, cur_path) read_fd, write_fd = os.pipe() write_fp = os.fdopen(write_fd, 'w') - p = _hadoop_fs_command(cmd, stdout=write_fp, java_mem_mb=java_mem_mb) + p = _hadoop_fs_command(cmd, stdout=write_fp) write_fp.close() read_fds.add(read_fd) procs[read_fd] = p