dusty/reporters/jira/legacy.py (261 lines of code) (raw):
#!/usr/bin/python3
# coding=utf-8
# pylint: skip-file
# Copyright 2019 getcarrier.io
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Code from Dusty 1.0
"""
import os
from dusty.tools import log as logging # import logging
from copy import deepcopy
from jira import JIRA
from traceback import format_exc
from . import constants as const # from dusty import constants as const
class JiraWrapper(object):
JIRA_REQUEST = 'project={} AND (description ~ "{}" OR labels in ({}))'
def __init__(self, url, user, password, project, fields=None):
self.valid = True
self.url = url
self.password = password
self.user = user
try:
self.connect()
except:
logging.error("Failed to connect to Jira")
self.valid = False
return
self.projects = [project.key for project in self.client.projects()]
self.project = project.upper()
if self.project not in self.projects:
# self.client.close()
logging.warning("Requested project not found in Jira projects")
# self.valid = False
# return
self.fields = {}
self.watchers = []
if isinstance(fields, dict):
if 'watchers' in fields.keys():
self.watchers = [item.strip() for item in fields.pop('watchers').split(",")]
all_jira_fields = self.client.fields()
for key, value in fields.items():
if value:
if isinstance(value, str) and const.JIRA_FIELD_DO_NOT_USE_VALUE in value:
self.fields[key] = value
continue
jira_keys = [item for item in all_jira_fields if item["id"] == key]
if not jira_keys:
jira_keys = [item for item in all_jira_fields
if item["name"].lower() == key.lower().replace('_', ' ')]
if len(jira_keys) == 1:
jira_key = jira_keys[0]
key_type = jira_key['schema']['type']
else:
logging.warning(f'Cannot recognize field {key}. This field will not be used.')
continue
if key_type in ['string', 'number', 'any'] or isinstance(value, dict):
_value = value
elif key_type == 'array':
if isinstance(value, str):
_value = [item.strip() for item in value.split(",")]
elif isinstance(value, int):
_value = [value]
else:
_value = value
else:
_value = {'name': value}
self.fields[jira_key['id']] = _value
if not self.fields.get('issuetype', None):
self.fields['issuetype'] = {'name': '!default_issuetype'}
self.client.close()
self.created_jira_tickets = list()
def connect(self):
self.client = JIRA(self.url, basic_auth=(self.user, self.password))
def markdown_to_jira_markdown(self, content):
return content.replace("###", "h3.").replace("**", "*")
def create_issue(self, title, priority, description, issue_hash, attachments=None, get_or_create=True,
additional_labels=None):
def replace_defaults(value):
if isinstance(value, str) and const.JIRA_FIELD_USE_DEFAULT_VALUE in value:
for default_key in default_fields.keys():
if default_key in value:
value = value.replace(default_key, default_fields[default_key])
return value
default_fields = {
'!default_issuetype': 'Bug',
'!default_summary': title,
'!default_description': description,
'!default_priority': priority}
description = self.markdown_to_jira_markdown(description)
issue_data = {
'project': {'key': self.project},
'issuetype': 'Bug',
'summary': title,
'description': description,
'priority': {'name': priority}
}
fields = deepcopy(self.fields)
for key, value in fields.items():
if isinstance(value, str):
if const.JIRA_FIELD_DO_NOT_USE_VALUE in value:
issue_data.pop(key)
else:
issue_data[key] = replace_defaults(value)
elif isinstance(value, list):
for item in value:
value[value.index(item)] = replace_defaults(item)
if issue_data.get(key):
issue_data[key].extend(value)
else:
issue_data[key] = value
elif isinstance(value, dict):
for _key, _value in value.items():
value[_key] = replace_defaults(_value)
issue_data[key] = value
elif not key in issue_data:
issue_data[key] = value
else:
logging.warning('field {} is already set and has \'{}\' value'.format(key, issue_data[key]))
_labels = []
if additional_labels and isinstance(additional_labels, list):
_labels.extend(additional_labels)
if issue_data.get('labels', None):
issue_data['labels'].extend(_labels)
else:
issue_data['labels'] = _labels
jira_request = self.JIRA_REQUEST.format(issue_data["project"]["key"], issue_hash, issue_hash)
if get_or_create:
issue, created = self.get_or_create_issue(jira_request, issue_data)
else:
issue = self.post_issue(issue_data)
created = True
try:
if attachments:
for attachment in attachments:
if 'binary_content' in attachment:
self.add_attachment(issue.key,
attachment=attachment['binary_content'],
filename=attachment['message'])
for watcher in self.watchers:
self.client.add_watcher(issue.id, watcher)
except:
if os.environ.get("debug", False):
logging.error(format_exc())
finally:
try:
result_priority = issue.fields.priority
except:
result_priority = "Default"
self.created_jira_tickets.append({'description': issue.fields.summary,
'priority': result_priority,
'key': issue.key,
'link': self.url + '/browse/' + issue.key,
'new': created,
'assignee': issue.fields.assignee,
'status': issue.fields.status.name,
'open_date': issue.fields.created})
return issue, created
def add_attachment(self, issue_key, attachment, filename=None):
issue = self.client.issue(issue_key)
for _ in issue.fields.attachment:
if _.filename == filename:
return
self.client.add_attachment(issue, attachment, filename)
def post_issue(self, issue_data):
issue = self.client.create_issue(fields=issue_data)
logging.info(f' \u2713 {issue_data["issuetype"]["name"]} was created: {issue.key}')
return issue
def get_or_create_issue(self, search_string, issue_data):
issuetype = issue_data['issuetype']
created = False
jira_results = self.client.search_issues(search_string)
issues = []
for each in jira_results:
if each.fields.summary == issue_data.get('summary', None):
issues.append(each)
if len(issues) == 1:
issue = issues[0]
if len(issues) > 1:
logging.error(' more then 1 issue with the same summary')
else:
logging.info(f' {issuetype["name"]} already exists: {issue.key}')
else:
issue = self.post_issue(issue_data)
created = True
return issue, created
def add_comment_to_issue(self, issue, data):
return self.client.add_comment(issue, data)
def get_created_tickets(self):
return self.created_jira_tickets
def get_project_priorities(jira_client, project, issue_type="Bug"):
""" Returns list of Jira priorities in project """
try:
meta = jira_client.createmeta(
projectKeys=project,
issuetypeNames=issue_type,
expand="projects.issuetypes.fields"
)
logging.debug("Got metadata for %d projects", len(meta["projects"]))
if not meta["projects"]:
logging.error(
"No meta returned for %s with type %s", project, issue_type
)
return []
project_meta = meta["projects"][0]
logging.debug(
"Got metadata for %d issuetypes", len(project_meta["issuetypes"])
)
if not project_meta["issuetypes"]:
logging.error("No %s in %s", issue_type, project)
return []
issue_types = project_meta["issuetypes"][0]
if "priority" not in issue_types["fields"]:
logging.error("No priority field in %s", project)
return []
priorities = [
priority["name"] for priority in \
issue_types["fields"]["priority"]["allowedValues"]
]
return priorities
except: # pylint: disable=W0702
logging.exception("Failed to get meta for %s", project)
return []
def prepare_jira_mapping(jira_service):
""" Make Jira mapping (for projects that are using custom values) """
if not jira_service or not jira_service.valid:
return dict()
jira_service.connect()
issue_type = "Bug"
if jira_service.fields["issuetype"]["name"] != "!default_issuetype":
issue_type = jira_service.fields["issuetype"]["name"]
project_priorities = get_project_priorities(
jira_service.client,
jira_service.project,
issue_type
)
if not project_priorities:
jira_service.client.close()
return dict()
logging.debug(
"%s %s priorities: %s",
jira_service.project, issue_type, str(project_priorities)
)
c = const
mapping = dict()
for severity in c.JIRA_SEVERITIES:
if severity not in project_priorities:
for alternative in c.JIRA_ALTERNATIVES[severity]:
if alternative in project_priorities:
logging.warning(
"Mapping %s %s Jira priority: %s -> %s",
jira_service.project, issue_type,
severity, alternative
)
mapping[severity] = alternative
break
if severity not in mapping:
mapping[severity] = project_priorities[0]
logging.error(
"Failed to find Jira mapping for %s, using %s as a fallback",
severity, mapping[severity]
)
jira_service.client.close()
return mapping
def cut_jira_comment(comment):
c = const
code_block_ending = "\n\n{code}"
if len(comment) > c.JIRA_COMMENT_MAX_SIZE:
_comment = comment[:c.JIRA_COMMENT_MAX_SIZE - 1]
last_code_block = _comment.rfind("{code:collapse=true}")
if last_code_block > -1 and _comment.find("{code}", last_code_block+1) == -1:
_comment = _comment[:(c.JIRA_COMMENT_MAX_SIZE - len(code_block_ending) - 1)] + code_block_ending
else:
_comment = comment
return _comment