libs/pagerduty/__init__.py (159 lines of code) (raw):
import json
from http_controller import HttpController
class PagerDutyApi(HttpController):
def __init__(self, config=None, **kwargs):
self.config = config
self.base_url = self.config["base_url"]
self.api_key = self.config["api_key"]
self.HEADERS = {"Authorization": "Token token={0}".format(self.api_key),
"Accept": "application/vnd.pagerduty+json;version=2",
"Content-type": "application/json"}
super(PagerDutyApi, self).__init__(base_url=self.base_url, **kwargs)
def find_user_id(self, user_email):
"""
Returns a user, matching the given email.
:param user_email: user email
:return: user object
"""
user_id = None
params = {"query": user_email}
try:
list_users = self.api_request(method="get",
endpoint="users",
params=params,
headers=self.HEADERS)
if "users" in list_users and len(list_users["users"]) > 0:
for user in list_users["users"]:
if user["email"] == user_email:
user_id = user["id"]
except ValueError as e:
print(e)
return user_id
def get_schedule_by_id(self, schedule_id):
"""
retrieves a schedule by its ID.
:param schedule_id
:return: schedule object
"""
r = self.api_request(method="get",
endpoint="schedules/{id}".format(id=schedule_id),
headers=self.HEADERS)
return r
def get_escalation_policy_by_id(self, policy_id):
"""
retrieves an escalation policy by its ID.
:param policy_id
:return: escalation policy object
"""
r = self.api_request(method="get",
endpoint="escalation_policies/{id}".format(id=policy_id),
headers=self.HEADERS)
return r
def get_service_by_id(self, service_id):
"""
retrieves a service by its ID.
:param service_id
:return: service object
"""
params = {"include[]": "escalation_policies"}
r = self.api_request(method="get",
endpoint="services/{id}".format(id=service_id),
params=params,
headers=self.HEADERS)
return r
def update_schedule_by_id(self, schedule_id, schedule):
"""
Updates a schedule by ID.
:param schedule_id: schedule ID
:param schedule: schedule object
:return: bool
"""
updated = True
r = self.api_request(method="put",
endpoint="schedules/{id}".format(id=schedule_id),
data=json.dumps(schedule),
headers=self.HEADERS)
if "error" in r:
updated = False
return updated
def update_escalation_policy_by_id(self, policy_id, policy):
"""
Updates an escalation policy by ID.
:param policy_id: policy ID
:param policy: escalation_policy object
:return: bool
"""
updated = True
r = self.api_request(method="put",
endpoint="escalation_policies/{id}".format(id=policy_id),
data=json.dumps(policy),
headers=self.HEADERS)
if "error" in r:
updated = False
return updated
def update_service_by_id(self, service_id, service):
"""
Updates a service by ID.
:param service_id: service ID
:param service: service object
:return: bool
"""
updated = True
r = self.api_request(method="put",
endpoint="services/{id}".format(id=service_id),
data=json.dumps(service),
headers=self.HEADERS)
if "error" in r:
updated = False
return updated
def delete_escalation_policy_by_id(self, policy_id):
"""
Deletes an escalation policy by ID.
:param policy_id: policy ID
:return: bool
"""
deleted = False
r = self.api_request(method="delete",
endpoint="escalation_policies/{id}".format(id=policy_id),
response_type="text",
headers=self.HEADERS)
try:
response = json.loads(r)
if "error" in response:
deleted = False
except json.JSONDecoderError:
if r == "":
deleted = True
return deleted
def remove_user_from_schedule(self, schedule_id, user_id):
"""
remove user from a schedule.
:param schedule_id: schedule_id
:param user_id: user_id
:return: bool
"""
schedule = self.get_schedule_by_id(schedule_id=schedule_id)
for layer in schedule["schedule"]["schedule_layers"]:
for user in layer["users"]:
if user["user"]["id"] == user_id:
user_idx = layer["users"].index(user)
layer["users"].pop(user_idx)
if len(layer["users"]) == 0:
layer["users"].append(schedule["schedule"]["users"][0])
removed = self.update_schedule_by_id(schedule_id=schedule_id, schedule=schedule)
return removed
def remove_user_from_escalation_policy(self, policy_id, user_id, delete_empty=False):
"""
remove user from an escalation policy.
:param policy_id: policy_id
:param user_id: user_id
:param delete_empty: whether to delete an escalation policy with no targets
:return: bool
"""
removed = False
policy = self.get_escalation_policy_by_id(policy_id=policy_id)
if len(policy["escalation_policy"]["escalation_rules"]) == 1:
rule = policy["escalation_policy"]["escalation_rules"][0]
if len(rule["targets"]) > 1:
for target in rule["targets"]:
if target["id"] == user_id:
target_idx = rule["targets"].index(target)
rule["targets"].pop(target_idx)
removed = self.update_escalation_policy_by_id(policy_id=policy_id, policy=policy)
elif delete_empty is True:
for service in policy["escalation_policy"]["services"]:
self.delete_service(service_id=service["id"])
self.get_escalation_policy_by_id(policy_id=policy_id)
removed = self.delete_escalation_policy_by_id(policy_id=policy_id)
return removed
def delete_service(self, service_id):
"""
Deletes a service.
:param service_id: id
:return: user object
"""
deleted = False
r = self.api_request(method="delete",
endpoint="services/{id}".format(id=service_id),
response_type="text",
headers=self.HEADERS)
if r == "":
deleted = True
return deleted
def delete_user(self, user_id):
"""
Deletes a user.
:param user_id: id
:return: user object
"""
r = self.api_request(method="delete",
endpoint="users/{id}".format(id=user_id),
response_type="text",
headers=self.HEADERS)
return r
def remove_from_oncalls(self, user_email):
"""
Removes a user from oncalls, and then deletes the user.
:param email: user email
:return: bool
"""
deleted = False
user_id = self.find_user_id(user_email=user_email)
if user_id is not None:
try:
userdel = json.loads(self.delete_user(user_id=user_id))
if "error" in userdel:
for conflict in userdel["error"]["conflicts"]:
if conflict["type"] == "schedule":
self.remove_user_from_schedule(schedule_id=conflict["id"], user_id=user_id)
elif conflict["type"] == "escalation_policy":
self.remove_user_from_escalation_policy(policy_id=conflict["id"],
user_id=user_id,
delete_empty=True)
userdel = self.delete_user(user_id=user_id)
if userdel == "":
deleted = True
except ValueError:
deleted = True
else:
deleted = True
return deleted
def list_oncalls(self, escalation_policy_ids):
"""
Returns all oncalls.
:param escalation_policy_ids: list of escalation policy ids
:return: list of oncalls resources
"""
params = {"escalation_policy_ids[]": escalation_policy_ids}
r = self.api_request(method="get",
endpoint="oncalls",
params=params,
headers=self.HEADERS)
if "oncalls" in r:
return r["oncalls"]
else:
return None