_apycotlib/atest.py
author David Douard <david.douard@logilab.fr>
Wed, 14 May 2014 16:05:06 +0200
changeset 1518 fd0a087f6117
parent 1488 05ddbef90877
child 1574 cd33e04f6433
child 1798 68e03b22b4dd
permissions -rw-r--r--
[apycotlib] keep a reference on the options dict in ATest So it can be easily used from recipes

"""APyCoT task / test

A task is a queue (pending) test

A test defines :
* a unit of sources to test (a project)
* a list of checks to apply to this unit
* how to build the test environment (preprocessing, dependencies...)
"""
from __future__ import with_statement

__docformat__ = "restructuredtext en"

import os
import os.path
import sys
import tempfile
from shutil import rmtree

from logilab.common.proc import ResourceError

from __pkginfo__ import version as apycot_version

from apycotlib import (ConfigError, Command, get_registered,
                       SUCCESS, SKIPPED, ERROR, KILLED)


def substitute(value, substitutions):
    if hasattr(value, 'replace') and value:
        for key, val in substitutions.iteritems():
            value = value.replace('${%s}' % key, val)
    return value

def substitute_dict(dict, substitutions):
    for key, value in dict.iteritems():
        dict[key] = substitute(value, substitutions)
    return dict

def clean_path(path):
    """remove trailing path separator from path"""
    if path and path[-1] == os.sep:
        return path[:-1]
    return path

def update_path(old_path, new_path):
    """update sys.path"""
    if old_path is not None:
        for path in old_path.split(os.pathsep):
            try:
                sys.path.remove(clean_path(path))
            except ValueError:
                continue
    if new_path is not None:
        new_path = new_path.split(os.pathsep)
        new_path.reverse()
        for path in new_path:
            sys.path.insert(0, clean_path(path))


from narvalbot import options_dict

