dusty/reporters/reportportal/legacy.py (78 lines of code) (raw):
#!/usr/bin/python3
# coding=utf-8
# pylint: skip-file
# Copyright 2019 getcarrier.io
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Code from Dusty 1.0
"""
import traceback
from time import time
from reportportal_client import ReportPortalServiceAsync as ReportPortalService
from . import constants # from dusty import constants
rp_service = None
def timestamp():
return str(int(time() * 1000))
def my_error_handler(exc_info):
"""
This callback function will be called by async service client when error occurs.
Return True if error is not critical and you want to continue work.
:param exc_info: result of sys.exc_info() -> (type, value, traceback)
:return:
"""
traceback.print_exception(*exc_info)
def launch_reportportal_service(rp_config):
if not rp_config:
return None
global rp_service
if not rp_service:
rp_service = ReportPortalDataWriter(endpoint=rp_config["rp_url"],
token=rp_config["rp_token"],
project=rp_config["rp_project"],
launch_name=rp_config["rp_launch_name"],
tags=rp_config["rp_launch_tags"])
rp_service.start_test()
return rp_service
class ReportPortalDataWriter:
def __init__(self, endpoint, token, project, log_batch_size=100, launch_name=None, tags=None,
launch_doc=None, launch_id=None, verify_ssl=False):
self.endpoint = endpoint
self.token = token
self.project = project
self.log_batch_size = log_batch_size
self.launch_name = launch_name
self.tags = tags
self.launch_doc = launch_doc
self.service = None
self.test = None
self.verify_ssl = verify_ssl
self.launch_id = launch_id
def start_service(self):
self.service = ReportPortalService(endpoint=self.endpoint,
project=self.project,
token=self.token,
log_batch_size=self.log_batch_size,
verify_ssl=self.verify_ssl)
if self.launch_id:
self.service.launch_id = self.launch_id
def start_test(self):
if not self.service:
self.start_service()
return self.service.start_launch(name=self.launch_name,
start_time=timestamp(),
description=self.launch_doc,
tags=self.tags)
def finish_test(self):
self.service.finish_launch(end_time=timestamp())
self.service.terminate()
self.service = None
def is_test_started(self):
if self.service:
return True
return False
def start_test_item(self, issue, description, tags, item_type='STEP', parameters={}):
self.service.start_test_item(issue, description=description,
tags=tags, start_time=timestamp(),
item_type=item_type, parameters=parameters)
def test_item_message(self, message, level="ERROR", attachment=None):
if len(message) > constants.MAX_MESSAGE_LEN:
index = 0
while index < len(message):
increment = constants.MAX_MESSAGE_LEN
if index + increment > len(message):
increment = len(message) - index
self.service.log(time=timestamp(), message=message[index:index+increment],
level=level, attachment=attachment)
index = index+increment
else:
self.service.log(time=timestamp(), message=message,
level=level, attachment=attachment)
def finish_test_item(self, status="FAILED"):
self.service.finish_test_item(end_time=timestamp(),
status=status)