_apycotlib/writer.py
author Paul Tonelli <paul.tonelli@logilab.fr>
Mon, 24 Mar 2014 10:17:06 +0100
changeset 1475 2a7c0ab3f126
parent 1455 4adba1b659f9
child 1476 419c81116273
permissions -rw-r--r--
[apycotlib] improve calls to http_get /post when communicating with cubicweb: - use get to get information from the cubicweb instance - use post to send information to modify the cubicweb instance - do not use object cwuri but concatenate cnxh url with eid

"""Writer sending data to a cubicweb instance which store it and may be used
to display reports

:organization: Logilab
:copyright: 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
"""
from __future__ import with_statement
__docformat__ = "restructuredtext en"

import os
import logging
import tarfile
import tempfile
import traceback
from datetime import datetime
from StringIO import StringIO
from threading import RLock

from logilab.mtconverter import xml_escape

REVERSE_SEVERITIES = {
    logging.DEBUG :   u'DEBUG',
    logging.INFO :    u'INFO',
    logging.WARNING : u'WARNING',
    logging.ERROR :   u'ERROR',
    logging.FATAL :   u'FATAL'
    }

ARCHIVE_EXT = '.tar.bz2'
ARCHIVE_MODE = 'w:bz2'
ARCHIVE_NAME = "apycot-archive-%(instance-id)s-%(exec-id)s"+ARCHIVE_EXT

def make_archive_name(cwinstid, execution_id):
    # replace ':' as tar use them to fetch archive over network
    exec_data = {'exec-id':     execution_id,
                 'instance-id': cwinstid,
                }
    return (ARCHIVE_NAME % exec_data).replace(':', '.')



class AbstractLogWriter(object):

    def _unicode(self, something):
        if isinstance(something, str):
            return unicode(something, 'utf-8', 'replace')
        if not isinstance(something, unicode):
            return unicode(something)
        return something

    def debug(self, *args, **kwargs):
        """log an debug"""
        self.log(logging.DEBUG, *args, **kwargs)

    def info(self, *args, **kwargs):
        """log an info"""
        self.log(logging.INFO, *args, **kwargs)

    def warning(self, *args, **kwargs):
        """log a warning"""
        self.log(logging.WARNING, *args, **kwargs)

    def error(self, *args, **kwargs):
        """log an error"""
        self.log(logging.ERROR, *args, **kwargs)

    def fatal(self, *args, **kwargs):
        """log a fatal error"""
        self.log(logging.FATAL, *args, **kwargs)

    critical = fatal

    def _msg_info(self, *args, **kwargs):
        path = kwargs.pop('path', None)
        line = kwargs.pop('line', None)
        tb = kwargs.pop('tb', False)
        assert not kwargs
        if len(args) > 1:
            args = [self._unicode(string) for string in args]
            msg = args[0] % tuple(args[1:])
        else:
            assert args
            msg = self._unicode(args[0])
        if tb:
            stream = StringIO()
            traceback.print_exc(file=stream)
            msg += '\n' + stream.getvalue()
        return path, line, msg

    def log(self, severity, *args, **kwargs):
        """log a message of a given severity"""
        path, line, msg = self._msg_info(*args, **kwargs)
        self._log(severity, path, line, msg)

    def _log(self, severity, path, line, msg):
        raise NotImplementedError()


class BaseDataWriter(AbstractLogWriter):
    """print execution message on stderr and store Test execution data to
    a CubicWeb instance (using the apycot cube)
    """

    def __init__(self, cnxh, target_url):
        self._cnxh = cnxh
        # eid of the execution entity
        self._url = target_url
        self._logs = []
        self._logs_sent = 0

    def target_eid(self):
        return self._url.rsplit('/',1)[-1]

    def instance_url(self):
        return '{0}/'.format(self._url.rsplit('/',1)[0])

    def start(self):
        pass

    def end(self):
        pass

    def set_exec_status(self, status):
        self._cnxh.http_post(self._url, vid='set_attributes', status=status)

    def execution_info(self, *args, **kwargs):
        msg = self._msg_info(*args, **kwargs)[-1]
        if isinstance(msg, unicode):
            msg = msg.encode('utf-8')
        print msg

    def _log(self, severity, path, line, msg):
        encodedmsg = u'%s\t%s\t%s\t%s<br/>' % (severity, xml_escape(path or u''),
                                               xml_escape(u'%s' % (line or u'')),
                                               xml_escape(msg))
        self._logs.append(encodedmsg)

    def raw(self, name, value, type=None):
        """give some raw data"""
        self._cnxh.http_post(self._url, vid='create_subentity',
                             __cwetype__='CheckResultInfo',
                             __cwrel__='for_check',
                             label=self._unicode(name),
                             value=self._unicode(value),
                             type=type and unicode(type))

    def refresh_log(self):
        log = self._logs
        if self._logs_sent < len(log):
            files = {'log': ('file', u'\n'.join(log[self._logs_sent:]) + '\n' )}
            self._cnxh.http_post(self.instance_url() +
                    'file-upload',
                    files=files,
                    field='log_file',
                    eid=self.target_eid(),
                    _cw_fields='eid,field,log')
            self._logs_sent = len(log)


class CheckDataWriter(BaseDataWriter):
    """Writer intended to report Check level log and result."""

    def start(self, checker, name=None):
        """Register the given checker as started"""
        if name is not None:
            crname = name
        else:
            crname = getattr(checker, 'id', checker) # may be the checked id
        data = self._cnxh.http_post(self._url, vid='create_subentity',
                                    __cwetype__='CheckResult',
                                    __cwrel__='during_execution',
                                    name=self._unicode(crname), status=u'processing',
                                    starttime=datetime.now())
        self._url = self._cnxh.instance_url + str(data[0]['eid'])
        if hasattr(checker, 'options'):
            options = ['%s=%s' % (k, v) for k, v in checker.options.iteritems()
                       if k in checker.options_def
                       and v != checker.options_def[k].get('default')]
            if options:
                self.info('\n'.join(options))
                self.refresh_log()

    def end(self, status):
        """Register the given checker as closed with status <status>"""
        self.refresh_log()
        self._cnxh.http_post(self._url, vid='set_attributes',
                             status=self._unicode(status),
                             endtime=datetime.now(),)


class TestDataWriter(BaseDataWriter):
    """Writer intended to report Test level log and result."""

    def make_check_writer(self):
        """Return a CheckDataWriter suitable to write checker log and result within this test"""
        self.refresh_log()
        return CheckDataWriter(self._cnxh, self._url)

    def link_to_revision(self, environment, vcsrepo):
        changeset = vcsrepo.changeset()
        if changeset is not None:
            self.raw(repr(vcsrepo), changeset, 'revision')

    def start(self):
        self.set_exec_status(u'set up')

    def end(self, status, archivedir=None):
        """mark the current test as closed (with status <status>) and archive if requested."""
        self.refresh_log()
        self._cnxh.http_post(self._url, vid='set_attributes',
                            status = self._unicode(status))
        if False and archivedir: # XXX this should be refactored! (mostly) useless as is
            archive = make_archive_name(self._cnxh.cwinstid, self._url)
            archivefpath = os.path.join(tempfile.gettempdir(), archive)
            tarball = tarfile.open(archivefpath, ARCHIVE_MODE)
            try:
                tarball.add(archivedir)
                tarball.close()
                files = {'log': (archive, StringIO(open(archivefpath,
                                 'rb').read()))}
                self._cnxh.http_post(self.instance_url() + 'file-upload',
                                     files=files,
                                     field='execution_archive',
                                     eid=self.target_eid())
            except:
                self.error('while archiving execution directory', tb=True)
            finally:
                os.unlink(archivefpath)