Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Frontera jobs #388

Merged
merged 17 commits into from
Jul 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions minsar/defaults/job_defaults.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ create_runfiles 00:10:00 0 1000 0
execute_runfiles 04:00:00 0 1000 0 2

# topsStack
unpack_topo_master 0 00:01:00 4000 0 8
unpack_slave_slc 0 00:00:30 4000 0 2
unpack_topo_reference 0 00:01:00 4000 0 8
unpack_secondary_slc 0 00:00:30 4000 0 2
average_baseline 0 00:00:10 1000 0 2
extract_burst_overlaps 0 00:00:10 4000 0 2
overlap_geo2rdr 0 00:00:50 4000 0 4
Expand All @@ -21,7 +21,7 @@ timeseries_misreg 00:10:00 0 4000 0
fullBurst_geo2rdr 0 00:03:00 5000 0 4
fullBurst_resample 0 00:01:00 5000 0 4
extract_stack_valid_region 0 00:01:00 4000 0 4
merge_master_slave_slc 00:02:45 0 4000 0 2
merge_reference_secondary_slc 00:02:45 0 4000 0 2
generate_burst_igram 0 00:00:30 4000 0 2
merge_burst_igram 0 00:00:10 4000 0 8 # for num_threads=4 got error with memory
filter_coherence 0 00:00:40 6000 0 8 # for num_threads=4 got error with memory
Expand Down
10 changes: 5 additions & 5 deletions minsar/defaults/queues.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ pegasus bigmem 64 2 25000
eos_sanghoon batch 32 2 192000 999 1
beijing_server batch 16 2 192000 999 1
deqing_server batch 32 2 192000 999 1
frontera normal 48 1 192000 50 1
frontera normal 100 1 192000 50 1
frontera nvdimm 48 1 2100000 2 1
frontera development 48 1 192000 1 1
frontera rtx 48 1 128000 5 1
frontera rtx-dev 48 1 128000 2 1
frontera flex 48 1 192000 50 1
frontera development 1 1 192000 1 1
frontera rtx 15 1 128000 5 1
frontera rtx-dev 1 1 128000 2 1
frontera flex 100 1 192000 50 1


#nvidimm # features 16 large-memory (2.1TB) nodes and jobs in this queue are charged at twice the normal
Expand Down
3 changes: 0 additions & 3 deletions minsar/export_ortho_geo.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,6 @@ def main(iargs=None):
putils.concatenate_error_files(run_file=item, work_dir=inps.work_dir)
putils.move_out_job_files_to_stdout(run_file=item)

# upload_to_s3(pic_dir)
minsar.upload_data_products.main([inps.custom_template_file, '--imageProducts'])

return


Expand Down
44 changes: 26 additions & 18 deletions minsar/job_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,14 @@ def __init__(self, inps):
self.number_of_cores_per_node, self.number_of_threads_per_core, self.max_jobs_per_queue, \
self.max_memory_per_node, self.wall_time_factor = set_job_queue_values(inps)

if not 'num_bursts' in inps:
if not 'num_bursts' in inps or not inps.num_bursts:
self.num_bursts = None
if not 'wall_time' in inps:
if not 'wall_time' in inps or not inps.wall_time:
self.wall_time = None
if not 'memory' in inps:
if not 'memory' in inps or not inps.memory:
self.memory = None
if not 'queue' in inps:
if not 'queue' in inps or not inps.queue:

self.queue = self.queue_name
if not 'out_dir' in inps:
self.out_dir = '.'
Expand Down Expand Up @@ -186,13 +187,13 @@ def submit_batch_jobs(self, batch_file=None, email_notif=None):
number_of_tasks = len(tasks)

number_of_nodes = np.int(np.ceil(number_of_tasks * float(self.default_num_threads) / (
(self.number_of_cores_per_node - 1) * self.number_of_threads_per_core)))
self.number_of_cores_per_node * self.number_of_threads_per_core)))

if 'singleTask' in self.submission_scheme:

self.write_batch_singletask_jobs(batch_file)

elif 'multiTask_multiNode' in self.submission_scheme or number_of_nodes == 1:
elif 'multiTask_multiNode' in self.submission_scheme: # or number_of_nodes == 1:

batch_file_name = batch_file + '_0'
job_name = os.path.basename(batch_file_name)
Expand Down Expand Up @@ -601,13 +602,20 @@ def add_tasks_to_job_file_lines(self, job_file_lines, tasks, batch_file=None):

if self.remora:
job_file_lines.append("\n\nmodule load remora")

job_file_lines.append("\n\nmodule load launcher")
#if self.queue in ['gpu', 'rtx', 'rtx-dev']:
# job_file_lines.append("\n\nmodule load launcher_gpu")
#else:
# job_file_lines.append("\n\nmodule load launcher")