class Test(object):
    """the single source unit test class"""

    def __init__(self, texec, writer):
        options = options_dict(texec['options'])
        # directory where the test environment will be built
        self.tmpdir = tempfile.mkdtemp(dir=options.get('test_dir'))
        # notify some subprocesses they're executed by apycot through an
        # environment variable
        os.environ['APYCOT_ROOT'] = self.tmpdir
        # test config / tested project environment
        self.texec = texec
        self.tconfig = texec['configuration']
        self.environment = texec['environment']
        # IWriter object
        self.writer = writer
        # local caches
        self._configs = {}
        self._repositories = {}
        # environment variables as a dictionary
        self.environ = self.tconfig['apycot_process_environment']
        self.environ.update(self.environment['apycot_process_environment'])
        self.environ.setdefault('LC_ALL', 'fr_FR.UTF-8') # XXX force utf-8
        self._substitute(self.environment, self.environ)
        # Track environment change to be able to restore it later.
        # Notice sys.path is synchronized with the PYTHONPATH environment variable
        self._tracks = {}
        # flag indicating whether to clean test environment after test execution
        # or if an archive containing it should be uploaded
        self.keep_test_dir = options.get('keep_test_dir', False)
        self.archive = options.get('archive', False)
        # set of preprocessors which have failed
        self._failed_pp = set()
        self.executed_checkers = {}
        self.global_status = SUCCESS
        self.options = options
        os.umask(022)

    def __str__(self):
        return repr(self.apycot_repository())

    def _substitute(self, pe, configdict):
        substitute_dict(configdict,
                        {'NAME': pe['name'], 'TESTDIR': self.tmpdir,
                         'SRCDIR': self.apycot_repository(pe).co_path})

    # resource accessors #######################################################

    def apycot_config(self, pe=None):
        if pe is None:
            pe = self.environment
        try:
            return self._configs[pe['eid']]
        except KeyError:
            tconfig_url = self.writer.cnxh.instance_url + str(self.tconfig['eid'])
            config = self.writer.cnxh.http_get(tconfig_url,
                                                vid='apycot.get_configuration',
                                                environment=pe['eid'])[0]
            self._configs[pe['eid']] = config
            self._substitute(pe, config)
            return config

    def apycot_repository(self, pe=None):
        if pe is None:
            pe = self.environment
        try:
            return self._repositories[pe['eid']]
        except KeyError:
            from apycotlib.repositories import get_repository
            if 'repository' not in pe or not pe['repository'] :
                raise Exception('Project environment %s has no repository' % pe['title'])
            repdef = {'repository': pe['repository'],
                      'path': pe['vcs_path'],
                      'branch': self.texec['branch']}
            # don't overwrite branch hardcoded on the environment: have to be
            # done here, not only in when starting plan (eg in entities.py)
            # since project env may not be the tested project env
            pecfg = pe['apycot_configuration']
            if pecfg.get('branch'):
                repdef['branch'] = pecfg['branch']
            apyrep = get_repository(repdef)
            self._repositories[pe['eid']] = apyrep
            return apyrep

    def project_path(self, subpath=False):
        path = self.apycot_repository().co_path
        if subpath and self.apycot_config().get('subpath'):
            return os.path.join(path, self.apycot_config()['subpath'])
        return path

    # test initialisation / cleanup ############################################

    def setup(self):
        """setup the test environment"""
        self.writer.start()
        self.writer.raw('apycot', apycot_version, 'version')
        # setup environment variables
        if self.environ:
            for key, val in self.environ.iteritems():
                self.update_env(self.tconfig['name'], key, val)

    def clean(self):
        """clean the test environment"""
        try:
            self.writer.end(self.global_status, self.archive and self.tmpdir)
        except:
            # XXX log error
            pass
        if not self.keep_test_dir:
            rmtree(self.tmpdir)
        else:
            self.writer.execution_info('temporary directory not removed: %s',
                                       self.tmpdir)

    # environment tracking #####################################################

    def update_env(self, key, envvar, value, separator=None):
        """update an environment variable"""
        envvar = envvar.upper()
        orig_value = os.environ.get(envvar)
        if orig_value is None:
            orig_value = ''
        uid = self._make_key(key, envvar)
        assert not self._tracks.has_key(uid)
        if separator is not None:
            if orig_value:
                orig_values = orig_value.split(separator)
            else:
                orig_values = [] # don't want a list with an empty string
            if not value in orig_values:
                orig_values.insert(0, value)
                self._set_env(uid, envvar, separator.join(orig_values))
        elif orig_value != value:
            self._set_env(uid, envvar, value)

    def clean_env(self, key, envvar):
        """reinitialize an environment variable"""
        envvar = envvar.upper()
        uid = self._make_key(key, envvar)
        if self._tracks.has_key(uid):
            orig_value = self._tracks[uid]
            if envvar == 'PYTHONPATH':
                update_path(os.environ.get(envvar), orig_value)
            if self.writer:
                self.writer.debug('Reset %s=%r', envvar, orig_value)
            if orig_value is None:
                del os.environ[envvar]
            else:
                os.environ[envvar] = self._tracks[uid]
            del self._tracks[uid]

    def _make_key(self, key, envvar):
        """build a key for an environment variable"""
        return '%s-%s' % (key, envvar)

    def _set_env(self, uid, envvar, value):
        """set a new value for an environment variable
        """
        if self.writer:
            self.writer.debug('Set %s=%r', envvar, value)
        orig_value = os.environ.get(envvar)
        self._tracks[uid] = orig_value
        os.environ[envvar]  = value
        if envvar == 'PYTHONPATH':
            update_path(orig_value, value)

    # api to call a particular preprocessor / checker #########################

    def checkout(self, pe):
        vcsrepo = self.apycot_repository(pe)
        cocmd = vcsrepo.co_command()
        if cocmd:
            Command(self.writer, cocmd, raises=True, shell=True,
                    cwd=self.tmpdir).run()
        movebranchcmd = vcsrepo.co_move_to_branch_command()
        if movebranchcmd:
            Command(self.writer, movebranchcmd, shell=True,
                    cwd=self.tmpdir).run()
        self.writer.link_to_revision(pe, vcsrepo)
        self.writer.refresh_log()

    def call_preprocessor(self, pptype, penv):
        cfg = self.apycot_config(penv)
        ppid = cfg.get(pptype)
        if ppid is not None:
            # fetch preprocessors options set on the project environment
            preprocessor = get_registered('preprocessor', ppid)(self.writer, cfg)
        else:
            # XXX log?
            return
        path = self.apycot_repository(penv).co_path
        dependency = path != self.project_path()
        msg = 'running preprocessor %(pp)s to perform %(pptype)s'
        msg_data = {'pptype': pptype,
                    'pp': preprocessor.id,}
        if dependency:
            msg + ' on dependency %(pe)s'
            msg_data['pe'] = penv['name']
        self.writer.info(msg % msg_data, path=path)
        try:
            preprocessor.run(self, path)
        except Exception, ex:
            msg = '%s while running preprocessor %s: %s'
            self.writer.fatal(msg, ex.__class__.__name__, preprocessor.id, ex,
                              path=path, tb=True)
            self._failed_pp.add(pptype)
            self.global_status = min(self.global_status, ERROR)

    def run_checker(self, id, displayname=None, nonexecuted=False, **kwargs):
        """run all checks in the test environment"""
        options = self.options.copy()
        options.update(kwargs)
        self._substitute(self.environment, options)
        check_writer = self.writer.make_check_writer()
        if nonexecuted:
            check_writer.start(id)
            check_writer.end(SKIPPED)
            return None, SKIPPED # XXX
        checker = get_registered('checker', id)(check_writer, options)
        check_writer.start(checker, name=displayname)
        if checker.need_preprocessor in self._failed_pp:
            msg = 'Can\'t run checker %s: preprocessor %s have failed'
            check_writer.fatal(msg, checker.id, checker.need_preprocessor)
            check_writer.end(SKIPPED)
            return checker, SKIPPED # XXX
        try:
            checker.check_options()
            status = checker.check(self)
            self.executed_checkers[checker.id] = status
        except ConfigError, ex:
            msg = 'Config error for %s checker: %s'
            check_writer.fatal(msg, checker.id, ex)
            status = ERROR
        except ResourceError, ex:
            check_writer.fatal('%s resource limit reached, aborted', ex.limit)
            status = KILLED
            raise
        except MemoryError:
            check_writer.fatal('memory resource limit reached, aborted')
            status = KILLED
            raise
        except Exception, ex:
            msg = 'Error while running checker %s: %s'
            check_writer.fatal(msg, checker.id, ex, tb=True)
            status = ERROR
        finally:
            check_writer.end(status)
            #globstatus = min(globstatus, status)
            self.writer.execution_info('%s [%s]', checker.id, status)
            self.global_status = min(self.global_status, status)
        return checker, status