Skip to content

Commit

Permalink
Support mocha test case on IoT Linux devices. (#143)
Browse files Browse the repository at this point in the history
  • Loading branch information
BruceDai authored Oct 28, 2016
1 parent 392ce9f commit 97a8e76
Show file tree
Hide file tree
Showing 2 changed files with 221 additions and 0 deletions.
161 changes: 161 additions & 0 deletions testkitlite/engines/mocha.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
#!/usr/bin/python
#
# Copyright (C) 2012 Intel Corporation
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# Authors:
# Bruce Dai <[email protected]>
""" The implementation of mocha test engine"""

import commands
import os
import time
import sys
import threading
import uuid
from testkitlite.util.log import LOGGER
from testkitlite.util.result import TestSetResut
from testkitlite.util import tr_utils
import subprocess

STR_PASS = 'PASS'
STR_FAIL = 'FAIL'
STR_BLOCK = 'BLOCK'
DEFAULT_TIMEOUT = 90

def _mocha_test_exec(test_session, cases, result_obj, session_dir):
"""function for running mocha tests"""
result_obj.set_status(0)
result_list = []
for i_case in cases['cases']:
i_case_timeout = i_case.get('timeout', DEFAULT_TIMEOUT)

try:
case_entry = i_case['entry']
status, output = commands.getstatusoutput("ssh %s 'ls %s'" % (os.environ["DEVICE_ID"], case_entry))
if status != 0:
i_case['result'] = STR_BLOCK
i_case['stdout'] = "[Message]No such file or dirctory: %s" % case_entry
result_list.append(i_case)
continue

case_id = i_case['case_id']
tmp_result_dir = "%s/json_results" % session_dir
os.makedirs(tmp_result_dir)
popen_args = "ssh %s 'mocha %s --reporter json' > %s/%s.json" % (os.environ["DEVICE_ID"], case_entry, tmp_result_dir, case_id)
i_case_proc = subprocess.Popen(args=popen_args, shell=True)
i_case_pre_time = time.time()
while True:
i_case_exit_code = i_case_proc.poll()
i_case_elapsed_time = time.time() - i_case_pre_time
if i_case_exit_code == None:
if i_case_elapsed_time >= i_case_timeout:
tr_utils.KillAllProcesses(ppid=i_case_proc.pid)
i_case['result'] = STR_BLOCK
i_case['stdout'] = "[Message]Timeout"
LOGGER.debug("Run %s timeout" % case_id)
break
else:
i_case['result'] = STR_FAIL
i_case['stdout'] = tmp_result_dir
break
time.sleep(1)
except Exception, e:
i_case['result'] = STR_BLOCK
i_case['stdout'] = "[Message]%s" % e
LOGGER.error(
"Run %s: failed: %s, exit from executer" % (case_id, e))
result_list.append(i_case)
result_obj.extend_result(result_list)
result_obj.set_status(1)


class TestWorker(object):

"""Test executor for testkit-lite"""

def __init__(self, conn):
super(TestWorker, self).__init__()
self.conn = conn
self.server_url = None
self.result_obj = None
self.session_dir = None
self.opts = dict({'block_size': 300,
'test_type': None,
'auto_iu': False,
'fuzzy_match': False,
'self_exec': False,
'self_repeat': False,
'debug_mode': False
})

def init_test(self, params):
"""init the test envrionment"""
self.session_dir =params.get('session_dir', '')
self.opts['testset_name'] = params.get('testset-name', '')
self.opts['testsuite_name'] = params.get('testsuite-name', '')
self.opts['debug_log_base'] = params.get("debug-log-base", '')
return str(uuid.uuid1())

def run_test(self, sessionid, test_set):
"""
process the execution for a test set
"""
if sessionid is None:
return False
disabledlog = os.environ.get("disabledlog","")
# start debug trace thread
if len(disabledlog) > 0 :
pass
else:
self.conn.start_debug(self.opts['debug_log_base'])
time.sleep(1)
self.result_obj = TestSetResut(
self.opts['testsuite_name'], self.opts['testset_name'])
self.opts['async_th'] = threading.Thread(
target=_mocha_test_exec,
args=(sessionid, test_set, self.result_obj, self.session_dir)
)

self.opts['async_th'].start()
return True

def get_test_status(self, sessionid):
"""poll the test task status"""
if sessionid is None:
return None
result = {}
result["msg"] = []
result["finished"] = str(self.result_obj.get_status())
return result

def get_test_result(self, sessionid):
"""get the test result for a test set """
result = {}
if sessionid is None:
return result

result = self.result_obj.get_result()
return result

def finalize_test(self, sessionid):
"""clear the test stub and related resources"""
if sessionid is None:
return False

if self.result_obj is not None:
self.result_obj.set_status(1)

# stop debug thread
self.conn.stop_debug()

return True
60 changes: 60 additions & 0 deletions testkitlite/util/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from testkitlite.util.result import TestSetResut
import subprocess
import glob
import json

if platform.system().startswith("Linux"):
import fcntl
Expand Down Expand Up @@ -110,6 +111,7 @@ def __init__(self, connector, worker):
self.bdd_test_files = []
self.xcunit_test_files = []
self.iosuiauto_test_files = []
self.mocha_test_files = []
self.testresult_dict = {"pass": 0, "fail": 0,
"block": 0, "not_run": 0}
self.current_test_xml = "none"
Expand Down Expand Up @@ -397,6 +399,15 @@ def run_case(self, latest_dir):
else:
self.testworker = TestWorker(self.connector)
self.__run_with_worker(self.iosuiauto_test_files)
if len(self.mocha_test_files) > 0:
try:
exec "from testkitlite.engines.mocha import TestWorker"
LOGGER.info("TestWorker is mocha")
except Exception as error:
raise TestEngineException("mocha")
else:
self.testworker = TestWorker(self.connector)
self.__run_with_worker(self.mocha_test_files)

def __run_with_worker(self, test_xml_set_list):
try:
Expand Down Expand Up @@ -465,6 +476,7 @@ def __split_xml_to_set(self, webapi_file):
bdd_test_set_list = []
xcunit_set_list = []
iosuiauto_set_list = []
mocha_set_list = []
auto_webdriver_flag = self.is_webdriver and webapi_file.split('.')[-3] == 'auto'
if len(test_xml_set_list) > 1:
test_xml_set_list.reverse()
Expand Down Expand Up @@ -508,6 +520,8 @@ def __split_xml_to_set(self, webapi_file):
xcunit_set_list.append(test_xml_set)
elif set_type == "iosuiauto":
iosuiauto_set_list.append(test_xml_set)
elif set_type == "mocha":
mocha_set_list.append(test_xml_set)

set_keep_number += 1
set_number -= 1
Expand Down Expand Up @@ -537,6 +551,8 @@ def __split_xml_to_set(self, webapi_file):
self.xcunit_test_files.extend(xcunit_set_list)
iosuiauto_set_list.reverse()
self.iosuiauto_test_files.extend(iosuiauto_set_list)
mocha_set_list.reverse()
self.mocha_test_files.extend(mocha_set_list)

def lock(self, fl):
try:
Expand Down Expand Up @@ -1013,6 +1029,8 @@ def __prepare_external_test_json(self, resultfile):
value = 'xcunit'
elif parameters['type'] == 'iosuiauto' :
value = 'iosuiauto'
elif parameters['type'] == 'mocha' :
value = 'mocha'
elif parameters['type'] == 'qunit':
value = 'default'
if value != None:
Expand Down Expand Up @@ -1437,6 +1455,46 @@ def write_file_result(set_result_xml, set_result, debug_log_file):
LOGGER.error(
"[ Error: fail to write cases result, error: %s ]\n" % error)

def __expand_subcases_mocha(tset, tcase, sub_num, result_msg):
sub_case_index = 1

if os.path.isdir(result_msg):
case_id = tcase.get("id")
case_purpose = tcase.get("purpose")
result_json_file = "%s/%s.json" % (result_msg, case_id)
with open(result_json_file) as js_handle:
result_content = json.load(js_handle)
for sub_test_details in result_content['tests']:
sub_case = copy.deepcopy(tcase)
sub_case.set("id", "/".join([case_id, str(sub_case_index)]))
sub_case.set("purpose", "/".join([case_purpose, sub_test_details['fullTitle']]))
sub_case.remove(sub_case.find("./result_info"))
result_info = etree.SubElement(sub_case, "result_info")
actual_result = etree.SubElement(result_info, "actual_result")
stdout = etree.SubElement(result_info, "stdout")
if len(sub_test_details['err']) == 0:
actual_result.text = 'PASS'
else:
actual_result.text = 'FAIL'
stderr = etree.SubElement(result_info, "stderr")
stderr.text = "%s" % sub_test_details['err']['stack']
sub_case.set("result", actual_result.text)
sub_case_index += 1
tset.append(sub_case)
for block_case_index in range(sub_case_index, sub_num + 1):
sub_case = copy.deepcopy(tcase)
sub_case.set("id", "/".join([case_id, str(block_case_index)]))
sub_case.set("purpose", "/".join([case_purpose, str(block_case_index)]))
sub_case.remove(sub_case.find("./result_info"))
result_info = etree.SubElement(sub_case, "result_info")
actual_result = etree.SubElement(result_info, "actual_result")
actual_result.text = 'BLOCK'
sub_case.set("result", actual_result.text)
stdout = etree.SubElement(result_info, "stdout")
stdout.text = "None result of this sub test case, please check the test case or \"subcase\" number."
block_case_index += 1
tset.append(sub_case)
tset.remove(tcase)

def __expand_subcases_bdd(tset, tcase, sub_num, result_msg):
sub_case_index = 1
Expand Down Expand Up @@ -1732,6 +1790,8 @@ def __write_by_caseid(tset, case_results):
else:
if tset.get('type') == 'nodeunit':
__expand_subcases_nodeunit(tset, tcase, sub_num, result_msg)
elif tset.get('type') == 'mocha':
__expand_subcases_mocha(tset, tcase, sub_num, result_msg)
else:
__expand_subcases(tset, tcase, sub_num, result_msg)

Expand Down

0 comments on commit 97a8e76

Please sign in to comment.