From 96668a16e92b96b6cfffd3b529f8a9eb6fe80f59 Mon Sep 17 00:00:00 2001 From: Michal Stolarczyk Date: Wed, 17 Oct 2018 15:21:34 -0400 Subject: [PATCH 01/34] Add function that handles the shell pipes. Any failure in the shell pipe will cause the pipeline to stop. Resolves #22 --- pypiper/manager.py | 167 ++++++++++++++++++++++++++++----------------- pypiper/utils.py | 9 +++ 2 files changed, 115 insertions(+), 61 deletions(-) diff --git a/pypiper/manager.py b/pypiper/manager.py index 35b3ed08..342232cf 100755 --- a/pypiper/manager.py +++ b/pypiper/manager.py @@ -25,7 +25,7 @@ from .exceptions import PipelineHalt, SubprocessError from .flags import * from .utils import \ - check_shell, checkpoint_filepath, clear_flags, flag_name, make_lock_name, \ + check_shell, check_pipes, checkpoint_filepath, clear_flags, flag_name, make_lock_name, \ pipeline_filepath, CHECKPOINT_SPECIFICATIONS from ._version import __version__ import __main__ @@ -807,6 +807,7 @@ def callprint(self, cmd, shell="guess", nofail=False, container=None, lock_name= likely_shell = check_shell(cmd) + pipes = check_pipes(cmd) if shell == "guess": shell = likely_shell @@ -825,78 +826,122 @@ def callprint(self, cmd, shell="guess", nofail=False, container=None, lock_name= returncode = -1 # set default return values for failed command local_maxmem = -1 - try: - # Capture the subprocess output in
 tags to make it format nicely
