_apycotlib/atest.py
author David Douard <david.douard@logilab.fr>
Thu, 13 Nov 2014 14:34:08 +0100
changeset 1743 a9b471081580
parent 1742 3cdc3ef300a4
child 1587 f37f05827b22
permissions -rw-r--r--
[atest] ensure an error in a recipe execution (even a SyntaxError) is reported instead of crashing the process

"""APyCoT task / test

A task is a queue (pending) test

A test defines :
* a unit of sources to test (a project)
* a list of checks to apply to this unit
* how to build the test environment (preprocessing, dependencies...)
"""
from __future__ import with_statement

__docformat__ = "restructuredtext en"

import os
import os.path
import sys
from tempfile import mkdtemp, NamedTemporaryFile
from shutil import rmtree

from logilab.common.proc import ResourceError

from __pkginfo__ import version as apycot_version

from apycotlib import (ConfigError, Command, get_registered,
                       SUCCESS, SKIPPED, ERROR, KILLED)


def clean_path(path):
    """remove trailing path separator from path"""
    if path and path[-1] == os.sep:
        return path[:-1]
    return path

def update_path(old_path, new_path):
    """update sys.path"""
    if old_path is not None:
        for path in old_path.split(os.pathsep):
            try:
                sys.path.remove(clean_path(path))
            except ValueError:
                continue
    if new_path is not None:
        new_path = new_path.split(os.pathsep)
        new_path.reverse()
        for path in new_path:
            sys.path.insert(0, clean_path(path))


from narvalbot import options_dict

