# built-in
from collections import defaultdict
from pathlib import Path
from typing import Any, Dict, List, NamedTuple, Optional, Tuple
# external
from flake8.checker import FileChecker, Manager
from flake8.utils import filenames_from, fnmatch
# app
from .._logic import (
Snapshot, check_include, get_exceptions, get_plugin_name,
get_plugin_rules, make_baseline, prepare_cache,
)
from ._processor import FlakeHeavenProcessor
DEFAULT_PLUGIN = 'pycodestyle'
[docs]class Result(NamedTuple):
plugin_name: str
error_code: str
line_number: int
column: int
text: str
line: str
[docs]def is_relative_to(path: Path, maybe_parent: Path):
try:
path.relative_to(maybe_parent)
except ValueError:
return False
else:
return True
[docs]class FlakeHeavenCheckersManager(Manager):
"""
Patched flake8.checker.Manager to provide `plugins` support
"""
def __init__(self, baseline: Optional[str], **kwargs):
self.baseline = set()
self.relative = kwargs.pop('relative', False)
if baseline:
with open(baseline) as stream:
self.baseline = {line.strip() for line in stream}
self.root_path = Path(baseline).resolve().parent
super().__init__(**kwargs)
[docs] def make_checkers(self, paths: List[str] = None) -> None:
"""
Reloaded checkers generator to provide one checker per file per rule.
Original `make_checkers` provides checker per file with all rules mixed.
It makes difficult to filter checks by codes after all.
"""
if paths is None:
paths = self.arguments
if not paths:
paths = ['.']
prepare_cache()
# `checkers` is list of checks to run (and then cache)
# check is a combination of plugin and file.
self.checkers = []
# `snapshots` is the list of checks that have cache and should not be run
self.snapshots = []
for argument in paths:
for filename in filenames_from(argument, self.is_path_excluded):
# get checks list
selected_checks: Dict[str, List[Dict[str, Any]]]
selected_checks = dict(
ast_plugins=[],
logical_line_plugins=[],
physical_line_plugins=[],
)
has_checks = False
for check_type, checks in self.checks.to_dictionary().items():
for check in checks:
should_process = self._should_process(
argument=argument,
filename=filename,
check_type=check_type,
check=check,
)
if not should_process:
continue
selected_checks[check_type].append(check)
has_checks = True
# Create checker with selected checks
if not has_checks:
continue
checker = FlakeHeavenFileChecker(
filename=filename,
checks=selected_checks,
options=self.options,
)
# ignore files with top-level `# flake8: noqa`
if not checker.should_process:
continue
checker.snapshot = Snapshot.create(
checker=checker,
options=self.options,
)
if checker.snapshot.exists():
self.snapshots.append(checker)
continue
self.checkers.append(checker)
[docs] def _should_process(
self, argument: str, filename: str, check_type: str, check: Dict[str, Any],
) -> bool:
# do not run plugins without rules specified
plugin_name = get_plugin_name(check)
rules = self._get_rules(plugin_name=plugin_name, filename=filename)
if not rules or set(rules) == {'-*'}:
return False
if filename == '-':
return True
if fnmatch(filename=filename, patterns=self.options.filename):
return True
if self.options._running_from_vcs:
return False
if self.options.diff:
return False
return argument == filename
[docs] def _get_rules(self, plugin_name: str, filename: str) -> List[str]:
rules = get_plugin_rules(
plugin_name=plugin_name,
plugins=self.options.plugins,
)
exceptions = get_exceptions(
path=filename,
exceptions=self.options.exceptions,
)
if exceptions:
exception_rules = get_plugin_rules(
plugin_name=plugin_name,
plugins=exceptions,
)
if '-*' in exception_rules:
rules = exception_rules
else:
rules = rules.copy()
rules += exception_rules
return rules
[docs] def is_path_excluded(self, filename: str) -> bool:
"""Patched `is_path_excluded`.
We patch it to exclude files not specified explicitly.
It is helpful when you want to run flakeheaven with `--diff`
and explicitly passed paths at the same time.
Run flakeheaven only on changes in example.py:
git diff | flakeheaven lint --diff example.py
"""
if filename == '-':
filename = self.options.stdin_display_name
if super().is_path_excluded(path=filename):
return True
# skip subpath check below for stdin
arguments = set(self.arguments) - {'.', '-', 'stdin'}
if not arguments:
return False
# include file only if it is a subpath of
# an explicitly specified CLI argument.
path = Path(filename).absolute()
parents = path.parents
for base_filename in self.arguments:
base_path = Path(base_filename).absolute()
if base_path == path:
return False
if base_path in parents:
return False
return True
[docs] def report(self) -> Tuple[int, int]:
"""Reloaded report generation to filter out excluded error codes.
+ use checker.filename as path instead of checker.display_name
+ pass checker into `_handle_results` to get plugin name.
"""
# self.run_serial()
results_reported = results_found = 0
for checker in self.checkers + self.snapshots:
if not checker.results and not checker.snapshot.exists():
continue
# get results either from cache or actual run
if checker.snapshot.exists():
all_results = checker.snapshot.results
else:
all_results = sorted(
checker.results, key=lambda tup: (tup[1], tup[2]))
checker.snapshot.dump(all_results)
# group results by plugin name
grouped_results = defaultdict(list)
for result in all_results:
if type(result) is not Result:
if len(result) == 6:
# cache entry is deserialized into list
result = Result(*result)
else:
# flake8 sets custom error codes in a few places
# where we didn't set `_processed_plugin`
result = Result(DEFAULT_PLUGIN, *result)
grouped_results[result.plugin_name].append(result)
# get filename
filename = checker.filename
if filename is None or filename == '-':
filename = self.options.stdin_display_name or 'stdin'
with self.style_guide.processing_file(filename):
ignored = checker.processor.parser.ignore
for plugin_name, results in sorted(grouped_results.items()):
results_reported += self._handle_results(
filename=filename,
results=results,
plugin_name=plugin_name,
ignored_codes=ignored.get(plugin_name, ()),
)
results_found += len(all_results)
return (results_found, results_reported)
[docs] def _handle_results(
self, filename: str, results: list, plugin_name: str, ignored_codes: Tuple[str, ...],
) -> int:
rules = self._get_rules(plugin_name=plugin_name, filename=filename)
reported_results_count = 0
for result in results:
# Some codes are ignored for a specific parser.
# For example, lack of blank lines for YAML parser.
if result.error_code in ignored_codes:
continue
# skip baselined errors
if self.baseline:
_path = Path(filename)
if self.relative and is_relative_to(_path, self.root_path):
_path = _path.relative_to(self.root_path)
digest = make_baseline(
path=_path,
context=result.line,
code=result.error_code,
line=result.line_number,
)
if digest in self.baseline:
continue
# skip explicitly excluded codes
if not check_include(code=result.error_code, rules=rules):
continue
# report
reported_results_count += self.style_guide.handle_error(
code=result.error_code,
filename=filename,
line_number=result.line_number,
column_number=result.column,
text=result.text,
physical_line=result.line,
plugin=plugin_name,
)
return reported_results_count
[docs]class FlakeHeavenFileChecker(FileChecker):
"""
A little bit patched FileChecker to support `--safe`
"""
snapshot: Snapshot
_processed_plugin: str = DEFAULT_PLUGIN
[docs] def _make_processor(self) -> Optional[FlakeHeavenProcessor]:
try:
return FlakeHeavenProcessor(self.filename, self.options)
except IOError as e:
message = '{0}: {1}'.format(type(e).__name__, e)
self.report('E902', 0, 0, message)
return None
[docs] def run_checks(self) -> Tuple[str, List[Result], Dict[str, Any]]:
if not self.processor:
return self.filename, self.results, self.statistics
if not self.processor.lines:
return self.filename, self.results, self.statistics
try:
return super().run_checks()
except Exception as exc:
if self.options.safe:
message = '{0}: {1}'.format(type(exc).__name__, exc)
self.report('E902', 0, 0, message)
else:
raise
return self.filename, self.results, self.statistics
[docs] def run_check(self, plugin: Dict[str, Any], **arguments):
self._processed_plugin = get_plugin_name(plugin)
return super().run_check(plugin=plugin, **arguments)
[docs] def report(self, error_code: Optional[str], line_number: int, column: int, text: str) -> str:
"""
Copy-pasted `report` method to store `Result` in `self.results` instead of tuple
and provide `plugin_name`.
"""
if error_code is None:
error_code, text = text.split(' ', 1)
# If we're recovering from a problem in _make_processor, we will not
# have this attribute.
if hasattr(self, 'processor'):
line = self.processor.noqa_line_for(line_number)
else:
line = None
self.results.append(Result(
plugin_name=self._processed_plugin,
error_code=error_code,
line_number=line_number,
column=column,
text=text,
line=line,
))
return error_code