_apycotlib/writer.py
author David Douard <david.douard@logilab.fr>
Tue, 13 Jan 2015 14:19:00 +0100
changeset 1783 e3aad84adf03
parent 1744 aa2677d6b9dc
permissions -rw-r--r--
[pkg] prepare 4.0.0

"""Writer classes: send data (execution reports) to an apycot cubicweb
instance which stores and provides test execution reports

:organization: Logilab
:copyright: 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
"""
from __future__ import with_statement
__docformat__ = "restructuredtext en"

import os
import logging
import tarfile
import tempfile
import traceback
from datetime import datetime
from StringIO import StringIO
from io import BytesIO
from threading import RLock
from logilab.mtconverter import xml_escape
from logilab.common.date import ustrftime

from cwclientlib import builders

REVERSE_SEVERITIES = {
    logging.DEBUG :   u'DEBUG',
    logging.INFO :    u'INFO',
    logging.WARNING : u'WARNING',
    logging.ERROR :   u'ERROR',
    logging.FATAL :   u'FATAL'
    }

ARCHIVE_EXT = '.tar.bz2'
ARCHIVE_MODE = 'w:bz2'
ARCHIVE_NAME = "apycot-archive-%(instance-id)s-%(exec-id)s"+ARCHIVE_EXT
ARCHIVE_MIME_TYPE = u'application/x-tar'

def make_archive_name(cwinstid, execution_id):
    # replace ':' as tar use them to fetch archive over network
    exec_data = {'exec-id':     execution_id,
                 'instance-id': cwinstid,
                }
    return (ARCHIVE_NAME % exec_data).replace(':', '.')



class AbstractLogWriter(object):
    """Mixin to be used as a logging-like reporter"""
    def _unicode(self, something):
        if isinstance(something, str):
            return unicode(something, 'utf-8', 'replace')
        if not isinstance(something, unicode):
            return unicode(something)
        return something

    def debug(self, *args, **kwargs):
        """log an debug"""
        self.log(logging.DEBUG, *args, **kwargs)

    def info(self, *args, **kwargs):
        """log an info"""
        self.log(logging.INFO, *args, **kwargs)

    def warning(self, *args, **kwargs):
        """log a warning"""
        self.log(logging.WARNING, *args, **kwargs)

    def error(self, *args, **kwargs):
        """log an error"""
        self.log(logging.ERROR, *args, **kwargs)

    def fatal(self, *args, **kwargs):
        """log a fatal error"""
        self.log(logging.FATAL, *args, **kwargs)

    critical = fatal

    def _msg_info(self, *args, **kwargs):
        path = kwargs.pop('path', None)
        line = kwargs.pop('line', None)
        tb = kwargs.pop('tb', False)
        assert not kwargs
        if len(args) > 1:
            args = [self._unicode(string) for string in args]
            msg = args[0] % tuple(args[1:])
        else:
            assert args
            msg = self._unicode(args[0])
        if tb:
            stream = StringIO()
            traceback.print_exc(file=stream)
            msg += '\n' + stream.getvalue()
        return path, line, msg

    def log(self, severity, *args, **kwargs):
        """log a message of a given severity"""
        path, line, msg = self._msg_info(*args, **kwargs)
        self._log(severity, path, line, msg)

    def _log(self, severity, path, line, msg):
        raise NotImplementedError()


class BaseDataWriter(AbstractLogWriter):
    """A logging-like that print execution messages on stderr and
    store Test execution data to a CubicWeb instance (using the apycot
    cube)

    Each logging statement is uploaded as a 4 columns tab-separated row:

    <severity>TAB<path>TAB<line>TAB<logging message>
    """
    def __init__(self, cnxh, target_eid):
        self.cnxh = cnxh
        # eid of the execution entity
        self.target_eid = target_eid
        self._log_file_eid = None
        self._logs = []
        self._logs_sent = 0

    def start(self):
        pass

    def end(self):
        pass

    def execution_info(self, *args, **kwargs):
        msg = self._msg_info(*args, **kwargs)[-1]
        if isinstance(msg, unicode):
            msg = msg.encode('utf-8')
        print msg

    def _log(self, severity, path, line, msg):
        encodedmsg = u'%s\t%s\t%s\t%s<br/>' % (severity, xml_escape(path or u''),
                                               xml_escape(u'%s' % (line or u'')),
                                               xml_escape(msg))
        self._logs.append(encodedmsg)

    def raw(self, name, value, type=None):
        """give some raw data"""
        self.cnxh.rqlio([builders.create_entity('CheckResultInfo',
                                                label=self._unicode(name),
                                                value=self._unicode(value),
                                                type=type and unicode(type)),
                         ('SET CRI for_check TE WHERE CRI eid %(cri)s, TE eid %(te)s',
                          {'cri': '__r0', 'te': self.target_eid})])

    def refresh_log(self):
        log = self._logs
        if not self._log_file_eid:
            data = self.cnxh.rqlio([('INSERT File F: F data %(content)s, '
                                     'F data_name %(name)s, '
                                     'F data_encoding %(coding)s, '
                                     'F data_format %(fmt)s, '
                                     'TE log_file F WHERE TE eid %(eid)s',
                                     {'name': u'log_file.txt',
                                      'fmt': u'text/plain',
                                      'coding': 'utf-8',
                                      'content': BytesIO(''),
                                      'eid': self.target_eid,
                                      })])
            self._log_file_eid = data.json()[0][0][0]

        if self._logs_sent < len(log):
            logcontent = u''.join(log[self._logs_sent:]).encode('utf-8')
            files = {'to_append': logcontent}
            self.cnxh.http_post('narval-file-append',
                                files=files, eid=self._log_file_eid)
            self._logs_sent = len(log)