class Test(object):
    """the single source unit test class"""

    def __init__(self, texec, writer):
        options = options_dict(texec['options'])
        # directory where the test environment will be built
        self.tmpdir = mkdtemp(dir=options.get('test_dir'))
        # notify some subprocesses they're executed by apycot through an
        # environment variable
        os.environ['APYCOT_ROOT'] = self.tmpdir
        # test config / tested project environment
        self.texec = texec
        self.tconfig = texec['configuration']
        self.penvironment = texec['environment']
        # IWriter object
        self.writer = writer
        # local caches
        self._configs_cache = {}
        self._repositories = {}
        # environment variables as a dictionary
        self.environ = self.tconfig['apycot_process_environment']
        self.environ.update(self.penvironment['apycot_process_environment'])
        self.environ.setdefault('LC_ALL', 'fr_FR.UTF-8') # XXX force utf-8
        # Track environment change to be able to restore it later.
        # Notice sys.path is synchronized with the PYTHONPATH environment variable
        self._tracks = {}
        # flag indicating whether to clean test environment after test execution
        # or if an archive containing it should be uploaded
        self.keep_test_dir = options.get('keep_test_dir', False)
        self.archive = options.get('archive', False)
        # set of preprocessors which have failed
        self._failed_pp = set()
        self.executed_checkers = {}
        self.global_status = SUCCESS
        self.options = options
        os.umask(022)

    def __str__(self):
        return repr(self.apycot_repository())

    # resource accessors #######################################################

    def get_entity(self, eid):
        """Retrieve the entity from cw"""
        return self.writer.cnxh.rql('Any E WHERE E eid %s' % eid,
                                    vid='ejsonexport')

    def rql(self, rql, **args):
        return self.writer.cnxh.rql(rql, **args)

    def apycot_config(self, pe_eid=None):
        if pe_eid is None:
            pe_eid = self.penvironment['eid']
        try:
            return self._configs_cache[pe_eid]
        except KeyError:
            config = self.writer.cnxh.rql('Any E WHERE E eid %s' % self.tconfig['eid'],
                                          vid='apycot.get_configuration',
                                          environment=pe_eid).json()[0]
            self._configs_cache[pe_eid] = config
            return config

    def exec_recipe(self, recipe, working_directory=None, **kwargs):
        cwd = os.getcwd()
        resp = self.writer.cnxh.rql('Recipe R WHERE R name "%s"' % recipe,
                                    vid='ejsonexport')
        if resp.status_code > 399:
            msg = "Cannot retrieve recipe %s: %s" % (recipe, resp.text)
            self.writer.error(msg)
            raise ValueError(msg)
        recipe = resp.json()[0]

        try:
            if working_directory is not None:
                os.chdir(working_directory)
            with NamedTemporaryFile(suffix='.py') as f:
                f.write(recipe['script'])
                f.flush()
                try:
                    execfile(f.name, kwargs)
                except Exception as exc:
                    url = self.writer.cnxh.base_url + '/%s'%recipe['eid']
                    self.writer.error("Execution of recipe %s (%s) FAILED" % (recipe['name'], url))
                    raise
        finally:
            os.chdir(cwd)

    # test initialisation / cleanup ############################################

    def setup(self):
        """setup the test environment"""
        self.writer.start()
        self.writer.raw('apycot', apycot_version, 'version')
        # setup environment variables
        if self.environ:
            for key, val in self.environ.iteritems():
                self.update_env(self.tconfig['name'], key, val)

    def clean(self):
        """clean the test environment"""
        try:
            self.writer.end(self.global_status, self.archive and self.tmpdir)
        except:
            # XXX log error
            pass
        if not self.keep_test_dir:
            rmtree(self.tmpdir)
        else:
            self.writer.execution_info('temporary directory not removed: %s',
                                       self.tmpdir)

    # environment tracking #####################################################

    def update_env(self, key, envvar, value, separator=None):
        """update an environment variable"""
        envvar = envvar.upper()
        orig_value = os.environ.get(envvar)
        if orig_value is None:
            orig_value = ''
        uid = self._make_key(key, envvar)
        assert not self._tracks.has_key(uid)
        if separator is not None:
            if orig_value:
                orig_values = orig_value.split(separator)
            else:
                orig_values = [] # don't want a list with an empty string
            if not value in orig_values:
                orig_values.insert(0, value)
                self._set_env(uid, envvar, separator.join(orig_values))
        elif orig_value != value:
            self._set_env(uid, envvar, value)

    def clean_env(self, key, envvar):
        """reinitialize an environment variable"""
        envvar = envvar.upper()
        uid = self._make_key(key, envvar)
        if self._tracks.has_key(uid):
            orig_value = self._tracks[uid]
            if envvar == 'PYTHONPATH':
                update_path(os.environ.get(envvar), orig_value)
            if self.writer:
                self.writer.debug('Reset %s=%r', envvar, orig_value)
            if orig_value is None:
                del os.environ[envvar]
            else:
                os.environ[envvar] = self._tracks[uid]
            del self._tracks[uid]

    def _make_key(self, key, envvar):
        """build a key for an environment variable"""
        return '%s-%s' % (key, envvar)

    def _set_env(self, uid, envvar, value):
        """set a new value for an environment variable
        """
        if self.writer:
            self.writer.debug(repr(value), path=envvar)
        orig_value = os.environ.get(envvar)
        self._tracks[uid] = orig_value
        os.environ[envvar]  = value
        if envvar == 'PYTHONPATH':
            update_path(orig_value, value)

    # api to call a particular preprocessor / checker #########################

    def run_checker(self, id, displayname=None, nonexecuted=False, **kwargs):
        """run all checks in the test environment"""
        options = self.options.copy()
        options.update(kwargs)
        check_writer = self.writer.make_check_writer()
        if nonexecuted:
            check_writer.start(id)
            check_writer.end(SKIPPED)
            return None, SKIPPED # XXX
        checker = get_registered('checker', id)(check_writer, options)
        check_writer.start(checker, name=displayname)
        if checker.need_preprocessor in self._failed_pp:
            msg = 'Can\'t run checker %s: preprocessor %s have failed'
            check_writer.fatal(msg, checker.id, checker.need_preprocessor)
            check_writer.end(SKIPPED)
            return checker, SKIPPED # XXX
        try:
            checker.check_options()
            status = checker.check(self)
            self.executed_checkers[checker.id] = status
        except ConfigError, ex:
            msg = 'Config error for %s checker: %s'
            check_writer.fatal(msg, checker.id, ex)
            status = ERROR
        except ResourceError, ex:
            check_writer.fatal('%s resource limit reached, aborted', ex.limit)
            status = KILLED
            raise
        except MemoryError:
            check_writer.fatal('memory resource limit reached, aborted')
            status = KILLED
            raise
        except Exception, ex:
            msg = 'Error while running checker %s: %s'
            check_writer.fatal(msg, checker.id, ex, tb=True)
            status = ERROR
        finally:
            check_writer.end(status)
            #globstatus = min(globstatus, status)
            self.writer.execution_info('%s [%s]', checker.id, status)
            self.global_status = min(self.global_status, status)
        return checker, status