Skip to content

Commit

Permalink
adds loopy scheduler v2
Browse files Browse the repository at this point in the history
  • Loading branch information
kaushikcfd committed Sep 28, 2021
1 parent dc36a4a commit 3d6839e
Showing 1 changed file with 197 additions and 31 deletions.
228 changes: 197 additions & 31 deletions loopy/schedule/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -810,8 +810,161 @@ def is_similar_to_template(insn):

# {{{ scheduling algorithm

def generate_loop_schedules_internal(
sched_state, debug=None):
def _get_outermost_diverging_inames(tree, within1, within2):
"""
For loop nestings *within1* and *within2*, returns the first inames at which
the loops nests diverge in the loop nesting tree *tree*.
:arg tree: A :class:`loopy.tools.Tree` of inames, denoting a loop nesting.
:arg within1: A :class:`frozenset` of inames.
:arg within2: A :class:`frozenset` of inames.
"""
common_ancestors = (within1 & within2) | {""}

innermost_parent = max(common_ancestors,
key=lambda k: tree.depth(k))
iname1, = tree.children(innermost_parent) & within1
iname2, = tree.children(innermost_parent) & within2

return iname1, iname2


class V2SchedulerNotImplementedException(RuntimeError):
pass


def generate_loop_schedules_v2(kernel):
from loopy.schedule.tools import get_loop_nest_tree
from functools import reduce
from pytools.graph import compute_topological_order
from loopy.kernel.data import ConcurrentTag, IlpBaseTag, VectorizeTag

concurrent_inames = {iname for iname in kernel.all_inames()
if kernel.iname_tags_of_type(iname, ConcurrentTag)}
ilp_inames = {iname for iname in kernel.all_inames()
if kernel.iname_tags_of_type(iname, IlpBaseTag)}
vec_inames = {iname for iname in kernel.all_inames()
if kernel.iname_tags_of_type(iname, VectorizeTag)}
parallel_inames = (concurrent_inames - ilp_inames - vec_inames)

# {{{ can v2 scheduler handle??

if any(len(insn.conflicts_with_groups) != 0 for insn in kernel.instructions):
raise V2SchedulerNotImplementedException("v2 scheduler cannot schedule"
" kernels with instruction having conflicts with groups.")

if any(insn.priority != 0 for insn in kernel.instructions):
raise V2SchedulerNotImplementedException("v2 scheduler cannot schedule"
" kernels with instruction priorities set.")

if kernel.schedule is not None:
# cannnot handle preschedule yet
raise V2SchedulerNotImplementedException("v2 scheduler cannot schedule"
" prescheduled kernels.")

if ilp_inames or vec_inames:
raise V2SchedulerNotImplementedException("v2 scheduler cannot schedule"
" loops tagged with 'ilp'/'vec' as they are not guaranteed to"
" be single entry loops.")

# }}}

loop_nest_tree = get_loop_nest_tree(kernel)

# loop_inames: inames that are realized as loops. Concurrent inames aren't
# realized as a loop in the generated code for a loopy.TargetBase.
loop_inames = (reduce(frozenset.union, (insn.within_inames
for insn in kernel.instructions),
frozenset())
- parallel_inames)

# The idea here is to build a DAG, where nodes are schedule items and if
# there exists an edge from schedule item A to schedule item B in the DAG =>
# B *must* come after A in the linearized result.

dag = {}

# LeaveLoop(i) *must* follow EnterLoop(i)
dag.update({EnterLoop(iname=iname): frozenset({LeaveLoop(iname=iname)})
for iname in loop_inames})
dag.update({LeaveLoop(iname=iname): frozenset()
for iname in loop_inames})
dag.update({RunInstruction(insn_id=insn.id): frozenset()
for insn in kernel.instructions})

# {{{ add constraints imposed by the loop nesting

for outer_loop in loop_nest_tree.nodes():
if outer_loop == "":
continue

for child in loop_nest_tree.children(outer_loop):
inner_loop = child
dag[EnterLoop(iname=outer_loop)] |= {EnterLoop(iname=inner_loop)}
dag[LeaveLoop(iname=inner_loop)] |= {LeaveLoop(iname=outer_loop)}

# }}}

# {{{ add deps. b/w schedule items coming from insn. depepdencies

for insn in kernel.instructions:
insn_loop_inames = insn.within_inames & loop_inames
for dep_id in insn.depends_on:
dep = kernel.id_to_insn[dep_id]
dep_loop_inames = dep.within_inames & loop_inames
# Enforce instruction dep:
dag[RunInstruction(insn_id=dep_id)] |= {RunInstruction(insn_id=insn.id)}

# {{{ register deps on loop entry/leave because of insn. deps

if dep_loop_inames < insn_loop_inames:
for iname in insn_loop_inames - dep_loop_inames:
dag[RunInstruction(insn_id=dep.id)] |= {EnterLoop(iname=iname)}
elif insn_loop_inames < dep_loop_inames:
for iname in dep_loop_inames - insn_loop_inames:
dag[LeaveLoop(iname=iname)] |= {RunInstruction(insn_id=insn.id)}
elif dep_loop_inames != insn_loop_inames:
insn_iname, dep_iname = _get_outermost_diverging_inames(
loop_nest_tree, insn_loop_inames, dep_loop_inames)
dag[LeaveLoop(iname=dep_iname)] |= {EnterLoop(iname=insn_iname)}
else:
pass

