author David Douard <david.douard@logilab.fr>
Tue, 21 Oct 2014 23:17:46 +0200
changeset 1738 dbf8a63bd907
parent 1736 72ec6c7e956c
child 1690 24caf193755e
permissions -rw-r--r--
[recipe] improve recipes' comments

"""Writer classes: send data (execution reports) to an apycot cubicweb
instance which stores and provides test execution reports

:organization: Logilab
:copyright: 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
from __future__ import with_statement
__docformat__ = "restructuredtext en"

import os
import logging
import tarfile
import tempfile
import traceback
from datetime import datetime
from StringIO import StringIO
from io import BytesIO
from threading import RLock
from logilab.mtconverter import xml_escape
from logilab.common.date import ustrftime

from cwclientlib import builders

    logging.DEBUG :   u'DEBUG',
    logging.INFO :    u'INFO',
    logging.WARNING : u'WARNING',
    logging.ERROR :   u'ERROR',
    logging.FATAL :   u'FATAL'

ARCHIVE_EXT = '.tar.bz2'
ARCHIVE_MODE = 'w:bz2'
ARCHIVE_NAME = "apycot-archive-%(instance-id)s-%(exec-id)s"+ARCHIVE_EXT
ARCHIVE_MIME_TYPE = u'application/x-tar'

def make_archive_name(cwinstid, execution_id):
    # replace ':' as tar use them to fetch archive over network
    exec_data = {'exec-id':     execution_id,
                 'instance-id': cwinstid,
    return (ARCHIVE_NAME % exec_data).replace(':', '.')

class AbstractLogWriter(object):
    """Mixin to be used as a logging-like reporter"""
    def _unicode(self, something):
        if isinstance(something, str):
            return unicode(something, 'utf-8', 'replace')
        if not isinstance(something, unicode):
            return unicode(something)
        return something

    def debug(self, *args, **kwargs):
        """log an debug"""
        self.log(logging.DEBUG, *args, **kwargs)

    def info(self, *args, **kwargs):
        """log an info"""
        self.log(logging.INFO, *args, **kwargs)

    def warning(self, *args, **kwargs):
        """log a warning"""
        self.log(logging.WARNING, *args, **kwargs)

    def error(self, *args, **kwargs):
        """log an error"""
        self.log(logging.ERROR, *args, **kwargs)

    def fatal(self, *args, **kwargs):
        """log a fatal error"""
        self.log(logging.FATAL, *args, **kwargs)

    critical = fatal

    def _msg_info(self, *args, **kwargs):
        path = kwargs.pop('path', None)
        line = kwargs.pop('line', None)
        tb = kwargs.pop('tb', False)
        assert not kwargs
        if len(args) > 1:
            args = [self._unicode(string) for string in args]
            msg = args[0] % tuple(args[1:])
            assert args
            msg = self._unicode(args[0])
        if tb:
            stream = StringIO()
            msg += '\n' + stream.getvalue()
        return path, line, msg

    def log(self, severity, *args, **kwargs):
        """log a message of a given severity"""
        path, line, msg = self._msg_info(*args, **kwargs)
        self._log(severity, path, line, msg)

    def _log(self, severity, path, line, msg):
        raise NotImplementedError()

class BaseDataWriter(AbstractLogWriter):
    """A logging-like that print execution messages on stderr and
    store Test execution data to a CubicWeb instance (using the apycot

    Each logging statement is uploaded as a 4 columns tab-separated row:

    <severity>TAB<path>TAB<line>TAB<logging message>

    def __init__(self, cnxh, target_eid):
        self.cnxh = cnxh
        # eid of the execution entity
        self.target_eid = target_eid
        self._log_file_eid = None
        self._logs = []
        self._logs_sent = 0

    def start(self):

    def end(self):

    def execution_info(self, *args, **kwargs):
        msg = self._msg_info(*args, **kwargs)[-1]
        if isinstance(msg, unicode):
            msg = msg.encode('utf-8')
        print msg

    def _log(self, severity, path, line, msg):
        encodedmsg = u'%s\t%s\t%s\t%s<br/>' % (severity, xml_escape(path or u''),
                                               xml_escape(u'%s' % (line or u'')),

    def raw(self, name, value, type=None):
        """give some raw data"""
                                                type=type and unicode(type)),
                         ('SET CRI for_check TE WHERE CRI eid %(cri)s, TE eid %(te)s',
                          {'cri': '__r0', 'te': self.target_eid})])

    def refresh_log(self):
        log = self._logs
        if not self._log_file_eid:
            data = self.cnxh.rqlio([('INSERT File F: F data %(content)s, '
                                     'F data_name %(name)s, '
                                     'F data_encoding %(coding)s, '
                                     'F data_format %(fmt)s, '
                                     'TE log_file F WHERE TE eid %(eid)s',
                                     {'name': u'log_file.txt',
                                      'fmt': u'text/plain',
                                      'coding': 'utf-8',
                                      'content': BytesIO(''),
                                      'eid': self.target_eid,
            self._log_file_eid = data.json()[0][0][0]

        if self._logs_sent < len(log):
            logcontent = u''.join(log[self._logs_sent:]).encode('utf-8')
            files = {'to_append': logcontent}
                                files=files, eid=self._log_file_eid)
            self._logs_sent = len(log)

class CheckDataWriter(BaseDataWriter):
    """Writer intended to report Check level log and result.

    A Check is one step of a Test session

    def start(self, checker, name=None):
        """Register the given checker as started"""
        if name is not None:
            crname = name
            crname = getattr(checker, 'id', checker) # may be the checked id
        eid = self.cnxh.rqlio([builders.create_entity('CheckResult',
                                                      name=self._unicode(crname), status=u'processing',
                                                      starttime=ustrftime(datetime.now(), '%Y-%m-%d %H:%M:%S')),
                               ('SET CR during_execution TE WHERE CR eid %(cr)s, TE eid %(te)s',
                                {'cr': '__r0', 'te': self.target_eid})]).json()[0][0][0]
        self.target_eid = eid
        self._log_file_eid = None
        if hasattr(checker, 'options'):
            options = ['%s=%s' % (k, v) for k, v in checker.options.iteritems()
                       if k in checker.options_def
                       and v != checker.options_def[k].get('default')]
            if options:

    def end(self, status):
        """Register the given checker as closed with status <status>"""
        self.cnxh.rqlio([('SET CR status %(status)s, CR endtime %(endtime)s WHERE CR eid %(eid)s',
                          {'status': self._unicode(status),
                           'endtime': ustrftime(datetime.now(), '%Y-%m-%d %H:%M:%S'),
                           'eid': self.target_eid})])

class TestDataWriter(BaseDataWriter):
    """Writer intended to report Test level log and result.

    A test consist in a setup procedure (the install step) followed by
    one or more checkers, each one executed in the environment set up
    during the install step

    def make_check_writer(self):
        """Return a CheckDataWriter suitable to write checker log and
        result within this test"""
        return CheckDataWriter(self.cnxh, self.target_eid)

    def link_to_revision(self, environment, vcsrepo):
        changeset = vcsrepo.changeset()
        if changeset is not None:
            self.raw(repr(vcsrepo), changeset, 'revision')

    def end(self, status, archivedir=None):
        """mark the current test as closed (with status <status>) and upload
        archive if requested."""

        self.cnxh.debug('End test with status %s' % status)
        self.cnxh.rqlio([('SET CR status %(status)s WHERE CR eid %(eid)s',
                          {'status': self._unicode(status),
                           'eid': self.target_eid})])
        if archivedir:
            self.cnxh.debug('Archive the apycot temp directory')
            archive = make_archive_name(self.cnxh.instance_id, self.target_eid)
            archivefpath = os.path.join(tempfile.gettempdir(), archive)
            tarball = tarfile.open(archivefpath, ARCHIVE_MODE)
                self.cnxh.debug('archive file for %r is %r' % (archive, archivefpath))
                self.cnxh.debug('** adding %r' % archivedir)
                tarball.add(archivedir, arcname=os.path.basename(archivedir))
                self.cnxh.debug('** archive file size: %s' % (os.stat(archivefpath).st_size))
                data = self.cnxh.http_post(self._url, vid='create_subentity',
                url = self.cnxh.instance_url + 'narval-file-append'
                files = {'data': ('dummy', open(archivefpath, 'rb'))}
                self.cnxh.http_post(url=url, files=files, eid=data['eid'])
                self.cnxh.debug('** archive file %s uploaded' % archive)
                self.cnxh.error('while archiving execution directory', tb=True)