Skip to content


Merge pull request #396 from dsavransky/JPL_dev
Browse files Browse the repository at this point in the history
JPL updates to scheduler prototype
  • Loading branch information
dsavransky authored Nov 18, 2024
2 parents 50d460c + e73fdbd commit 86cc76c
Show file tree
Hide file tree
Showing 2 changed files with 254 additions and 9 deletions.
242 changes: 233 additions & 9 deletions EXOSIMS/Prototypes/
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class SurveySimulation(object):
record_counts_path (str, optional):
If set, write out photon count info to file specified by this keyword.
Defaults to None.
revisit_wait (float, optional):
If set, it is the minimum time (in days) to wait before revisiting current target. Defaults to None.
nokoMap (bool):
Skip generating keepout map. Only useful if you're not planning on
actually running a mission simulation. Defaults to False.
Expand Down Expand Up @@ -100,6 +102,8 @@ class SurveySimulation(object):
The mission simulation. List of observation dictionaries.
dt_max (astropy.units.quantity.Quantity):
Maximum time for revisit window.
revisit_wait (astropy.units.quantity.Quantity):
Minimum time (in days) to wait before revisiting.
find_known_RV (bool):
Identify stars with known planets.
fullSpectra (numpy.ndarray(bool)):
Expand Down Expand Up @@ -194,6 +198,7 @@ def __init__(
Expand Down Expand Up @@ -339,6 +344,12 @@ def __init__(
self.dt_max = float(dt_max) * u.week
self._outspec["dt_max"] = self.dt_max.value

# minimum time for revisit window
if revisit_wait is not None:
self.revisit_wait = revisit_wait * u.d
self.revisit_wait = revisit_wait

# list of detected earth-like planets aroung promoted stars
self.known_earths = np.array([])

Expand Down Expand Up @@ -1968,6 +1979,8 @@ def scheduleRevisit(self, sInd, smin, det, pInds):
T = 2.0 * np.pi * np.sqrt(sp**3.0 / mu)
t_rev = TK.currentTimeNorm.copy() + 0.75 * T

if self.revisit_wait is not None:
t_rev = TK.currentTimeNorm.copy() + self.revisit_wait
# finally, populate the revisit list (NOTE: sInd becomes a float)
revisit = np.array([sInd,"day").value])
if self.starRevisit.size == 0: # If starRevisit has nothing in it
Expand Down Expand Up @@ -2647,7 +2660,7 @@ def revisitFilter(self, sInds, tmpCurrentTimeNorm):
sInds (~numpy.ndarray(int)):
Indices of stars still in observation list
tmpCurrentTimeNorm (astropy.units.Quantity):
The simulation time after overhead was added in MJD form
Normalized simulation time
Expand All @@ -2664,19 +2677,26 @@ def revisitFilter(self, sInds, tmpCurrentTimeNorm):
self.starVisits[sInds] < self.nVisitsMax
if self.starRevisit.size != 0:
dt_rev = np.abs(self.starRevisit[:, 1] * - tmpCurrentTimeNorm)
ind_rev = [
for x in self.starRevisit[dt_rev < self.dt_max, 0]
if x in sInds
ind_rev = self.revisit_inds(sInds, tmpCurrentTimeNorm)
tovisit[ind_rev] = self.starVisits[ind_rev] < self.nVisitsMax
sInds = np.where(tovisit)[0]
return sInds

def is_earthlike(self, plan_inds, sInd):
Is the planet earthlike?
"""Is the planet earthlike? Returns boolean array that's True for Earth-like
Planet indices
sInd (int):
Star index
Array of same dimension as plan_inds input that's True for Earthlike
planets and false otherwise.
TL = self.TargetList
SU = self.SimulatedUniverse
Expand Down Expand Up @@ -2762,6 +2782,210 @@ def find_known_plans(self):
known_rocky = np.intersect1d(HIP_sInds, known_rocky)
return known_stars.astype(int), known_rocky.astype(int)

def find_char_SNR(self, sInd, pIndsChar, startTime, intTime, mode):
"""Finds the SNR achieved by an observing mode after intTime days
The observation window (which includes settling and overhead times)
is a superset of the integration window (in which photons are collected).
The observation window begins at startTime. The integration window
begins at startTime + Obs.settlingTime + mode["ohTime"],
and the integration itself has a duration of intTime.
sInd (int):
Integer index of the star of interest
pIndsChar (int numpy.ndarray):
Observable planets to characterize. Place (-1) at end to put
False Alarm parameters at end of returned tuples.
startTime (astropy.units.Quantity):
Beginning of observation window in units of day.
intTime (astropy.units.Quantity):
Selected star characterization integration time in units of day.
mode (dict):
Observing mode for the characterization
SNR (float numpy.ndarray):
Characterization signal-to-noise ratio of the observable planets.
Defaults to None. [TBD]
fZ (astropy.units.Quantity):
Surface brightness of local zodiacal light in units of 1/arcsec2
systemParams (dict):
Dictionary of time-dependent planet properties averaged over the
duration of the integration.

OS = self.OpticalSystem
ZL = self.ZodiacalLight
TL = self.TargetList
SU = self.SimulatedUniverse
Obs = self.Observatory
TK = self.TimeKeeping

# time at start of integration window
currentTimeNorm = startTime
currentTimeAbs = TK.missionStart + startTime

# first, calculate SNR for observable planets (without false alarm)
planinds = pIndsChar[:-1] if pIndsChar[-1] == -1 else pIndsChar
SNRplans = np.zeros(len(planinds))
if len(planinds) > 0:
# initialize arrays for SNR integration
fZs = np.zeros(self.ntFlux) / u.arcsec**2.0
systemParamss = np.empty(self.ntFlux, dtype="object")
Ss = np.zeros((self.ntFlux, len(planinds)))
Ns = np.zeros((self.ntFlux, len(planinds)))
# integrate the signal (planet flux) and noise
dt = intTime / float(self.ntFlux)
timePlus = (
Obs.settlingTime.copy() + mode["syst"]["ohTime"].copy()
) # accounts for the time since the current time
for i in range(self.ntFlux):
# calculate signal and noise (electron count rates)
if SU.lucky_planets:
fZs[i] = ZL.fZ(Obs, TL, sInd, currentTimeAbs, mode)[0]
Ss[i, :], Ns[i, :] = self.calc_signal_noise(
sInd, planinds, dt, mode, fZ=fZs[i]
# allocate first half of dt
timePlus += dt / 2.0
# calculate current zodiacal light brightness
fZs[i] = ZL.fZ(Obs, TL, sInd, currentTimeAbs + timePlus, mode)[0]
# propagate the system to match up with current time
sInd, currentTimeNorm + timePlus - self.propagTimes[sInd]
self.propagTimes[sInd] = currentTimeNorm + timePlus
# time-varying planet params (WA, dMag, phi, fEZ, d)
systemParamss[i] = SU.dump_system_params(sInd)
# calculate signal and noise (electron count rates)
if not SU.lucky_planets:
Ss[i, :], Ns[i, :] = self.calc_signal_noise(
sInd, planinds, dt, mode, fZ=fZs[i]
# allocate second half of dt
timePlus += dt / 2.0

# average output parameters
fZ = np.mean(fZs)
systemParams = {
key: sum([systemParamss[x][key] for x in range(self.ntFlux)])
/ float(self.ntFlux)
for key in sorted(systemParamss[0])
# calculate planets SNR
S = Ss.sum(0)
N = Ns.sum(0)
SNRplans[N > 0] = S[N > 0] / N[N > 0]
# allocate extra time for timeMultiplier

# if only a FA, just save zodiacal brightness
# in the middle of the integration
totTime = intTime * (mode["timeMultiplier"])
fZ = ZL.fZ(Obs, TL, sInd, currentTimeAbs.copy() + totTime / 2.0, mode)[0]

# calculate the false alarm SNR (if any)
SNRfa = []
if pIndsChar[-1] == -1:
# Note: these attributes may not exist for all schedulers
fEZ = self.lastDetected[sInd, 1][-1] / u.arcsec**2.0
dMag = self.lastDetected[sInd, 2][-1]
WA = self.lastDetected[sInd, 3][-1] * u.arcsec
C_p, C_b, C_sp = OS.Cp_Cb_Csp(TL, sInd, fZ, fEZ, dMag, WA, mode)
S = (C_p * intTime).decompose().value
N = np.sqrt((C_b * intTime + (C_sp * intTime) ** 2.0).decompose().value)
SNRfa = S / N if N > 0.0 else 0.0

# SNR vector is of length (#planets), plus 1 if FA
# This routine leaves SNR = 0 if unknown or not found
pInds = np.where(SU.plan2star == sInd)[0]
FA_present = pIndsChar[-1] == -1
# there will be one SNR for each entry of pInds_FA
pInds_FA = np.append(pInds, [-1] if FA_present else np.zeros(0, dtype=int))

# boolean index vector into SNR
# True iff we have computed a SNR for that planet
# False iff we didn't find an SNR (and will leave 0 there)
# if FA_present, SNR_plug_in[-1] = True
SNR_plug_in = np.isin(pInds_FA, pIndsChar)

# save all SNRs (planets and FA) to one array
SNR = np.zeros(len(pInds_FA))
# plug in the SNR's we computed (pIndsChar and optional FA)
SNR[SNR_plug_in] = np.append(SNRplans, SNRfa)

return SNR, fZ, systemParams

def revisit_inds(self, sInds, tmpCurrentTimeNorm):
"""Return subset of star indices that are scheduled for revisit.
sInds (~numpy.ndarray(int)):
Indices of stars to consider
tmpCurrentTimeNorm (astropy.units.Quantity):
Normalized simulation time
Indices of stars whose revisit is scheduled for within `self.dt_max` of
the current time
dt_rev = np.abs(self.starRevisit[:, 1] * - tmpCurrentTimeNorm)
ind_rev = [
int(x) for x in self.starRevisit[dt_rev < self.dt_max, 0] if x in sInds

return ind_rev

def keepout_filter(self, sInds, startTimes, endTimes, koMap):
"""Filters stars not observable due to keepout
sInds (~numpy.ndarray(int)):
indices of stars still in observation list
startTimes (~numpy.ndarray(float)):
start times of observations
endTimes (~numpy.ndarray(float)):
end times of observations
koMap (~numpy.ndarray(bool)):
map where True is for a target unobstructed and observable,
False is for a target obstructed and unobservable due to keepout zone
sInds - filtered indices of stars still in observation list
# find the indices of keepout times that pertain to the start and end times of
# observation
startInds = np.searchsorted(self.koTimes.value, startTimes)
endInds = np.searchsorted(self.koTimes.value, endTimes)

# boolean array of available targets (unobstructed between observation start and
# end time) we include a -1 in the start and +1 in the end to include keepout
# days enclosing start and end times
availableTargets = np.array(
max(startInds[sInd] - 1, 0) : min(
endInds[sInd] + 1, len(self.koTimes.value) + 1
for sInd in sInds

return sInds[availableTargets]

def array_encoder(obj):
r"""Encodes numpy arrays, astropy Times, and astropy Quantities, into JSON.
Expand Down
21 changes: 21 additions & 0 deletions tests/Prototypes/
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,27 @@ def test_reset_sim(self):
self.assertEqual(sim.TimeKeeping.currentTimeNorm, 0.0 * u.d)
self.assertEqual(len(sim.DRM), 0)

def revisit_wait_check(self):
r"""Test the revisit_wait definition.
Approach: Initialize the SurveySim object with different definitions of
evisit_wait and assess different cases.

self.dev_null = open(os.devnull, "w")
sim = self.fixture(SimpleScript, revisit_wait=0.5)
for sInd in range(100):
pInds = np.where(sim.SimulatedUniverse.plan2star == sInd)[0]
if len(pInds) > 0:
smin = np.min(sim.SimulatedUniverse.s[pInds[0]])
self.assertEqual(sim.revisit_wait, 0.5 * u.d)
sim.scheduleRevisit(sInd, smin, 0, pInds)

sim = self.fixture(SimpleScript)

def validate_outspec(self, outspec, sim):
r"""Validation of an output spec dictionary.
Expand Down

0 comments on commit 86cc76c

Please sign in to comment.