# }}}

for iname in insn_loop_inames:
# For an insn within a loop nest 'i'
# for i
# insn
# end i
# 'insn' *must* come b/w 'for i' and 'end i'
dag[EnterLoop(iname=iname)] |= {RunInstruction(insn_id=insn.id)}
dag[RunInstruction(insn_id=insn.id)] |= {LeaveLoop(iname=iname)}

# }}}

def iname_key(iname):
all_ancestors = sorted(loop_nest_tree.ancestors(iname),
key=lambda x: loop_nest_tree.depth(x))
return ",".join(all_ancestors+[iname])

def key(x):
if isinstance(x, RunInstruction):
iname = max((kernel.id_to_insn[x.insn_id].within_inames & loop_inames),
key=lambda k: loop_nest_tree.depth(k),
default="")
result = (iname_key(iname), x.insn_id)
elif isinstance(x, (EnterLoop, LeaveLoop)):
result = (iname_key(x.iname),)
else:
raise NotImplementedError

return result

return compute_topological_order(dag, key=key)


def generate_loop_schedules_internal(sched_state, debug=None):
# allow_insn is set to False initially and after entering each loop
# to give loops containing high-priority instructions a chance.
kernel = sched_state.kernel
Expand Down Expand Up @@ -1954,6 +2107,39 @@ def generate_loop_schedules(kernel, callables_table, debug_args=None):
callables_table, debug_args=debug_args)


def postprocess_schedule(kernel, callables_table, gen_sched):
from loopy.kernel import KernelState
gen_sched = convert_barrier_instructions_to_barriers(
kernel, gen_sched)

gsize, lsize = kernel.get_grid_size_upper_bounds(callables_table,
return_dict=True)

if (gsize or lsize):
if not kernel.options.disable_global_barriers:
logger.debug("%s: barrier insertion: global" % kernel.name)
gen_sched = insert_barriers(kernel, gen_sched,
synchronization_kind="global", verify_only=True)

logger.debug("%s: barrier insertion: local" % kernel.name)
gen_sched = insert_barriers(kernel, gen_sched,
synchronization_kind="local", verify_only=False)
logger.debug("%s: barrier insertion: done" % kernel.name)

new_kernel = kernel.copy(
schedule=gen_sched,
state=KernelState.LINEARIZED)

from loopy.schedule.device_mapping import \
map_schedule_onto_host_or_device
if kernel.state != KernelState.LINEARIZED:
# Device mapper only gets run once.
new_kernel = map_schedule_onto_host_or_device(new_kernel)

from loopy.schedule.tools import add_extra_args_to_schedule
return add_extra_args_to_schedule(new_kernel)


def generate_loop_schedules_inner(kernel, callables_table, debug_args=None):
if debug_args is None:
debug_args = {}
Expand All @@ -1963,6 +2149,14 @@ def generate_loop_schedules_inner(kernel, callables_table, debug_args=None):
raise LoopyError("cannot schedule a kernel that has not been "
"preprocessed")

try:
gen_sched = generate_loop_schedules_v2(kernel)
yield postprocess_schedule(kernel, callables_table, gen_sched)
return
except V2SchedulerNotImplementedException as e:
from warnings import warn
warn(f"Falling back to a slow scheduler implementation due to: {e}")

schedule_count = 0

debug = ScheduleDebugger(**debug_args)
Expand Down Expand Up @@ -2072,35 +2266,7 @@ def print_longest_dead_end():
sched_state, debug=debug, **schedule_gen_kwargs):
debug.stop()

gen_sched = convert_barrier_instructions_to_barriers(
kernel, gen_sched)

gsize, lsize = kernel.get_grid_size_upper_bounds(callables_table,
return_dict=True)

if (gsize or lsize):
if not kernel.options.disable_global_barriers:
logger.debug("%s: barrier insertion: global" % kernel.name)
gen_sched = insert_barriers(kernel, gen_sched,
synchronization_kind="global", verify_only=True)

logger.debug("%s: barrier insertion: local" % kernel.name)
gen_sched = insert_barriers(kernel, gen_sched,
synchronization_kind="local", verify_only=False)
logger.debug("%s: barrier insertion: done" % kernel.name)

new_kernel = kernel.copy(
linearization=gen_sched,
state=KernelState.LINEARIZED)

from loopy.schedule.device_mapping import \
map_schedule_onto_host_or_device
if kernel.state != KernelState.LINEARIZED:
# Device mapper only gets run once.
new_kernel = map_schedule_onto_host_or_device(new_kernel)

from loopy.schedule.tools import add_extra_args_to_schedule
new_kernel = add_extra_args_to_schedule(new_kernel)
new_kernel = postprocess_schedule(kernel, callables_table, gen_sched)
yield new_kernel

debug.start()
Expand Down

0 comments on commit 3d6839e

Please sign in to comment.