-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjpsflib.py
432 lines (345 loc) · 15.1 KB
/
jpsflib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
################################################################################
# This script takes a directory of JWST calints files as input and builds a #
# database of reference info for each file, which can then be read via the #
# pipeline script. #
# #
# Basically it will hold the target-specific info needed by the pipeline, as #
# well as info needed to choose reference observations for a given science #
# target. #
# #
# Written 2024-07-10 by Ellis Bogat #
################################################################################
# TODO:
# - set up pip installability
# - finish commenting/documentation
# - make a readme describing each column & its units
# imports
import os
import warnings
import glob
import re
import mocapy
import pandas as pd
from astropy.io import fits
from astroquery.simbad import Simbad
import numpy as np
sp_class_letters = ['O','B','A','F','G','K','M','T','Y']
def isnone(series):
try:
return np.isnan(series)
except:
return (series == '') | (series == None)
def not_isnone(series):
return ~ isnone(series)
def load_refdb_csv(fpath):
"""
Reads the database of target- and observation-specific reference info for
each observation in the PSF library.
Args:
fpath (str or os.path): Path to the .csv file containing the reference
database.
Returns:
pandas.DataFrame: database containing target- and observation-specific
info for each file.
"""
# TODO:
# - Write tests:
# -
refdb = pd.read_csv(fpath)
refdb.set_index('TARGNAME',inplace=True)
return refdb
def decode_simbad_sptype(input_sptypes):
if isinstance(input_sptypes,str):
input_sptypes = [input_sptypes]
sp_classes = []
sp_subclasses = []
sp_lclasses = []
for simbad_spectype in input_sptypes:
m = re.search(r'([OBAFGKMTY])(\d\.*\d*)[-+/]*(\d\.*\d*)*[-+/]*(I*V*I*)[-/]*(I*V*I*)',simbad_spectype)
if m is None:
sp_classes.append('')
sp_subclasses.append('')
sp_lclasses.append('')
else:
res = m.group(1,2,3,4,5)
sp_classes.append(res[0])
sp_subclasses.append(res[1])
if 'V' in res[3:] or res[3:] == ('',''):
sp_lclasses.append('V')
elif res[4] != '':
sp_lclasses.append(res[4])
else:
sp_lclasses.append(res[3])
return (sp_classes,sp_subclasses,sp_lclasses)
def update_db_sptypes(refdb):
simbad_sptypes = refdb.SPTYPE.values
sp_classes, sp_subclasses, sp_lclasses = decode_simbad_sptype(simbad_sptypes)
refdb_copy = refdb.copy()
refdb_copy['SP_CLASS'] = sp_classes
refdb_copy['SP_SUBCLASS'] = sp_subclasses
refdb_copy['SP_LCLASS'] = sp_lclasses
return refdb_copy
def build_refdb(idir,odir='.',uncal=False,overwrite=False):
"""
Constructs a database of target-specific reference info for each
calints file in the input directory.
Args:
idir (path): Path to pre-processed (stage 2) JWST images to be added
to the database
odir (path, optional): Location to save the database. Defaults to '.'.
uncal (bool, optional): Toggle using uncal files as inputs instead of
calints. Defaults to None.
overwrite (bool, optional): If true, overwrite the existing caldb.
Raises:
Exception: _description_
Exception: _description_
Exception: _description_
Returns:
pandas.DataFrame: database containing target- and observation-specific
info for each file.
"""
# TODO:
# - write tests for build_refdb()
# - nonexistent input directory
# - empty input directory
# - no calints files in input directory
# - header kw missing
# - duplicate science target with different program names
# - calints vs uncal toggle
# - (warning if no calints present but uncals are
# 'did you mean to set uncal = True?')
# Check that you won't accidentally overwrite an existing csv.
outpath = os.path.join(odir,'ref_lib.csv')
if os.path.exists(outpath):
if overwrite:
msg = f'\nThis operation will overwrite {outpath}. \nIf this is not what you want, abort now!'
warnings.warn(msg)
else:
raise Exception(f'This operation is trying to overwrite {outpath}.\nIf this is what you want, set overwrite=True.')
# Read in uncal files
print('Reading input files...')
suffix = 'uncal' if uncal else 'calints'
fpaths = sorted(glob.glob(os.path.join(idir,f"*_{suffix}.fits")))
# Start a dataframe with the header info we want from each file
csv_list = []
fits_cols = [
'TARGPROP',
'TARGNAME', # Save 2MASS ID also
'FILENAME',
'DATE-OBS',
'TIME-OBS',
'DURATION', # Total exposure time
'TARG_RA',
'TARG_DEC',
'TARGURA', # RA uncertainty
'TARGUDEC', # Dec uncertainty
'MU_RA', # Proper motion
'MU_DEC',
'MU_EPOCH',
'INSTRUME',
'DETECTOR',
'MODULE',
'CHANNEL',
'FILTER',
'PUPIL',
]
for fpath in fpaths:
row = []
hdr = fits.getheader(fpath)
for col in fits_cols:
row.append(hdr[col])
csv_list.append(row)
df = pd.DataFrame(csv_list,columns=fits_cols)
# Make a df with only one entry for each unique target
targnames = np.unique(df['TARGNAME'])
df_unique = pd.DataFrame(np.transpose([targnames]),columns=['TARGNAME'])
# Get 2MASS IDs
print('Collecting 2MASS IDs...')
twomass_ids = []
for targname in targnames:
result_table = Simbad.query_objectids(targname)
if result_table is None:
raise Exception(f'No SIMBAD object found for targname {targname}.')
tmids_found = []
for name in list(result_table['ID']):
if name[:6] == '2MASS ':
twomass_ids.append(name)
tmids_found.append(name)
if len(tmids_found) < 1:
raise Exception(f'No 2MASS ID found for targname {targname}.')
elif len(tmids_found) > 1:
raise Exception(f'Multiple 2MASS ID found for targname {targname}: {tmids_found}')
df_unique['2MASS_ID'] = twomass_ids
df_unique.set_index('2MASS_ID',inplace=True)
# Query SIMBAD
print('Querying SIMBAD...')
customSimbad = Simbad()
customSimbad.add_votable_fields('sptype',
'flux(K)', 'flux_error(K)',
'plx', 'plx_error')
simbad_list = list(df_unique.index)
scistar_simbad_table = customSimbad.query_objects(simbad_list)
# Convert to pandas df and make 2MASS IDs the index
df_simbad = scistar_simbad_table.to_pandas()
df_simbad['2MASS_ID'] = simbad_list
df_simbad.set_index('2MASS_ID',inplace=True)
# Rename some columns
simbad_cols = { # Full column list here: http://simbad.u-strasbg.fr/Pages/guide/sim-fscript.htx
'SPTYPE': 'SP_TYPE', # maybe use 'simple_spt' or 'complete_spt'?
'KMAG': 'FLUX_K', # 'kmag'
'KMAG_ERR': 'FLUX_ERROR_K', # 'ekmag'
'PLX': 'PLX_VALUE', # 'plx'
'PLX_ERR': 'PLX_ERROR', # 'eplx'
}
for col,simbad_col in simbad_cols.items():
df_simbad[col] = list(df_simbad[simbad_col])
# Add the values we want to df_unique
df_unique = pd.concat([df_unique,df_simbad.loc[:,simbad_cols.keys()]],axis=1)
# Query mocadb.ca for extra info
print('Querying MOCADB (this may take a minute)...')
names_df = pd.DataFrame(list(df_unique.index),columns=['designation'])
moca = mocapy.MocaEngine()
mdf = moca.query("SELECT tt.designation AS input_designation, sam.* FROM tmp_table AS tt LEFT JOIN mechanics_all_designations AS mad ON(mad.designation LIKE tt.designation) LEFT JOIN summary_all_objects AS sam ON(sam.moca_oid=mad.moca_oid)", tmp_table=names_df)
mdf.set_index('input_designation',inplace=True)
moca_cols = {
'SPTYPE': 'spt', # maybe use 'simple_spt' or 'complete_spt'?
'PLX': 'plx', # 'plx'
'PLX_ERR': 'eplx', # 'eplx'
'AGE': 'age', # 'age'
'AGE_ERR': 'eage', # 'eage'
}
# Update the column names for consistency
for col,moca_col in moca_cols.items():
# print(col, list(mdf[moca_col]))
mdf[col] = list(mdf[moca_col])
# Fill in values missing from SIMBAD with MOCA
df_unique['COMMENTS'] = ''
# Sort all the dfs by index so they match up
df_unique.sort_index(inplace=True)
df_simbad.sort_index(inplace=True)
mdf.sort_index(inplace=True)
# Replace values and update comments
cols_overlap = list(set(list(simbad_cols.keys())).intersection(list(moca_cols.keys())))
for col in cols_overlap:
df_unique.loc[isnone(df_simbad[col]) & ~isnone(mdf[col]),'COMMENTS'] += f"{col} adopted from MOCA. "
df_unique.loc[isnone(df_simbad[col]) & ~isnone(mdf[col]),col] = mdf
for col in ['AGE','AGE_ERR']:
df_unique[col] = mdf[col]
df_unique.loc[~isnone(mdf[col]),'COMMENTS'] += f"{col} adopted from MOCA. "
# Replace values
#df_unique.loc[df_unique['SPTYPE']=='','SPTYPE'] = None
#df_unique_replaced = df_unique.loc[:,cols_overlap].combine_first(mdf.loc[:,cols_overlap])
#df_unique.loc[:,cols_overlap] = df_unique_replaced.loc[:,cols_overlap]
#cols_overlap = ['SPTYPE','PLX','PLX_ERR']
#df_unique.loc[isnone(df_unique.loc[:,cols_overlap]),cols_overlap] = mdf.loc[isnone(df_unique.loc[:,cols_overlap]),cols_overlap]
#df_unique.loc[:,cols_overlap].where(not_isnone,other=mdf,inplace=True)
# Calculate distances from plx in mas
df_unique['DIST'] = 1. / (df_unique['PLX'] / 1000)
df_unique['DIST_ERR'] = df_unique['PLX_ERR'] / 1000 / ((df_unique['PLX'] / 1000)**2)
# Decode spectral types
df_unique = update_db_sptypes(df_unique)
# Add empty columns
empty_cols = [
'FLAGS',
'HAS_DISK',
'HAS_CANDS']
for col in empty_cols:
df_unique[col] = ''
# Apply dataframe of unique targets to the original file list
df.set_index('TARGNAME',inplace=True)
df_unique.reset_index(inplace=True)
df_unique.set_index('TARGNAME',inplace=True)
df_unique = df_unique.reindex(df.index)
df_out = pd.concat([df,df_unique],axis=1)
# Save dataframe
df_out.to_csv(outpath)
print(f'Database saved to {outpath}')
print(f'Done.')
return df_out
def get_sciref_files(sci_target, refdb, idir=None, spt_choice=None, filters=None, exclude_disks=False):
"""Construct a list of science files and reference files to input to a PSF subtraction routine.
Args:
sci_target (str):
name of the science target to be PSF subtracted. Can be the proposal target name,
JWST resolved target name, or 2MASS ID.
refdb (pandas.DataFrame or str):
pandas dataframe or filepath to csv containing the reference database generated by
the build_refdb() function.
idir (str):
path to directory of input data, to be appended to file names.
spt_choice (str, optional):
None (default): use all spectral types.
'near' : use only references with the same spectral class letter.
'nearer' : use only refs within +- 1 spectral type. ie. M3-5 for an M4 science target.
'nearest' : use only refs with the exact same spectral type.
filters (str or list, optional):
None (default) : include all filters.
'F444W' or other filter name: include only that filter.
['filt1','filt2']: include only filt1 and filt2
exclude_disks (bool, optional): Exclude references that are known to have disks. Defaults to False.
Returns:
list: filenames of science observations.
list: filenames of reference observations.
"""
# TODO:
# - enable reading in refdb fpath or pandas df
# - filter out flags?
# - sptype filters
# - tests:
# - sci_target in index
# - sci_target in 2MASS_ID
# - sci_target in TARGPROP
# - sci_target not in refdb
# - exceptions if 0 science fnames are returned
# - warnings if 0 reference observations are returned
if isinstance(refdb,str):
refdb = load_refdb_csv(refdb)
# Locate input target 2MASS ID
# (input name could be in index, TARGPROP, or 2MASS_ID column)
if sci_target in refdb['2MASS_ID'].to_list():
targ_2mass = sci_target
elif sci_target in refdb.index.to_list():
targ_2mass = refdb.loc[sci_target,'2MASS_ID'].to_list()[0]
elif sci_target in refdb['TARGPROP'].to_list():
refdb_temp = refdb.reset_index()
refdb_temp.set_index('TARGPROP',inplace=True)
targ_2mass = refdb_temp.loc[sci_target,'2MASS_ID'].to_list()[0]
else:
raise Exception(f'Science target {sci_target} not found in reference database.')
refdb_temp = refdb.reset_index()
refdb_temp.set_index('FILENAME',inplace=True)
# Collect all the science files
sci_fnames = refdb_temp.index[refdb_temp['2MASS_ID'] == targ_2mass].to_list()
# Start list of reference files
ref_fnames = refdb_temp.index[refdb_temp['2MASS_ID'] != targ_2mass].to_list()
# Collect the reference files
if spt_choice != None:
raise Exception('Only spt_choice=None is currently supported.')
# Remove observations with disks flagged
if exclude_disks:
disk_fnames = refdb_temp.index[refdb_temp['HAS_DISK'] == True].to_list()
ref_fnames = list(set(ref_fnames) - set(disk_fnames))
if filters != None:
if isinstance('filter',str):
filters = [filters]
filter_fnames = []
for filter in filters:
filter_fnames.extend(refdb_temp.index[refdb_temp['FILTER'] == filter].to_list())
if len(filter_fnames) == 0:
raise Warning(f'No observations found for filters {filters}.')
sci_fnames = list(set(sci_fnames).intersection(filter_fnames))
ref_fnames = list(set(ref_fnames).intersection(filter_fnames))
# Make sure no observations are in both sci_fnames and ref_fnames
if len(set(sci_fnames).intersection(ref_fnames)) > 0:
raise Exception("One or more filenames exists in both the science and reference file list. Something is wrong.")
if not idir is None:
sci_fpaths = [os.path.join(idir,sci_fname) for sci_fname in sci_fnames]
ref_fpaths = [os.path.join(idir,ref_fname) for ref_fname in ref_fnames]
else:
sci_fpaths = sci_fnames
ref_fpaths = ref_fnames
return [sci_fpaths, ref_fpaths]
# # TESTING
# idir = 'DATA/NANREPLACED_v0'
# ref_db = build_refdb(idir,overwrite=True)
# print('Done Done.')