modular_api/services/audit_service.py (88 lines of code) (raw):
import json
from pynamodb.pagination import ResultIterator
from modular_api.models.audit_model import Audit
from modular_api.helpers.date_utils import convert_datetime_to_human_readable
from modular_api.helpers.log_helper import get_logger
from modular_api.helpers.password_util import secure_string
from datetime import datetime, timedelta
_LOG = get_logger(__name__)
class AuditService:
@staticmethod
def prepare_audit_to_be_hashed(audit: Audit) -> dict:
"""
Returns audit event as dictionary from Audit table without hash
attribute. All values in dictionary are string type
"""
_LOG.info(f'Preparing audit event to be hashed')
return audit.response_object_without_hash()
def save_audit(self, timestamp: str, group: str, command: str,
parameters: str,
result: str, warnings=None) -> None:
"""
Saves event and its hash sum to the 'ModularAudit' table
"""
_LOG.info(f'Saving audit item')
audit = Audit(group=group, timestamp=timestamp, command=command,
parameters=parameters, result=result, warnings=warnings)
event_to_be_hashed = self.prepare_audit_to_be_hashed(audit=audit)
event_to_be_hashed = json.dumps(event_to_be_hashed, sort_keys=True)
audit.hash_sum = secure_string(event_to_be_hashed)
audit.save()
@staticmethod
def check_audit_hash(audit: Audit) -> bool:
"""
Compares an existing audit event with its hash sum
"""
_LOG.info(f'Checking audit event hash')
audit_hash = audit.hash_sum
if not audit_hash:
return False
provided_audit = AuditService.prepare_audit_to_be_hashed(audit=audit)
provided_audit = secure_string(json.dumps(provided_audit))
return provided_audit == audit_hash
@staticmethod
def get_audit(group, command, from_date, to_date, limit
) -> ResultIterator[Audit]:
"""
Returns iterator with filtered results
"""
_LOG.info('Describing audit table')
filter_condition = None
range_key_condition = None
# some kludges
if isinstance(from_date, datetime):
from_date = from_date.isoformat()
if isinstance(to_date, datetime):
to_date = to_date.isoformat()
if from_date and to_date:
range_key_condition &= (
Audit.timestamp.between(from_date, to_date))
elif from_date:
range_key_condition &= (Audit.timestamp >= from_date)
elif to_date:
range_key_condition &= (Audit.timestamp <= to_date)
else:
# describe in docs
initial_date = datetime.today().replace(
hour=0, minute=0, second=0, microsecond=0) - timedelta(days=7)
range_key_condition &= (Audit.timestamp >= initial_date.isoformat())
if command:
filter_condition &= (Audit.command == command)
if group:
return Audit.query(hash_key=group,
filter_condition=filter_condition,
range_key_condition=range_key_condition,
limit=limit)
if range_key_condition is not None:
filter_condition &= range_key_condition
return Audit.scan(filter_condition=filter_condition)
def filter_audit(self, group=None, from_date=None, to_date=None,
command=None, limit=None) -> tuple:
"""
Returns audit events by provided filters
"""
_LOG.info(f'Going to filter audit by the following parameters: group: '
f'{group}, command: {command}, from_date: {from_date}, '
f'to_date: {to_date}, limit: {limit}')
audit_list = []
invalid_list = []
audit = self.get_audit(group=group, command=command,
from_date=from_date,
to_date=to_date, limit=limit)
for event in audit:
valid_event = AuditService.check_audit_hash(event)
pretty_value = {
'Group': event.group,
'Command': event.command,
'Timestamp': convert_datetime_to_human_readable(event.timestamp),
'Parameters': event.parameters,
'Execution warnings': event.warnings,
'Result': event.result,
'Consistency status': 'Compromised' if not valid_event else 'OK'
}
if not valid_event:
invalid_list.append(pretty_value)
audit_list.append(pretty_value)
return audit_list, invalid_list