author David Douard <david.douard@logilab.fr>
Sun, 26 Oct 2014 22:46:44 +0100
changeset 1768 48066736c862
parent 1766 42e60f760703
child 1715 1c5da41171e2
permissions -rw-r--r--
[recipe] make recipe names a bit more consistant

"""checkers for python source files

:organization: Logilab
:copyright: 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
:license: General Public License version 2 - http://www.gnu.org/licenses
from __future__ import with_statement

__docformat__ = "restructuredtext en"

import os
import sys
import re
from commands import getoutput
from os.path import join, exists, basename
from test import test_support
from warnings import warn
from lxml import etree

from logilab.common.testlib import find_tests, DEFAULT_PREFIXES
from logilab.common.modutils import get_module_files
from logilab.common.decorators import cached
from logilab.common.proc import RESOURCE_LIMIT_EXCEPTION

    import coverage
    if coverage.__version__ < "3":
        raise ImportError
    COVERAGE_CMD = "/usr/bin/python-coverage" # XXX
except ImportError:
    coverage = None

from apycotlib import register, INSTALL_PREFIX, pyversions
from apycotlib import SimpleOutputParser, ParsedCommand

from checkers.apycot import BaseChecker, AbstractFilteredFileChecker

def pyinstall_path(test):
    path = _pyinstall_path(test)
    if not exists(path):
        raise Exception('path %s doesn\'t exist' %  path)
    return path

def _pyinstall_path(test):
    """return the project's installation path"""
    config = test.apycot_config()
    if 'install_path' in config:
        return config['install_path']
    modname = config.get('python_modname')
    project_path = config['project_path']
    if not modname and exists(join(project_path, '__pkginfo__.py')):
        from logilab.devtools.lib.pkginfo import PackageInfo
        pkginfo = PackageInfo(directory=project_path)
        modname = pkginfo.modname
        distname = pkginfo.distname or modname
        package = pkginfo.subpackage_of
        if modname and package:
            modname = '%s.%s' % (package, modname)
        elif distname.startswith('cubicweb-'):
            cubespdir = join(os.environ['APYCOT_ROOT'], 'local', 'share', 'cubicweb')
            pypath = cubespdir + os.pathsep + os.environ.get('PYTHONPATH', '')
            test.update_env(test.tconfig['name'], 'PYTHONPATH', pypath, os.pathsep)
            return join(cubespdir, 'cubes', modname)
    if modname:
            path = join(INSTALL_PREFIX[project_path], *modname.split('.'))
        except KeyError:
            cfg = test.apycot_config()
            if cfg.get('subpath'):
                path = join(path, cfg['subpath'])
            return path
    return project_path

class PythonSyntaxChecker(AbstractFilteredFileChecker):
    """check syntax of python file

       Use Pylint to check a score for python package. The check fails if the score is
       inferior to a given threshold.
    id = 'python_syntax'
    checked_extensions = ('.py', )

    def check_file(self, filepath):
        """try to compile the given file to see if it's syntaxicaly correct"""
        # Try to compile it. If compilation fails, then there's a
        # SyntaxError
            compile(open(filepath, 'U').read() + '\n', filepath, "exec")
            return SUCCESS
        except SyntaxError, error:
            self.writer.error(error.msg, path=filepath, line=error.lineno)
            return FAILURE

    def version_info(self):
        self.record_version_info('python', sys.version)

register('checker', PythonSyntaxChecker)

