Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better sync of pixel tasks #590

Merged
merged 3 commits into from
Apr 19, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 143 additions & 44 deletions py/desispec/pipeline/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,11 @@ def task_types():
list: The list of supported task types.

"""
from .tasks.base import task_classes, task_type
return list(sorted(task_classes.keys()))
from . import tasks
from .tasks.base import default_task_chain
ttypes = ["fibermap", "rawdata"]
ttypes.extend(tasks.base.default_task_chain)
return ttypes


def all_tasks(night, nside):
Expand Down Expand Up @@ -74,7 +77,6 @@ def all_tasks(night, nside):

for ex in sorted(expid):


# get the fibermap for this exposure
fibermap = io.get_raw_files("fibermap", night, ex)

Expand All @@ -89,12 +91,8 @@ def all_tasks(night, nside):
log.error("Do not know what do to with fibermap flavor '{}' for file '{}".format(flavor,fibermap))
raise ValueError("Do not know what do to with fibermap flavor '{}' for file '{}".format(flavor,fibermap))


fmpix = dict()
if (flavor != "arc") and (flavor != "flat"):



# This will be used to track which healpix pixels are
# touched by fibers from each spectrograph.
ra = np.array(fmdata["RA_TARGET"], dtype=np.float64)
Expand Down Expand Up @@ -136,8 +134,6 @@ def all_tasks(night, nside):
break
if not exists : full["redshift"].append(props)



fmprops = dict()
fmprops["night"] = int(night)
fmprops["expid"] = int(ex)
Expand All @@ -146,7 +142,6 @@ def all_tasks(night, nside):

full["fibermap"].append(fmprops)


rdprops = dict()
rdprops["night"] = int(night)
rdprops["expid"] = int(ex)
Expand Down Expand Up @@ -263,8 +258,6 @@ def all_tasks(night, nside):
props["state"] = "waiting" # see defs.task_states
full["starfit"].append(props)



log.debug("done")
return full , healpix_frames

Expand Down Expand Up @@ -435,7 +428,6 @@ def set_states_type(self, tasktype, tasks, postprocessing=True):
Nothing.

"""

from .tasks.base import task_classes

log = get_logger()
Expand Down Expand Up @@ -637,26 +629,133 @@ def sync(self, night):
night (str): The night to scan for updates.

"""
from .tasks.base import task_classes
log = get_logger()

# Get the list of task types excluding spectra and redshifts,
# which will be handled separately.
ttypes = [ t for t in task_types() if (t != "spectra") \
and (t != "redshift") ]

tasks_in_db = None
# Grab existing nightly tasks
with self.cursor() as cur:
tasks_in_db = {}
for tt in task_types():
if (tt == "spectra") or (tt == "redshift"):
continue
cur.execute(\
"select name from {} where night = {}"\
.format(tt, night))
for tt in ttypes:
cur.execute("select name from {} where night = {}"\
.format(tt, night))
tasks_in_db[tt] = [ x for (x, ) in cur.fetchall() ]

# For each task type, check status WITHOUT the DB, then set state.
for tt in tasks_in_db.keys() :
if (tt == "spectra") or (tt == "redshift"):
continue

# Save out the cframe states for later use with the healpix_frame table
cfstates = None
for tt in ttypes:
tstates = check_tasks(tasks_in_db[tt], db=None)
st = [ (x, tstates[x]) for x in tasks_in_db[tt] ]
self.set_states_type(tt, st)
if tt == "cframe":
cfstates = tstates.copy()

# Now examine the spectra and redshift files. If the files exist,
# we assume they are done and completely up to date. If the files
# are not up to date, they must be manually deleted in order for the
# sync to correctly reconstruct the database state.

pixrows = self.select_healpix_frame({"night" : night})

# First check the existence of the files touched by this night
spec_exists = dict()
red_exists = dict()
for row in pixrows:
if row["pixel"] in spec_exists:
continue
spec_name = task_classes["spectra"].name_join(row)
red_name = task_classes["redshift"].name_join(row)

# Check spectra outputs
outfiles = task_classes["spectra"].paths(spec_name)
spec_exists[row["pixel"]] = True
for out in outfiles:
if not os.path.isfile(out):
spec_exists[row["pixel"]] = False
break

# Check redshift outputs
outfiles = task_classes["redshift"].paths(red_name)
red_exists[row["pixel"]] = True
for out in outfiles:
if not os.path.isfile(out):
red_exists[row["pixel"]] = False
break

# Now use all this info. Some internal helpers to avoid code
# duplication
def set_hpx_frame_0(row, spec, red, cur):
self.update_healpix_frame_state(row, 0, cur)
task_classes["spectra"].state_set(
self, spec, "waiting", cur)
task_classes["redshift"].state_set(
self, red, "waiting", cur)
return

def set_hpx_frame_1(row, spec, red, cur):
self.update_healpix_frame_state(row, 1, cur)
# getready() will do this for us:
#task_classes["spectra"].state_set(
# self, spec, "ready", cur)
task_classes["redshift"].state_set(
self, red, "waiting", cur)
return

def set_hpx_frame_2(row, spec, red, cur):
self.update_healpix_frame_state(row, 2, cur)
task_classes["spectra"].state_set(
self, spec, "done", cur)
# getready() will do this:
#task_classes["redshift"].state_set(
# self, red, "ready", cur)
return

def set_hpx_frame_3(row, spec, red, cur):
self.update_healpix_frame_state(row, 3, cur)
task_classes["spectra"].state_set(
self, spec, "done", cur)
task_classes["redshift"].state_set(
self, red, "done", cur)
return

with self.cursor() as cur:
for row in pixrows:
cfdone = True
cfprops = row.copy()
for band in ["b", "r", "z"]:
cfprops["band"] = band
cf_name = task_classes["cframe"].name_join(cfprops)
if cfstates[cf_name] != "done":
cfdone = False

spec_name = task_classes["spectra"].name_join(row)
red_name = task_classes["redshift"].name_join(row)

if not cfdone:
# The cframes do not exist, so reset the state of the
# spectra and redshift tasks.
set_hpx_frame_0(row, spec_name, red_name, cur)
else:
# The cframe exists...
if spec_exists[row["pixel"]]:
if red_exists[row["pixel"]]:
# We are all done (state 3)
set_hpx_frame_3(row, spec_name, red_name, cur)
else:
# We are only at state 2
set_hpx_frame_2(row, spec_name, red_name, cur)
else:
# We are at just at state 1
set_hpx_frame_1(row, spec_name, red_name, cur)

# Update ready state of tasks
self.getready()

return

Expand Down Expand Up @@ -706,31 +805,31 @@ def getready(self):

"""
from .tasks.base import task_classes, task_type