job_file_lines.append("\nexport OMP_NUM_THREADS={0}".format(self.default_num_threads))
job_file_lines.append("\nexport PATH={0}:$PATH".format(self.stack_path))
job_file_lines.append("\nexport LAUNCHER_WORKDIR={0}".format(self.out_dir))
job_file_lines.append("\nexport LAUNCHER_JOB_FILE={0}\n".format(batch_file))
if self.platform_name == 'stampede2':

if self.scheduler == 'SLURM':

job_file_lines.append("export LD_PRELOAD=/home1/apps/tacc-patches/python_cacher/myopen.so\n")

if self.remora:
Expand All @@ -626,7 +634,8 @@ def add_tasks_to_job_file_lines(self, job_file_lines, tasks, batch_file=None):
os.path.abspath(batch_file) + '_{}.o'.format(count),
os.path.abspath(batch_file) + '_{}.e'.format(count)))

if self.platform_name == 'stampede2':
if self.scheduler == 'SLURM':

job_file_lines.append("\nexport LD_PRELOAD=/home1/apps/tacc-patches/python_cacher/myopen.so")

job_file_lines.append("\n\nexport OMP_NUM_THREADS={0}".format(self.default_num_threads))
Expand Down Expand Up @@ -668,18 +677,16 @@ def set_job_queue_values(args):

inps = putils.create_or_update_template(args)
submission_scheme = inps.template['job_submission_scheme']
hostname = subprocess.Popen("hostname -f", shell=True, stdout=subprocess.PIPE).stdout.read().decode("utf-8")

hostname = 'local'
host_keys = ['hostname', 'HOSTNAME', 'uname']
for key in host_keys:
if os.getenv(key):
hostname = os.getenv(key)

work_system = os.path.basename(os.getenv('WORK'))
platform_name = hostname
for platform in supported_platforms:
if work_system and platform in work_system:
if platform in hostname:
platform_name = platform
break

if inps.queue:
inps.template['QUEUENAME'] = inps.queue


check_auto = {'queue_name': inps.template['QUEUENAME'],
'number_of_cores_per_node': inps.template['JOB_CPUS_PER_NODE'],
Expand All @@ -706,7 +713,8 @@ def set_job_queue_values(args):
check_auto['number_of_threads_per_core'] = int(split_values[queue_header.index('THREADS_PER_CORE')])
check_auto['max_jobs_per_queue'] = int(split_values[queue_header.index('MAX_JOBS_PER_QUEUE')])
check_auto['max_memory_per_node'] = int(split_values[queue_header.index('MEM_PER_NODE')])
check_auto['wall_time_factor'] = int(split_values[queue_header.index('WALLTIME_FACTOR')])
check_auto['wall_time_factor'] = float(split_values[queue_header.index('WALLTIME_FACTOR')])

break
else:
continue
Expand Down
19 changes: 19 additions & 0 deletions minsar/process_rsmas.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,18 @@ def run_timeseries(self):
else:
import minsar.minopy_wrapper as minopy_wrapper
minopy_wrapper.main(scp_args)

return

def run_upload_data_products(self):
""" upload data to jetstream server for data download
"""
if self.template['upload_flag'] in ['True', True]:
# upload_data_products.main([self.custom_template_file, '--mintpyProducts']) # this is simpler, but how to put process into background?
command = 'upload_data_products.py --mintpyProducts ' + self.custom_template_file + ' > out_upload_data_products.o 2> out_upload_data_products.e'
message_rsmas.log(os.getcwd(), command)
status = subprocess.Popen(command, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=True)

return

def run_insarmaps(self):
Expand All @@ -248,6 +260,10 @@ def run_image_products(self):
if self.remora:
scp_args += ['--remora']
minsar.export_ortho_geo.main(scp_args)

# upload_to_s3(pic_dir)
minsar.upload_data_products.main([inps.custom_template_file, '--imageProducts'])

return

def run(self, steps=step_list):
Expand All @@ -268,6 +284,9 @@ def run(self, steps=step_list):
elif sname == 'timeseries':
self.run_timeseries()

elif sname == 'upload':
self.run_upload_data_products()

elif sname == 'insarmaps':
self.run_insarmaps()

Expand Down
11 changes: 9 additions & 2 deletions minsar/utils/summarize_job_run_times.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,15 @@ def main(iargs=None):
reserved_time_list = []
elapsed_time_list = []

if not (os.getenv('PLATFORM_NAME') == "STAMPEDE2"):
print('Not on stampede2 - return from summarize_job_run_times.py')
hostname = subprocess.Popen("hostname -f", shell=True, stdout=subprocess.PIPE).stdout.read().decode("utf-8")

scheduler = None
for platform in ['frontera', 'stampede2', 'comet']:
if platform in hostname:
scheduler = 'SLURM'
break
if not scheduler == 'SLURM':
print('Not on TACC system - return from summarize_job_run_times.py')
return None

for fname in run_stdout_files:
Expand Down