Metadata | |
---|---|
cEP | 2 |
Version | 1.1.3 |
Title | Next gen bear design |
Authors | Udayan Tandon mailto:[email protected] |
Status | Implementation Due |
Type | Process |
This document describes how the next generation bears will be designed and implemented.
This cEP describes a new implementation of bears in coala which should make them simpler and make their design more intuitive and better.
Currently the bear design is clunky and bad. The current design makes global bears dependant on local bears through its queuing mechanism. It uses communication signals over a queue to indicate that local bears are done with their execution and global bears can begin. The queueing mechanism doesn't work very well as it sometimes has synchronization problems. Also if local bears fail to run it stalls the global bears. Also we have a permanent logger thread. An extra logger thread consumes time and resources of the system which we can avoid.
The new proposal aims to eliminate the use of the queuing control signals utilising asynchronous I/O and process pool to generate results from the bears. No longer will the global bears be dependent on local bears to finish. The logging will also be handled by the main thread itself instead of utilising an extra thread.
Here is the detailed implementation stepwise:
- A
ProcessPoolExecutor
is created with the maximum number of processes your CPU can run in parallel. - We proceed with getting an asynchronous event loop. This asynchronous event
loop creates
Concurrent.Futures
for tasks and schedules them to the process pool. - Each task is given a callback when it has a result ready. This callback is responsible for processing the result accordingly.
- The
Bear
class will get two new methods.execute_task()
andgenerate_tasks()
.- The
generate_tasks()
is resposible for generating the correct arguments for the upcoming tasks. - The
execute_task()
method is scheduled by the event loop to the process pool using the arguments from thegenerate_tasks()
method.
- The
- Each bear will generate the following type of tasks:
- Global bears fire tasks that process all files.
- Local bears fire tasks for each individual file.
- A bear that has dependencies will only be scheduled to the process pool
once all its dependencies have been resolved. It's done in the following way:
- It's checked if the bear has some dependencies.
- If they haven't been resolved the bear is not scheduled.
- As soon as the dependencies are resolved for a bear it is removed from
the dependency dictionary and scheduled to the
ProcessPoolExecutor
.
- A special purpose
Dependencies
class will be introduced to do the above mentioned task. This will be responsible for keeping track of dependencies. Whenever a bear completes it will be removed from the dependency class. - When all the tasks are done the event loop is stopped and closed.
- You would not need to change any bears because coala will handle the
backwards compatibility. There will be new versions of
LocalBear
andGlobalBear
that will be calledFileWiseBear
andProjectWideBear
respectively.
With the new bear design we are also moving to a new wrapper object to wrap every file. The file proxy is an object that contains properties about the file currently processed (such as the filename). It contains:
- The filename, this is the starting point for every other attribute.
- The content of the file as a string.
- The content of the file, line-splitted.
Note: Other properties can be included later as and when required.
A prototype of the file proxy:
from coala_utils.decorators import generate_eq
@generate_eq("filename")
class FileProxy:
def __init__(self, filename):
self.filename = filename
with open(self.filename) as filehandle:
self.content = filehandle.read()
self.lines = self.content.splitlines(True)
def __iter__(self):
return iter(self.lines)
def __hash__(self):
return hash(self.filename)
def __str__(self):
return self.content
A new class will be responsible for keeping track of the dependencies of each bear and it will work in the following manner:
- The list of all the bears is passed to the class.
- It takes this list and runs a method
check_circular_dependency
on it to check if any bears in the list have circular dependencies. If they do then we stop execution. - Each bear will add its dependencies to this tracker class.
- Whenever a bear completes its execution the
resolve()
method of the new class will be called with this bear as an argument. - This would mark this bear as complete in the dependency tracker.
- Once the marking is complete the tracker returns all bears which have no dependencies remaining as pending, so that their tasks can be scheduled to the process pool.
Here is a prototype:
from collections import defaultdict
class CircularDependencyError(Exception):
def __init__(self, bears):
"""
Creates the CircularDependencyError with a helpful message about the
dependency.
"""
bear_names = (bear.name for bear in bears)
super(CircularDependencyError, self).__init__(
"Circular dependency detected: " + " -> ".join(bear_names))
class Dependencies:
def __init__(self):
self.dependency_dict = defaultdict(list)
self.dependency_set = set()
def _add_dependency(self, bear_instance, dependency):
self.dependency_dict[bear_instance].append(dependency)
self.dependency_set.add(dependency)
def add_bear_dependencies(self, bears):
"""
Take the full bear list and check it for circular dependencies. Then
continue to add the dependencies of the bears to the class to be
resolved later.
:param bears: List of all bears.
"""
self.check_circular_dependency(bears)
for bear_instance in bears:
for bear in bear_instance.BEAR_DEPS:
self._add_dependency(bear_instance, bear)
@staticmethod
def check_circular_dependency(bears, resolved_bears=[], seen=[]):
for bear in bears:
if bear in resolved_bears:
continue
missing = bear.missing_dependencies(resolved_bears)
if not missing:
resolved_bears.append(bear)
continue
if bear in seen:
seen.append(bear)
raise CircularDependencyError(seen)
resolved_bears = self.check_circular_dependency(
missing, resolved_bears, seen + [bear])
resolved_bears.append(bear)
seen.remove(bear) # Already resolved, no candidate for circular dep
return resolved_bears
def resolve(self, bear_instance):
"""
When a bear completes this method is called with the instance of that
bear. The method deletes this bear from the list of dependencies of
each bear in the dependency dictionary. It returns the bears which
have all of its dependencies resolved.
:param bear_instance: The instance of the bear.
:return: Returns a list of bears whose dependencies were
all resolved and are ready to be scheduled.
"""
return_bears = []
bear_type = type(bear_instance)
if bear_type in self.dependency_set:
for bear in self.dependency_dict:
if bear_type in self.dependency_dict[bear]:
self.dependency_dict[bear].remove(bear_type)
if not self.dependency_dict[bear]:
return_bears.append(bear)
for bear in return_bears:
del self.dependency_dict[bear]
return return_bears
A rough look at how the new LocalBear
class implementation will look:
from coalib.bears.Bear import Bear
class FileWiseBear(Bear):
def __init__(self, section, file_set):
"""
Constructs a new bear.
:param section: The section object where bear settings are
contained.
:param file_set: A set of FileProxy objects.
"""
self.section = section
self.file_set = file_set
# May raise RuntimeError so bear doesn't get executed on invalid params
self.kwargs = get_kwargs_for_function(self.analyze, section)
def execute_task(self, task, kwargs):
"""
This method is responsible for getting results from the
analysis routines and populating the result queue.
"""
return list(self.analyze(task, **kwargs))
def analyze(self,
file,
*args,
dependency_results=None,
**kwargs):
"""
Handles the given file.
:param file_proxy: Object containing filename and contents.
:return: An iterable of Result.
"""
raise NotImplementedError("This function has to be implemented for a "
"runnable bear.")
def generate_tasks(self):
"""
This method is responsible for providing the files.
"""
return ((file, self.kwargs) for file in self.file_set)
A test bear:
from coalib.results.Result import Result
class TestBear(FileWiseBear):
def analyze(self, file):
"""
Handles the given file.
:param file_proxy: Object containing filename and contents.
:return: An iterable of Result.
"""
for i, line in enumerate(file):
# This will contain the analysis routine
yield Result.from_values(origin=self,
message="Line has a problem",
file=file.name,
line=i)
A rough idea of how the new bear will be called:
import asyncio
import concurrent.futures
import logging
import sys
from coalib.collecting.Collectors import collect_files
from Fileproxy import FileProxy
from TestBear import TestBear
from AnotherBear import AnotherBear
from Dependencies import Dependencies
from DependentBear import DependentBear
from AnotherDependentBear import AnotherDependentBear
import functools
import multiprocessing
def get_cpu_count():
try:
return multiprocessing.cpu_count()
# cpu_count is not implemented for some CPU architectures/OSes
except NotImplementedError: # pragma: no cover
return 2
def schedule_bears(bears,
dep_resolver,
event_loop,
blocking_tasks,
executor):
"""
Schedules the tasks of bears to the process pool and run the event loop to
complete those tasks.
:param bears: A list of bears to be scheduled onto the process
pool.
:param dep_resolver: The object that keeps track of dependencies.
:param event_loop: The asyncio event loop.
:param blocking_tasks: Dictionary that keeps track of the remaining tasks
of each bear.
:param executor: The process pool to which the bear tasks are
scheduled.
"""
for bear in bears:
if bear not in dep_resolver.dependency_dict:
blocking_tasks[bear] = [
event_loop.run_in_executor(executor,
bear.execute_task,
task[0],
task[1])
for task in bear.generate_tasks()]
for task in blocking_tasks[bear]:
task.add_done_callback(functools.partial(finish_task,
bear,
dep_resolver,
blocking_tasks,
event_loop,
executor))
def finish_task(bear,
dep_resolver,
blocking_tasks,
event_loop,
executor,
task):
"""
The callback for when a task of a bear completes. It is responsible for
checking if the bear completed its execution and the handling of the
result generated by the task.
:param bear: The bear that the task belongs to.
:param dep_resolver: The object that keeps track of dependencies.
:param blocking_tasks: Dictionary that keeps track of the remaining tasks
of each bear.
:param event_loop: The asyncio event loop.
:param executor: The process pool to which the bear tasks are
scheduled.
:param task: The task that completed.
"""
log = logging.getLogger('Consumer task')
log.info(bear)
for result in task.result():
log.info(result)
blocking_tasks[bear].remove(task)
if not blocking_tasks[bear]:
resolved_bears = dep_resolver.resolve(bear)
if resolved_bears:
schedule_bears(resolved_bears,
dep_resolver,
event_loop,
blocking_tasks,
executor)
del blocking_tasks[bear]
if not blocking_tasks:
event_loop.stop()
def main():
# Configure logging to show the name of the thread where the log
# message originates.
logging.basicConfig(
level=logging.INFO,
format='%(process)5s %(name)18s: %(message)s',
stream=sys.stderr,
)
# Create a limited thread pool.
executor = concurrent.futures.ProcessPoolExecutor(
max_workers=get_cpu_count(),
)
event_loop = asyncio.get_event_loop()
file_list = collect_files(["~/Documents/gsoc/coala/*.py",
"~/Documents/gsoc/coala/coalib/*.py",
"~/Documents/gsoc/coala/coalib/**/*.py"], None)
fileproxies = [FileProxy(file) for file in file_list]
bears = [TestBear(fileproxies),
AnotherBear(fileproxies),
AnotherDependentBear(fileproxies),
DependentBear(fileproxies)]
blocking_tasks = {}
dep_resolver = Dependencies()
dep_resolver.add_bear_dependencies(bears)
schedule_bears(bears,
dep_resolver,
event_loop,
blocking_tasks,
executor)
try:
event_loop.run_forever()
finally:
event_loop.close()