log = get_logger()

with self.cursor() as cur:

for tt in task_types():

if tt == "spectra" or tt == "redshift" : continue # ignore those, they are treated separatly
# Get the list of task types excluding spectra and redshifts,
# which will be handled separately.
ttypes = [ t for t in task_types() if (t != "spectra") \
and (t != "redshift") ]

with self.cursor() as cur:
for tt in ttypes:
# for each type of task, get the list of tasks in waiting mode
cur.execute('select name from {} where state = {}'.format(tt,task_state_to_int["waiting"]))
tasks = [ x for (x, ) in cur.fetchall()]

if len(tasks)>0 :
if len(tasks) > 0:
log.debug("checking {} {} tasks ...".format(len(tasks),tt))
for tsk in tasks :
for tsk in tasks:
task_classes[tt].getready( db=self,name=tsk,cur=cur)


for tt in [ "spectra" , "redshift" ] :

if tt == "spectra" :
required_healpix_frame_state = 1 # means we have a cframe
elif tt == "redshift" :
required_healpix_frame_state = 2 # means we have an updated spectra file
for tt in [ "spectra" , "redshift" ]:
if tt == "spectra":
required_healpix_frame_state = 1
# means we have a cframe
elif tt == "redshift":
required_healpix_frame_state = 2
# means we have an updated spectra file

cur.execute('select nside,pixel from healpix_frame where state = {}'.format(required_healpix_frame_state))
entries = cur.fetchall()
Expand All @@ -740,7 +839,7 @@ def getready(self):

return

def update_healpix_frame_state(self,props,state,cur) :
def update_healpix_frame_state(self, props, state, cur):
if "expid" in props :
# update from a cframe
cmd = "update healpix_frame set state = {} where expid = {} and spec = {} and state = {}".format(state,props["expid"],props["spec"],props["state"])
Expand All @@ -755,7 +854,7 @@ def update_healpix_frame_state(self,props,state,cur) :
cur.execute(cmd)
return

def select_healpix_frame(self,props) :
def select_healpix_frame(self, props):
res = []
with self.cursor() as cur:
cmd = "select * from healpix_frame where "
Expand All @@ -764,13 +863,13 @@ def select_healpix_frame(self,props) :
if not first : cmd += " and "
first=False
cmd += "{}={}".format(k,props[k])
### I AM HERE
cur.execute(cmd)
entries = cur.fetchall()
# convert that to list of dictionnaries
# convert that to list of dictionaries
for entry in entries :
tmp=dict()
for i,k in enumerate(["night","expid","spec","nside","pixel"]) :
tmp = dict()
for i, k in enumerate(["night", "expid", "spec", "nside",
"pixel", "ntargets", "state"]):
tmp[k] = entry[i]
res.append(tmp)
return res
Expand Down