Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

first pass of parallelization #416

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ tests/integration/django/dill/garden-db.sqlite
tests/integration/django/grocery/grocery-db.sqlite
.tox
./_public
.idea
150 changes: 148 additions & 2 deletions lettuce/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,20 @@
import os
import sys
import traceback

import multiprocessing


try:
from imp import reload
except ImportError:
# python 2.5 fallback
pass

import random
import itertools

from lettuce.core import Feature, TotalResult
from lettuce.core import Feature, TotalResult, FeatureResult

from lettuce.terrain import after
from lettuce.terrain import before
Expand Down Expand Up @@ -97,7 +102,6 @@ def __init__(self, base_path, scenarios=None, verbosity=0, random=False,
""" lettuce.Runner will try to find a terrain.py file and
import it from within `base_path`
"""

self.tags = tags
self.single_feature = None

Expand Down Expand Up @@ -197,3 +201,145 @@ def run(self):
raise SystemExit(2)

return total


def grouper(n, iterable):
# http://stackoverflow.com/a/1625013/192791
# args = [iter(iterable)] * n
# return ([e for e in t if e != None] for t in itertools.izip_longest(*args))
# http://www.garyrobinson.net/2008/04/splitting-a-pyt.html
return [iterable[i::n] for i in range(n)]



class ParallelRunner(Runner):

def __init__(self, base_path, scenarios=None, verbosity=0, random=False,
enable_xunit=False, xunit_filename=None,
enable_subunit=False, subunit_filename=None,
tags=None, failfast=False, auto_pdb=False,
smtp_queue=None,workers=None):

super(ParallelRunner, self).__init__( base_path,
scenarios=scenarios,
verbosity=verbosity,
random=random,
enable_xunit=enable_xunit,
xunit_filename=xunit_filename,
enable_subunit=enable_subunit,
subunit_filename=subunit_filename,
failfast=failfast,
auto_pdb=auto_pdb,
tags=tags)
self.workers = workers


def run(self):
""" Find and load step definitions, and them find and load
features under `base_path` specified on constructor
"""
try:
print "look at me!"
self.loader.find_and_load_step_definitions()
except StepLoadingError, e:
print "Error loading step definitions:\n", e
return

if self.single_feature:
features_files = [self.single_feature]
else:
features_files = self.loader.find_feature_files()
if self.random:
random.shuffle(features_files)

if not features_files:
self.output.print_no_features_found(self.loader.base_dir)
return

failed = False
scenarios_to_run = []
try:

for filename in features_files:
feature = Feature.from_file(filename)
feature_scenarios_to_run = feature.scenarios_to_run(self.scenarios,self.tags)
scenarios_to_run.extend(feature_scenarios_to_run)
except exceptions.LettuceSyntaxError, e:
sys.stderr.write(e.msg)
failed = True

batches = grouper(self.workers, scenarios_to_run)

import pdb; pdb.set_trace()

call_hook('before', 'all')

ignore_case = True

manager = multiprocessing.Manager()
errors = manager.list()
results = manager.list()

def process_batch(batch,port_number,results,errors):
print "running batch with port number: {}".format(port_number)
world.port_number = port_number

call_hook('before','batch')

try:
for scenario in batch:
results.append(scenario.run(ignore_case, failfast=self.failfast))
except Exception as e:
if not self.failfast:
e = sys.exc_info()[1]
print "Died with %s" % str(e)
traceback.print_exc()
errors.append(e)
else:
print
print ("Lettuce aborted running any more tests "
"because was called with the `--failfast` option")

failed = True

call_hook('after','batch')

processes = []
i = 0
for batch in batches:
i = i + 1
port_number = 8180 + i
process = multiprocessing.Process(target=process_batch,args=(batch,port_number,results,errors))
processes.append(process)
process.start()

for process in processes:
process.join()

if len(errors) > 0:
print "Exceptions"
for error in errors:
print error
else:
print "Test suite had no errors"

feature_results = []

for feature, scenario_results in itertools.groupby(results, lambda r: r[0].scenario.feature):
all_results = []
for results in scenario_results:
for result in results:
all_results.append(result)

feature_results.append(FeatureResult(feature, *list(all_results)))

total = TotalResult(feature_results)

call_hook('after', 'all', total)

if failed:
raise SystemExit(2)

return total


48 changes: 35 additions & 13 deletions lettuce/bin.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ def main(args=sys.argv[1:]):
default=False,
help="Run scenarios in a more random order to avoid interference")

