modular_api_cli/modular_handler/audit_handler.py (87 lines of code) (raw):
import json
from os import linesep
from datetime import datetime
import click
from modular_api.helpers.log_helper import get_logger
from modular_api.services.audit_service import AuditService
from modular_api.helpers.decorators import CommandResponse
from modular_api.helpers.exceptions import ModularApiBadRequestException
from modular_api.helpers.utilities import parse_date
_LOG = get_logger(__name__)
class AuditHandler:
def __init__(self, audit_service):
self.audit_service: AuditService = audit_service
def describe_audit_handler(
self,
group: str | None = None,
command: str | None = None,
from_date: str | None = None,
to_date: str | None = None,
limit: int = 10,
invalid: bool = False,
) -> CommandResponse:
"""
Querying audit events by provided filters
"""
_LOG.info(f'Describing audit events. Parameters: group \'{group}\', '
f'command \'{command}\', from_date \'{from_date}\', '
f'to_date \'{to_date}\', limit \'{limit}\', '
f'invalid \'{invalid}\'')
if from_date:
from_date = parse_date(from_date).replace(
hour=0, minute=0, second=0, microsecond=0)
if to_date:
to_date = parse_date(to_date).replace(
hour=23, minute=59, second=59, microsecond=0)
if (from_date and to_date) and from_date >= to_date:
message = 'The \'from_date\' parameter must not be greater ' \
'than or equal to the \'to_date\' parameter'
_LOG.error(message)
raise ModularApiBadRequestException(message)
audit_list, invalid_list = self.audit_service.filter_audit(
group=group, command=command, from_date=from_date,
to_date=to_date, limit=limit)
if invalid:
if not invalid_list:
_LOG.info('All events are valid')
return CommandResponse(message='All events are valid')
invalid_list.sort(key=lambda elem: elem.get('Timestamp'))
file_name = self._save_file(invalid_list)
if file_name:
_LOG.info(f'Audit was successfully saved in: {file_name}')
return CommandResponse(
message=f'Audit was successfully saved in: {file_name}')
_LOG.warning('Invalid audit events detected')
return CommandResponse(
table_title='Invalid audit events', items=invalid_list)
else:
if not audit_list:
_LOG.info('There are not events by provided filters')
return CommandResponse(
message='There are not events by provided filters',
error=True,
)
audit_list.sort(key=lambda elem: elem.get('Timestamp'))
file_name = self._save_file(audit_list)
if file_name:
_LOG.info(f'Audit was successfully saved in: {file_name}')
return CommandResponse(
message=f'Audit was successfully saved in: {file_name}')
valid_title = 'Audit events'
compromised_title = f'Audit events{linesep}WARNING! ' \
f'Compromised event(s) have been detected.' \
f'{linesep}To view only invalid results ' \
f'use \'--invalid\' flag'
_LOG.warning('Invalid audit events detected')
return CommandResponse(
table_title=valid_title if not invalid_list else compromised_title,
items=audit_list,
)
@staticmethod
def _save_file(items) -> str:
"""
Writes audit events to the file and returns created file name
"""
events_total = len(items)
if events_total > 100 and click.confirm(
text=f'Found {events_total} audit events. Do you want '
f'to save these records to a file??'):
file_name = 'audit_' + datetime.utcnow(). \
strftime("%d%m%Y_%H%M%S") + '.json'
with open(file_name, 'w') as f:
json.dump(items, f, indent=4)
return file_name