class PyTestParser(SimpleOutputParser):
    status = NODATA
    non_zero_status_code = FAILURE
    # search for following output:
    # 'Ran 42 test cases in 0.07s (0.07s CPU), 3 errors, 31 failures, 3 skipped'
    regex = re.compile(
        r'Ran (?P<total>[0-9]+) test cases '
        'in (?P<time>[0-9]+(.[0-9]+)?)s \((?P<cputime>[0-9]+(.[0-9]+)?)s CPU\)'
        '(, )?'
        '((?P<errors>[0-9]+) errors)?'
        '(, )?'
        '((?P<failures>[0-9]+) failures)?'
        '(, )?'
        '((?P<skipped>[0-9]+) skipped)?'

    total, failures, errors, skipped = 0, 0, 0, 0

    def __init__(self, writer, options=None):
        super(PyTestParser, self).__init__(writer, options)
        self.total    = 0
        self.failures = 0
        self.skipped  = 0
        self.errors   = 0

    def _parse(self, stream):
        self.status = None
        super(PyTestParser, self)._parse(stream)
        if self.errors or self.failures:
        elif self.skipped:
        elif not self.total:
        elif self.total >= 0:

    def success(self):
        return max(0, self.total - sum(( self.failures, self.errors,
    def add_junk(self, line):
        if any(c for c in line if c not in 'EFS. \n\t\r-*'):

    def extract_tests_status(self, values):
        for status in ('failures', 'errors', 'skipped'):
                setattr(self, status,
                        max(getattr(self, status), int(values[status])))
            except TypeError:

    def parse_line(self, line):
        match = self.regex.match(line)
        if match is not None:
            values = match.groupdict()
            total = int(values['total'])
            self.total += total

    'tested_python_versions': {
        'type': 'csv',
        'help': ('comma separated list of python version (such as 2.5) that '
                 'should be considered for testing.'),
    'ignored_python_versions': {
        'type': 'csv',
        'help': ('comma separated list of python version (such as 2.5) that '
                 'should be ignored for testing when '
                 'use_pkginfo_python_versions is set to 1.'),
    'use_pkginfo_python_versions': {
        'type': 'int', 'default': True,
        'help': ('0/1 flag telling if tested python version should be '
                 'determinated according to __pkginfo__.pyversion of the '
                 'tested project. This option is ignored if tested_python_versions '
                 'is set.'),
    'pycoverage': {
        'type': 'int', 'default': False,
        'help': ('Tell if test should be run with pycoverage to gather '
                 'coverage data.'),

class PyTestChecker(BaseChecker):
    """check that unit tests of a python package succeed using the pytest command
    (from logilab.common)
    id = 'pytest'
    parsercls = PyTestParser
    parsed_content = 'stdout'
    options_def = BaseChecker.options_def.copy()
        'pytest_extra_argument': {
            'type': 'csv',
            'help': ('extra argument to give to pytest. Add this option multiple '
                     'times in the correct order to give several arguments.'),

    def __init__(self, writer, options=None):
        BaseChecker.__init__(self, writer, options)
        self.coverage_data = None
        self._path = None
        self.test = None

    def version_info(self):
        if pyversions(self.test):
            self.record_version_info('python', ', '.join(pyversions(self.test)))

    def enable_coverage(self):
        if self.options.get('pycoverage') and coverage:
            self.coverage_data = join(self.project_path, '.coverage')
            # XXX we need the environment variable to be considered by
            # "python-coverage run"
            os.environ['COVERAGE_FILE'] = self.coverage_data
            return True
        return False

    def setup_check(self, test):
        """run the checker against the <project_path> option (usually a directory)"""
        test_support.verbose = 0
        self.test = test
        return SUCCESS

    def do_check(self, test):
        if self.enable_coverage():
            command = ['-c', 'from logilab.common.pytest import run; import sys; sys.argv=["pytest", "--coverage"]; run()']
            command = ['-c', 'from logilab.common.pytest import run; run()']
        extraargs = self.options.get("pytest_extra_argument", [])
        if not isinstance(extraargs, list):
            command += extraargs
        status = SUCCESS
        testresults = {'success': 0, 'failures': 0,
                       'errors': 0, 'skipped': 0}
        total = 0
        for python in pyversions(self.test):
            cmd = self.run_test(command, python)
            for rtype in testresults:
                total += getattr(cmd.parser, rtype)
                testresults[rtype] += getattr(cmd.parser, rtype)
            status = self.merge_status(status, cmd.status)
        self.execution_info(total, testresults)
        return status

    def execution_info(self, total, testresults):
        self.writer.raw('total_test_cases', total, 'result')
        self.writer.raw('succeeded_test_cases', testresults['success'], 'result')
        self.writer.raw('failed_test_cases', testresults['failures'], 'result')
        self.writer.raw('error_test_cases', testresults['errors'], 'result')
        self.writer.raw('skipped_test_cases', testresults['skipped'], 'result')

    def get_command(self, command, python):
        return [python, '-W', 'ignore'] + command

    def run_test(self, command, python='python'):
        """execute the given test file and parse output to detect failed /
        succeed test cases
        if isinstance(command, basestring):
            command = [command]
        command = self.get_command(command, python)
        self.writer.info(' '.join(command), path=basename(command[-1]))
        self.writer.debug(os.environ['PYTHONPATH'], path='PYTHONPATH')
        self.writer.debug(self.project_path, path='CWD')
        cmd = ParsedCommand(self.writer, command,
                            path=self._path, cwd=self.project_path)
        return cmd

register('checker', PyTestChecker)

class PyUnitTestParser(PyTestParser):
    result_regex = re.compile(
        ' \('
        '(, )?'
        '(, )?'

    total_regex = re.compile(
        'Ran (?P<total>[0-9]+) tests?'
        ' in (?P<time>[0-9]+(.[0-9]+)?s)')

    def parse_line(self, line):
        match = self.total_regex.match(line)
        if match is not None:
            self.total = int(match.groupdict()['total'])
        match = self.result_regex.match(line)
        if match is not None:

class PyUnitTestChecker(PyTestChecker):
    """check that unit tests of a python package succeed

    Execute tests found in the "test" or "tests" directory of the package. The
    check succeed if no test cases failed. Note each test module is executed by
    a spawed python interpreter and the output is parsed, so tests should use
    the default text output of the unittest framework, and avoid messages on

    spawn unittest and parse output (expect a standard TextTestRunner)
    id = 'pyunit'
    parsed_content = 'stderr'
    parsercls = PyUnitTestParser
    options_def = PyTestChecker.options_def.copy()
        'test_dirs': {
                'type': 'csv', 'default': ('test', 'tests'),
                'help': ('comma separated list of directories where tests could be '
                         'find. Search in "test" and "tests" by default.'),
        'test_prefixes': {
                'type': 'csv', 'default': DEFAULT_PREFIXES,
                'help': ('comma separated list of directories where tests could be '
                         'find. Defaults to %s.' % ', '.join(DEFAULT_PREFIXES)),

    def do_check(self, test):
        status = SUCCESS
        testdirs = self.options.get("test_dirs")
        basepath = self.project_path
        for testdir in testdirs:
            testdir = join(basepath, testdir)
            if exists(testdir):
                self._path = testdir
                _status = self.run_tests(testdir)
                status = self.merge_status(status, _status)
            self.writer.error('no test directory', path=basepath)
            status = NODATA
        return status

    def run_tests(self, testdir):
        """run a package test suite
        expect to be in the test directory
        tests = find_tests(testdir,
        if not tests:
            self.writer.error('no test found', path=self._path)
            return NODATA
        status = SUCCESS
        testresults = {'success': 0, 'failures': 0,
                       'errors': 0, 'skipped': 0}
        total = 0
        for python in pyversions(self.test):
            for test_file in tests:
                cmd = self.run_test(join(testdir, test_file), python)
                total += cmd.parser.total
                for rtype in testresults:
                    testresults[rtype] += getattr(cmd.parser, rtype)
                if cmd.status == NODATA:
                    self.writer.error('no test found', path=test_file)
                status = self.merge_status(status, cmd.status)
        self.execution_info(total, testresults)
        return status

    def get_command(self, command, python):
        python = [python, '-W', 'ignore']
        if self.enable_coverage():
            python += [COVERAGE_CMD, 'run', '-a', '--branch',
                       '--source=%s' % pyinstall_path(self.test)]
        return python + command

register('checker', PyUnitTestChecker)

class PyDotTestParser(PyUnitTestParser):
    line_regex = re.compile(
            r'(?P<filename>\w+\.py)(\[(?P<ntests>\d+)\] | - )(?P<results>.*)')

    # XXX overwrite property
    success = 0

    def _parse(self, stream):
        for _, _, _, results in self.line_regex.findall(stream.read()):
            if results == "FAILED TO LOAD MODULE":
                self.errors += 1
                self.success += results.count('.')
                self.total += results.count('.')
                self.failures += results.count('F')
                self.total += results.count('F')
                self.errors += results.count('E')
                self.total += results.count('E')
                self.skipped += results.count('s')
                self.total += results.count('s')
        if self.failures or self.errors:
        elif self.skipped:
        elif not self.success:

class PyDotTestChecker(PyUnitTestChecker):
    """check that py.test based unit tests of a python package succeed

    spawn py.test and parse output (expect a standard TextTestRunner)
    id = 'py.test'
    parsercls = PyDotTestParser
    parsed_content = 'stdout'
    options_def = PyUnitTestChecker.options_def.copy()

    def get_command(self, command, python):
        # XXX coverage
        return ['py.test', '--exec=%s' % python, '--nomagic', '--tb=no'] + command

register('checker', PyDotTestChecker)

class PyLintChecker(BaseChecker):
    """check that the python package as a decent pylint evaluation
    id = 'pylint'
    options_def = BaseChecker.options_def.copy()
        'pylintrc': {
            'help': ('path to a pylint configuration file.'),
        'pylint_threshold': {
            'type': 'int', 'default': 7,
            'help': ('integer between 1 and 10 telling expected pylint note to '
                     'pass this check. Default to 7.'),
        'pylint.show_categories': {
            'type': 'csv', 'default': ['E', 'F'],
            'help': ('comma separated list of pylint message categories to add to '
                     'reports. Default to error (E) and failure (F).'),
        'pylint.additional_builtins': {
            'type': 'csv',
            'help': ('comma separated list of additional builtins to give to '
        'pylint.disable': {
            'type': 'csv',
            'help': ('comma separated list of pylint message id that should be '
        'pylint.ignore': {
            'type': 'csv',
            'help': 'comma separated list of files or directories to ignore',

    def version_info(self):
        self.record_version_info('pylint', pylint_version)

    def do_check(self, test):
        """run the checker against <path> (usually a directory)"""
        # XXX should consider python version
        threshold = self.options.get('pylint.threshold')
        pylintrc_path = self.options.get('pylintrc')
        linter = PyLinter(pylintrc=pylintrc_path)
        # register checkers
        # load configuration
        package_wd_path = self.project_path
        if exists(join(package_wd_path, 'pylintrc')):
            linter.load_file_configuration(join(package_wd_path, 'pylintrc'))
        linter.set_option('persistent', False)
        linter.set_option('reports', 0, action='store')
        linter.quiet = 1
        # set file or dir to ignore
        for option in ('ignore', 'additional_builtins', 'disable'):
            value = self.options.get('pylint.' + option)
            if value is not None:
                linter.global_set_option(option.replace('_', '-'), ','.join(value))
        # message categories to record
        categories = self.options.get('pylint.show_categories')
        linter.set_reporter(MyLintReporter(self.writer, test.tmpdir, categories))
        # run pylint
            note = eval(linter.config.evaluation, {}, linter.stats)
            self.writer.raw('pylint.evaluation', '%.2f' % note, 'result')
        except ZeroDivisionError:
            self.writer.raw('pylint.evaluation', '0', 'result')
            note = 0
        except Exception:
            self.writer.error('Error while processing pylint evaluation',
                              path=self.project_path, tb=True)
            note = 0
        self.writer.raw('statements', '%i' % linter.stats['statement'], 'result')
        if note < threshold:
            return FAILURE
        return SUCCESS

    from pylint import checkers as pycheckers
    from pylint.lint import PyLinter
    from pylint.__pkginfo__ import version as pylint_version
    from pylint.interfaces import IReporter
    from pylint.reporters import BaseReporter
    register('checker', PyLintChecker)

    class MyLintReporter(BaseReporter):
        """a partial pylint writer (implements only the message method, not
        methods necessary to display layouts
        __implements__ = IReporter

        def __init__(self, writer, basepath, categories):
            super(MyLintReporter, self).__init__()
            self.writer = writer
            self.categories = set(categories)
            self._to_remove = len(basepath) + 1 # +1 for the leading "/"

        def add_message(self, msg_id, location, msg):
            """ manage message of different type and in the context of path """
            if not msg_id[0] in self.categories:
            path, line = location[0], location[3]
            path = path[self._to_remove:]
            if msg_id[0] == 'I':
                self.writer.info(msg, path=path, line=line)
            elif msg_id[0]  == 'E':
                self.writer.error(msg, path=path, line=line)
            elif msg_id[0] == 'F':
                self.writer.fatal(msg, path=path, line=line)
            else: # msg_id[0] in ('R', 'C', 'W')
                self.writer.warning(msg, path=path, line=line)

        def display_results(self, layout):
except ImportError, e:
    warn("unable to import pylint. Pylint checker disabled : %s" % e)

class PyCoverageChecker(BaseChecker):
    """retrieve the tests coverage data

    The coverage data are coming from the pyunit checker with the "pycoverage"
    configuration variable

    When devtools is available, test will be launched in a coverage mode. This
    test will gather coverage information, and will succeed if the test coverage
    is superior to a given threshold. *This checker must be executed after the
    python_unittest checker.
    id = 'pycoverage'
    options_def = BaseChecker.options_def.copy()
        'pycoverage_threshold': {
            'type': 'int', 'default': 80,
            'help': ('integer between 1 and 100 telling expected percent coverage '
                     'to pass this check. Default to 80.\n'
                     'PARTIAL returned when cover rate between threshold and threshold / 2.\n'
                     'ERROR returned when cover rate under threshold / 2'),
        'coverage_data': {
            'required': True,
            'help': 'collect coverage data file',

    def version_info(self):
        if coverage:
            version = getoutput('%s --version' % COVERAGE_CMD).strip()
            self.record_version_info('python-coverage', version)

    def do_check(self, test):
        """run the checker against <path> (usually a directory)"""
        self.threshold = float(self.options.get('pycoverage_threshold')) / 100
        coverage_data = self.options.get('coverage_data')
        if coverage_data == None or not exists(coverage_data):
            self.writer.fatal('no coverage information', path=coverage_data)
            return NODATA
        line_rate, branch_rate = self._get_cover_info(test)
        # in case of error during coverage reporting
        if line_rate is None:
            return ERROR
        # global summary
        self.writer.raw('cover-line-rate', '%.3f' % line_rate, 'result')
        self.writer.raw('cover-branch-rate', '%.3f' % branch_rate, 'result')
        if line_rate < self.threshold:
            return FAILURE
        return SUCCESS

    def _get_log_method(self, pc_cover):
        if pc_cover < (self.threshold / 2):
            writer = self.writer.error
        elif pc_cover < self.threshold:
            writer = self.writer.warning
            writer = self.writer.info
        return writer

    def _get_cover_info(self, test):
        if coverage is None:
            raise Exception('install python-coverage')
        covertool = coverage.coverage()
            report_file = join(self.project_path, "coverage.xml")
            covertool.xml_report(outfile=report_file, ignore_errors=True)
            report = etree.parse(report_file).getroot()
            pc_cover = float(report.attrib.get('line-rate'))
            br_rate  = float(report.attrib.get('branch-rate'))
            # format of the xml_report file is compatible with Cobertura
            # <http://cobertura.sourceforge.net/>
            # FIXME missing stats: <stat> / <miss>
            # FIXME p_name construction (arbitrary slice)
            for package in report.iter("package"):
                p_name = package.attrib.get('name')
                p_name = ".".join(p_name.split(".")[6:])
                # class *are* files in Coberture (from java world)
                for cls in package.iter("class"):
                    c_name = ".".join([p_name, cls.attrib.get('name')])
                    c_pc_cover = float(cls.attrib.get('line-rate'))
                    c_br_rate  = float(cls.attrib.get('branch-rate'))
                    logger = self._get_log_method(c_pc_cover)
                    logger("line rate: %3.0f %% / branch rate: %3.0f %%"
                           % (c_pc_cover*100, c_br_rate*100), path=c_name)
        except Exception, err:
            pc_cover = br_rate = None
            self.writer.fatal(err, tb=True)
            return (pc_cover, br_rate)

if coverage is not None:
    register('checker', PyCoverageChecker)

class PyCheckerOutputParser(SimpleOutputParser):
    non_zero_status_code = FAILURE
    def parse_line(self, line):
            path, line, msg = line.split(':')
            self.writer.error(msg, path=path, line=line)
            self.status = FAILURE
        except ValueError:

class PyCheckerChecker(BaseChecker):
    """check that unit tests of a python package succeed

    spawn unittest and parse output (expect a standard TextTestRunner)
    id = 'pychecker'

    def do_check(self, test):
        """run the checker against <path> (usually a directory)"""
        command = ['pychecker', '-Qqe', 'Style']
        command += get_module_files(pyinstall_path(test))
        return ParsedCommand(self.writer, command, parsercls=PyCheckerOutputParser).run()

    def version_info(self):
        self.record_version_info('pychecker', getoutput("pychecker --version").strip())

register('checker', PyCheckerChecker)