-            # if the markdown log file is displayed as HTML.
+        def shell_pipeline(args):
+            '''
+            Takes a list of dict(s), each with the same
+            parameters passed to psutil.Popen().
+
+            Spawns the processes connecting the stdout of every process with the
+            stdin of the next process. Waits for all the processes to finish and records their PIDs,
+            returncodes and measures the duration of all the processes combined.
+            '''
+            processes = [psutil.Popen(**args[0])]
+            start_time = time.time()
+            for i in range(1, len(args)):
+                args[i]["stdin"] = processes[i - 1].stdout
+                processes.append(psutil.Popen(**args[i]))
+                processes[i - 1].stdout.close()
+            pids = [x.pid for x in processes]
+            retcodes = [0] * len(processes)
+            while processes:
+                proc = processes.pop(-1)
+                retcodes[len(processes)] = proc.wait()
+                duration = str(datetime.timedelta(seconds=self.time_elapsed(start_time)))
+            return retcodes, duration, pids
+
+        if shell and pipes:
             print("
")
-            p = psutil.Popen(cmd, shell=shell)
-            # Keep track of the running process ID in case we need to kill it
-            # when the pipeline is interrupted.
-            self.procs[p.pid] = {
-                "proc_name":proc_name,
-                "start_time":time.time(),
-                "pre_block":True,
-                "container":container,
-                "p": p}
-
-            def check_me(proc, sleeptime):
-                try:
-                    proc.wait(timeout=sleeptime)
-                except psutil.TimeoutExpired:
-                    return True
-                return False
+            split_cmds = cmd.split('|')
+            args_count = len(split_cmds)
+            process_list = [0] * args_count
+            for i in range(args_count):
+                process_list[i] = dict(args=split_cmds[i].strip(), stdout=subprocess.PIPE, shell=True)
+            retcodes, duration, pids = shell_pipeline(process_list)
+            if max(retcodes) != 0:
+                msg = "One of the shell pipe subprocesses returned nonzero result. Check above output for details"
+                self._triage_error(SubprocessError(msg))
+            # return a list of the greatest returncode (catch the error code if any)
+            # and memory usage (0 since it's not supported yet)
+            info = "\nProcesses in the shell pipe " + str(pids) + " returned: (" + str(retcodes) + ")."
+            info += " Elapsed: " + str(duration) + ".\n"
+            print("
") + print(info) + return [max(retcodes), 0] + + else: + try: + # Capture the subprocess output in
 tags to make it format nicely
+                # if the markdown log file is displayed as HTML.
+                print("
")
+
+                p = psutil.Popen(cmd, shell=shell)
+                # Keep track of the running process ID in case we need to kill it
+                # when the pipeline is interrupted.
+                self.procs[p.pid] = {
+                    "proc_name":proc_name,
+                    "start_time":time.time(),
+                    "pre_block":True,
+                    "container":container,
+                    "p": p}
+
+                def check_me(proc, sleeptime):
+                    try:
+                        proc.wait(timeout=sleeptime)
+                    except psutil.TimeoutExpired:
+                        return True
+                    return False
 
-            sleeptime = .25
+                sleeptime = .25
 
-            if not self.wait:
-                print("
") - print ("Not waiting for subprocess: " + str(p.pid)) - return [0, -1] + if not self.wait: + print("
") + print ("Not waiting for subprocess: " + str(p.pid)) + return [0, -1] - - # This will wait on the process and break out as soon as the process - # returns, but every so often will break out of the wait to check - # the memory use and record a memory high water mark - while check_me(p, sleeptime): + # This will wait on the process and break out as soon as the process + # returns, but every so often will break out of the wait to check + # the memory use and record a memory high water mark + + while check_me(p, sleeptime): + if not shell: + local_maxmem = max(local_maxmem, self._memory_usage(p.pid, container=container)/1e6) + sleeptime = min(sleeptime + 5, 60) + + returncode = p.returncode + info = "Process " + str(p.pid) + " returned: (" + str(p.returncode) + ")." + info += " Elapsed: " + str(datetime.timedelta(seconds=self.time_elapsed(self.procs[p.pid]["start_time"]))) + "." if not shell: - local_maxmem = max(local_maxmem, self._memory_usage(p.pid, container=container)/1e6) - sleeptime = min(sleeptime + 5, 60) + info += " Peak memory: (Process: " + str(round(local_maxmem, 3)) + "GB;" + info += " Pipeline: " + str(round(self.peak_memory, 3)) + "GB)" + # Close the preformat tag for markdown output + print("
") + print(info) + # set self.maxmem + self.peak_memory = max(self.peak_memory, local_maxmem) - returncode = p.returncode - info = "Process " + str(p.pid) + " returned: (" + str(p.returncode) + ")." - info += " Elapsed: " + str(datetime.timedelta(seconds=self.time_elapsed(self.procs[p.pid]["start_time"]))) + "." - if not shell: - info += " Peak memory: (Process: " + str(round(local_maxmem, 3)) + "GB;" - info += " Pipeline: " + str(round(self.peak_memory, 3)) + "GB)" - # Close the preformat tag for markdown output - print("") - print(info) - # set self.maxmem - self.peak_memory = max(self.peak_memory, local_maxmem) + # report process profile + self._report_profile(self.procs[p.pid]["proc_name"], lock_name, time.time() - self.procs[p.pid]["start_time"], local_maxmem) - # report process profile - self._report_profile(self.procs[p.pid]["proc_name"], lock_name, time.time() - self.procs[p.pid]["start_time"], local_maxmem) - - # Remove this as a running subprocess - del self.procs[p.pid] + # Remove this as a running subprocess + del self.procs[p.pid] - if p.returncode != 0: - msg = "Subprocess returned nonzero result. Check above output for details" - self._triage_error(SubprocessError(msg), nofail) + if p.returncode != 0: + msg = "Subprocess returned nonzero result. Check above output for details" + self._triage_error(SubprocessError(msg), nofail) - except OSError as e: - print(e) - if (e.args[0] == 2): - errmsg = "Check to make sure you have '{}' installed.".format(cmd[0]) - else: - errmsg = str(e) - print(errmsg) - print("") - self._triage_error(OSError(errmsg), nofail) + except OSError as e: + print(e) + if (e.args[0] == 2): + errmsg = "Check to make sure you have '{}' installed.".format(cmd[0]) + else: + errmsg = str(e) + print(errmsg) + print("") + self._triage_error(OSError(errmsg), nofail) - return [returncode, local_maxmem] + return [returncode, local_maxmem] ################################### diff --git a/pypiper/utils.py b/pypiper/utils.py index 1e66e8e1..eb7cc674 100644 --- a/pypiper/utils.py +++ b/pypiper/utils.py @@ -212,6 +212,15 @@ def check_shell(cmd): return "|" in cmd or ">" in cmd or r"*" in cmd +def check_pipes(cmd): + """ + Determine whether a command appears to contain shell pipes . + + :param str cmd: Command to investigate. + :return bool: Whether the command appears to contain shell pipes. + """ + return "|" in cmd + def clear_flags(pm, flag_names=None): """ From 93aee197a3e999db15a33f888ddc67b20972b970 Mon Sep 17 00:00:00 2001 From: Michal Stolarczyk Date: Mon, 22 Oct 2018 16:16:25 -0400 Subject: [PATCH 02/34] Add shell redirection and piping These two support the memory usage and time tracking for all the sub commands in pipes. Need to add '*' support --- pypiper/manager.py | 198 +++++++++++++++++---------------------------- pypiper/utils.py | 87 +++++++++++--------- 2 files changed, 121 insertions(+), 164 deletions(-) diff --git a/pypiper/manager.py b/pypiper/manager.py index 342232cf..835e3344 100755 --- a/pypiper/manager.py +++ b/pypiper/manager.py @@ -25,8 +25,8 @@ from .exceptions import PipelineHalt, SubprocessError from .flags import * from .utils import \ - check_shell, check_pipes, checkpoint_filepath, clear_flags, flag_name, make_lock_name, \ - pipeline_filepath, CHECKPOINT_SPECIFICATIONS + check_shell, check_shell_pipes, checkpoint_filepath, clear_flags, flag_name, make_lock_name, \ + pipeline_filepath, CHECKPOINT_SPECIFICATIONS, check_shell_aterisk, check_shell_redirection from ._version import __version__ import __main__ @@ -34,11 +34,9 @@ __all__ = ["PipelineManager"] - LOCK_PREFIX = "lock." - class PipelineManager(object): """ Base class for instantiating a PipelineManager object, @@ -185,19 +183,19 @@ def __init__( self.pipeline_log_file = pipeline_filepath(self, suffix="_log.md") self.pipeline_profile_file = \ - pipeline_filepath(self, suffix="_profile.tsv") + pipeline_filepath(self, suffix="_profile.tsv") # Stats and figures are general and so lack the pipeline name. self.pipeline_stats_file = \ - pipeline_filepath(self, filename="stats.tsv") + pipeline_filepath(self, filename="stats.tsv") self.pipeline_figures_file = \ - pipeline_filepath(self, filename="figures.tsv") + pipeline_filepath(self, filename="figures.tsv") self.pipeline_objects_file = \ - pipeline_filepath(self, filename="objects.tsv") + pipeline_filepath(self, filename="objects.tsv") # Record commands used and provide manual cleanup script. self.pipeline_commands_file = \ - pipeline_filepath(self, suffix="_commands.sh") + pipeline_filepath(self, suffix="_commands.sh") self.cleanup_file = pipeline_filepath(self, suffix="_cleanup.sh") # Pipeline status variables @@ -797,151 +795,103 @@ def callprint(self, cmd, shell="guess", nofail=False, container=None, lock_name= # if shell=False, then we format the command (with split()) to be a list of command and its arguments. # Split the command to use shell=False; # leave it together to use shell=True; + def check_me(proc, sleeptime): + try: + proc.wait(timeout=sleeptime) + except psutil.TimeoutExpired: + return True + return False if container: cmd = "docker exec " + container + " " + cmd self._report_command(cmd) # self.proc_name = cmd[0] + " " + cmd[1] - self.proc_name = "".join(cmd).split()[0] - proc_name = "".join(cmd).split()[0] - + # self.proc_name = "".join(cmd).split()[0] + # proc_name = "".join(cmd).split()[0] - likely_shell = check_shell(cmd) - pipes = check_pipes(cmd) - - if shell == "guess": - shell = likely_shell - if not shell: - if likely_shell: - print("Should this command run in a shell instead of directly in a subprocess?") - #cmd = cmd.split() - cmd = shlex.split(cmd) - # call(cmd, shell=shell) # old way (no memory profiling) - - # Try to execute the command: - # Putting it in a try block enables us to catch exceptions from bad subprocess - # commands, as well as from valid commands that just fail + if check_shell_pipes(cmd): + stdout = subprocess.PIPE + else: + stdout = None - returncode = -1 # set default return values for failed command - local_maxmem = -1 + stderr=None - def shell_pipeline(args): - ''' - Takes a list of dict(s), each with the same - parameters passed to psutil.Popen(). - - Spawns the processes connecting the stdout of every process with the - stdin of the next process. Waits for all the processes to finish and records their PIDs, - returncodes and measures the duration of all the processes combined. - ''' - processes = [psutil.Popen(**args[0])] - start_time = time.time() - for i in range(1, len(args)): - args[i]["stdin"] = processes[i - 1].stdout - processes.append(psutil.Popen(**args[i])) - processes[i - 1].stdout.close() - pids = [x.pid for x in processes] - retcodes = [0] * len(processes) - while processes: - proc = processes.pop(-1) - retcodes[len(processes)] = proc.wait() - duration = str(datetime.timedelta(seconds=self.time_elapsed(start_time))) - return retcodes, duration, pids - - if shell and pipes: - print("
")
+        if not check_shell_aterisk(cmd):
             split_cmds = cmd.split('|')
-            args_count = len(split_cmds)
-            process_list = [0] * args_count
-            for i in range(args_count):
-                process_list[i] = dict(args=split_cmds[i].strip(), stdout=subprocess.PIPE, shell=True)
-            retcodes, duration, pids = shell_pipeline(process_list)
-            if max(retcodes) != 0:
-                msg = "One of the shell pipe subprocesses returned nonzero result. Check above output for details"
-                self._triage_error(SubprocessError(msg))
-            # return a list of the greatest returncode (catch the error code if any)
-            # and memory usage (0 since it's not supported yet)
-            info = "\nProcesses in the shell pipe " + str(pids) + " returned: (" + str(retcodes) + ")."
-            info += " Elapsed: " + str(duration) + ".\n"
-            print("
") - print(info) - return [max(retcodes), 0] - - else: - try: + cmds_count = len(split_cmds) + param_list = [0] * cmds_count + for i in range(cmds_count): + # shell redirection handling - no shell needed + if check_shell_redirection(split_cmds[i]): + if re.search(r'2>',split_cmds[i]): + split_cmds_redir_error = split_cmds[i].split('2>') + split_cmds[i] = split_cmds_redir_error[0].strip() + error_file_name = split_cmds_redir_error[1].strip() + error_file = open(error_file_name,"w") + stderr=error_file + split_cmds_redir = split_cmds[i].split('>') + split_cmds[i] = split_cmds_redir[0].strip() + file_name = split_cmds_redir[1].strip() + file = open(file_name,"w") + split_cmds[i]=split_cmds[i].replace("(","").replace(")","") + param_list[i] = dict(args=shlex.split(split_cmds[i]),stdout=file, stderr=stderr, shell=False) + else: + split_cmds[i]=split_cmds[i].replace("(","").replace(")","") + param_list[i] = dict(args=shlex.split(split_cmds[i]),stdout=stdout, stderr=stderr, shell=False) + + + returncode = -1 # set default return values for failed command + start_times = [] + stop_times = [] + processes = [] + local_maxmems = [] + returncodes = [] + for i in range(len(param_list)): + start_times.append(time.time()) + if i==0: + processes.append(psutil.Popen(**param_list[i])) + else: + param_list[i]["stdin"] = processes[i - 1].stdout + processes.append(psutil.Popen(**param_list[i])) # Capture the subprocess output in
 tags to make it format nicely
                 # if the markdown log file is displayed as HTML.
                 print("
")
 
-                p = psutil.Popen(cmd, shell=shell)
-                # Keep track of the running process ID in case we need to kill it
-                # when the pipeline is interrupted.
-                self.procs[p.pid] = {
-                    "proc_name":proc_name,
-                    "start_time":time.time(),
-                    "pre_block":True,
-                    "container":container,
-                    "p": p}
-
-                def check_me(proc, sleeptime):
-                    try:
-                        proc.wait(timeout=sleeptime)
-                    except psutil.TimeoutExpired:
-                        return True
-                    return False
-
                 sleeptime = .25
 
-                if not self.wait:
-                    print("
") - print ("Not waiting for subprocess: " + str(p.pid)) - return [0, -1] - - # This will wait on the process and break out as soon as the process # returns, but every so often will break out of the wait to check # the memory use and record a memory high water mark - - while check_me(p, sleeptime): - if not shell: - local_maxmem = max(local_maxmem, self._memory_usage(p.pid, container=container)/1e6) + for i in range(len(param_list)): + local_maxmem = -1 + while check_me(processes[i], sleeptime): + local_maxmem = max(local_maxmem, self._memory_usage(processes[i].pid, container=container)/1e6) sleeptime = min(sleeptime + 5, 60) - returncode = p.returncode - info = "Process " + str(p.pid) + " returned: (" + str(p.returncode) + ")." - info += " Elapsed: " + str(datetime.timedelta(seconds=self.time_elapsed(self.procs[p.pid]["start_time"]))) + "." - if not shell: - info += " Peak memory: (Process: " + str(round(local_maxmem, 3)) + "GB;" - info += " Pipeline: " + str(round(self.peak_memory, 3)) + "GB)" + stop_times.append(time.time()) + returncode = processes[i].returncode + info = "Process " + str(processes[i].pid) + " returned: (" + str(processes[i].returncode) + ")." + if i>0: + info += " Elapsed: " + str(datetime.timedelta(seconds=stop_times[i]-stop_times[i-1])) + "." + else: + info += " Elapsed: " + str(datetime.timedelta(seconds=self.time_elapsed(start_times[i]))) + "." + # if not shell: + info += " Peak memory: (Process: " + str(round(local_maxmem, 3)) + "GB;" + info += " Pipeline: " + str(round(self.peak_memory, 3)) + "GB)" # Close the preformat tag for markdown output print("
") print(info) - # set self.maxmem self.peak_memory = max(self.peak_memory, local_maxmem) - # report process profile - self._report_profile(self.procs[p.pid]["proc_name"], lock_name, time.time() - self.procs[p.pid]["start_time"], local_maxmem) - - # Remove this as a running subprocess - del self.procs[p.pid] - - - if p.returncode != 0: + if returncode != 0: msg = "Subprocess returned nonzero result. Check above output for details" self._triage_error(SubprocessError(msg), nofail) - except OSError as e: - print(e) - if (e.args[0] == 2): - errmsg = "Check to make sure you have '{}' installed.".format(cmd[0]) - else: - errmsg = str(e) - print(errmsg) - print("") - self._triage_error(OSError(errmsg), nofail) + returncodes.append(returncode) + local_maxmems.append(local_maxmem) - return [returncode, local_maxmem] + return [returncodes, local_maxmems] ################################### diff --git a/pypiper/utils.py b/pypiper/utils.py index eb7cc674..52ada23d 100644 --- a/pypiper/utils.py +++ b/pypiper/utils.py @@ -3,6 +3,7 @@ from collections import Iterable import os import sys +import re from .const import \ CHECKPOINT_EXTENSION, PIPELINE_CHECKPOINT_DELIMITER, \ @@ -20,11 +21,9 @@ __all__ = ["add_pypiper_args", "build_command", "get_first_value"] - CHECKPOINT_SPECIFICATIONS = ["start_point", "stop_before", "stop_after"] - def add_pypiper_args(parser, groups=("pypiper", ), args=None, required=None, all_args=False): """ @@ -56,7 +55,6 @@ def add_pypiper_args(parser, groups=("pypiper", ), args=None, return parser - def build_command(chunks): """ Create a command from various parts. @@ -76,7 +74,8 @@ def build_command(chunks): """ if not chunks: - raise ValueError("No command parts: {} ({})".format(chunks, type(chunks))) + raise ValueError( + "No command parts: {} ({})".format(chunks, type(chunks))) if isinstance(chunks, str): return chunks @@ -101,7 +100,6 @@ def build_command(chunks): return " ".join(parsed_pieces) - def build_sample_paths(sample): """ Ensure existence of folders for a Sample. @@ -119,7 +117,6 @@ def build_sample_paths(sample): os.makedirs(base) - def checkpoint_filename(checkpoint, pipeline_name=None): """ Translate a checkpoint to a filename. @@ -148,11 +145,10 @@ def checkpoint_filename(checkpoint, pipeline_name=None): base = translate_stage_name(checkpoint) if pipeline_name: base = "{}{}{}".format( - pipeline_name, PIPELINE_CHECKPOINT_DELIMITER, base) + pipeline_name, PIPELINE_CHECKPOINT_DELIMITER, base) return base + CHECKPOINT_EXTENSION - def checkpoint_filepath(checkpoint, pm): """ Create filepath for indicated checkpoint. @@ -201,7 +197,6 @@ def checkpoint_filepath(checkpoint, pm): return pipeline_filepath(pm, filename=chkpt_name) - def check_shell(cmd): """ Determine whether a command appears to involve shell process(es). @@ -212,9 +207,19 @@ def check_shell(cmd): return "|" in cmd or ">" in cmd or r"*" in cmd -def check_pipes(cmd): +def check_shell_aterisk(cmd): + """ + Determine whether a command appears to involve shell stars. + + :param str cmd: Command to investigate. + :return bool: Whether the command appears to involve shell stars. + """ + return r"*" in cmd + + +def check_shell_pipes(cmd): """ - Determine whether a command appears to contain shell pipes . + Determine whether a command appears to contain shell pipes. :param str cmd: Command to investigate. :return bool: Whether the command appears to contain shell pipes. @@ -222,6 +227,25 @@ def check_pipes(cmd): return "|" in cmd +def check_shell_redirection(cmd): + """ + Determine whether a command appears to contain shell redirection symbol outsite of curlt brackets + + :param str cmd: Command to investigate. + :return bool: Whether the command appears to contain shell redirection. + """ + curly_brackets = True + while curly_brackets: + SRE_match_obj = re.search(r'\{(.*?)}',cmd) + if SRE_match_obj is not None: + cmd = cmd[:SRE_match_obj.start()] + cmd[(SRE_match_obj.end()+1):] + if re.search(r'\{(.*?)}',cmd) is None: + curly_brackets = False + else: + curly_brackets = False + return ">" in cmd + + def clear_flags(pm, flag_names=None): """ @@ -255,7 +279,6 @@ def clear_flags(pm, flag_names=None): return removed - def flag_name(status): """ Determine the name for a flag file of the status indicated. @@ -268,7 +291,6 @@ def flag_name(status): return status + ".flag" - def get_first_value(param, param_pools, on_missing=None, error=True): """ Get the value for a particular parameter from the first pool in the provided @@ -316,7 +338,6 @@ def get_first_value(param, param_pools, on_missing=None, error=True): return on_missing - def is_in_file_tree(fpath, folder): """ Determine whether a file is in a folder. @@ -330,7 +351,6 @@ def is_in_file_tree(fpath, folder): return other_folder.startswith(file_folder) - def is_fastq(file_name): """ Determine whether indicated file appears to be in FASTQ format. @@ -344,7 +364,6 @@ def is_fastq(file_name): return is_unzipped_fastq(file_name) or is_gzipped_fastq(file_name) - def is_gzipped_fastq(file_name): """ Determine whether indicated file appears to be a gzipped FASTQ. @@ -358,7 +377,6 @@ def is_gzipped_fastq(file_name): return file_name.endswith(".fastq.gz") or file_name.endswith(".fq.gz") - def is_unzipped_fastq(file_name): """ Determine whether indicated file appears to be an unzipped FASTQ. @@ -372,7 +390,6 @@ def is_unzipped_fastq(file_name): return ext in [".fastq", ".fq"] - def is_sam_or_bam(file_name): """ Determine whether a file appears to be in a SAM format. @@ -386,7 +403,6 @@ def is_sam_or_bam(file_name): return ext in [".bam", ".sam"] - def make_lock_name(original_path, path_base_folder): """ Create name for lock file from an absolute path. @@ -405,8 +421,6 @@ def make_lock_name(original_path, path_base_folder): return original_path.replace(path_base_folder, "").replace(os.sep, "__") - - def parse_cores(cores, pm, default): """ Framework to finalize number of cores for an operation. @@ -431,7 +445,6 @@ def parse_cores(cores, pm, default): return int(cores) - def parse_stage_name(stage): """ Determine the name of a stage. @@ -455,7 +468,6 @@ def parse_stage_name(stage): raise TypeError("Unsupported stage type: {}".format(type(stage))) - def pipeline_filepath(pm, filename=None, suffix=None): """ Derive path to file for managed pipeline. @@ -487,8 +499,7 @@ def pipeline_filepath(pm, filename=None, suffix=None): # In fact, a Pipeline just references its manager's outfolder. # So we can handle argument of either type to pm parameter. return filename if os.path.isabs(filename) \ - else os.path.join(pm.outfolder, filename) - + else os.path.join(pm.outfolder, filename) def translate_stage_name(stage): @@ -510,7 +521,6 @@ def translate_stage_name(stage): return str(name).lower().replace(" ", STAGE_NAME_SPACE_REPLACEMENT) - # TODO: implement as context manager. class Tee(object): def __init__(self, log_file): @@ -532,7 +542,7 @@ def fileno(self): return self.stdout.fileno() -def uniqify(seq): # Dave Kirby +def uniqify(seq): # Dave Kirby # Order preserving seen = set() return [x for x in seq if x not in seen and not seen.add(x)] @@ -560,13 +570,13 @@ def _determine_args(argument_groups, arguments, use_all_args=False): # Define the argument groups. args_by_group = { - "pypiper" : ["recover", "new-start", "dirty", "force-follow"], - "config" : ["config"], - "checkpoint" : ["stop-before", "stop-after"], - "resource" : ["mem", "cores"], - "looper" : ["config", "output-parent", "mem", "cores"], - "common" : ["input", "sample-name"], - "ngs" : ["sample-name", "input", "input2", "genome", "single-or-paired"] + "pypiper": ["recover", "new-start", "dirty", "force-follow"], + "config": ["config"], + "checkpoint": ["stop-before", "stop-after"], + "resource": ["mem", "cores"], + "looper": ["config", "output-parent", "mem", "cores"], + "common": ["input", "sample-name"], + "ngs": ["sample-name", "input", "input2", "genome", "single-or-paired"] } # Handle various types of group specifications. @@ -601,12 +611,9 @@ def _determine_args(argument_groups, arguments, use_all_args=False): elif arguments: raise TypeError("arguments must be a str or a list.") - - return uniqify(final_args) - def _add_args(parser, args, required): """ Add new arguments to an ArgumentParser. @@ -701,9 +708,9 @@ def _add_args(parser, args, required): short_opt, argdata = argdata except ValueError: raise TypeError( - "Option name must map to dict or two-tuple (short " - "name and dict) of argument command-line argument " - "specification data.") + "Option name must map to dict or two-tuple (short " + "name and dict) of argument command-line argument " + "specification data.") argdata["required"] = arg in required From bcdd182e25510afc7c77c226ccd5b5d0d2db3e75 Mon Sep 17 00:00:00 2001 From: nsheff Date: Thu, 25 Oct 2018 14:30:25 -0400 Subject: [PATCH 03/34] change order of exit handler registration. Fix #91 --- pypiper/_version.py | 2 +- pypiper/manager.py | 24 +++++++++++++++++------- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/pypiper/_version.py b/pypiper/_version.py index 8088f751..baadd088 100644 --- a/pypiper/_version.py +++ b/pypiper/_version.py @@ -1 +1 @@ -__version__ = "0.8.1" +__version__ = "0.8.1-dev" diff --git a/pypiper/manager.py b/pypiper/manager.py index 35b3ed08..455b9de7 100755 --- a/pypiper/manager.py +++ b/pypiper/manager.py @@ -219,11 +219,7 @@ def __init__( # In-memory holder for report_result self.stats_dict = {} - # Register handler functions to deal with interrupt and termination signals; - # If received, we would then clean up properly (set pipeline status to FAIL, etc). - atexit.register(self._exit_handler) - signal.signal(signal.SIGINT, self._signal_int_handler) - signal.signal(signal.SIGTERM, self._signal_term_handler) + # Checkpoint-related parameters self.overwrite_checkpoints = overwrite_checkpoints @@ -234,6 +230,13 @@ def __init__( # Pypiper can keep track of intermediate files to clean up at the end self.cleanup_list = [] self.cleanup_list_conditional = [] + + + # Register handler functions to deal with interrupt and termination signals; + # If received, we would then clean up properly (set pipeline status to FAIL, etc). + signal.signal(signal.SIGINT, self._signal_int_handler) + signal.signal(signal.SIGTERM, self._signal_term_handler) + self.start_pipeline(args, multi) # Handle config file if it exists @@ -267,6 +270,7 @@ def __init__( pass if config_to_load is not None: print("Using custom config file: {}".format(config_to_load)) + print("test") else: # No custom config file specified. Check for default pipe_path_base, _ = os.path.splitext(os.path.basename(sys.argv[0])) @@ -395,6 +399,10 @@ def start_pipeline(self, args=None, multi=False): os.dup2(tee.stdin.fileno(), sys.stdout.fileno()) os.dup2(tee.stdin.fileno(), sys.stderr.fileno()) + # For some reason, this exit handler function MUST be registered after + # the one that kills the tee process. + atexit.register(self._exit_handler) + # A future possibility to avoid this tee, is to use a Tee class; this works for anything printed here # by pypiper, but can't tee the subprocess output. For this, it would require using threading to # simultaneously capture and display subprocess output. I shelve this for now and stick with the tee option. @@ -1657,6 +1665,7 @@ def _exit_handler(self): # TODO (cont.): order of interpreter vs. subprocess shutdown signal receipt. # TODO (cont.): see https://bugs.python.org/issue11380 + # Make the cleanup file executable if it exists if os.path.isfile(self.cleanup_file): # Make the cleanup file self destruct. @@ -1669,8 +1678,9 @@ def _exit_handler(self): if not self.has_exit_status: print("Pipeline status: {}".format(self.status)) - self.fail_pipeline(Exception("Unknown exit failure")) - + self.fail_pipeline(Exception("Pipeline failure. See details above.")) + else: + print("Pipeline has already exited. Status: {}".format(self.status)) def _terminate_running_subprocesses(self): From c57c130b7c4b0d70a6fddd4b266eb9954adbfa76 Mon Sep 17 00:00:00 2001 From: nsheff Date: Thu, 25 Oct 2018 14:57:41 -0400 Subject: [PATCH 04/34] Remove extra info on failure --- pypiper/manager.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pypiper/manager.py b/pypiper/manager.py index 55df1054..68cca02e 100755 --- a/pypiper/manager.py +++ b/pypiper/manager.py @@ -1674,8 +1674,6 @@ def _exit_handler(self): if not self.has_exit_status: print("Pipeline status: {}".format(self.status)) self.fail_pipeline(Exception("Pipeline failure. See details above.")) - else: - print("Pipeline has already exited. Status: {}".format(self.status)) def _terminate_running_subprocesses(self): From c3dda6518171521c0da38565806ad61e4d0d87b9 Mon Sep 17 00:00:00 2001 From: Michal Stolarczyk Date: Thu, 25 Oct 2018 15:54:39 -0400 Subject: [PATCH 05/34] add support for shell pipes only, use psutil to monitor RSS memory, see #22 and #27 The command is devided by pipes and then each element is evaluated by check_shell if there are any other shell indicators (like >). If so, this part of the original shell pipe is run in the shell. Otherwise as python pipe which causes the whole pypiper pipeline to stop if this shell pipe element fails. Needs testing! --- pypiper/manager.py | 140 ++++++++++++++++++++------------------------- pypiper/utils.py | 2 +- 2 files changed, 63 insertions(+), 79 deletions(-) diff --git a/pypiper/manager.py b/pypiper/manager.py index 835e3344..cf6461c2 100755 --- a/pypiper/manager.py +++ b/pypiper/manager.py @@ -26,7 +26,7 @@ from .flags import * from .utils import \ check_shell, check_shell_pipes, checkpoint_filepath, clear_flags, flag_name, make_lock_name, \ - pipeline_filepath, CHECKPOINT_SPECIFICATIONS, check_shell_aterisk, check_shell_redirection + pipeline_filepath, CHECKPOINT_SPECIFICATIONS, check_shell_asterisk, check_shell_redirection from ._version import __version__ import __main__ @@ -805,91 +805,75 @@ def check_me(proc, sleeptime): if container: cmd = "docker exec " + container + " " + cmd self._report_command(cmd) - # self.proc_name = cmd[0] + " " + cmd[1] - # self.proc_name = "".join(cmd).split()[0] - # proc_name = "".join(cmd).split()[0] - + param_list=[] if check_shell_pipes(cmd): + shell = False stdout = subprocess.PIPE - else: - stdout = None - - stderr=None - - if not check_shell_aterisk(cmd): split_cmds = cmd.split('|') cmds_count = len(split_cmds) - param_list = [0] * cmds_count for i in range(cmds_count): - # shell redirection handling - no shell needed - if check_shell_redirection(split_cmds[i]): - if re.search(r'2>',split_cmds[i]): - split_cmds_redir_error = split_cmds[i].split('2>') - split_cmds[i] = split_cmds_redir_error[0].strip() - error_file_name = split_cmds_redir_error[1].strip() - error_file = open(error_file_name,"w") - stderr=error_file - split_cmds_redir = split_cmds[i].split('>') - split_cmds[i] = split_cmds_redir[0].strip() - file_name = split_cmds_redir[1].strip() - file = open(file_name,"w") - split_cmds[i]=split_cmds[i].replace("(","").replace(")","") - param_list[i] = dict(args=shlex.split(split_cmds[i]),stdout=file, stderr=stderr, shell=False) - else: - split_cmds[i]=split_cmds[i].replace("(","").replace(")","") - param_list[i] = dict(args=shlex.split(split_cmds[i]),stdout=stdout, stderr=stderr, shell=False) - - - returncode = -1 # set default return values for failed command - start_times = [] - stop_times = [] - processes = [] - local_maxmems = [] - returncodes = [] - for i in range(len(param_list)): - start_times.append(time.time()) - if i==0: - processes.append(psutil.Popen(**param_list[i])) - else: - param_list[i]["stdin"] = processes[i - 1].stdout - processes.append(psutil.Popen(**param_list[i])) - # Capture the subprocess output in
 tags to make it format nicely
-                # if the markdown log file is displayed as HTML.
-                print("
")
-
-                sleeptime = .25
-
-                # This will wait on the process and break out as soon as the process
-                # returns, but every so often will break out of the wait to check
-                # the memory use and record a memory high water mark
-            for i in range(len(param_list)):
-                local_maxmem = -1
-                while check_me(processes[i], sleeptime):
-                    local_maxmem = max(local_maxmem, self._memory_usage(processes[i].pid, container=container)/1e6)
-                    sleeptime = min(sleeptime + 5, 60)
-
-                stop_times.append(time.time())
-                returncode = processes[i].returncode
-                info = "Process " + str(processes[i].pid) + " returned: (" + str(processes[i].returncode) + ")."
-                if i>0:
-                    info += " Elapsed: " + str(datetime.timedelta(seconds=stop_times[i]-stop_times[i-1])) + "."
-                else:
-                    info += " Elapsed: " + str(datetime.timedelta(seconds=self.time_elapsed(start_times[i]))) + "."
-                # if not shell:
-                info += " Peak memory: (Process: " + str(round(local_maxmem, 3)) + "GB;"
-                info += " Pipeline: " + str(round(self.peak_memory, 3)) + "GB)"
-                # Close the preformat tag for markdown output
-                print("
") - print(info) - self.peak_memory = max(self.peak_memory, local_maxmem) + if check_shell(split_cmds[i]): + param_list.append(dict(args=split_cmds[i],stdout=stdout, shell=True)) + else: + param_list.append(dict(args=shlex.split(split_cmds[i]),stdout=stdout, shell=shell)) + else: + shell = True + stdout = None + param_list.append(dict(args=cmd,stdout=stdout,shell=shell)) + + + returncode = -1 # set default return values for failed command + start_times = [] + stop_times = [] + processes = [] + local_maxmems = [] + returncodes = [] + for i in range(len(param_list)): + start_times.append(time.time()) + if i == 0: + processes.append(psutil.Popen(**param_list[i])) + else: + param_list[i]["stdin"] = processes[i - 1].stdout + processes.append(psutil.Popen(**param_list[i])) + # Capture the subprocess output in
 tags to make it format nicely
+            # if the markdown log file is displayed as HTML.
+        print("
")
+
+        if not self.wait:
+            print("
") + ids = [x.pid for x in processes] + print ("Not waiting for subprocess(es): " + str(ids)) + return [0, -1] + for i in range(len(param_list)): + local_maxmem = -1 + sleeptime = .25 + while check_me(processes[i], sleeptime): + local_maxmem = max(local_maxmem, (processes[i].memory_info().rss)/1e9) + # local_maxmem = max(local_maxmem, self._memory_usage(processes[i].pid, container=container)/1e6) + sleeptime = min(sleeptime + 5, 60) + + stop_times.append(time.time()) + returncode = processes[i].returncode + info = "Process " + str(processes[i].pid) + " returned: (" + str(processes[i].returncode) + ")." + if i>0: + info += " Elapsed: " + str(datetime.timedelta(seconds=round(stop_times[i]-stop_times[i-1],0))) + "." + else: + info += " Elapsed: " + str(datetime.timedelta(seconds=self.time_elapsed(start_times[i]))) + "." + self.peak_memory = max(self.peak_memory, local_maxmem) + info += " Peak memory: (Process: " + str(round(local_maxmem, 3)) + "GB;" + info += " Pipeline: " + str(round(self.peak_memory, 3)) + "GB)" + # Close the preformat tag for markdown output + print("
") + print(info) + - if returncode != 0: - msg = "Subprocess returned nonzero result. Check above output for details" - self._triage_error(SubprocessError(msg), nofail) + if returncode != 0: + msg = "Subprocess returned nonzero result. Check above output for details" + self._triage_error(SubprocessError(msg), nofail) - returncodes.append(returncode) - local_maxmems.append(local_maxmem) + returncodes.append(returncode) + local_maxmems.append(local_maxmem) return [returncodes, local_maxmems] diff --git a/pypiper/utils.py b/pypiper/utils.py index 52ada23d..2f06fd1a 100644 --- a/pypiper/utils.py +++ b/pypiper/utils.py @@ -207,7 +207,7 @@ def check_shell(cmd): return "|" in cmd or ">" in cmd or r"*" in cmd -def check_shell_aterisk(cmd): +def check_shell_asterisk(cmd): """ Determine whether a command appears to involve shell stars. From 6004ce65e840206286aef798f95949b80e8f017a Mon Sep 17 00:00:00 2001 From: Nathan Sheffield Date: Tue, 30 Oct 2018 21:34:30 -0400 Subject: [PATCH 06/34] add sample-name as pypiper arg --- doc/source/develop-arguments.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/source/develop-arguments.rst b/doc/source/develop-arguments.rst index 88d5c8f6..28523c95 100644 --- a/doc/source/develop-arguments.rst +++ b/doc/source/develop-arguments.rst @@ -51,7 +51,7 @@ Individual arguments that are understood and used by pypiper: - ``-C, --config``: Pypiper pipeline config yaml file. Individual arguments just provided for convenience and standardization: - +- ``-S, --sample-name``: name of the sample - ``-I, --input``: primary input file (e.g. read1) - ``-I2, --input2``: secondary input file (e.g. read2) - ``-O, --output-parent``: parent folder for pipeline results (the pipeline will use this as the parent directory for a folder named ``sample-name``) @@ -100,4 +100,4 @@ Examples parser = pypiper.add_pypiper_args(parser, args=["genome"]) # add some groups and some individual arguments - parser = pypiper.add_pypiper_args(parser, args=["genome"], groups=["looper", "ngs"]) \ No newline at end of file + parser = pypiper.add_pypiper_args(parser, args=["genome"], groups=["looper", "ngs"]) From b69045bde58311a5370baddf51379b74e707d3e8 Mon Sep 17 00:00:00 2001 From: nsheff Date: Wed, 31 Oct 2018 08:57:18 -0400 Subject: [PATCH 07/34] switch from zcat for macos compabitibility. See #92 --- pypiper/ngstk.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pypiper/ngstk.py b/pypiper/ngstk.py index b98d8232..6f95a254 100755 --- a/pypiper/ngstk.py +++ b/pypiper/ngstk.py @@ -662,7 +662,7 @@ def count_lines_zip(self, file_name): For compressed files. :param file: file_name """ - x = subprocess.check_output("zcat " + file_name + " | wc -l | cut -f1 -d' '", shell=True) + x = subprocess.check_output("gunzip -c " + file_name + " | wc -l | cut -f1 -d' '", shell=True) return x def get_chrs_from_bam(self, file_name): From 15831f96db8a8eb2b70439413059ee1e142a412c Mon Sep 17 00:00:00 2001 From: Michal Stolarczyk Date: Fri, 2 Nov 2018 11:57:01 -0400 Subject: [PATCH 08/34] use psutil to track memory of the process and its children, change the sleeptime between checks to grow exponentially. Solves #27 --- pypiper/manager.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) mode change 100755 => 100644 pypiper/manager.py diff --git a/pypiper/manager.py b/pypiper/manager.py old mode 100755 new mode 100644 index 1826c850..4e0caa33 --- a/pypiper/manager.py +++ b/pypiper/manager.py @@ -810,6 +810,15 @@ def check_me(proc, sleeptime): return True return False + def get_mem_child_sum(proc): + # get children processes + children=proc.children(recursive=True) + # get RSS memory of each child proc and sum all + mem_sum=sum([x.memory_info().rss for x + in children]) + # return in gigs + return mem_sum/1e9 + if container: cmd = "docker exec " + container + " " + cmd self._report_command(cmd) @@ -855,11 +864,13 @@ def check_me(proc, sleeptime): return [0, -1] for i in range(len(param_list)): local_maxmem = -1 - sleeptime = .25 + sleeptime = .0001 while check_me(processes[i], sleeptime): - local_maxmem = max(local_maxmem, (processes[i].memory_info().rss)/1e9) - # local_maxmem = max(local_maxmem, self._memory_usage(processes[i].pid, container=container)/1e6) - sleeptime = min(sleeptime + 5, 60) + local_maxmem = max(local_maxmem, (get_mem_child_sum(processes[i]))) + # the sleeptime is extremely short at the beginning and gets longer exponentially + # (+ constant to prevent copious checks at the very beginning) + # = more precise mem tracing for short processes + sleeptime = min((sleeptime + 0.25) * 3 , 60) stop_times.append(time.time()) returncode = processes[i].returncode From f88f476d250f238f47fd448f940e29f1e76c868a Mon Sep 17 00:00:00 2001 From: Michal Stolarczyk Date: Mon, 5 Nov 2018 13:13:28 -0500 Subject: [PATCH 09/34] fix #95 make the memory message report None instead of -1 when the process ends before its memory usage is checked. The -1 is still returned. --- pypiper/manager.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/pypiper/manager.py b/pypiper/manager.py index 4e0caa33..c28a9906 100644 --- a/pypiper/manager.py +++ b/pypiper/manager.py @@ -880,8 +880,17 @@ def get_mem_child_sum(proc): else: info += " Elapsed: " + str(datetime.timedelta(seconds=self.time_elapsed(start_times[i]))) + "." self.peak_memory = max(self.peak_memory, local_maxmem) - info += " Peak memory: (Process: " + str(round(local_maxmem, 3)) + "GB;" - info += " Pipeline: " + str(round(self.peak_memory, 3)) + "GB)" + # Check if the local_maxmem and/or peak_memory are still -1 (in case the process ended before it was checked: < 0.0001s) + # this way of handling it does not cause any problems with comparisons/round. + # Also, -1 is used for comparison of the returned local_maxmems later on in run() + if local_maxmem < 0: + info += " Peak memory: (Process: None;" + else: + info += " Peak memory: (Process: " + str(round(local_maxmem, 3)) + "GB;" + if self.peak_memory < 0: + info += " Pipeline: None)" + else: + info += " Pipeline: " + str(round(self.peak_memory, 3)) + "GB)" # Close the preformat tag for markdown output print("
") print(info) From 4402e69d06732cb7eeec0ebe09a5c6e4d04ec59a Mon Sep 17 00:00:00 2001 From: Michal Stolarczyk Date: Mon, 5 Nov 2018 13:39:18 -0500 Subject: [PATCH 10/34] resolves #90 check for the dynamic recovery flag in the lock_file wait routine and stop waiting if spotted --- pypiper/manager.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pypiper/manager.py b/pypiper/manager.py index c28a9906..3f334109 100644 --- a/pypiper/manager.py +++ b/pypiper/manager.py @@ -944,7 +944,7 @@ def _wait_for_process(self, p, shell=False): def _wait_for_lock(self, lock_file): """ - Just sleep until the lock_file does not exist. + Just sleep until the lock_file does not exist or a lock_file-related dynamic recovery flag is spotted :param lock_file: Lock file to wait upon. :type lock_file: str @@ -952,6 +952,7 @@ def _wait_for_lock(self, lock_file): sleeptime = .5 first_message_flag = False dot_count = 0 + recover_file = self._recoverfile_from_lockfile(lock_file) while os.path.isfile(lock_file): if first_message_flag is False: self.timestamp("Waiting for file lock: " + lock_file) @@ -962,6 +963,11 @@ def _wait_for_lock(self, lock_file): dot_count = dot_count + 1 if dot_count % 60 == 0: print("") # linefeed + # prevents the issue of pypier waiting for the lock file to be gone infinitely + # in case the recovery flag is sticked by other pipeline when it's interrupted + if os.path.isfile(recover_file): + sys.stdout.write(" Dynamic recovery flag found") + break time.sleep(sleeptime) sleeptime = min(sleeptime + 2.5, 60) From 004b874321587ea2b4dcaaedd7cc54ae36ce9e4b Mon Sep 17 00:00:00 2001 From: Michal Stolarczyk Date: Mon, 5 Nov 2018 14:49:23 -0500 Subject: [PATCH 11/34] fix #89 work on log formatting, the Markdown file converts to the HTML correctly now --- pypiper/manager.py | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/pypiper/manager.py b/pypiper/manager.py index 3f334109..d5daf0ad 100644 --- a/pypiper/manager.py +++ b/pypiper/manager.py @@ -267,8 +267,7 @@ def __init__( #print("Can't find custom config file: " + abs_config) pass if config_to_load is not None: - print("Using custom config file: {}".format(config_to_load)) - print("test") + print("\nUsing custom config file: {}".format(config_to_load)) else: # No custom config file specified. Check for default pipe_path_base, _ = os.path.splitext(os.path.basename(sys.argv[0])) @@ -280,7 +279,7 @@ def __init__( # Finally load the config we found. if config_to_load is not None: - print("Loading config file: {}".format(config_to_load)) + print("\nLoading config file: {}\n".format(config_to_load)) with open(config_to_load, 'r') as conf: # Set the args to the new config file, so it can be used # later to pass to, for example, toolkits @@ -437,8 +436,8 @@ def start_pipeline(self, args=None, multi=False): # Print out a header section in the pipeline log: # Wrap things in backticks to prevent markdown from interpreting underscores as emphasis. - print("----------------------------------------") - print("##### [Pipeline run code and environment:]") + # print("----------------------------------------") + print("### [Pipeline run code and environment:]\n") print("* " + "Command".rjust(20) + ": " + "`" + str(" ".join(sys.argv)) + "`") print("* " + "Compute host".rjust(20) + ": " + platform.node()) print("* " + "Working dir".rjust(20) + ": " + os.getcwd()) @@ -446,7 +445,7 @@ def start_pipeline(self, args=None, multi=False): self.timestamp("* " + "Pipeline started at".rjust(20) + ": ") - print("\n##### [Version log:]") + print("\n### [Version log:]\n") print("* " + "Python version".rjust(20) + ": " + platform.python_version()) try: print("* " + "Pypiper dir".rjust(20) + ": " + "`" + gitvars['pypiper_dir'].strip() + "`") @@ -473,7 +472,7 @@ def start_pipeline(self, args=None, multi=False): pass # Print all arguments (if any) - print("\n##### [Arguments passed to pipeline:]") + print("\n### [Arguments passed to pipeline:]\n") for arg, val in (vars(args) if args else dict()).items(): argtext = "`{}`".format(arg) valtext = "`{}`".format(val) @@ -698,9 +697,9 @@ def call_follow(): # If you make it past these tests, we should proceed to run the process. if target is not None: - print("\nTarget to produce: `" + target + "`") + print("\nTarget to produce: `" + target + "`\n") else: - print("\nTargetless command, running...") + print("\nTargetless command, running...\n") if isinstance(cmd, list): # Handle command lists for cmd_i in cmd: @@ -888,9 +887,9 @@ def get_mem_child_sum(proc): else: info += " Peak memory: (Process: " + str(round(local_maxmem, 3)) + "GB;" if self.peak_memory < 0: - info += " Pipeline: None)" + info += " Pipeline: None)\n" else: - info += " Pipeline: " + str(round(self.peak_memory, 3)) + "GB)" + info += " Pipeline: " + str(round(self.peak_memory, 3)) + "GB)\n" # Close the preformat tag for markdown output print("") print(info) @@ -934,7 +933,7 @@ def _wait_for_process(self, p, shell=False): info = "Process " + str(p.pid) + " returned: (" + str(p.returncode) + ")." if not shell: info += " Peak memory: (Process: " + str(round(local_maxmem,3)) + "GB;" - info += " Pipeline: " + str(round(self.peak_memory,3)) + "GB)" + info += " Pipeline: " + str(round(self.peak_memory,3)) + "GB)\n" print(info + "\n") if p.returncode != 0: @@ -1139,7 +1138,7 @@ def report_result(self, key, value, annotation=None): message_raw = "{key}\t{value}\t{annotation}".format( key=key, value=value, annotation=annotation) - message_markdown = "> `{key}`\t{value}\t{annotation}\t_RES_".format( + message_markdown = "\n> `{key}`\t{value}\t{annotation}\t_RES_".format( key=key, value=value, annotation=annotation) print(message_markdown) @@ -1282,7 +1281,7 @@ def _report_command(self, cmd): :type cmd: str """ - print("> `" + cmd + "`\n") + print("\n> `" + cmd + "`\n") with open(self.pipeline_commands_file, "a") as myfile: myfile.write(cmd + "\n\n") From 154b0f95aa07d12c896ab5179c6ea98825b01ba4 Mon Sep 17 00:00:00 2001 From: Michal Stolarczyk Date: Mon, 5 Nov 2018 15:10:20 -0500 Subject: [PATCH 12/34] references #95 wrap building the memory reporting string in function --- pypiper/manager.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/pypiper/manager.py b/pypiper/manager.py index d5daf0ad..55488450 100644 --- a/pypiper/manager.py +++ b/pypiper/manager.py @@ -818,6 +818,9 @@ def get_mem_child_sum(proc): # return in gigs return mem_sum/1e9 + def display_memory(memval): + return None if memval < 0 else "{}GB".format(round(memval, 3)) + if container: cmd = "docker exec " + container + " " + cmd self._report_command(cmd) @@ -879,17 +882,8 @@ def get_mem_child_sum(proc): else: info += " Elapsed: " + str(datetime.timedelta(seconds=self.time_elapsed(start_times[i]))) + "." self.peak_memory = max(self.peak_memory, local_maxmem) - # Check if the local_maxmem and/or peak_memory are still -1 (in case the process ended before it was checked: < 0.0001s) - # this way of handling it does not cause any problems with comparisons/round. - # Also, -1 is used for comparison of the returned local_maxmems later on in run() - if local_maxmem < 0: - info += " Peak memory: (Process: None;" - else: - info += " Peak memory: (Process: " + str(round(local_maxmem, 3)) + "GB;" - if self.peak_memory < 0: - info += " Pipeline: None)\n" - else: - info += " Pipeline: " + str(round(self.peak_memory, 3)) + "GB)\n" + + info += " Peak memory: (Process: {proc}; Pipeline: {pipe})".format(proc=display_memory(local_maxmem), pipe=display_memory(self.peak_memory)) # Close the preformat tag for markdown output print("") print(info) From 4e91305363b6e288b63eda623f1e3b5ccda9bce4 Mon Sep 17 00:00:00 2001 From: Michal Stolarczyk Date: Tue, 6 Nov 2018 20:22:53 -0500 Subject: [PATCH 13/34] references #97, work on
 tags

incroporate new shorter params list creation
---
 pypiper/manager.py | 25 ++++++++-----------------
 1 file changed, 8 insertions(+), 17 deletions(-)

diff --git a/pypiper/manager.py b/pypiper/manager.py
index 55488450..54051e96 100644
--- a/pypiper/manager.py
+++ b/pypiper/manager.py
@@ -820,27 +820,17 @@ def get_mem_child_sum(proc):
 
         def display_memory(memval):
             return None if memval < 0 else "{}GB".format(round(memval, 3))
-            
+
+        def make_dict(command):
+            a, s = (command, True) if check_shell(command) else (shlex.split(command), False)
+            return dict(args=a, stdout=subprocess.PIPE, shell=s)
+
+
         if container:
             cmd = "docker exec " + container + " " + cmd
         self._report_command(cmd)
 
-        param_list=[]
-        if check_shell_pipes(cmd):
-            shell = False
-            stdout = subprocess.PIPE
-            split_cmds = cmd.split('|')
-            cmds_count = len(split_cmds)
-            for i in range(cmds_count):
-                if check_shell(split_cmds[i]):
-                    param_list.append(dict(args=split_cmds[i],stdout=stdout, shell=True))
-                else:       
-                    param_list.append(dict(args=shlex.split(split_cmds[i]),stdout=stdout, shell=shell))
-        else:
-            shell = True
-            stdout = None
-            param_list.append(dict(args=cmd,stdout=stdout,shell=shell))
-
+        param_list = [make_dict(c) for c in cmd.split("|")] if check_shell_pipes(cmd) else [dict(args=cmd, stdout=None, shell=True)]
 
         returncode = -1  # set default return values for failed command
         start_times = []
@@ -887,6 +877,7 @@ def display_memory(memval):
             # Close the preformat tag for markdown output
             print("
") print(info) + if i != len(param_list)-1: print("
") 
             
 
             if returncode != 0:

From 67bbf23cffeca55a7aabee74e3973551f23e00dd Mon Sep 17 00:00:00 2001
From: Michal Stolarczyk 
Date: Wed, 7 Nov 2018 09:22:45 -0500
Subject: [PATCH 14/34] list the no cleanup items together, addresses #98

---
 pypiper/manager.py | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/pypiper/manager.py b/pypiper/manager.py
index 54051e96..89130073 100644
--- a/pypiper/manager.py
+++ b/pypiper/manager.py
@@ -1881,6 +1881,7 @@ def _cleanup(self, dry_run=False):
                 print("\nConditional flag found: " + str([os.path.basename(i) for i in flag_files]))
                 print("\nThese conditional files were left in place:" + str(self.cleanup_list_conditional))
                 # Produce a cleanup script.
+                no_clenup_script = []
                 for cleandir in self.cleanup_list_conditional:
                     try:
                         items_to_clean = glob.glob(cleandir)
@@ -1889,8 +1890,8 @@ def _cleanup(self, dry_run=False):
                                 if os.path.isfile(file): clean_script.write("rm " + clean_item + "\n")
                                 elif os.path.isdir(file): clean_script.write("rmdir " + clean_item + "\n")
                     except Exception:
-                        print("Could not produce cleanup script for item '{}', "
-                              "skipping".format(cleandir))
+                        no_clenup_script.append(cleandir)
+                print('Could not produce cleanup script for item(s):', *no_clenup_script, sep='\n* ')
 
 
     def _memory_usage(self, pid='self', category="hwm", container=None):

From 9e4a8f114d9d338b0910f1ce757e7661ea09ad85 Mon Sep 17 00:00:00 2001
From: Michal Stolarczyk 
Date: Wed, 7 Nov 2018 09:26:15 -0500
Subject: [PATCH 15/34] correct var name typo

---
 pypiper/manager.py | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/pypiper/manager.py b/pypiper/manager.py
index 89130073..f893931d 100644
--- a/pypiper/manager.py
+++ b/pypiper/manager.py
@@ -1881,7 +1881,7 @@ def _cleanup(self, dry_run=False):
                 print("\nConditional flag found: " + str([os.path.basename(i) for i in flag_files]))
                 print("\nThese conditional files were left in place:" + str(self.cleanup_list_conditional))
                 # Produce a cleanup script.
-                no_clenup_script = []
+                no_cleanup_script = []
                 for cleandir in self.cleanup_list_conditional:
                     try:
                         items_to_clean = glob.glob(cleandir)
@@ -1890,8 +1890,8 @@ def _cleanup(self, dry_run=False):
                                 if os.path.isfile(file): clean_script.write("rm " + clean_item + "\n")
                                 elif os.path.isdir(file): clean_script.write("rmdir " + clean_item + "\n")
                     except Exception:
-                        no_clenup_script.append(cleandir)
-                print('Could not produce cleanup script for item(s):', *no_clenup_script, sep='\n* ')
+                        no_cleanup_script.append(cleandir)
+                print('Could not produce cleanup script for item(s):', *no_cleanup_script, sep='\n* ')
 
 
     def _memory_usage(self, pid='self', category="hwm", container=None):

From fa80acdb777c0165ed8569354d84498b5eb9bbc8 Mon Sep 17 00:00:00 2001
From: Michal Stolarczyk 
Date: Wed, 7 Nov 2018 10:18:46 -0500
Subject: [PATCH 16/34] make no cleanup script listing python2 compatible

---
 pypiper/manager.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/pypiper/manager.py b/pypiper/manager.py
index f893931d..a87f347d 100644
--- a/pypiper/manager.py
+++ b/pypiper/manager.py
@@ -1891,7 +1891,8 @@ def _cleanup(self, dry_run=False):
                                 elif os.path.isdir(file): clean_script.write("rmdir " + clean_item + "\n")
                     except Exception:
                         no_cleanup_script.append(cleandir)
-                print('Could not produce cleanup script for item(s):', *no_cleanup_script, sep='\n* ')
+                if no_cleanup_script: 
+                    print('Could not produce cleanup script for item(s):\n* ' + '\n* '.join(no_cleanup_script))
 
 
     def _memory_usage(self, pid='self', category="hwm", container=None):

From 134a930f250b400998488572593b2ed15884b64e Mon Sep 17 00:00:00 2001
From: Vince Reuter 
Date: Wed, 7 Nov 2018 14:09:46 -0500
Subject: [PATCH 17/34] add docs for test execution; close #99

---
 doc/source/support.rst | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/doc/source/support.rst b/doc/source/support.rst
index 7c00b213..f5110ffc 100644
--- a/doc/source/support.rst
+++ b/doc/source/support.rst
@@ -8,4 +8,8 @@ If you find a bug or want request a feature, open an issue at https://github.com
 
 Contributing
 *************
-We welcome contributions in the form of pull requests.
\ No newline at end of file
+We welcome contributions in the form of pull requests.
+If proposing changes to package source code, please run the test suite in `python2` and `python3` by running ``pytest`` or ``python setup.py test`` from within the repository root.
+
+If using ``pytest`` directly, we suggest first activating the appropriate Python version's virtual environment and running ``pip install --ugprade ./``.
+Otherwise, simply specify the appropriate Python version, i.e. ``python2 setup.py test`` or ``python3 setup.py test``.

From e65d214f56259261b95875914cac25ff630a71fc Mon Sep 17 00:00:00 2001
From: Michal Stolarczyk 
Date: Mon, 12 Nov 2018 15:37:32 -0500
Subject: [PATCH 18/34] addresses #104

implement function for splitting command with pipes
---
 pypiper/manager.py |  2 +-
 pypiper/utils.py   | 11 ++++++++++-
 2 files changed, 11 insertions(+), 2 deletions(-)

diff --git a/pypiper/manager.py b/pypiper/manager.py
index a87f347d..8bdb5220 100644
--- a/pypiper/manager.py
+++ b/pypiper/manager.py
@@ -830,7 +830,7 @@ def make_dict(command):
             cmd = "docker exec " + container + " " + cmd
         self._report_command(cmd)
 
-        param_list = [make_dict(c) for c in cmd.split("|")] if check_shell_pipes(cmd) else [dict(args=cmd, stdout=None, shell=True)]
+        param_list = [make_dict(c) for c in split_by_pipes(cmd)] if check_shell_pipes(cmd) else [dict(args=cmd, stdout=None, shell=True)]
 
         returncode = -1  # set default return values for failed command
         start_times = []
diff --git a/pypiper/utils.py b/pypiper/utils.py
index 2f06fd1a..edc0093b 100644
--- a/pypiper/utils.py
+++ b/pypiper/utils.py
@@ -216,6 +216,15 @@ def check_shell_asterisk(cmd):
     """
     return r"*" in cmd
 
+def split_by_pipes(cmd):
+    """
+    Split the command by shell pipes, but preserve the contents of the parantheses.
+
+    :param str cmd: Command to investigate.
+    :return list: List of sub commands to be linked
+    """
+    r = re.compile(r'(?:[^|(]|\([^)]*\))+')
+    return r.findall(cmd)
 
 def check_shell_pipes(cmd):
     """
@@ -229,7 +238,7 @@ def check_shell_pipes(cmd):
 
 def check_shell_redirection(cmd):
     """
-    Determine whether a command appears to contain shell redirection symbol outsite of curlt brackets
+    Determine whether a command appears to contain shell redirection symbol outside of curly brackets
 
     :param str cmd: Command to investigate.
     :return bool: Whether the command appears to contain shell redirection.

From dbda82739a337f9c833acb8313432b8cca25583a Mon Sep 17 00:00:00 2001
From: nsheff 
Date: Mon, 12 Nov 2018 21:27:41 -0500
Subject: [PATCH 19/34] elevate contribution section; fix links; fix #100

---
 doc/source/contributing.rst       | 5 +++++
 doc/source/hello-world.rst        | 4 ++--
 doc/source/index.rst              | 3 ++-
 doc/source/ngstk.rst              | 4 ++--
 doc/source/support.rst            | 8 +-------
 doc/source/tutorials-advanced.rst | 2 +-
 doc/source/tutorials-basic.rst    | 2 +-
 7 files changed, 14 insertions(+), 14 deletions(-)
 create mode 100644 doc/source/contributing.rst

diff --git a/doc/source/contributing.rst b/doc/source/contributing.rst
new file mode 100644
index 00000000..de471c88
--- /dev/null
+++ b/doc/source/contributing.rst
@@ -0,0 +1,5 @@
+
+Contributing
+=========================
+
+We welcome contributions in the form of pull requests.
diff --git a/doc/source/hello-world.rst b/doc/source/hello-world.rst
index 0cb1dc6b..c83f58a4 100644
--- a/doc/source/hello-world.rst
+++ b/doc/source/hello-world.rst
@@ -3,7 +3,7 @@
 |logo| Installing and Hello World
 ==============================
 
-Release versions are hosted on  `pypi (under the name piper) `_, and posted on the GitHub `releases page `_. Install directly using pip:
+Release versions are hosted on  `pypi (under the name piper) `_, and posted on the GitHub `releases page `_. Install directly using pip:
 
 .. code-block:: bash
 
@@ -24,7 +24,7 @@ Now, to test pypiper, follow the commands in the ``Hello, Pypiper!`` tutorial: j
 	pip install --user piper
 
 	# download hello_pypiper.py
-	wget https://raw.githubusercontent.com/epigen/pypiper/master/example_pipelines/hello_pypiper.py
+	wget https://raw.githubusercontent.com/databio/pypiper/master/example_pipelines/hello_pypiper.py
 	
 	# Run it:
 	python hello_pypiper.py
diff --git a/doc/source/index.rst b/doc/source/index.rst
index 55ef6920..ce6569d2 100644
--- a/doc/source/index.rst
+++ b/doc/source/index.rst
@@ -54,7 +54,8 @@ Contents
    faq.rst
    changelog.rst
    support.rst
-   GitHub 
+   contributing.rst
+   GitHub 
 
 
 
diff --git a/doc/source/ngstk.rst b/doc/source/ngstk.rst
index 0075dbcd..02b3d4d5 100644
--- a/doc/source/ngstk.rst
+++ b/doc/source/ngstk.rst
@@ -2,7 +2,7 @@
 NGSTk
 =========================
 
-An optional feature of pypiper is the accompanying toolkits, such as the next-gen sequencing toolkit, `NGSTk `_, which simply provides some convenient helper functions to create common commands, like converting from file formats (*e.g.* bam to fastq), merging files (*e.g.* merge_bams), counting reads, etc. These make it faster to design bioinformatics pipelines in Pypiper, but are entirely optional.
+An optional feature of pypiper is the accompanying toolkits, such as the next-gen sequencing toolkit, `NGSTk `_, which simply provides some convenient helper functions to create common commands, like converting from file formats (*e.g.* bam to fastq), merging files (*e.g.* merge_bams), counting reads, etc. These make it faster to design bioinformatics pipelines in Pypiper, but are entirely optional.
 
 Example:
 
@@ -18,6 +18,6 @@ Example:
 	ngstk.index_bam("sample.bam")
 
 
-A list of available functions can be found in the :doc:`API ` or in the source code for `NGSTk `_.
+A list of available functions can be found in the :doc:`API ` or in the source code for `NGSTk `_.
 
 Contributions of additional toolkits or functions in an existing toolkit are welcome.
diff --git a/doc/source/support.rst b/doc/source/support.rst
index 7c00b213..d13c525d 100644
--- a/doc/source/support.rst
+++ b/doc/source/support.rst
@@ -2,10 +2,4 @@
 Support
 =========================
 
-Bug Reports
-*************
-If you find a bug or want request a feature, open an issue at https://github.com/epigen/pypiper/issues.
-
-Contributing
-*************
-We welcome contributions in the form of pull requests.
\ No newline at end of file
+If you find a bug or want request a feature, open an issue at https://github.com/databio/pypiper/issues.
diff --git a/doc/source/tutorials-advanced.rst b/doc/source/tutorials-advanced.rst
index 29451d02..9bf5882f 100644
--- a/doc/source/tutorials-advanced.rst
+++ b/doc/source/tutorials-advanced.rst
@@ -3,7 +3,7 @@ Advanced tutorial
 
 Here we have a more advanced bioinformatics pipeline that adds some new concepts. This is a simple script that takes an input file and returns the file size and the number of sequencing reads in that file. This example uses a function from from the built-in :doc:`NGSTk toolkit `. In particular, this toolkit contains a few handy functions that make it easy for a pipeline to accept inputs of various types. So, this pipeline can count the number of reads from files in ``BAM`` format, or ``fastq`` format, or ``fastq.gz`` format. You can also use the same functions from NGSTk to develop a pipeline to do more complicated things, and handle input of any of these types.
 
-First, grab this pipeline. Download `count_reads.py `_, make it executable (``chmod 755 count_reads.py``), and then run it with ``./count_reads.py``). 
+First, grab this pipeline. Download `count_reads.py `_, make it executable (``chmod 755 count_reads.py``), and then run it with ``./count_reads.py``). 
 
 You can grab a few small data files in the `microtest repository `_. Run a few of these files like this:
 
diff --git a/doc/source/tutorials-basic.rst b/doc/source/tutorials-basic.rst
index f8eb238f..9555ba7f 100644
--- a/doc/source/tutorials-basic.rst
+++ b/doc/source/tutorials-basic.rst
@@ -1,7 +1,7 @@
 Basic tutorial
 *****************
 
-Now, download `basic.py `_ and run it with `python basic.py` (or, better yet, make it executable (`chmod 755 basic.py`) and then run it directly with `./basic.py`). This example is a documented vignette; so just read it and run it to get an idea of how things work.
+Now, download `basic.py `_ and run it with `python basic.py` (or, better yet, make it executable (`chmod 755 basic.py`) and then run it directly with `./basic.py`). This example is a documented vignette; so just read it and run it to get an idea of how things work.
 
 .. literalinclude:: ../../example_pipelines/basic.py
 

From 52c711af52816e1b3b30436f7e6cfb7a4b3ab496 Mon Sep 17 00:00:00 2001
From: nsheff 
Date: Mon, 12 Nov 2018 21:47:19 -0500
Subject: [PATCH 20/34] docs tweaks

---
 doc/source/functions.rst | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/doc/source/functions.rst b/doc/source/functions.rst
index 2a8eb84c..478d3fb2 100644
--- a/doc/source/functions.rst
+++ b/doc/source/functions.rst
@@ -2,7 +2,7 @@
 Basic functions
 =========================
 
-Pypiper is simple, but powerful. You only need 3 functions to get started. ``PipelineManager`` can do:
+Pypiper is simple, but powerful. You really only need to know about 3 functions to get started. ``PipelineManager`` can do:
 
 .. currentmodule:: pypiper.manager.PipelineManager
 .. autosummary:: 
@@ -11,7 +11,7 @@ Pypiper is simple, but powerful. You only need 3 functions to get started. ``Pip
 	stop_pipeline
 
 
-With that you can create a simple pipeline. You can click on each function to view the in-depth documentation for that function. There are quite a few optional parameters to the ``run`` function, which is where most of Pypiper's power comes from
+With those 3 functions, you can create a simple pipeline. Click on each function to view its in-depth documentation. There are quite a few optional parameters to the ``run`` function, which is where most of Pypiper's power comes from.
 
 When you've mastered the basics and are ready to get more powerful, add in a few new (optional) commands that make debugging and development easier:
 

From 211ac5a5993f310798486b7e7d1abfb19d3df58e Mon Sep 17 00:00:00 2001
From: Michal Stolarczyk 
Date: Tue, 13 Nov 2018 13:53:04 -0500
Subject: [PATCH 21/34] import the split_by_pipes function, references #104

---
 pypiper/manager.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pypiper/manager.py b/pypiper/manager.py
index 8bdb5220..26b69227 100644
--- a/pypiper/manager.py
+++ b/pypiper/manager.py
@@ -26,7 +26,7 @@
 from .flags import *
 from .utils import \
     check_shell, check_shell_pipes, checkpoint_filepath, clear_flags, flag_name, make_lock_name, \
-    pipeline_filepath, CHECKPOINT_SPECIFICATIONS, check_shell_asterisk, check_shell_redirection
+    pipeline_filepath, CHECKPOINT_SPECIFICATIONS, check_shell_asterisk, check_shell_redirection, split_by_pipes
 from ._version import __version__
 import __main__
 

From 6a6b5037a2fbf5ebf74545410e25c60eb152f895 Mon Sep 17 00:00:00 2001
From: Vince 
Date: Wed, 14 Nov 2018 15:13:52 -0500
Subject: [PATCH 22/34] remove unimplemented label; #105

---
 pypiper/manager.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pypiper/manager.py b/pypiper/manager.py
index 26b69227..5708b0f4 100644
--- a/pypiper/manager.py
+++ b/pypiper/manager.py
@@ -56,7 +56,7 @@ class PipelineManager(object):
     :param bool recover: Specify recover mode, to overwrite lock files.
         If pypiper encounters a locked target, it will ignore the lock and
         recompute this step. Useful to restart a failed pipeline.
-    :param bool new_start: NOT IMPLEMENTED
+    :param bool new_start: start over and run every command even if output exists
     :param bool force_follow: Force run all follow functions
         even if  the preceding command is not run. By default,
         following functions  are only run if the preceding command is run.

From 10130e4875922b634913a0db644d42cfe14aec1f Mon Sep 17 00:00:00 2001
From: Vince 
Date: Fri, 16 Nov 2018 11:25:37 -0500
Subject: [PATCH 23/34] remove unused imports

---
 pypiper/manager.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pypiper/manager.py b/pypiper/manager.py
index 5708b0f4..6ad3d333 100644
--- a/pypiper/manager.py
+++ b/pypiper/manager.py
@@ -25,8 +25,8 @@
 from .exceptions import PipelineHalt, SubprocessError
 from .flags import *
 from .utils import \
-    check_shell, check_shell_pipes, checkpoint_filepath, clear_flags, flag_name, make_lock_name, \
-    pipeline_filepath, CHECKPOINT_SPECIFICATIONS, check_shell_asterisk, check_shell_redirection, split_by_pipes
+    check_shell, check_shell_pipes, checkpoint_filepath, clear_flags, flag_name, \
+    make_lock_name, pipeline_filepath, CHECKPOINT_SPECIFICATIONS, split_by_pipes
 from ._version import __version__
 import __main__
 

From d7d53e067ba4466883f26432db02032aea2b82b8 Mon Sep 17 00:00:00 2001
From: Vince 
Date: Fri, 16 Nov 2018 11:29:31 -0500
Subject: [PATCH 24/34] remove unused variable preset; tidy

---
 pypiper/manager.py | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)

diff --git a/pypiper/manager.py b/pypiper/manager.py
index 6ad3d333..6e4dd9f0 100644
--- a/pypiper/manager.py
+++ b/pypiper/manager.py
@@ -832,7 +832,6 @@ def make_dict(command):
 
         param_list = [make_dict(c) for c in split_by_pipes(cmd)] if check_shell_pipes(cmd) else [dict(args=cmd, stdout=None, shell=True)]
 
-        returncode = -1  # set default return values for failed command
         start_times = []
         stop_times = []
         processes = []
@@ -868,17 +867,20 @@ def make_dict(command):
             returncode = processes[i].returncode
             info = "Process " + str(processes[i].pid) + " returned: (" + str(processes[i].returncode) + ")."
             if i>0:
-                info += " Elapsed: " + str(datetime.timedelta(seconds=round(stop_times[i]-stop_times[i-1],0))) + "."
+                info += " Elapsed: " + str(datetime.timedelta(
+                    seconds=round(stop_times[i] - stop_times[i - 1], 0))) + "."
             else:
-                info += " Elapsed: " + str(datetime.timedelta(seconds=self.time_elapsed(start_times[i]))) + "."
+                info += " Elapsed: " + str(datetime.timedelta(
+                    seconds=self.time_elapsed(start_times[i]))) + "."
             self.peak_memory = max(self.peak_memory, local_maxmem)
 
-            info += " Peak memory: (Process: {proc}; Pipeline: {pipe})".format(proc=display_memory(local_maxmem), pipe=display_memory(self.peak_memory))
+            info += " Peak memory: (Process: {proc}; Pipeline: {pipe})".format(
+                proc=display_memory(local_maxmem), pipe=display_memory(self.peak_memory))
             # Close the preformat tag for markdown output
             print("
") print(info) - if i != len(param_list)-1: print("
") 
-            
+            if i != len(param_list) - 1:
+                print("
")
 
             if returncode != 0:
                 msg = "Subprocess returned nonzero result. Check above output for details"

From 7e7443825dda6dc41b989fa126ecc7040a9fec99 Mon Sep 17 00:00:00 2001
From: Vince 
Date: Fri, 16 Nov 2018 11:35:12 -0500
Subject: [PATCH 25/34] remove unused parameters and function

---
 pypiper/manager.py | 53 +++++-----------------------------------------
 1 file changed, 5 insertions(+), 48 deletions(-)

diff --git a/pypiper/manager.py b/pypiper/manager.py
index 6e4dd9f0..af384805 100644
--- a/pypiper/manager.py
+++ b/pypiper/manager.py
@@ -704,13 +704,13 @@ def call_follow():
             if isinstance(cmd, list):  # Handle command lists
                 for cmd_i in cmd:
                     list_ret, list_maxmem = \
-                        self.callprint(cmd_i, shell, nofail, container)
+                        self.callprint(cmd_i, nofail, container)
                     local_maxmem = max(local_maxmem, list_maxmem)
                     process_return_code = max(process_return_code, list_ret)
 
             else:  # Single command (most common)
                 process_return_code, local_maxmem = \
-                    self.callprint(cmd, shell, nofail, container)  # Run command
+                    self.callprint(cmd, nofail, container)  # Run command
 
             # For temporary files, you can specify a clean option to automatically
             # add them to the clean list, saving you a manual call to clean_add
@@ -769,7 +769,7 @@ def checkprint(self, cmd, shell="guess", nofail=False, errmsg=None):
             self._triage_error(e, nofail, errmsg)
 
 
-    def callprint(self, cmd, shell="guess", nofail=False, container=None, lock_name=None, errmsg=None):
+    def callprint(self, cmd, nofail=False, container=None):
         """
         Prints the command, and then executes it, then prints the memory use and
         return code of the command.
@@ -783,20 +783,12 @@ def callprint(self, cmd, shell="guess", nofail=False, container=None, lock_name=
 
         :param cmd: Bash command(s) to be run.
         :type cmd: str or list
-        :param shell: If command is required to be run in its own shell. Optional. Default: "guess", which
-            will make a best guess on whether it should run in a shell or not, based on presence of shell
-            utils, like asterisks, pipes, or output redirects. Force one way or another by specifying True or False
-        :type shell: bool
         :param nofail: Should the pipeline bail on a nonzero return from a process? Default: False
             Nofail can be used to implement non-essential parts of the pipeline; if these processes fail,
             they will not cause the pipeline to bail out.
         :type nofail: bool
         :param container: Named Docker container in which to execute.
         :param container: str
-        :param lock_name: Name of the relevant lock file.
-        :type lock_name: str
-        :param errmsg: Message to print if there's an error.
-        :type errmsg: str
         """
         # The Popen shell argument works like this:
         # if shell=False, then we format the command (with split()) to be a list of command and its arguments.
@@ -830,7 +822,8 @@ def make_dict(command):
             cmd = "docker exec " + container + " " + cmd
         self._report_command(cmd)
 
-        param_list = [make_dict(c) for c in split_by_pipes(cmd)] if check_shell_pipes(cmd) else [dict(args=cmd, stdout=None, shell=True)]
+        param_list = [make_dict(c) for c in split_by_pipes(cmd)] \
+            if check_shell_pipes(cmd) else [dict(args=cmd, stdout=None, shell=True)]
 
         start_times = []
         stop_times = []
@@ -962,42 +955,6 @@ def _wait_for_lock(self, lock_file):
             self.set_status_flag(RUN_FLAG)
 
 
-    def _wait_for_file(self, file_name, lock_name=None):
-        """
-        Just sleep until the file_name DOES exist.
-
-        :param file_name: File to wait for.
-        :type file_name: str
-        :param lock_name: Name of lock file to wait for.
-        :type lock_name: str
-        """
-
-        # Waiting loop/state variables
-        sleeptime = .5
-        first_message_flag = False
-        dot_count = 0
-
-        while not os.path.isfile(file_name):
-            if first_message_flag is False:
-                self.timestamp("Waiting for file: " + file_name)
-                first_message_flag = True
-            else:
-                sys.stdout.write(".")
-                dot_count = dot_count + 1
-                if dot_count % 60 == 0:
-                    print("")  # linefeed
-            time.sleep(sleeptime)
-            sleeptime = min(sleeptime + 2.5, 60)
-
-        if first_message_flag:
-            self.timestamp("File exists.")
-
-        # Finalize lock file path and begin waiting.
-        lock_name = lock_name or make_lock_name(file_name, self.outfolder)
-        lock_file = self._make_lock_path(lock_name)
-        self._wait_for_lock(lock_file)
-
-
     ###################################
     # Logging functions
     ###################################

From 46cafcb88f2b53b6725333c906971ce8c5d91cc9 Mon Sep 17 00:00:00 2001
From: Vince 
Date: Fri, 16 Nov 2018 12:01:08 -0500
Subject: [PATCH 26/34] tweak messaging

---
 pypiper/manager.py | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/pypiper/manager.py b/pypiper/manager.py
index af384805..a716d101 100644
--- a/pypiper/manager.py
+++ b/pypiper/manager.py
@@ -664,16 +664,18 @@ def call_follow():
 
             # Scenario 1: Lock file exists, but we're supposed to overwrite target; Run process.
             if os.path.isfile(lock_file):
+                print("Found lock file: {}".format(lock_file))
                 if self.overwrite_locks:
-                    print("Found lock file; overwriting this target...")
+                    print("Overwriting target...")
                 elif os.path.isfile(recover_file):
-                    print("Found lock file. Found dynamic recovery file. Overwriting this target...")
+                    print("Found dynamic recovery file ({}); "
+                          "overwriting target...".format(recover_file))
                     # remove the lock file which will then be promptly re-created for the current run.
                     recover_mode = True
                     # the recovery flag is now spent, so remove so we don't accidentally re-recover a failed job
                     os.remove(recover_file)
                 elif self.new_start:
-                    print("New start mode, overwriting this target...")
+                    print("New start mode; overwriting target...")
                 else:  # don't overwrite locks
                     self._wait_for_lock(lock_file)
                     # when it's done loop through again to try one more time (to see if the target exists now)

From ceab8cda99ae302689f3870907ed2cc92e087101 Mon Sep 17 00:00:00 2001
From: nsheff 
Date: Fri, 16 Nov 2018 16:03:31 -0500
Subject: [PATCH 27/34] Add fastq results to object report

---
 pypiper/manager.py | 2 +-
 pypiper/ngstk.py   | 9 ++++++++-
 2 files changed, 9 insertions(+), 2 deletions(-)

diff --git a/pypiper/manager.py b/pypiper/manager.py
index a716d101..9d1ab1dd 100644
--- a/pypiper/manager.py
+++ b/pypiper/manager.py
@@ -1171,7 +1171,7 @@ def report_object(self, key, filename, anchor_text=None, anchor_image=None,
             relative_anchor_image = os.path.relpath(anchor_image, self.outfolder) \
                 if os.path.isabs(anchor_image) else anchor_image
         else:
-            anchor_image = ""
+            relative_anchor_image = ""
 
 
         message_raw = "{key}\t{filename}\t{anchor_text}\t{anchor_image}\t{annotation}".format(
diff --git a/pypiper/ngstk.py b/pypiper/ngstk.py
index b8bf5e7a..d9caf8b1 100755
--- a/pypiper/ngstk.py
+++ b/pypiper/ngstk.py
@@ -557,9 +557,16 @@ def temp_func():
                     self.make_sure_path_exists(fastqc_folder)
                 cmd = self.fastqc(trimmed_fastq, fastqc_folder)
                 self.pm.run(cmd, lock_name="trimmed_fastqc", nofail=True)
+                fname, ext = os.path.splitext(os.path.basename(trimmed_fastq))
+                fastqc_html = os.path.join(fastqc_folder, fname + "_fastqc.html")
+                self.pm.report_object("FastQC report r1", fastqc_html)
+
                 if paired_end and trimmed_fastq_R2:
                     cmd = self.fastqc(trimmed_fastq_R2, fastqc_folder)
                     self.pm.run(cmd, lock_name="trimmed_fastqc_R2", nofail=True)
+                    fname, ext = os.path.splitext(os.path.basename(trimmed_fastq_R2))
+                    fastqc_html = os.path.join(fastqc_folder, fname + "_fastqc.html")
+                    self.pm.report_object("FastQC report r2", fastqc_html)
 
         return temp_func
 
@@ -915,7 +922,7 @@ def bam_conversions(self, bam_file, depth=True):
 
     def fastqc(self, file, output_dir):
         """
-        Create command to run fastqc on a BAM file (or FASTQ file, right?)
+        Create command to run fastqc on a FASTQ file
 
         :param str file: Path to file with sequencing reads
         :param str output_dir: Path to folder in which to place output

From 9f9465096d925d97819d5ab9ba18c95b4696e2fa Mon Sep 17 00:00:00 2001
From: Vince 
Date: Fri, 16 Nov 2018 16:30:11 -0500
Subject: [PATCH 28/34] context manager for working directory

---
 pypiper/folder_context.py | 35 +++++++++++++++++++++++++++++++++++
 1 file changed, 35 insertions(+)
 create mode 100644 pypiper/folder_context.py

diff --git a/pypiper/folder_context.py b/pypiper/folder_context.py
new file mode 100644
index 00000000..360d6c0c
--- /dev/null
+++ b/pypiper/folder_context.py
@@ -0,0 +1,35 @@
+""" Context manager for temporarily changing folder. """
+
+import os
+
+
+__author__ = "Vince Reuter"
+__email__ = "vreuter@virginia.edu"
+
+
+
+class FolderContext(object):
+    """ Context manager for temporarily changing directory. """
+
+    def __init__(self, folder):
+        """
+        Store the previous working path to restore upon exit.
+
+        :param str folder: Path to set as new working directory
+        """
+        if not os.path.isdir(folder):
+            raise ValueError(
+                "Requested temp entry to non-folder: {}".format(folder))
+        self._prevdir = os.getcwd()
+        self._currdir = folder
+
+    def __enter__(self):
+        """ Make the working directory switch. """
+        os.chdir(self._currdir)
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        """ Switch back to the previous working directory. """
+        if not os.path.isdir(self._prevdir):
+            raise RuntimeError("Return path is no longer a directory: {}".
+                               format(self._prevdir))
+        os.chdir(self._prevdir)

From d24d9d875c511fad8c5244053d9281436db8b552 Mon Sep 17 00:00:00 2001
From: nsheff 
Date: Mon, 19 Nov 2018 10:22:13 -0500
Subject: [PATCH 29/34] update changelog, release prep

---
 doc/source/changelog.rst | 15 +++++++++++++++
 1 file changed, 15 insertions(+)

diff --git a/doc/source/changelog.rst b/doc/source/changelog.rst
index 625221c0..59e0d3f4 100644
--- a/doc/source/changelog.rst
+++ b/doc/source/changelog.rst
@@ -1,6 +1,21 @@
 Changelog
 ******************************
 
+- **v0.9.** (*2018-11-19*):
+
+    - Use ``psutil`` to track aggregate memory usage for processes that spawn children
+
+    - Implement pipes in python so that individual commands in a series of shell 
+      pipes that fail will now also halt the pipeline, instead of proceeding
+      with potentially bad files.
+
+    - Various other small improvements (like waiting checking for dynamic recover
+      flags)
+    
+    - Improved the ``run`` waiting method to immediately stop upon job
+      completion, rather than minute-increment polling. This should improve
+      performance particularly in pipelines with many, medium-runtime steps, and
+      improve accuracy of timing profiles.
 
 - **v0.8.1** (*2018-09-20*):
 

From 3f16d0d065c1423ed1f3fbea0deccb9a5f08b8c1 Mon Sep 17 00:00:00 2001
From: nsheff 
Date: Mon, 19 Nov 2018 10:22:32 -0500
Subject: [PATCH 30/34] typo

---
 doc/source/changelog.rst | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/doc/source/changelog.rst b/doc/source/changelog.rst
index 59e0d3f4..a3900508 100644
--- a/doc/source/changelog.rst
+++ b/doc/source/changelog.rst
@@ -1,7 +1,7 @@
 Changelog
 ******************************
 
-- **v0.9.** (*2018-11-19*):
+- **v0.9.0** (*2018-11-19*):
 
     - Use ``psutil`` to track aggregate memory usage for processes that spawn children
 

From cbd25c2b150d447f70ef05d1c95611f128a1f5c7 Mon Sep 17 00:00:00 2001
From: nsheff 
Date: Mon, 19 Nov 2018 10:28:18 -0500
Subject: [PATCH 31/34] add travis badge. See #107

---
 README.md | 1 +
 1 file changed, 1 insertion(+)

diff --git a/README.md b/README.md
index b8de7407..c2859436 100644
--- a/README.md
+++ b/README.md
@@ -3,5 +3,6 @@
 # Pypiper
 
 [![Documentation Status](https://readthedocs.org/projects/pypiper/badge/?version=latest)](http://pypiper.readthedocs.org/en/latest/?badge=latest)
+[![Build Status](https://travis-ci.org/databio/pypiper.svg?branch=master)](https://travis-ci.org/databio/pypiper)
 
 A lightweight python toolkit for gluing together restartable, robust command line pipelines. The best place to learn more is at the [documentation hosted at Read the Docs](http://pypiper.readthedocs.org/).

From ba58a6395beceeb59c0d592c9b836bfa0a671dc9 Mon Sep 17 00:00:00 2001
From: nsheff 
Date: Mon, 19 Nov 2018 10:51:03 -0500
Subject: [PATCH 32/34] Change test error type; Fix #107

---
 tests/pipeline_manager/test_pipeline_manager.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/tests/pipeline_manager/test_pipeline_manager.py b/tests/pipeline_manager/test_pipeline_manager.py
index 61985c3e..205c70aa 100755
--- a/tests/pipeline_manager/test_pipeline_manager.py
+++ b/tests/pipeline_manager/test_pipeline_manager.py
@@ -10,6 +10,7 @@
 
 import pypiper
 from pypiper.utils import pipeline_filepath
+from pypiper.exceptions import SubprocessError
 
 
 __author__ = "Nathan Sheffield"
@@ -246,7 +247,7 @@ def test_me(self):
         self.pp.callprint(cmd, nofail=True)
 
         # Should raise an error
-        with self.assertRaises(OSError):
+        with self.assertRaises(SubprocessError):
             self.pp.run(cmd, target=None, lock_name="badcommand")
 
         print("Test dynamic recovery...")

From dfb8c36de83ed4e3007c5a407df2777ec059c322 Mon Sep 17 00:00:00 2001
From: nsheff 
Date: Mon, 19 Nov 2018 10:54:18 -0500
Subject: [PATCH 33/34] version bump for release 0.9

---
 pypiper/_version.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pypiper/_version.py b/pypiper/_version.py
index baadd088..3e2f46a3 100644
--- a/pypiper/_version.py
+++ b/pypiper/_version.py
@@ -1 +1 @@
-__version__ = "0.8.1-dev"
+__version__ = "0.9.0"

From 2d5d81b3652d60eeb09e9a6370027b3b9840834f Mon Sep 17 00:00:00 2001
From: nsheff 
Date: Mon, 19 Nov 2018 10:57:38 -0500
Subject: [PATCH 34/34] update changelog

---
 doc/source/changelog.rst | 18 ++++++++----------
 1 file changed, 8 insertions(+), 10 deletions(-)

diff --git a/doc/source/changelog.rst b/doc/source/changelog.rst
index a3900508..84bdf3b4 100644
--- a/doc/source/changelog.rst
+++ b/doc/source/changelog.rst
@@ -3,19 +3,17 @@ Changelog
 
 - **v0.9.0** (*2018-11-19*):
 
-    - Use ``psutil`` to track aggregate memory usage for processes that spawn children
+    - Use ``psutil`` to track aggregate memory usage for processes that spawn
+    children. This results in accurate memory records for these processes.
 
-    - Implement pipes in python so that individual commands in a series of shell 
-      pipes that fail will now also halt the pipeline, instead of proceeding
-      with potentially bad files.
+    - Individual commands in a string of commands connected by shell pipes are
+    now treated as individual commands, and and monitored individually for
+    time and memory, and if a single component, fails, the entire string will
+    fail. Previously, only the final return command was recorded, as in ``bash``.
 
     - Various other small improvements (like waiting checking for dynamic recover
-      flags)
-    
-    - Improved the ``run`` waiting method to immediately stop upon job
-      completion, rather than minute-increment polling. This should improve
-      performance particularly in pipelines with many, medium-runtime steps, and
-      improve accuracy of timing profiles.
+    flags)
+
 
 - **v0.8.1** (*2018-09-20*):