class CheckDataWriter(BaseDataWriter):
    """Writer intended to report Check level log and result.

    A Check is one step of a Test session
    """

    def start(self, checker, name=None):
        """Register the given checker as started"""
        if name is not None:
            crname = name
        else:
            crname = getattr(checker, 'id', checker) # may be the checked id
        eid = self.cnxh.rqlio([builders.create_entity('CheckResult',
                                                      name=self._unicode(crname), status=u'processing',
                                                      starttime=ustrftime(datetime.now(), '%Y-%m-%d %H:%M:%S')),
                               ('SET CR during_execution TE WHERE CR eid %(cr)s, TE eid %(te)s',
                                {'cr': '__r0', 'te': self.target_eid})]).json()[0][0][0]
        self.target_eid = eid
        self._log_file_eid = None
        if hasattr(checker, 'options'):
            options = ['%s=%s' % (k, v) for k, v in checker.options.iteritems()
                       if k in checker.options_def
                       and v != checker.options_def[k].get('default')]
            if options:
                self.info('\n'.join(options))
                self.refresh_log()

    def end(self, status):
        """Register the given checker as closed with status <status>"""
        self.refresh_log()
        self.cnxh.rqlio([('SET CR status %(status)s, CR endtime %(endtime)s WHERE CR eid %(eid)s',
                          {'status': self._unicode(status),
                           'endtime': ustrftime(datetime.now(), '%Y-%m-%d %H:%M:%S'),
                           'eid': self.target_eid})])



class TestDataWriter(BaseDataWriter):
    """Writer intended to report Test level log and result.

    A test consist in a setup procedure (the install step) followed by
    one or more checkers, each one executed in the environment set up
    during the install step

    self.target_eid is the eid of the TestExecution
    """

    def make_check_writer(self):
        """Return a CheckDataWriter suitable to write checker log and
        result within this test"""
        self.refresh_log()
        return CheckDataWriter(self.cnxh, self.target_eid)

    def link_to_revision(self, environment, vcsrepo):
        changeset = vcsrepo.changeset()
        if changeset is not None:
            self.raw(repr(vcsrepo), changeset, 'revision')

    def end(self, status, archivedir=None):
        """mark the current test as closed (with status <status>) and upload
        archive if requested."""

        self.cnxh.debug('End test with status %s' % status)
        self.refresh_log()
        self.cnxh.rqlio([('SET CR status %(status)s WHERE CR eid %(eid)s',
                          {'status': self._unicode(status),
                           'eid': self.target_eid})])

        if archivedir:
            self.cnxh.debug('Archive the apycot temp directory')
            archive = make_archive_name(self.cnxh.instance_id, self.target_eid)
            archivefpath = os.path.join(tempfile.gettempdir(), archive)
            tarball = tarfile.open(archivefpath, ARCHIVE_MODE)
            try:
                self.cnxh.debug('archive file for %r is %r' % (archive, archivefpath))
                self.cnxh.debug('** adding %r' % archivedir)
                tarball.add(archivedir, arcname=os.path.basename(archivedir))
                tarball.close()
                self.cnxh.debug('** archive file size: %s' % (os.stat(archivefpath).st_size))
                self.cnxh.rqlio([('INSERT File F: F data %(content)s, '
                                  'F data_name %(fname)s, F data_format %(fmt)s, '
                                  'TE execution_archive F WHERE TE eid %(eid)s',
                                  {'content': open(archivefpath, 'rb'),
                                   'fname':archive, 'fmt': ARCHIVE_MIME_TYPE,
                                   'eid': self.target_eid})])
                self.cnxh.debug('** archive file %s uploaded' % archive)
            except:
                self.cnxh.error('while archiving execution directory', tb=True)
            finally:
                os.unlink(archivefpath)