--- a/test/test_functional.py Fri Jul 25 11:06:29 2014 +0200
+++ b/test/test_functional.py Fri Jul 25 12:05:02 2014 +0200
@@ -31,35 +31,39 @@
class ApycotTC(utils.ApycotBaseTC):
def test_quick_recipe(self):
- te = self.lgc.start(self.lgce)
- self.commit()
- self.run_plan(te)
- self.assertEqual(dict((checker.name, checker.status) for checker in te.checkers),
- {'pyunit': 'nodata'})
-
+ with self.admin_access.client_cnx() as cnx:
+ lgc = cnx.entity_from_eid(self.lgc)
+ lgce = cnx.entity_from_eid(self.lgce)
+ te = lgc.start(lgce)
+ cnx.commit()
+ self.run_plan(te)
+ self.assertEqual(dict((checker.name, checker.status) for checker in te.checkers),
+ {'pyunit': 'nodata'})
def test_full_recipe(self):
- recipe = self.req.execute('Recipe X WHERE X name "apycot.recipe.full"').get_entity(0, 0)
- # reset recipe content
- recipe.cw_set(script=full_script)
- tc = self.add_test_config(u'full config', env=self.lgce, group=self.pyp,
- use_recipe=recipe)
- te = tc.start(self.lgce)
- self.commit()
- self.run_plan(te)
- exp = {u'pycoverage': u'error', u'pyunit': u'nodata'}
- try:
- from pylint import checkers as pycheckers
- from pylint.lint import PyLinter
- from pylint.__pkginfo__ import version as pylint_version
- from pylint.interfaces import IReporter
- from pylint.reporters import BaseReporter
- exp['pylint'] = u'error'
- except:
- pass
- self.assertDictEqual(exp,
- dict((checker.name, checker.status) for checker in te.checkers),
- )
+ with self.admin_access.client_cnx() as cnx:
+ lgce = cnx.entity_from_eid(self.lgce)
+ recipe = cnx.execute('Recipe X WHERE X name "apycot.recipe.full"').get_entity(0, 0)
+ # reset recipe content
+ recipe.cw_set(script=full_script)
+ tc = self.add_test_config(cnx, u'full config', env=self.lgce, group=self.pyp,
+ use_recipe=recipe)
+ te = tc.start(lgce)
+ cnx.commit()
+ self.run_plan(te)
+ exp = {u'pycoverage': u'error', u'pyunit': u'nodata'}
+ try:
+ from pylint import checkers as pycheckers
+ from pylint.lint import PyLinter
+ from pylint.__pkginfo__ import version as pylint_version
+ from pylint.interfaces import IReporter
+ from pylint.reporters import BaseReporter
+ exp['pylint'] = u'error'
+ except:
+ pass
+ self.assertDictEqual(exp,
+ dict((checker.name, checker.status) for checker in te.checkers),
+ )
if __name__ == '__main__':
unittest_main()
--- a/test/unittest_apycot.py Fri Jul 25 11:06:29 2014 +0200
+++ b/test/unittest_apycot.py Fri Jul 25 12:05:02 2014 +0200
@@ -28,41 +28,45 @@
def setUp(self):
super(ApycotBaseTC, self).setUp()
- te = self.lgc.start(self.lgce)
- self.commit()
- with self.login('narval', password='narval0') as cu:
- cnxh = HTTPConnectionHandler('narval')
- writer = TestDataWriter(cnxh, te.eid)
- writer.start()
- cwriter = writer.make_check_writer()
- cwriter.start(MockChecker(u'pylint', {}))
- cwriter.raw('pylint_version', '0.18.1', type=u'version')
- cwriter.debug('hip', path='/tmp/something', line=12)
- cwriter.info('hop', path='/tmp/something')
- cwriter.warning('''momo\n\n<br/>''')
- cwriter.end(SUCCESS)
- cwriter = writer.make_check_writer()
- cwriter.start(MockChecker(u'lintian', {'option': 'value'}))
- cwriter.raw('lintian_version', '1.0')
- cwriter.error('bouh')
- cwriter.fatal('di&d')
- cwriter.end(FAILURE)
- writer.end(FAILURE)
- self.checks = self.execute('Any X, N ORDERBY N WHERE X is CheckResult, X name N')
+ with self.admin_access.client_cnx() as cnx:
+ lgc = cnx.entity_from_eid(self.lgc)
+ lgce = cnx.entity_from_eid(self.lgce)
+ te = lgc.start(lgce).eid
+ cnx.commit()
+ cnxh = HTTPConnectionHandler('narval')
+ writer = TestDataWriter(cnxh, te)
+ writer.start()
+ cwriter = writer.make_check_writer()
+ cwriter.start(MockChecker(u'pylint', {}))
+ cwriter.raw('pylint_version', '0.18.1', type=u'version')
+ cwriter.debug('hip', path='/tmp/something', line=12)
+ cwriter.info('hop', path='/tmp/something')
+ cwriter.warning('''momo\n\n<br/>''')
+ cwriter.end(SUCCESS)
+ cwriter = writer.make_check_writer()
+ cwriter.start(MockChecker(u'lintian', {'option': 'value'}))
+ cwriter.raw('lintian_version', '1.0')
+ cwriter.error('bouh')
+ cwriter.fatal('di&d')
+ cwriter.end(FAILURE)
+ writer.end(FAILURE)
def test_writer_log_content(self):
- checks = self.checks
- self.assertEqual(len(checks), 2)
- self.assertEqual(checks.get_entity(0, 0).log_file[0].data.getvalue(),
- '20\t\t\toption=value<br/>40\t\t\tbouh<br/>50\t\t\tdi&d<br/>')
- self.assertEqual(checks.get_entity(1, 0).log_file[0].data.getvalue(),
- '10\t/tmp/something\t12\thip<br/>20\t/tmp/something\t\thop<br/>30\t\t\tmomo\n\n<br/><br/>')
+ with self.admin_access.client_cnx() as cnx:
+ checks = cnx.execute('Any X, N ORDERBY N WHERE X is CheckResult, X name N')
+ self.assertEqual(len(checks), 2)
+ self.assertEqual(checks.get_entity(0, 0).log_file[0].data.getvalue(),
+ '20\t\t\toption=value<br/>40\t\t\tbouh<br/>50\t\t\tdi&d<br/>')
+ self.assertEqual(checks.get_entity(1, 0).log_file[0].data.getvalue(),
+ '10\t/tmp/something\t12\thip<br/>20\t/tmp/something\t\thop<br/>30\t\t\tmomo\n\n<br/><br/>')
def test_log_formatting_first_check(self):
stream = []
- log_to_html(self.request(), '',
- self.checks.get_entity(0, 0).log_file[0].data.read(),
- stream.append)
+ with self.admin_access.web_request() as req:
+ checks = req.execute('Any X, N ORDERBY N WHERE X is CheckResult, X name N')
+ log_to_html(req, '',
+ checks.get_entity(0, 0).log_file[0].data.read(),
+ stream.append)
log_html = '\n'.join(stream)
self.assertWellFormed(self.get_validator(content_type='application/xml'),
CW_NAMESPACE_DIV % log_html)
@@ -84,9 +88,11 @@
def test_log_formatting_second_check(self):
stream = []
- log_to_html(self.request(), '',
- self.checks.get_entity(1, 0).log_file[0].data.read(),
- stream.append)
+ with self.admin_access.web_request() as req:
+ checks = req.execute('Any X, N ORDERBY N WHERE X is CheckResult, X name N')
+ log_to_html(req, '',
+ checks.get_entity(1, 0).log_file[0].data.read(),
+ stream.append)
log_html = '\n'.join(stream)
self.assertWellFormed(self.get_validator(content_type='application/xml'),
CW_NAMESPACE_DIV % log_html)
--- a/test/unittest_entities.py Fri Jul 25 11:06:29 2014 +0200
+++ b/test/unittest_entities.py Fri Jul 25 12:05:02 2014 +0200
@@ -24,82 +24,107 @@
def setup_database(self):
ApycotBaseTC.setup_database(self)
- self.lgd = self.add_test_config(u'lgd', check_config=None,
- env=self.lgce, group=self.pyp)
+ with self.admin_access.repo_cnx() as cnx:
+ self.lgd = self.add_test_config(cnx, u'lgd', check_config=None,
+ env=self.lgce, group=self.pyp).eid
+ cnx.commit()
def test_refinement_base(self):
- self.assertEqual(self.lgce.apycot_process_environment(),
- {'DISPLAY': ':2.0', 'SETUPTOOLS': '1'})
- self.assertEqual(self.lgc.apycot_process_environment(),
- {'DISPLAY': ':1.0', 'NO_SETUPTOOLS': '1'})
+ with self.admin_access.client_cnx() as cnx:
+ lgce = cnx.entity_from_eid(self.lgce)
+ lgc = cnx.entity_from_eid(self.lgc)
+ lgd = cnx.entity_from_eid(self.lgd)
+ self.assertEqual(lgce.apycot_process_environment(),
+ {'DISPLAY': ':2.0', 'SETUPTOOLS': '1'})
+ self.assertEqual(lgc.apycot_process_environment(),
+ {'DISPLAY': ':1.0', 'NO_SETUPTOOLS': '1'})
- self.assertEqual(self.lgd.apycot_configuration(self.lgce),
- {'install': 'python_setup',
- 'python_lint_treshold': '7',
- 'python_lint_ignore': 'thirdparty',
- 'python_test_coverage_treshold': '70',
- 'env-option': 'value'})
+ self.assertEqual(lgd.apycot_configuration(lgce),
+ {'install': 'python_setup',
+ 'python_lint_treshold': '7',
+ 'python_lint_ignore': 'thirdparty',
+ 'python_test_coverage_treshold': '70',
+ 'env-option': 'value'})
def test_refinement_override(self):
- self.assertEqual(self.lgc.apycot_configuration(self.lgce),
- {'install': 'python_setup',
- 'python_lint_treshold': '8',
- 'python_lint_ignore': 'thirdparty',
- 'python_test_coverage_treshold': '70',
- 'pouet': '5',
- 'env-option': 'value'})
+ with self.admin_access.client_cnx() as cnx:
+ lgce = cnx.entity_from_eid(self.lgce)
+ lgc = cnx.entity_from_eid(self.lgc)
+ self.assertEqual(lgc.apycot_configuration(lgce),
+ {'install': 'python_setup',
+ 'python_lint_treshold': '8',
+ 'python_lint_ignore': 'thirdparty',
+ 'python_test_coverage_treshold': '70',
+ 'pouet': '5',
+ 'env-option': 'value'})
def test_all_check_results(self):
- ex1 = self.lgc.start(self.lgce, check_duplicate=False)
- ex2 = self.lgc.start(self.lgce, check_duplicate=False)
- self.commit()
- self.login('narval', password='narval0')
- self.dumb_execution(ex1, [('unittest', 'success'), ('coverage', 'success')])
- self.commit()
- covcr = ex1.check_result_by_name('coverage').eid
- self.dumb_execution(ex2, [('unittest', 'failure')])
- self.commit()
- ucr = ex2.check_result_by_name('unittest').eid
- self.commit()
- self.restore_connection()
- self.assertEqual([cr.eid for cr in all_check_results(self.lgc)],
- [covcr, ucr])
+ with self.admin_access.client_cnx() as cnx:
+ lgce = cnx.entity_from_eid(self.lgce)
+ lgc = cnx.entity_from_eid(self.lgc)
+ ex1eid = lgc.start(lgce, check_duplicate=False).eid
+ ex2eid = lgc.start(lgce, check_duplicate=False).eid
+ cnx.commit()
+
+ with self.new_access('narval').client_cnx() as cnx:
+ ex1 = cnx.entity_from_eid(ex1eid)
+ ex2 = cnx.entity_from_eid(ex2eid)
+ self.dumb_execution(cnx, ex1, [('unittest', 'success'), ('coverage', 'success')])
+ cnx.commit()
+ covcr = ex1.check_result_by_name('coverage').eid
+ self.dumb_execution(cnx, ex2, [('unittest', 'failure')])
+ cnx.commit()
+ ucr = ex2.check_result_by_name('unittest').eid
+ cnx.commit()
+
+ with self.admin_access.client_cnx() as cnx:
+ lgc = cnx.entity_from_eid(self.lgc)
+ self.assertEqual([cr.eid for cr in all_check_results(lgc)],
+ [covcr, ucr])
def test_duplicated_tc_same_env(self):
tcncstrs = self.schema['TestConfig'].rdef('name').constraints
self.assertEqual([cstr.type() for cstr in tcncstrs], ['RQLUniqueConstraint', 'SizeConstraint'])
- self.request().create_entity('TestConfig', name=u'lgd', use_environment=self.lgce)
- self.assertRaises(ValidationError, self.commit)
+ with self.admin_access.client_cnx() as cnx:
+ cnx.create_entity('TestConfig', name=u'lgd', use_environment=self.lgce)
+ self.assertRaises(ValidationError, cnx.commit)
def test_status_change(self):
- ex1 = self.lgc.start(self.lgce, check_duplicate=False)
- ex2 = self.lgc.start(self.lgce, check_duplicate=False)
- ex3 = self.lgc.start(self.lgce, check_duplicate=False)
- self.commit()
- self.login('narval', password='narval0')
- self.dumb_execution(ex1, [('unittest', 'success'), ('coverage', 'success')])
- self.dumb_execution(ex2, [('unittest', 'failure'), ('coverage', 'success')])
- self.dumb_execution(ex3, [('unittest', 'success'), ('coverage', 'error')])
- self.commit()
+ with self.admin_access.client_cnx() as cnx:
+ lgce = cnx.entity_from_eid(self.lgce)
+ lgc = cnx.entity_from_eid(self.lgc)
+ ex1eid = lgc.start(lgce, check_duplicate=False).eid
+ ex2eid = lgc.start(lgce, check_duplicate=False).eid
+ ex3eid = lgc.start(lgce, check_duplicate=False).eid
+ cnx.commit()
+
+ with self.new_access('narval').client_cnx() as cnx:
+ ex1 = cnx.entity_from_eid(ex1eid)
+ ex2 = cnx.entity_from_eid(ex2eid)
+ ex3 = cnx.entity_from_eid(ex3eid)
+ self.dumb_execution(cnx, ex1, [('unittest', 'success'), ('coverage', 'success')])
+ self.dumb_execution(cnx, ex2, [('unittest', 'failure'), ('coverage', 'success')])
+ self.dumb_execution(cnx, ex3, [('unittest', 'success'), ('coverage', 'error')])
+ cnx.commit()
- status_changes = ex2.status_changes()
- for key, values in status_changes.iteritems():
- self.assertTrue(hasattr(values[0], 'eid'))
- self.assertTrue(hasattr(values[1], 'eid'))
- status_changes[key] = (values[0].eid, values[1].eid)
- self.assertDictEqual(status_changes,
- {'unittest': (ex1.check_result_by_name('unittest').eid,
- ex2.check_result_by_name('unittest').eid)})
- status_changes = ex3.status_changes()
- for key, values in status_changes.iteritems():
- self.assertTrue(hasattr(values[0], 'eid'))
- self.assertTrue(hasattr(values[1], 'eid'))
- status_changes[key] = (values[0].eid, values[1].eid)
- self.assertDictEqual(status_changes,
- {'unittest': (ex2.check_result_by_name('unittest').eid,
- ex3.check_result_by_name('unittest').eid),
- 'coverage': (ex2.check_result_by_name('coverage').eid,
- ex3.check_result_by_name('coverage').eid)})
+ status_changes = ex2.status_changes()
+ for key, values in status_changes.iteritems():
+ self.assertTrue(hasattr(values[0], 'eid'))
+ self.assertTrue(hasattr(values[1], 'eid'))
+ status_changes[key] = (values[0].eid, values[1].eid)
+ self.assertDictEqual(status_changes,
+ {'unittest': (ex1.check_result_by_name('unittest').eid,
+ ex2.check_result_by_name('unittest').eid)})
+ status_changes = ex3.status_changes()
+ for key, values in status_changes.iteritems():
+ self.assertTrue(hasattr(values[0], 'eid'))
+ self.assertTrue(hasattr(values[1], 'eid'))
+ status_changes[key] = (values[0].eid, values[1].eid)
+ self.assertDictEqual(status_changes,
+ {'unittest': (ex2.check_result_by_name('unittest').eid,
+ ex3.check_result_by_name('unittest').eid),
+ 'coverage': (ex2.check_result_by_name('coverage').eid,
+ ex3.check_result_by_name('coverage').eid)})
# XXX to backport
# def test_branch_for_pe(self):
# #check that branch defined in ProjectEnvironement are propertly retrieved
--- a/test/unittest_hooks.py Fri Jul 25 11:06:29 2014 +0200
+++ b/test/unittest_hooks.py Fri Jul 25 12:05:02 2014 +0200
@@ -22,52 +22,58 @@
def setup_database(self):
super(NotificationTC, self).setup_database()
- self.request().create_entity('EmailAddress',
- address=u'admin@cubicweb.org',
- reverse_use_email=self.user())
+ with self.admin_access.repo_cnx() as cnx:
+ cnx.create_entity('EmailAddress',
+ address=u'admin@cubicweb.org',
+ reverse_use_email=cnx.user)
+ cnx.commit()
def start_lgc_tests(self):
- plan = self.lgc.start(self.lgce)
- self.lgc._cw.cnx.commit()
- return plan
+ with self.admin_access.client_cnx() as cnx:
+ lgc = cnx.entity_from_eid(self.lgc)
+ lgce = cnx.entity_from_eid(self.lgce)
+ plan = lgc.start(lgce)
+ cnx.commit()
+ return plan.eid
def test_exec_status_change(self):
- self.login('narval', password='narval0')
- plan = self.start_lgc_tests()
- plan = self.request().entity_from_eid(plan.eid)
- self.dumb_execution(plan, [('unittest', 'success'), ('coverage', 'success')])
- plan.cw_adapt_to('IWorkflowable').fire_transition('start')
- self.commit()
- plan.cw_adapt_to('IWorkflowable').fire_transition('end')
- self.commit()
- self.assertEqual(MAILBOX, [])
- self.commit()
- self.assertEqual(len(MAILBOX), 0)
- plan = self.start_lgc_tests()
- plan = self.request().entity_from_eid(plan.eid)
- self.dumb_execution(plan, [('unittest', 'success'), ('coverage', 'success')])
- plan.cw_adapt_to('IWorkflowable').fire_transition('start')
- self.commit()
- plan.cw_adapt_to('IWorkflowable').fire_transition('end')
- self.commit()
- self.assertEqual(len(MAILBOX), 0)
- self.commit()
- self.assertEqual(len(MAILBOX), 0)
- plan = self.start_lgc_tests()
- plan = self.request().entity_from_eid(plan.eid)
- self.dumb_execution(plan, [('unittest', 'failure'), ('coverage', 'failure')])
- self.assertEqual(len(MAILBOX), 0, MAILBOX)
- self.commit()
- plan.cw_adapt_to('IWorkflowable').fire_transition('start')
- self.commit()
- plan.cw_adapt_to('IWorkflowable').fire_transition('end')
- self.commit()
- self.assertEqual(len(MAILBOX), 1)
- self.assertEqual(MAILBOX[0].recipients, ['admin@cubicweb.org'])
- self.assertEqual(MAILBOX[0].message.get('Subject'),
- '[data] lgce/lgc#default now has 2 failure')
- self.assertMultiLineEqual(clean_str(MAILBOX[0].message.get_payload(decode=True)),
- '''The following changes occured between executions on branch default:
+ planeid = self.start_lgc_tests()
+ with self.new_access('narval').client_cnx() as cnx:
+ plan = cnx.entity_from_eid(planeid)
+ self.dumb_execution(cnx, plan, [('unittest', 'success'), ('coverage', 'success')])
+ plan.cw_adapt_to('IWorkflowable').fire_transition('start')
+ cnx.commit()
+ plan.cw_adapt_to('IWorkflowable').fire_transition('end')
+ cnx.commit()
+ self.assertEqual(MAILBOX, [])
+ self.assertEqual(len(MAILBOX), 0)
+ planeid = self.start_lgc_tests()
+ with self.new_access('narval').client_cnx() as cnx:
+ plan = cnx.entity_from_eid(planeid)
+ self.dumb_execution(cnx, plan, [('unittest', 'success'), ('coverage', 'success')])
+ plan.cw_adapt_to('IWorkflowable').fire_transition('start')
+ cnx.commit()
+ plan.cw_adapt_to('IWorkflowable').fire_transition('end')
+ cnx.commit()
+ self.assertEqual(len(MAILBOX), 0)
+ cnx.commit()
+ self.assertEqual(len(MAILBOX), 0)
+ planeid = self.start_lgc_tests()
+ with self.new_access('narval').client_cnx() as cnx:
+ plan = cnx.entity_from_eid(planeid)
+ self.dumb_execution(cnx, plan, [('unittest', 'failure'), ('coverage', 'failure')])
+ self.assertEqual(len(MAILBOX), 0, MAILBOX)
+ cnx.commit()
+ plan.cw_adapt_to('IWorkflowable').fire_transition('start')
+ cnx.commit()
+ plan.cw_adapt_to('IWorkflowable').fire_transition('end')
+ cnx.commit()
+ self.assertEqual(len(MAILBOX), 1)
+ self.assertEqual(MAILBOX[0].recipients, ['admin@cubicweb.org'])
+ self.assertEqual(MAILBOX[0].message.get('Subject'),
+ '[data] lgce/lgc#default now has 2 failure')
+ self.assertMultiLineEqual(clean_str(MAILBOX[0].message.get_payload(decode=True)),
+ '''The following changes occured between executions on branch default:
* coverage status changed from success to failure
* unittest status changed from success to failure
@@ -79,29 +85,30 @@
def test_exec_one_status_change(self):
- self.login('narval', password='narval0')
- plan = self.start_lgc_tests()
- plan = self.request().entity_from_eid(plan.eid)
- self.dumb_execution(plan, [('unittest', 'success'), ('coverage', 'success')])
- self.commit()
- plan.cw_adapt_to('IWorkflowable').fire_transition('start')
- self.commit()
- plan.cw_adapt_to('IWorkflowable').fire_transition('end')
- self.commit()
- plan = self.start_lgc_tests()
- plan = self.request().entity_from_eid(plan.eid)
- self.dumb_execution(plan, [('unittest', 'failure')])
- self.commit()
- plan.cw_adapt_to('IWorkflowable').fire_transition('start')
- self.commit()
- plan.cw_adapt_to('IWorkflowable').fire_transition('end')
- self.commit()
- self.assertEqual(len(MAILBOX), 1)
- self.assertEqual(MAILBOX[0].recipients, ['admin@cubicweb.org'])
- self.assertEqual(MAILBOX[0].message.get('Subject'),
- '[data] lgce/lgc#default: success -> failure (unittest)')
- self.assertMultiLineEqual(clean_str(MAILBOX[0].message.get_payload(decode=True)),
- '''The following changes occured between executions on branch default:
+ planeid = self.start_lgc_tests()
+ with self.new_access('narval').client_cnx() as cnx:
+ plan = cnx.entity_from_eid(planeid)
+ self.dumb_execution(cnx, plan, [('unittest', 'success'), ('coverage', 'success')])
+ cnx.commit()
+ plan.cw_adapt_to('IWorkflowable').fire_transition('start')
+ cnx.commit()
+ plan.cw_adapt_to('IWorkflowable').fire_transition('end')
+ cnx.commit()
+ planeid = self.start_lgc_tests()
+ with self.new_access('narval').client_cnx() as cnx:
+ plan = cnx.entity_from_eid(planeid)
+ self.dumb_execution(cnx, plan, [('unittest', 'failure')])
+ cnx.commit()
+ plan.cw_adapt_to('IWorkflowable').fire_transition('start')
+ cnx.commit()
+ plan.cw_adapt_to('IWorkflowable').fire_transition('end')
+ cnx.commit()
+ self.assertEqual(len(MAILBOX), 1)
+ self.assertEqual(MAILBOX[0].recipients, ['admin@cubicweb.org'])
+ self.assertEqual(MAILBOX[0].message.get('Subject'),
+ '[data] lgce/lgc#default: success -> failure (unittest)')
+ self.assertMultiLineEqual(clean_str(MAILBOX[0].message.get_payload(decode=True)),
+ '''The following changes occured between executions on branch default:
* unittest status changed from success to failure
@@ -111,15 +118,14 @@
URL: http://testing.fr/cubicweb/testexecution/XXXX''')
def test_nosy_list_propagation(self):
- self.login('narval', password='narval0')
- plan = self.start_lgc_tests()
- self.commit()
- self.restore_connection()
- self.execute('SET X interested_in PE WHERE X login "anon", PE eid %(pe)s',
- {'pe': self.lgce.eid})
- self.commit()
- # do not propagate to existing test execution
- self.assertFalse('anon' in [u.login for u in plan.nosy_list])
+ planeid = self.start_lgc_tests()
+ with self.admin_access.client_cnx() as cnx:
+ cnx.execute('SET X interested_in PE WHERE X login "anon", PE eid %(pe)s',
+ {'pe': self.lgce})
+ cnx.commit()
+ # do not propagate to existing test execution
+ plan = cnx.entity_from_eid(planeid)
+ self.assertNotIn('anon', [u.login for u in plan.nosy_list])
class StartTestTC(ApycotBaseTC):
@@ -127,14 +133,16 @@
def setUp(self):
super(StartTestTC, self).setUp()
r = self.repo
- r.path = self.vcsrepo.path
+ with self.admin_access.repo_cnx() as cnx:
+ vcsrepo = cnx.entity_from_eid(self.vcsrepo)
+ r.path = vcsrepo.path
os.system('hg init %s' % r.path)
os.system('echo data > %s/tutu.png' % r.path)
os.system('hg -R %s add %s/tutu.png' % (r.path, r.path))
os.system('hg -R %s commit -m rien -u narval' % r.path)
# reset vcsrepo
- with r.internal_session(safe=True) as session:
- bridge.import_content(session, commitevery=1, raise_on_error=True)
+ with r.internal_cnx() as cnx:
+ bridge.import_content(cnx, commitevery=1, raise_on_error=True)
def tearDown(self):
r = self.repo
@@ -142,7 +150,7 @@
os.system('rm -rf %s/*.png' % r.path)
super(StartTestTC, self).tearDown()
- def get_tc(self, period=None):
+ def get_tc(self, cnx, period=None):
rql = ('Any PEN, TCN, TCS WHERE TC in_state S, '
'TC use_environment PE, '
'PE name PEN, '
@@ -151,24 +159,22 @@
'TC start_rev_deps TCS')
if period:
rql +=', TC computed_start_mode "%s"' % period
- return set(tuple(tc) for tc in self.execute(rql))
+ return set(tuple(tc) for tc in cnx.execute(rql))
- def get_te_for_tc_name(self, eid):
+ def get_te_for_tc_name(self, cnx, eid):
rql = 'Any TE where TE using_config TC, TC eid %(eid)s'
- return self.execute(rql, {'eid': eid})
+ return cnx.execute(rql, {'eid': eid})
- def launch_all_tests(self, period):
+ def launch_all_tests(self, cnx, period):
""" launch all tests in the correct state and and make sure no
TestExecutions are left in status waiting execution. Also fill-in the
revision which should normally be filled by StartTestOp.
"""
- session = self.session
- session.set_cnxset()
- start_period_tests(session, period)
- self.commit() #put new te in base
+ start_period_tests(cnx, period)
+ cnx.commit() #put new te in base
## make sure they are not waiting execution and REV is filled
- for te, rev in self.execute('Any TE, REV WHERE TE status %(st)s, '
+ for te, rev in cnx.execute('Any TE, REV WHERE TE status %(st)s, '
'TE using_config TC, '
'TE branch BR, '
'TC use_environment PE, '
@@ -177,116 +183,119 @@
'REV branch BR, '
'NOT REV parent_revision REV2',
{'st': 'waiting execution'}):
- te_e = self.request().entity_from_eid(te)
- rev_e = self.request().entity_from_eid(rev)
+ te_e = cnx.entity_from_eid(te)
+ rev_e = cnx.entity_from_eid(rev)
te_e.cw_adapt_to('IWorkflowable').fire_transition('start')
te_e.cw_set(status=u'running')
te_e.cw_set(using_revision=rev)
- self.commit() #change new te status
+ cnx.commit() #change new te status
def test_new_vc_trigger(self):
""" check the on new revision start mode. Run all testconfigs, add new
revision, re-run and check there are new test configs"""
- self.lgc.cw_set(start_mode=u'on new revision')
- lgc2 = self.add_test_config(u'lgc2',
- start_mode=u'manual',
- env=self.lgce,
- use_recipe=self.recipe)
- ## same repo, branch stable
- lgc3 = self.add_test_config(u'lgc3',
- start_mode=u'on new revision',
- check_config=u'branch=stable',
- env=self.lgce,
- use_recipe=self.recipe)
- ## same repo, branch default
- lgc4 = self.add_test_config(u'lgc4',
- start_mode=u'on new revision',
- check_config=u'branch=default',
- env=self.lgce,
- use_recipe=self.recipe)
- ## on different repo
- req = self.request()
- lgce2 = req.create_entity(
- 'ProjectEnvironment', name=u'lgce2',
- check_config=u'install=setup_install',
- vcs_path=u'dir1', #dummy
- )
- lgc5 = self.add_test_config(u'lgc5',
- start_mode=u'on new revision',
- check_config=u'branch=default',
- env=lgce2,
- start_rev_deps=True,
- use_recipe=self.recipe)
- ## not sure this is still useful
- self.commit()
- ## not reaally necessary now, as they all appear, helps understanding,
- ## we check all the necessary testconfig are there
- self.assertEqual(self.get_tc(period=u'on new revision'),
- set([('lgce', 'lgc', None),
- ('lgce', 'lgc3', None),
- ('lgce', 'lgc4', None),
- ('lgce2', 'lgc5', 1)]))
- ## run everything
+ with self.admin_access.client_cnx() as cnx:
+ lgc = cnx.entity_from_eid(self.lgc)
+ lgc.cw_set(start_mode=u'on new revision')
+ lgc2 = self.add_test_config(cnx, u'lgc2',
+ start_mode=u'manual',
+ env=self.lgce,
+ use_recipe=self.recipe)
+ ## same repo, branch stable
+ lgc3 = self.add_test_config(cnx, u'lgc3',
+ start_mode=u'on new revision',
+ check_config=u'branch=stable',
+ env=self.lgce,
+ use_recipe=self.recipe)
+ ## same repo, branch default
+ lgc4 = self.add_test_config(cnx, u'lgc4',
+ start_mode=u'on new revision',
+ check_config=u'branch=default',
+ env=self.lgce,
+ use_recipe=self.recipe)
+ ## on different repo
+ lgce2 = cnx.create_entity(
+ 'ProjectEnvironment', name=u'lgce2',
+ check_config=u'install=setup_install',
+ vcs_path=u'dir1', #dummy
+ )
+ lgc5 = self.add_test_config(cnx, u'lgc5',
+ start_mode=u'on new revision',
+ check_config=u'branch=default',
+ env=lgce2,
+ start_rev_deps=True,
+ use_recipe=self.recipe)
+ ## not sure this is still useful
+ cnx.commit()
+ ## not reaally necessary now, as they all appear, helps understanding,
+ ## we check all the necessary testconfig are there
+ self.assertEqual(self.get_tc(cnx, period=u'on new revision'),
+ set([('lgce', 'lgc', None),
+ ('lgce', 'lgc3', None),
+ ('lgce', 'lgc4', None),
+ ('lgce2', 'lgc5', 1)]))
+ ## run everything
- self.launch_all_tests('on new revision')
+ self.launch_all_tests(cnx, 'on new revision')
- nb_tc_lgc_before = len(self.get_te_for_tc_name(self.lgc.eid))
- nb_tc_lgc3_before = len(self.get_te_for_tc_name(lgc3.eid))
- nb_tc_lgc4_before = len(self.get_te_for_tc_name(lgc4.eid))
- nb_tc_lgc5_before = len(self.get_te_for_tc_name(lgc5.eid))
- ##check you do not add anything on rerun (no new revision)
- self.launch_all_tests('on new revision')
- self.assertEqual(nb_tc_lgc_before,
- len(self.get_te_for_tc_name(self.lgc.eid)))
- self.assertEqual(nb_tc_lgc3_before,
- len(self.get_te_for_tc_name(lgc3.eid)))
- self.assertEqual(nb_tc_lgc4_before,
- len(self.get_te_for_tc_name(lgc4.eid)))
- self.assertEqual(nb_tc_lgc5_before,
- len(self.get_te_for_tc_name(lgc5.eid)))
+ nb_tc_lgc_before = len(self.get_te_for_tc_name(cnx, self.lgc))
+ nb_tc_lgc3_before = len(self.get_te_for_tc_name(cnx, lgc3.eid))
+ nb_tc_lgc4_before = len(self.get_te_for_tc_name(cnx, lgc4.eid))
+ nb_tc_lgc5_before = len(self.get_te_for_tc_name(cnx, lgc5.eid))
+ ##check you do not add anything on rerun (no new revision)
+ self.launch_all_tests(cnx, 'on new revision')
+ self.assertEqual(nb_tc_lgc_before,
+ len(self.get_te_for_tc_name(cnx, self.lgc)))
+ self.assertEqual(nb_tc_lgc3_before,
+ len(self.get_te_for_tc_name(cnx, lgc3.eid)))
+ self.assertEqual(nb_tc_lgc4_before,
+ len(self.get_te_for_tc_name(cnx, lgc4.eid)))
+ self.assertEqual(nb_tc_lgc5_before,
+ len(self.get_te_for_tc_name(cnx, lgc5.eid)))
+ ## add a new revision in stable
+ #r.vcs_add(u'dir1', u'tutu.png', Binary('data'), branch=u'stable')
+ cnx.commit()
+
r = self.repo
-
- ## add a new revision in stable
- #r.vcs_add(u'dir1', u'tutu.png', Binary('data'), branch=u'stable')
- self.commit()
os.system('echo data > %s/tutu1.png' % r.path)
os.system('hg -R %s branch stable' % r.path)
os.system('hg -R %s add %s/tutu1.png' % (r.path, r.path))
os.system('hg -R %s commit -m rien -u narval' % r.path)
- with r.internal_session(safe=True) as session:
- bridge.import_content(session, commitevery=1, raise_on_error=True)
+ with r.internal_cnx() as cnx:
+ bridge.import_content(cnx, commitevery=1, raise_on_error=True)
- ## now it should add te for the tc linked to lgce and running on new
- ## revision and not the others
- self.launch_all_tests('on new revision')
- self.assertEqual(nb_tc_lgc_before + 1,
- len(self.get_te_for_tc_name(self.lgc.eid)))
- self.assertEqual(nb_tc_lgc3_before + 1,
- len(self.get_te_for_tc_name(lgc3.eid)))
- self.assertEqual(nb_tc_lgc4_before,
- len(self.get_te_for_tc_name(lgc4.eid)))
- self.assertEqual(nb_tc_lgc5_before,
- len(self.get_te_for_tc_name(lgc5.eid)))
+ with self.admin_access.client_cnx() as cnx:
+ ## now it should add te for the tc linked to lgce and running on new
+ ## revision and not the others
+ self.launch_all_tests(cnx, 'on new revision')
+ self.assertEqual(nb_tc_lgc_before + 1,
+ len(self.get_te_for_tc_name(cnx, self.lgc)))
+ self.assertEqual(nb_tc_lgc3_before + 1,
+ len(self.get_te_for_tc_name(cnx, lgc3.eid)))
+ self.assertEqual(nb_tc_lgc4_before,
+ len(self.get_te_for_tc_name(cnx, lgc4.eid)))
+ self.assertEqual(nb_tc_lgc5_before,
+ len(self.get_te_for_tc_name(cnx, lgc5.eid)))
- ## add a new revision in default
- #r.vcs_add(u'dir1', u'tutu1.png', Binary('data'))
- self.commit()
+ ## add a new revision in default
+ #r.vcs_add(u'dir1', u'tutu1.png', Binary('data'))
+ cnx.commit()
self.assertEqual(0, os.system('echo data > %s/tutu2.png' % r.path))
self.assertEqual(0,os.system('hg -R %s up default' % r.path))
self.assertEqual(0,os.system('hg -R %s add %s/tutu2.png' % (r.path, r.path)))
self.assertEqual(0,os.system('hg -R %s commit -m rien -u narval' % r.path))
- with r.internal_session(safe=True) as session:
- bridge.import_content(session, commitevery=1, raise_on_error=True)
+ with r.internal_cnx() as cnx:
+ bridge.import_content(cnx, commitevery=1, raise_on_error=True)
## it should add te to the tc on default (lgc and lgc4)
- self.launch_all_tests('on new revision')
- self.assertEqual(nb_tc_lgc_before + 2,
- len(self.get_te_for_tc_name(self.lgc.eid)))
- self.assertEqual(nb_tc_lgc3_before + 1,
- len(self.get_te_for_tc_name(lgc3.eid)))
- self.assertEqual(nb_tc_lgc4_before + 1,
- len(self.get_te_for_tc_name(lgc4.eid)))
- self.assertEqual(nb_tc_lgc5_before,
- len(self.get_te_for_tc_name(lgc5.eid)))
+ with self.admin_access.client_cnx() as cnx:
+ self.launch_all_tests(cnx, 'on new revision')
+ self.assertEqual(nb_tc_lgc_before + 2,
+ len(self.get_te_for_tc_name(cnx, self.lgc)))
+ self.assertEqual(nb_tc_lgc3_before + 1,
+ len(self.get_te_for_tc_name(cnx, lgc3.eid)))
+ self.assertEqual(nb_tc_lgc4_before + 1,
+ len(self.get_te_for_tc_name(cnx, lgc4.eid)))
+ self.assertEqual(nb_tc_lgc5_before,
+ len(self.get_te_for_tc_name(cnx, lgc5.eid)))
def test_datetime_trigger(self):
"""test the registering of TC depending on their start_mode
@@ -295,86 +304,92 @@
- check they are removed when deactivated
- check they are removed when finished
"""
- self.lgc.cw_set(start_mode=u'hourly')
- lgc2 = self.add_test_config(u'lgc2', start_mode=u'hourly', env=self.lgce)
- req = self.request()
- lgce2 = req.create_entity(
- 'ProjectEnvironment', name=u'lgce2',
- check_config=u'install=setup_install',
- vcs_path=u'dir1',
- )
- lgc3 = self.add_test_config(u'lgc3', start_mode=u'hourly',
- check_config=u'branch=default',
- env=lgce2, start_rev_deps=True,
- use_recipe=self.recipe)
- self.execute('SET PE local_repository R WHERE PE name "lgce2", R eid %(r)s',
- {'r': self.vcsrepo.eid})
- self.commit()
- self.assertEqual(self.get_tc('daily'),
- set([]))
- self.assertEqual(self.get_tc('hourly'),
- set((('lgce', 'lgc', None),
- ('lgce', 'lgc2', None),
- ('lgce2', 'lgc3', 1)))
- )
- lgc2.cw_adapt_to('IWorkflowable').fire_transition('deactivate')
- self.commit()
- self.assertEqual(self.get_tc('hourly'),
- set((('lgce', 'lgc', None),
- ('lgce2', 'lgc3', 1)))
- )
- self.commit()
- rset_before_lgc = self.get_te_for_tc_name(self.lgc.eid)
- te = lgc3.start(lgce2, branch=u'default')
- lgc3._cw.cnx.commit()
- ###not really useful as the method directly check the existence of a
- ###relevant tc before running the tests,
- ### we should instead test the launched test configs
- self.assertEqual(self.get_tc('hourly'),
- set((('lgce', 'lgc', None),
- ('lgce2', 'lgc3', 1)))
- )
- rset_before_lgc = self.get_te_for_tc_name(self.lgc.eid)
- rset_before_lgc3 = self.get_te_for_tc_name(lgc3.eid)
- self.launch_all_tests('hourly')
- self.commit()
- rset_after_lgc = self.get_te_for_tc_name(self.lgc.eid)
- rset_after_lgc3 = self.get_te_for_tc_name(lgc3.eid)
- ### check there is one new TestConfig for lgc and the same number for lgc3
- #(alredy launched)
- self.assertEqual(len(rset_after_lgc3), len(rset_before_lgc3))
- self.assertEqual(len(rset_after_lgc), 1 + len(rset_before_lgc))
+ with self.admin_access.client_cnx() as cnx:
+ lgc = cnx.entity_from_eid(self.lgc)
+ lgc.cw_set(start_mode=u'hourly')
+ lgc2 = self.add_test_config(cnx, u'lgc2', start_mode=u'hourly', env=self.lgce)
+ lgce2 = cnx.create_entity(
+ 'ProjectEnvironment', name=u'lgce2',
+ check_config=u'install=setup_install',
+ vcs_path=u'dir1',
+ )
+ lgc3 = self.add_test_config(cnx, u'lgc3', start_mode=u'hourly',
+ check_config=u'branch=default',
+ env=lgce2, start_rev_deps=True,
+ use_recipe=self.recipe)
+ cnx.execute('SET PE local_repository R WHERE PE name "lgce2", R eid %(r)s',
+ {'r': self.vcsrepo})
+ cnx.commit()
+ self.assertEqual(self.get_tc(cnx, 'daily'),
+ set([]))
+ self.assertEqual(self.get_tc(cnx, 'hourly'),
+ set((('lgce', 'lgc', None),
+ ('lgce', 'lgc2', None),
+ ('lgce2', 'lgc3', 1)))
+ )
+ lgc2.cw_adapt_to('IWorkflowable').fire_transition('deactivate')
+ cnx.commit()
+ self.assertEqual(self.get_tc(cnx, 'hourly'),
+ set((('lgce', 'lgc', None),
+ ('lgce2', 'lgc3', 1)))
+ )
+ cnx.commit()
+ rset_before_lgc = self.get_te_for_tc_name(cnx, self.lgc)
+ te = lgc3.start(lgce2, branch=u'default')
+ cnx.commit()
+ ###not really useful as the method directly check the existence of a
+ ###relevant tc before running the tests,
+ ### we should instead test the launched test configs
+ self.assertEqual(self.get_tc(cnx, 'hourly'),
+ set((('lgce', 'lgc', None),
+ ('lgce2', 'lgc3', 1)))
+ )
+ rset_before_lgc = self.get_te_for_tc_name(cnx, self.lgc)
+ rset_before_lgc3 = self.get_te_for_tc_name(cnx, lgc3.eid)
+ self.launch_all_tests(cnx, 'hourly')
+ rset_after_lgc = self.get_te_for_tc_name(cnx, self.lgc)
+ rset_after_lgc3 = self.get_te_for_tc_name(cnx, lgc3.eid)
+ ### check there is one new TestConfig for lgc and the same number for lgc3
+ #(alredy launched)
+ self.assertEqual(len(rset_after_lgc3), len(rset_before_lgc3))
+ self.assertEqual(len(rset_after_lgc), 1 + len(rset_before_lgc))
def test_error_in_recipe(self):
"""this testcase should check what happens when a recipe does not have
the correct code and crashes before sending any status, the returned
status should NOT be waiting execution"""
- te = self.lgc.start(self.lgce, branch=u'default')
- self.commit()
- te.cw_adapt_to('IWorkflowable').fire_transition('start')
- self.commit()
- te.cw_adapt_to('IWorkflowable').fire_transition('fail')
- self.commit()
- #the status should not be waiting execution !
- self.assertNotEqual(u'waiting execution', te.status)
+ with self.admin_access.client_cnx() as cnx:
+ lgc = cnx.entity_from_eid(self.lgc)
+ lgce = cnx.entity_from_eid(self.lgce)
+ te = lgc.start(lgce, branch=u'default')
+ cnx.commit()
+ te.cw_adapt_to('IWorkflowable').fire_transition('start')
+ cnx.commit()
+ te.cw_adapt_to('IWorkflowable').fire_transition('fail')
+ cnx.commit()
+ #the status should not be waiting execution !
+ self.assertNotEqual(u'waiting execution', te.status)
class ComputedStartModeTC(ApycotBaseTC):
def test_start_mode(self):
- self.pyp.cw_set(start_mode=u'monthly')
- self.commit()
- self.assertEqual(self.pyp.computed_start_mode, self.pyp.start_mode)
- self.lgc.cw_set(start_mode=u'inherited')
- self.commit()
- self.lgc.cw_clear_all_caches()
- self.assertEqual(self.lgc.computed_start_mode, self.pyp.start_mode)
- self.pyp.cw_set(start_mode=u'hourly')
- self.commit()
- self.pyp.cw_clear_all_caches()
- self.lgc.cw_clear_all_caches()
- self.assertEqual(self.pyp.computed_start_mode, u'hourly')
- self.assertEqual(self.lgc.computed_start_mode, u'hourly')
+ with self.admin_access.client_cnx() as cnx:
+ pyp = cnx.entity_from_eid(self.pyp)
+ lgc = cnx.entity_from_eid(self.lgc)
+ pyp.cw_set(start_mode=u'monthly')
+ cnx.commit()
+ self.assertEqual(pyp.computed_start_mode, pyp.start_mode)
+ lgc.cw_set(start_mode=u'inherited')
+ cnx.commit()
+ lgc.cw_clear_all_caches()
+ self.assertEqual(lgc.computed_start_mode, pyp.start_mode)
+ pyp.cw_set(start_mode=u'hourly')
+ cnx.commit()
+ pyp.cw_clear_all_caches()
+ lgc.cw_clear_all_caches()
+ self.assertEqual(pyp.computed_start_mode, u'hourly')
+ self.assertEqual(lgc.computed_start_mode, u'hourly')
if __name__ == '__main__':
unittest_main()
--- a/test/unittest_preprocessors.py Fri Jul 25 11:06:29 2014 +0200
+++ b/test/unittest_preprocessors.py Fri Jul 25 12:05:02 2014 +0200
@@ -39,50 +39,52 @@
self.cnxh = FakeCnxh(msgcfg, msgdep)
class FileCheckerTest(ApycotBaseTC):
- def start_lgc_tests(self):
- plan = self.lgc.start(self.lgce)
- self.lgc._cw.cnx.commit()
- return plan
+ def start_lgc_tests(self, cnx):
+ with self.admin_access.client_cnx() as cnx:
+ lgc = cnx.entity_from_eid(self.lgc)
+ lgce = cnx.entity_from_eid(self.lgce)
+ plan = lgc.start(lgce)
+ cnx.commit()
+ return plan.eid
def test_exec_status_change(self):
- req = self.request()
- tc = req.create_entity('TestConfig',
- name=u'TC',
- label=u'full python tests',
- start_mode=u'manual',
- check_config=u"""pylint_threshold=7
-pycoverage_threshold=70
-install=python_setup""")
- pe = req.create_entity('ProjectEnvironment',
- name=u'gna',
- reverse_use_environment=tc.eid,
- local_repository=self.vcsrepo)
+ with self.admin_access.client_cnx() as cnx:
+ tceid = cnx.create_entity('TestConfig',
+ name=u'TC',
+ label=u'full python tests',
+ start_mode=u'manual',
+ check_config=u"""pylint_threshold=7
+ pycoverage_threshold=70
+ install=python_setup""").eid
+ peeid = cnx.create_entity('ProjectEnvironment',
+ name=u'gna',
+ reverse_use_environment=tceid,
+ local_repository=self.vcsrepo).eid
- self.vcsrepo2 = req.create_entity('Repository', type=u'mercurial',
- # use path to avoid clone attempt when using url
- path=unicode(self.datapath('project1')), source_url=u'http://gna.com',
- reverse_local_repository=self.lgce)
+ self.vcsrepo2 = cnx.create_entity('Repository', type=u'mercurial',
+ # use path to avoid clone attempt when using url
+ path=unicode(self.datapath('project1')), source_url=u'http://gna.com',
+ reverse_local_repository=self.lgce)
- pe_dep1 = req.create_entity('ProjectEnvironment',
- name=u'regna',
- reverse_use_environment=tc.eid,
- local_repository=self.vcsrepo2)
-
+ pe_dep1 = cnx.create_entity('ProjectEnvironment',
+ name=u'regna',
+ reverse_use_environment=tceid,
+ local_repository=self.vcsrepo2)
+ cnx.commit()
- self.commit()
- self.vreg['views']
- req_env = self.request(environment=pe.eid)
- req_env_dep = self.request(environment=pe_dep1.eid)
- tconf = json.loads(req_env.view('apycot.get_configuration', rset=tc.as_rset()))
- penv = json.loads(req_env.view('ejsonexport', rset=pe.as_rset()))[0]
- penv_dep = json.loads(req_env.view('ejsonexport', rset=pe.as_rset()))
- penv['title'] = 'gna'
- self.login('narval', password='narval0')
- plan = self.start_lgc_tests()
- plan = json.loads(json_dumps(req.entity_from_eid(plan.eid)))
- test = atest.Test(plan, FakeWriter(tconf, penv_dep))
- test.call_preprocessor('install', penv)
+ with self.admin_access.web_request(environment=peeid) as req_env:
+ tc = req_env.entity_from_eid(tceid)
+ pe = req_env.entity_from_eid(peeid)
+ tconf = json.loads(req_env.view('apycot.get_configuration', rset=tc.as_rset()))
+ penv = json.loads(req_env.view('ejsonexport', rset=pe.as_rset()))[0]
+ penv_dep = json.loads(req_env.view('ejsonexport', rset=pe.as_rset()))
+ penv['title'] = 'gna'
+ planeid = self.start_lgc_tests(req_env)
+ with self.new_access('narval').client_cnx() as cnx:
+ plan = json.loads(json_dumps(cnx.entity_from_eid(planeid)))
+ test = atest.Test(plan, FakeWriter(tconf, penv_dep))
+ test.call_preprocessor('install', penv)
if __name__ == '__main__':
--- a/testutils.py Fri Jul 25 11:06:29 2014 +0200
+++ b/testutils.py Fri Jul 25 12:05:02 2014 +0200
@@ -153,50 +153,50 @@
""" self.repo: used to get the session to connect to cw
self.vcsrepo: new entity
"""
+ with self.admin_access.repo_cnx() as cnx:
+ self.lgce = cnx.create_entity(
+ 'ProjectEnvironment', name=u'lgce',
+ check_config=u'install=python_setup\nenv-option=value',
+ check_environment=u'SETUPTOOLS=1\nDISPLAY=:2.0'
+ ).eid
+ self.vcsrepo = cnx.create_entity('Repository', type=u'mercurial',
+ # use path to avoid clone attempt when using url
+ path=unicode(self.datapath('project')),
+ reverse_local_repository=self.lgce).eid
+ self.pyp = cnx.create_entity('TestConfig', name=u'PYTHONPACKAGE',
+ check_config=u'python_lint_treshold=7\n'
+ 'python_lint_ignore=thirdparty\n'
+ 'python_test_coverage_treshold=70\n',
+ check_environment=u'NO_SETUPTOOLS=1\nDISPLAY=:1.0').eid
+ self.recipe = cnx.execute('Recipe X WHERE X name "apycot.recipe.quick"')[0][0]
+ # reset vcsrepo (using the session )
+ init_vcsrepo(self.repo)
+ # reset recipe content
+ cnx.execute('SET X script %(script)s WHERE X eid %(recipe)s',
+ {'recipe': self.recipe,
+ 'script': self.recipescript})
+ self.lgc = self.add_test_config(cnx, u'lgc', env=self.lgce, group=self.pyp, use_recipe=self.recipe).eid
+ cnx.commit()
- req = self.request()
- self.lgce = req.create_entity(
- 'ProjectEnvironment', name=u'lgce',
- check_config=u'install=python_setup\nenv-option=value',
- check_environment=u'SETUPTOOLS=1\nDISPLAY=:2.0'
- )
- self.vcsrepo = req.create_entity('Repository', type=u'mercurial',
- # use path to avoid clone attempt when using url
- path=unicode(self.datapath('project')),
- reverse_local_repository=self.lgce)
- self.pyp = req.create_entity('TestConfig', name=u'PYTHONPACKAGE',
- check_config=u'python_lint_treshold=7\n'
- 'python_lint_ignore=thirdparty\n'
- 'python_test_coverage_treshold=70\n',
- check_environment=u'NO_SETUPTOOLS=1\nDISPLAY=:1.0')
- self.recipe = req.execute('Recipe X WHERE X name "apycot.recipe.quick"').get_entity(0, 0)
- # reset vcsrepo (using the session )
- init_vcsrepo(self.repo)
- # reset recipe content
- self.recipe.cw_set(script=self.recipescript)
- self.lgc = self.add_test_config(u'lgc', env=self.lgce, group=self.pyp, use_recipe=self.recipe)
+ self.repo.threaded_task = lambda func: func() # XXX move to cw
- self.repo.threaded_task = lambda func: func() # XXX move to cw
-
- def add_test_config(self, name,
+ def add_test_config(self, cnx, name,
check_config=u'python_lint_treshold=8\npouet=5',
env=None, group=None, **kwargs):
"""add a TestConfig instance"""
- req = self.request()
if group is not None:
kwargs['refinement_of'] = group
if env is not None:
kwargs['use_environment'] = env
- return req.create_entity('TestConfig', name=name,
+ return cnx.create_entity('TestConfig', name=name,
check_config=check_config, **kwargs)
- def dumb_execution(self, ex, check_defs, setend=True):
+ def dumb_execution(self, cnx, ex, check_defs, setend=True):
"""add a TestExecution instance"""
- req = self.request()
for name, status in check_defs:
- cr = req.create_entity('CheckResult', name=unicode(name), status=unicode(status))
- req.execute('SET X during_execution Y WHERE X eid %(x)s, Y eid %(e)s',
+ cr = cnx.create_entity('CheckResult', name=unicode(name), status=unicode(status))
+ cnx.execute('SET X during_execution Y WHERE X eid %(x)s, Y eid %(e)s',
{'x': cr.eid, 'e': ex.eid})
if setend:
- req.execute('SET X status "success" '
+ cnx.execute('SET X status "success" '
'WHERE X eid %(x)s', {'x': ex.eid})