parser.add_option("-p", "--parallel",
dest="parallel",
default=None,
help="Run scenarios in a parallel fashion")

parser.add_option("--with-xunit",
dest="enable_xunit",
action="store_true",
Expand Down Expand Up @@ -103,19 +108,36 @@ def main(args=sys.argv[1:]):
if options.tags:
tags = [tag.strip('@') for tag in options.tags]

runner = lettuce.Runner(
base_path,
scenarios=options.scenarios,
verbosity=options.verbosity,
random=options.random,
enable_xunit=options.enable_xunit,
xunit_filename=options.xunit_file,
enable_subunit=options.enable_subunit,
subunit_filename=options.subunit_filename,
failfast=options.failfast,
auto_pdb=options.auto_pdb,
tags=tags,
)
if options.parallel:
print "running Parallel Runner with {} workers".format(options.parallel)
runner = lettuce.ParallelRunner(
base_path,
scenarios=options.scenarios,
verbosity=options.verbosity,
random=options.random,
enable_xunit=options.enable_xunit,
xunit_filename=options.xunit_file,
enable_subunit=options.enable_subunit,
subunit_filename=options.subunit_filename,
failfast=options.failfast,
auto_pdb=options.auto_pdb,
tags=tags,
workers=int(options.parallel)
)
else:
runner = lettuce.Runner(
base_path,
scenarios=options.scenarios,
verbosity=options.verbosity,
random=options.random,
enable_xunit=options.enable_xunit,
xunit_filename=options.xunit_file,
enable_subunit=options.enable_subunit,
subunit_filename=options.subunit_filename,
failfast=options.failfast,
auto_pdb=options.auto_pdb,
tags=tags
)

result = runner.run()
failed = result is None or result.steps != result.steps_passed
Expand Down
23 changes: 15 additions & 8 deletions lettuce/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1170,12 +1170,7 @@ def _parse_remaining_lines(self, lines, original_string, with_file=None):

return background, scenarios, description

def run(self, scenarios=None, ignore_case=True, tags=None, random=False, failfast=False):
scenarios_ran = []

if random:
shuffle(self.scenarios)

def scenarios_to_run(self,scenarios,tags):
scenario_nums_to_run = None
if isinstance(scenarios, (tuple, list)):
if all(map(lambda x: isinstance(x, int), scenarios)):
Expand All @@ -1184,8 +1179,19 @@ def run(self, scenarios=None, ignore_case=True, tags=None, random=False, failfas
def should_run_scenario(num, scenario):
return scenario.matches_tags(tags) and \
(scenario_nums_to_run is None or num in scenario_nums_to_run)
scenarios_to_run = [scenario for num, scenario in enumerate(self.scenarios, start=1)
if should_run_scenario(num, scenario)]
return [scenario for num, scenario in enumerate(self.scenarios, start=1)
if should_run_scenario(num, scenario)]



def run(self, scenarios=None, ignore_case=True, tags=None, random=False, failfast=False):
scenarios_ran = []

if random:
shuffle(self.scenarios)

scenarios_to_run = self.scenarios_to_run(self.scenarios,tags)

# If no scenarios in this feature will run, don't run the feature hooks.
if not scenarios_to_run:
return FeatureResult(self)
Expand Down Expand Up @@ -1247,6 +1253,7 @@ def __init__(self, feature_results):
self.failed_scenario_locations = []
for feature_result in self.feature_results:
for scenario_result in feature_result.scenario_results:

self.scenario_results.append(scenario_result)
self.steps_passed += len(scenario_result.steps_passed)
self.steps_failed += len(scenario_result.steps_failed)
Expand Down
4 changes: 4 additions & 0 deletions lettuce/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ def _is_func_or_method(self, func):
'before': [],
'after': [],
},
'batch': {
'before': [],
'after': [],
},
'step': {
'before_each': [],
'after_each': [],
Expand Down
1 change: 1 addition & 0 deletions lettuce/terrain.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def method(self, fn):
('each_background', 'background', '%(0)s_each'),
('each_feature', 'feature', '%(0)s_each'),
('harvest', 'harvest', '%(0)s'),
('batch', 'batch', '%(0)s'),
('each_app', 'app', '%(0)s_each'),
('runserver', 'runserver', '%(0)s'),
('handle_request', 'handle_request', '%(0)s'),
Expand Down