diff --git a/minsar/defaults/job_defaults.cfg b/minsar/defaults/job_defaults.cfg index 4aa6031f..3fe5f2a6 100644 --- a/minsar/defaults/job_defaults.cfg +++ b/minsar/defaults/job_defaults.cfg @@ -10,8 +10,8 @@ create_runfiles 00:10:00 0 1000 0 execute_runfiles 04:00:00 0 1000 0 2 # topsStack -unpack_topo_master 0 00:01:00 4000 0 8 -unpack_slave_slc 0 00:00:30 4000 0 2 +unpack_topo_reference 0 00:01:00 4000 0 8 +unpack_secondary_slc 0 00:00:30 4000 0 2 average_baseline 0 00:00:10 1000 0 2 extract_burst_overlaps 0 00:00:10 4000 0 2 overlap_geo2rdr 0 00:00:50 4000 0 4 @@ -21,7 +21,7 @@ timeseries_misreg 00:10:00 0 4000 0 fullBurst_geo2rdr 0 00:03:00 5000 0 4 fullBurst_resample 0 00:01:00 5000 0 4 extract_stack_valid_region 0 00:01:00 4000 0 4 -merge_master_slave_slc 00:02:45 0 4000 0 2 +merge_reference_secondary_slc 00:02:45 0 4000 0 2 generate_burst_igram 0 00:00:30 4000 0 2 merge_burst_igram 0 00:00:10 4000 0 8 # for num_threads=4 got error with memory filter_coherence 0 00:00:40 6000 0 8 # for num_threads=4 got error with memory diff --git a/minsar/defaults/queues.cfg b/minsar/defaults/queues.cfg index 4827689b..b3e3e30f 100644 --- a/minsar/defaults/queues.cfg +++ b/minsar/defaults/queues.cfg @@ -12,12 +12,12 @@ pegasus bigmem 64 2 25000 eos_sanghoon batch 32 2 192000 999 1 beijing_server batch 16 2 192000 999 1 deqing_server batch 32 2 192000 999 1 -frontera normal 48 1 192000 50 1 +frontera normal 100 1 192000 50 1 frontera nvdimm 48 1 2100000 2 1 -frontera development 48 1 192000 1 1 -frontera rtx 48 1 128000 5 1 -frontera rtx-dev 48 1 128000 2 1 -frontera flex 48 1 192000 50 1 +frontera development 1 1 192000 1 1 +frontera rtx 15 1 128000 5 1 +frontera rtx-dev 1 1 128000 2 1 +frontera flex 100 1 192000 50 1 #nvidimm # features 16 large-memory (2.1TB) nodes and jobs in this queue are charged at twice the normal diff --git a/minsar/export_ortho_geo.py b/minsar/export_ortho_geo.py index 4327b8e8..335bcf99 100755 --- a/minsar/export_ortho_geo.py +++ b/minsar/export_ortho_geo.py @@ -102,9 +102,6 @@ def main(iargs=None): putils.concatenate_error_files(run_file=item, work_dir=inps.work_dir) putils.move_out_job_files_to_stdout(run_file=item) - # upload_to_s3(pic_dir) - minsar.upload_data_products.main([inps.custom_template_file, '--imageProducts']) - return diff --git a/minsar/job_submission.py b/minsar/job_submission.py index f99161f8..77a25a57 100755 --- a/minsar/job_submission.py +++ b/minsar/job_submission.py @@ -111,13 +111,14 @@ def __init__(self, inps): self.number_of_cores_per_node, self.number_of_threads_per_core, self.max_jobs_per_queue, \ self.max_memory_per_node, self.wall_time_factor = set_job_queue_values(inps) - if not 'num_bursts' in inps: + if not 'num_bursts' in inps or not inps.num_bursts: self.num_bursts = None - if not 'wall_time' in inps: + if not 'wall_time' in inps or not inps.wall_time: self.wall_time = None - if not 'memory' in inps: + if not 'memory' in inps or not inps.memory: self.memory = None - if not 'queue' in inps: + if not 'queue' in inps or not inps.queue: + self.queue = self.queue_name if not 'out_dir' in inps: self.out_dir = '.' @@ -186,13 +187,13 @@ def submit_batch_jobs(self, batch_file=None, email_notif=None): number_of_tasks = len(tasks) number_of_nodes = np.int(np.ceil(number_of_tasks * float(self.default_num_threads) / ( - (self.number_of_cores_per_node - 1) * self.number_of_threads_per_core))) + self.number_of_cores_per_node * self.number_of_threads_per_core))) if 'singleTask' in self.submission_scheme: self.write_batch_singletask_jobs(batch_file) - elif 'multiTask_multiNode' in self.submission_scheme or number_of_nodes == 1: + elif 'multiTask_multiNode' in self.submission_scheme: # or number_of_nodes == 1: batch_file_name = batch_file + '_0' job_name = os.path.basename(batch_file_name) @@ -601,13 +602,20 @@ def add_tasks_to_job_file_lines(self, job_file_lines, tasks, batch_file=None): if self.remora: job_file_lines.append("\n\nmodule load remora") + job_file_lines.append("\n\nmodule load launcher") + #if self.queue in ['gpu', 'rtx', 'rtx-dev']: + # job_file_lines.append("\n\nmodule load launcher_gpu") + #else: + # job_file_lines.append("\n\nmodule load launcher") job_file_lines.append("\nexport OMP_NUM_THREADS={0}".format(self.default_num_threads)) job_file_lines.append("\nexport PATH={0}:$PATH".format(self.stack_path)) job_file_lines.append("\nexport LAUNCHER_WORKDIR={0}".format(self.out_dir)) job_file_lines.append("\nexport LAUNCHER_JOB_FILE={0}\n".format(batch_file)) - if self.platform_name == 'stampede2': + + if self.scheduler == 'SLURM': + job_file_lines.append("export LD_PRELOAD=/home1/apps/tacc-patches/python_cacher/myopen.so\n") if self.remora: @@ -626,7 +634,8 @@ def add_tasks_to_job_file_lines(self, job_file_lines, tasks, batch_file=None): os.path.abspath(batch_file) + '_{}.o'.format(count), os.path.abspath(batch_file) + '_{}.e'.format(count))) - if self.platform_name == 'stampede2': + if self.scheduler == 'SLURM': + job_file_lines.append("\nexport LD_PRELOAD=/home1/apps/tacc-patches/python_cacher/myopen.so") job_file_lines.append("\n\nexport OMP_NUM_THREADS={0}".format(self.default_num_threads)) @@ -668,18 +677,16 @@ def set_job_queue_values(args): inps = putils.create_or_update_template(args) submission_scheme = inps.template['job_submission_scheme'] + hostname = subprocess.Popen("hostname -f", shell=True, stdout=subprocess.PIPE).stdout.read().decode("utf-8") - hostname = 'local' - host_keys = ['hostname', 'HOSTNAME', 'uname'] - for key in host_keys: - if os.getenv(key): - hostname = os.getenv(key) - - work_system = os.path.basename(os.getenv('WORK')) - platform_name = hostname for platform in supported_platforms: - if work_system and platform in work_system: + if platform in hostname: platform_name = platform + break + + if inps.queue: + inps.template['QUEUENAME'] = inps.queue + check_auto = {'queue_name': inps.template['QUEUENAME'], 'number_of_cores_per_node': inps.template['JOB_CPUS_PER_NODE'], @@ -706,7 +713,8 @@ def set_job_queue_values(args): check_auto['number_of_threads_per_core'] = int(split_values[queue_header.index('THREADS_PER_CORE')]) check_auto['max_jobs_per_queue'] = int(split_values[queue_header.index('MAX_JOBS_PER_QUEUE')]) check_auto['max_memory_per_node'] = int(split_values[queue_header.index('MEM_PER_NODE')]) - check_auto['wall_time_factor'] = int(split_values[queue_header.index('WALLTIME_FACTOR')]) + check_auto['wall_time_factor'] = float(split_values[queue_header.index('WALLTIME_FACTOR')]) + break else: continue diff --git a/minsar/process_rsmas.py b/minsar/process_rsmas.py index 2e237f80..3ad94d9a 100755 --- a/minsar/process_rsmas.py +++ b/minsar/process_rsmas.py @@ -228,6 +228,18 @@ def run_timeseries(self): else: import minsar.minopy_wrapper as minopy_wrapper minopy_wrapper.main(scp_args) + + return + + def run_upload_data_products(self): + """ upload data to jetstream server for data download + """ + if self.template['upload_flag'] in ['True', True]: + # upload_data_products.main([self.custom_template_file, '--mintpyProducts']) # this is simpler, but how to put process into background? + command = 'upload_data_products.py --mintpyProducts ' + self.custom_template_file + ' > out_upload_data_products.o 2> out_upload_data_products.e' + message_rsmas.log(os.getcwd(), command) + status = subprocess.Popen(command, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=True) + return def run_insarmaps(self): @@ -248,6 +260,10 @@ def run_image_products(self): if self.remora: scp_args += ['--remora'] minsar.export_ortho_geo.main(scp_args) + + # upload_to_s3(pic_dir) + minsar.upload_data_products.main([inps.custom_template_file, '--imageProducts']) + return def run(self, steps=step_list): @@ -268,6 +284,9 @@ def run(self, steps=step_list): elif sname == 'timeseries': self.run_timeseries() + elif sname == 'upload': + self.run_upload_data_products() + elif sname == 'insarmaps': self.run_insarmaps() diff --git a/minsar/utils/summarize_job_run_times.py b/minsar/utils/summarize_job_run_times.py index 17253c04..222b0c7f 100755 --- a/minsar/utils/summarize_job_run_times.py +++ b/minsar/utils/summarize_job_run_times.py @@ -87,8 +87,15 @@ def main(iargs=None): reserved_time_list = [] elapsed_time_list = [] - if not (os.getenv('PLATFORM_NAME') == "STAMPEDE2"): - print('Not on stampede2 - return from summarize_job_run_times.py') + hostname = subprocess.Popen("hostname -f", shell=True, stdout=subprocess.PIPE).stdout.read().decode("utf-8") + + scheduler = None + for platform in ['frontera', 'stampede2', 'comet']: + if platform in hostname: + scheduler = 'SLURM' + break + if not scheduler == 'SLURM': + print('Not on TACC system - return from summarize_job_run_times.py') return None for fname in run_stdout_files: