_apycotlib/atest.py
author David Douard <david.douard@logilab.fr>
Fri, 21 Nov 2014 20:07:15 +0100
changeset 1782 69394bafe4bd
parent 1769 8e817371cba1
permissions -rw-r--r--
[narval] the lgp_suffix option of lgp.build checker is now used as is it is now the responsibility of the caller to build the desired suffix instead of a hardcoded ~rev<csid>.

"""APyCoT task / test

A task is a queue (pending) test

A test defines :
* a unit of sources to test (a project)
* a list of checks to apply to this unit
* how to build the test environment (dependencies...)
"""
from __future__ import with_statement

__docformat__ = "restructuredtext en"

import os
import os.path
import sys
from tempfile import mkdtemp, NamedTemporaryFile
from shutil import rmtree

from logilab.common.proc import ResourceError

from __pkginfo__ import version as apycot_version

from apycotlib import (ConfigError, Command, get_registered,
                       SUCCESS, SKIPPED, ERROR, KILLED)


def clean_path(path):
    """remove trailing path separator from path"""
    if path and path[-1] == os.sep:
        return path[:-1]
    return path

def update_path(old_path, new_path):
    """update sys.path"""
    if old_path is not None:
        for path in old_path.split(os.pathsep):
            try:
                sys.path.remove(clean_path(path))
            except ValueError:
                continue
    if new_path is not None:
        new_path = new_path.split(os.pathsep)
        new_path.reverse()
        for path in new_path:
            sys.path.insert(0, clean_path(path))


from narvalbot import options_dict

class Test(object):
    """the single source unit test class"""

    def __init__(self, texec, writer):
        options = options_dict(texec['options'])
        # directory where the test environment will be built
        self.tmpdir = mkdtemp(dir=options.get('test_dir'))
        # notify some subprocesses they're executed by apycot through an
        # environment variable
        os.environ['APYCOT_ROOT'] = self.tmpdir
        # test config / tested project environment
        self.texec = texec
        self.tconfig = texec['configuration']
        self.penvironment = texec['environment']
        self._project_path = None
        # IWriter object
        self.writer = writer
        # local caches
        self._configs_cache = {}
        # environment variables as a dictionary
        self.environ = self.tconfig['apycot_process_environment']
        self.environ.update(self.penvironment['apycot_process_environment'])
        self.environ.setdefault('LC_ALL', 'fr_FR.UTF-8') # XXX force utf-8
        # Track environment change to be able to restore it later.
        # Notice sys.path is synchronized with the PYTHONPATH environment variable
        self._tracks = {}
        # flag indicating whether to clean test environment after test execution
        # or if an archive containing it should be uploaded
        self.keep_test_dir = options.get('keep_test_dir', False)
        self.archive = options.get('archive', False)
        self.executed_checkers = {}
        self.global_status = SUCCESS
        self.options = options
        os.umask(022)

    def project_path(self):
        return self._project_path

    def set_project_path(self, path):
        self._project_path = path

    def __str__(self):
        return repr(self.apycot_repository())

    # resource accessors #######################################################

    def src_configuration(self):
        '''Return the sources configuration needed for setting the
        project environment in place for testing.

        Return a list of (cset, url, repo eid, checkout recipe, setup recipe)

        cset: the changeset to consider
        url: the URL of the repository to clone
        repoeid: the EID of the Repository entity
        checkout recpe: the name of the Recipe to be used to perform the checkout
                        (clone and update to given cset)
        setup recipe: the name of the recipe to be used to make the installation
        '''
        for pe in self.writer.cnxh.rql('Any E WHERE E eid %s' % self.texec['eid'],
                                       vid='apycot.get_dependencies').json()[0]:
            repoeid = pe['repository']['eid']
            repourl = pe['repository']['source_url']
            co_rcp = self.writer.cnxh.rql('Any RN WHERE REPO eid %(eid)s, '
                                          'REPO checkout_recipe R, R name RN' %
                                          {'eid': repoeid}).json()[0][0]
            setup_rcp = self.writer.cnxh.rql('Any RN WHERE PE eid %(eid)s, '
                                             'PE setup_recipe R, R name RN' %
                                             {'eid': pe['eid']}).json()[0][0]
            cset = self.texec['branch']
            yield cset, repourl, repoeid, co_rcp, setup_rcp


    def get_entity(self, eid):
        """Retrieve the entity from cw"""
        return self.writer.cnxh.rql('Any E WHERE E eid %s' % eid,
                                    vid='ejsonexport')

    def rql(self, rql, **args):
        return self.writer.cnxh.rql(rql, **args)

    def apycot_config(self, pe_eid=None):
        if pe_eid is None:
            pe_eid = self.penvironment['eid']
        try:
            return self._configs_cache[pe_eid]
        except KeyError:
            env = self.writer.cnxh.rql('Any E WHERE E eid %s' % self.tconfig['eid'],
                                          vid='apycot.get_configuration').json()[0]
            pe_env = self.writer.cnxh.rql('Any E WHERE E eid %s' % pe_eid,
                                          vid='apycot.get_configuration').json()[0]
            env.update(pe_env)
            self._configs_cache[pe_eid] = env
            return env

    def exec_recipe(self, recipe, working_directory=None, **kwargs):
        cwd = os.getcwd()
        resp = self.writer.cnxh.rql('Recipe R WHERE R name "%s"' % recipe,
                                    vid='ejsonexport')
        if resp.status_code > 399:
            msg = "Cannot retrieve recipe %s: %s" % (recipe, resp.text)
            self.writer.error(msg)
            raise ValueError(msg)
        recipe = resp.json()[0]

        try:
            if working_directory is not None:
                os.chdir(working_directory)
            with NamedTemporaryFile(suffix='.py') as f:
                f.write(recipe['script'])
                f.flush()
                try:
                    execfile(f.name, kwargs)
                except Exception as exc:
                    url = self.writer.cnxh.base_url + '/%s'%recipe['eid']
                    self.writer.error("Execution of recipe %s (%s) FAILED" % (recipe['name'], url))
                    raise
        finally:
            os.chdir(cwd)

    # test initialisation / cleanup ############################################

    def setup(self):
        """setup the test environment"""
        self.writer.start()
        self.writer.raw('apycot', apycot_version, 'version')
        # setup environment variables
        if self.environ:
            for key, val in self.environ.iteritems():
                self.update_env(self.tconfig['name'], key, val)

    def clean(self):
        """clean the test environment"""
        try:
            self.writer.end(self.global_status, self.archive and self.tmpdir)
        except:
            # XXX log error
            pass
        if not self.keep_test_dir:
            rmtree(self.tmpdir)
        else:
            self.writer.execution_info('temporary directory not removed: %s',
                                       self.tmpdir)
            self.writer.raw('tmpdir', self.tmpdir, 'output')

    # environment tracking #####################################################

    def update_env(self, key, envvar, value, separator=None):
        """update an environment variable"""
        envvar = envvar.upper()
        orig_value = os.environ.get(envvar)
        if orig_value is None:
            orig_value = ''
        uid = self._make_key(key, envvar)
        assert not self._tracks.has_key(uid)
        if separator is not None:
            if orig_value:
                orig_values = orig_value.split(separator)
            else:
                orig_values = [] # don't want a list with an empty string
            if not value in orig_values:
                orig_values.insert(0, value)
                self._set_env(uid, envvar, separator.join(orig_values))
        elif orig_value != value:
            self._set_env(uid, envvar, value)

    def clean_env(self, key, envvar):
        """reinitialize an environment variable"""
        envvar = envvar.upper()
        uid = self._make_key(key, envvar)
        if self._tracks.has_key(uid):
            orig_value = self._tracks[uid]
            if envvar == 'PYTHONPATH':
                update_path(os.environ.get(envvar), orig_value)
            if self.writer:
                self.writer.debug('Reset %s=%r', envvar, orig_value)
            if orig_value is None:
                del os.environ[envvar]
            else:
                os.environ[envvar] = self._tracks[uid]
            del self._tracks[uid]

    def _make_key(self, key, envvar):
        """build a key for an environment variable"""
        return '%s-%s' % (key, envvar)

    def _set_env(self, uid, envvar, value):
        """set a new value for an environment variable
        """
        if self.writer:
            self.writer.debug(repr(value), path=envvar)
        orig_value = os.environ.get(envvar)
        self._tracks[uid] = orig_value
        os.environ[envvar]  = value
        if envvar == 'PYTHONPATH':
            update_path(orig_value, value)

    # api to call a particular checker #########################

    def run_checker(self, id, displayname=None, nonexecuted=False, **kwargs):
        """run all checks in the test environment"""
        assert self.project_path() is not None, 'project_path must be set before running any checker'
        options = self.options.copy()
        options.update(kwargs)
        check_writer = self.writer.make_check_writer()
        if nonexecuted:
            check_writer.start(id)
            check_writer.end(SKIPPED)
            return None, SKIPPED # XXX
        checker = get_registered('checker', id)(check_writer, options)
        check_writer.start(checker, name=displayname)

        try:
            checker.check_options()
            status = checker.check(self)
            self.executed_checkers[checker.id] = status
        except ConfigError, ex:
            msg = 'Config error for %s checker: %s'
            check_writer.fatal(msg, checker.id, ex)
            status = ERROR
        except ResourceError, ex:
            check_writer.fatal('%s resource limit reached, aborted', ex.limit)
            status = KILLED
            raise
        except MemoryError:
            check_writer.fatal('memory resource limit reached, aborted')
            status = KILLED
            raise
        except Exception, ex:
            msg = 'Error while running checker %s: %s'
            check_writer.fatal(msg, checker.id, ex, tb=True)
            status = ERROR
        finally:
            check_writer.end(status)
            #globstatus = min(globstatus, status)
            self.writer.execution_info('%s [%s]', checker.id, status)
            self.global_status = min(self.global_status, status)
        return checker, status