obfuscator-cli/obfuscation_manager.py (557 lines of code) (raw):
from abc import ABC, abstractmethod
import argparse
import csv
import email
import email.message
import email.policy
import gzip
import importlib
import itertools
import json
import logging
import logging.config
import os
from pathlib import Path
import subprocess
import sys
from typing import Generator, List, Optional, TYPE_CHECKING, Tuple, Union, cast
import uuid
openpyxl = None
if TYPE_CHECKING:
import openpyxl
from openpyxl.worksheet.worksheet import Worksheet
__version__ = '1.0.0'
REPORT_FIELDS = {'id', 'name', 'arn', 'namespace'}
class TermColor:
ENDC = '\033[0m'
BOLD = '\033[1m'
FAIL = '\033[91m'
DEBUG = '\033[90m'
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
UNDERLINE = '\033[4m'
BOLD_RED = '\x1b[31;1m'
_pattern = '{color}{string}' + ENDC
@classmethod
def blue(cls, st: str) -> str:
return cls._pattern.format(color=cls.OKBLUE, string=st)
@classmethod
def cyan(cls, st: str) -> str:
return cls._pattern.format(color=cls.OKCYAN, string=st)
@classmethod
def green(cls, st: str) -> str:
return cls._pattern.format(color=cls.OKGREEN, string=st)
@classmethod
def yellow(cls, st: str) -> str:
return cls._pattern.format(color=cls.WARNING, string=st)
@classmethod
def red(cls, st: str) -> str:
return cls._pattern.format(color=cls.FAIL, string=st)
@classmethod
def gray(cls, st: str) -> str:
return cls._pattern.format(color=cls.DEBUG, string=st)
@classmethod
def bold_red(cls, st: str) -> str:
return cls._pattern.format(color=cls.BOLD_RED, string=st)
class ColorFormatter(logging.Formatter):
formats = {
logging.DEBUG: TermColor.gray,
logging.INFO: TermColor.green,
logging.WARNING: TermColor.yellow,
logging.ERROR: TermColor.red,
logging.CRITICAL: TermColor.bold_red
}
def format(self, record):
res = super().format(record)
return self.formats[record.levelno](res)
def get_logger(name: str, level=os.getenv('LOG_LEVEL', logging.DEBUG)):
log = logging.getLogger(name)
log.setLevel(level)
handler = logging.StreamHandler()
handler.setFormatter(ColorFormatter('%(levelname)s - %(message)s'))
log.addHandler(handler)
return log
_LOG = get_logger(__name__)
NoneType = type(None)
Leaf = Union[str, int, bool, NoneType]
JsonContainer = Union[list, dict]
Json = Union[Leaf, JsonContainer]
def iter_values(finding: Json) -> Generator[Leaf, Leaf, Json]:
"""
Yields values from the given finding with an ability to send back
the desired values. I proudly think this is cool, because we can put
keys replacement login outside of this generator
>>> gen = iter_values({'1':'q', '2': ['w', 'e'], '3': {'4': 'r'}})
>>> next(gen)
q
>>> gen.send('instead of q')
w
>>> gen.send('instead of w')
e
>>> gen.send('instead of e')
r
>>> gen.send('instead of r')
After the last command StopIteration will be raised, and it
will contain the changed finding. The given finding will be changed
in-place
:param finding:
:return:
"""
if isinstance(finding, (str, int, bool, NoneType)):
new = yield finding
return new
if isinstance(finding, dict):
for k, v in finding.items():
finding[k] = yield from iter_values(v)
return finding
if isinstance(finding, list):
for i, v in enumerate(finding):
finding[i] = yield from iter_values(v)
return finding
def flip_dict(d: dict) -> None:
"""
In place
:param d:
:return:
"""
for k in tuple(d.keys()):
d[d.pop(k)] = k
def keep_only(d: dict, keys: set) -> None:
if not keys:
return
for k in tuple(d.keys()):
if k not in keys:
d.pop(k)
def query_yes_no(question: str, default: str = "yes") -> bool:
"""Ask a yes/no question via raw_input() and return their answer.
"question" is a string that is presented to the user.
"default" is the presumed answer if the user just hits <Enter>.
It must be "yes" (the default), "no" or None (meaning
an answer is required of the user).
The "answer" return value is True for "yes" or False for "no".
"""
valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False}
if default is None:
prompt = " [y/n] "
elif default == "yes":
prompt = " [Y/n] "
elif default == "no":
prompt = " [y/N] "
else:
raise ValueError("invalid default answer: '%s'" % default)
while True:
sys.stdout.write(question + prompt)
choice = input().lower()
if default is not None and choice == "":
return valid[default]
elif choice in valid:
return valid[choice]
else:
sys.stdout.write(
"Please respond with 'yes' or 'no' " "(or 'y' or 'n').\n")
def import_openpyxl() -> None:
global openpyxl
if openpyxl:
return
_LOG.info('Going to import openpyxl')
try:
openpyxl = importlib.import_module('openpyxl')
except ImportError:
if not query_yes_no('Required requirement openpyxl is not found. '
'Want to install?'):
_LOG.error('Aborting...')
sys.exit(1)
subprocess.run(['pip', 'install', 'openpyxl'])
openpyxl = importlib.import_module('openpyxl')
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description='Obfuscation script cli enter-point'
)
# -- top level sub-parser
sub_parsers = parser.add_subparsers(dest='action', required=True,
help='Available actions')
obfuscate_parser = sub_parsers.add_parser(
'obfuscate', help='Obfuscates an existing dump'
)
obfuscate_parser.add_argument(
'--dump-directory', default='custodian_dump', type=Path,
help='Path to the folder where custodian dump is sited '
'(default: %(default)s)'
)
obfuscate_parser.add_argument(
'--to', required=False, type=Path,
help='Path the the patched data must be placed. If not specified, '
'pathed data will override an existing dump'
)
obfuscate_parser.add_argument(
'--keep-all-fields', action='store_true',
help='If specified, all the fields will be kept. By default, '
'only ID fields are kept'
)
obfuscate_parser.add_argument(
'--dictionary-out', default='dictionary_out.json', type=Path,
help='Path where obfuscated keys and their IDs will be places '
'(default: %(default)s)'
)
obfuscate_parser.add_argument(
'--dictionary', dest='dictionary_path', required=False, type=Path,
help='Optional dict, in case you want to give some specific '
'aliases for concrete names in resources. '
'Path to file which contains JSON with key-value pairs, '
'where key is real value of some attribute in resources '
f'and value is a name you want to replace the real value with. '
f'(default: %(default)s)'
)
deobfuscate_parser = sub_parsers.add_parser(
'deobfuscate', help='De-obfuscates an existing dump'
)
deobfuscate_parser.add_argument(
'--dump-directory', default='custodian_dump', type=Path,
help='Path to the folder where the obfuscated custodian dump is '
'sited. Can be also a path to concrete file '
'(default: %(default)s)'
)
deobfuscate_parser.add_argument(
'--to', required=False, type=Path,
help='Path where the de-obfuscated data must be placed. '
'If not specified, pathed data will override an existing dump'
)
deobfuscate_parser.add_argument(
'--dictionary', required=True, nargs='+', type=Path,
help='Path to a file where obfuscated keys and their IDs are'
)
return parser
class ActionHandler(ABC):
@abstractmethod
def __call__(self, **kwargs):
...
class BaseHandler(ActionHandler):
"""
Contains some common methods
"""
def __call__(self, *args, **kwargs):
raise NotImplementedError('BaseHandler must not be used directly')
@staticmethod
def is_gzipped_json(name: Path) -> bool:
return name.suffixes == ['.json', '.gz']
@staticmethod
def is_json(name: Path) -> bool:
return name.suffixes == ['.json']
@staticmethod
def is_xlsx(name: Path) -> bool:
return name.suffixes == ['.xlsx']
@staticmethod
def is_csv(name: Path) -> bool:
return name.suffixes == ['.csv']
@staticmethod
def load_json(path: Path, gzipped: Optional[bool] = None
) -> Optional[Json]:
if not isinstance(gzipped, bool):
gzipped = BaseHandler.is_gzipped_json(path)
try:
with open(path, 'rb') as fp:
data = fp.read()
if gzipped:
data = gzip.decompress(data)
return json.loads(data)
except Exception as e:
_LOG.warning(
f'Unexpected error occurred trying to load {path}: {e}. '
f'Skipping...'
)
return
@staticmethod
def dump_json(to: Path, data: Json, gzipped: Optional[bool] = None):
if not isinstance(gzipped, bool):
gzipped = BaseHandler.is_gzipped_json(to)
to.parent.mkdir(parents=True, exist_ok=True)
with open(to, 'wb') as fp:
buf = json.dumps(data, separators=(',', ':')).encode()
if gzipped:
buf = gzip.compress(buf)
fp.write(buf)
@staticmethod
def yield_files(root: Path) -> Generator[Tuple[Path, Path], None, None]:
"""
Iterates over all the files in root and yield each file full
path and path relative to root
:param root: (full, relative)
:return:
"""
if root.is_file():
yield root.resolve(), root.relative_to(root.parent)
return
for base, _, files in os.walk(root):
for file in files:
_path = Path(base, file)
yield _path, _path.relative_to(root)
class ObfuscateDump(BaseHandler):
@staticmethod
def is_findings(dct) -> bool:
"""
Is 'findings' format of report
:param dct:
:return:
"""
if not isinstance(dct, dict):
return False
if not list(dct.keys())[0].startswith('ecc-'):
return False
return True
@staticmethod
def is_list_of_resources(lst) -> bool:
"""
Is just list of resources
:param lst:
:return:
"""
if not isinstance(lst, list):
return False
if len(lst) == 0:
return True
if not isinstance(lst[0], dict):
return False
return True
@staticmethod
def is_list_of_shard_parts(lst) -> bool:
if not isinstance(lst, list):
return False
if len(lst) == 0:
return True
dct = lst[0]
if not isinstance(dct, dict):
return False
return 'p' in dct and 'l' in dct and 't' in dct and 'r' in dct
@staticmethod
def obfuscate_finding(finding: dict, dictionary: dict,
dictionary_out: dict) -> None:
"""
Main business logic
:param finding:
:param dictionary:
:param dictionary_out:
:return:
"""
gen = iter_values(finding)
try:
real = next(gen)
gen_id = uuid.uuid4
while True:
alias = dictionary_out.setdefault(
real, dictionary.get(real) or str(gen_id())
)
_LOG.debug(f'"{str(real)[:3]}***" will be '
f'replaced with {alias}')
real = gen.send(alias)
except StopIteration:
pass
def patch_findings(self, findings: dict, all_fields: bool,
dictionary: dict,
dictionary_out: dict):
"""
In place
:param findings:
:param all_fields:
:param dictionary:
:param dictionary_out:
:return:
"""
for data in findings.values():
for resources in data['resources'].values():
for resource in resources:
if not all_fields:
keep_only(resource, REPORT_FIELDS)
self.obfuscate_finding(
resource,
dictionary,
dictionary_out
)
def patch_list_of_resources(self, findings: list, all_fields: bool,
dictionary: dict, dictionary_out: dict):
for resource in findings:
if not all_fields:
keep_only(resource, REPORT_FIELDS)
self.obfuscate_finding(resource, dictionary, dictionary_out)
def patch_list_of_shard_parts(self, findings: list, all_fields: bool,
dictionary: dict, dictionary_out: dict):
for part in findings:
for resource in part.setdefault('r', []):
if not all_fields:
keep_only(resource, REPORT_FIELDS)
self.obfuscate_finding(resource, dictionary, dictionary_out)
def yield_jsons(self, root: Path
) -> Generator[Tuple[Path, Json], None, None]:
"""
Yields tuples where the first element is file path relative to root,
the second element - loaded file content.
Loads only JSON and gzipped JSON. Skips the file if it cannot be
loaded or not json
:param root:
:return:
"""
for full, relative in self.yield_files(root):
gz = self.is_gzipped_json(full)
js = self.is_json(full)
if not (gz or js):
_LOG.info(f'Skipping: {relative} - not json')
continue
data = self.load_json(full)
if not data:
continue
yield relative, data
def __call__(self, dump_directory: Path, to: Optional[Path],
keep_all_fields: bool, dictionary_out: Path,
dictionary_path: Optional[Path]):
# output dump directory validation
if not to:
if query_yes_no('Parameter --to was not provided. Patched files '
'will override the dump'):
to = dump_directory
else:
_LOG.error('Aborting')
sys.exit(1)
_LOG.info(f'Obfuscated dump will be places to: "{to}"')
# Loading desired values dictionary
if dictionary_path:
_LOG.info('Loading dictionary')
try:
with open(dictionary_path, 'r') as file:
dictionary = json.load(file)
if not isinstance(dictionary, dict):
raise ValueError('The content must be a dict')
except Exception as e:
_LOG.error(f'Could not load {dictionary_path}: {e}')
sys.exit(1)
else:
_LOG.info('Dictionary was not provided. All the '
'aliases will be randomly generated')
dictionary = {}
# Logging whether all the fields will be kept
if keep_all_fields:
_LOG.warning('All the fields will be kept')
else:
_LOG.info('Only id fields will be kept')
# obfuscating
out = {} # here we will put real names to our randomly generated
for path, content in self.yield_jsons(dump_directory):
if self.is_findings(content):
_LOG.info(f'Findings found by path: {path}. Pathing')
content = cast(dict, content) # is_findings ensures it's dict
self.patch_findings(content, keep_all_fields, dictionary, out)
elif self.is_list_of_shard_parts(content):
_LOG.info(f'List of shard parts found by path: {path}. '
f'Pathing')
content = cast(list, content)
self.patch_list_of_shard_parts(content, keep_all_fields,
dictionary, out)
elif self.is_list_of_resources(content):
_LOG.info(f'List of resources found by path: {path}. Pathing')
content = cast(list, content)
self.patch_list_of_resources(content, keep_all_fields,
dictionary, out)
else:
_LOG.warning(f'Unknown file format: {path}. Skipping')
self.dump_json(to / path, content)
# dumping output dict
_LOG.info(f'Output dictionary will be dumped to {dictionary_out}')
dictionary_out.parent.mkdir(parents=True, exist_ok=True)
flip_dict(out)
with open(dictionary_out, 'w') as file:
json.dump(out, file, indent=2)
_LOG.info('Finished!')
class Deobfuscator(ABC):
"""
>>> Deobfuscator().deobfuscate(Path('here')).to(Path('there')).using({})
"""
_what: Union[Path, None] = None
_to: Union[Path, None] = None
def __init__(self, *args, **kwargs):
pass
def deobfuscate(self, what: Path) -> 'Deobfuscator':
self._what = what
return self
def to(self, to: Path) -> 'Deobfuscator':
self._to = to
return self
def using(self, dictionary: dict) -> None:
assert self._what and self._to, 'Invalid calls chain'
self._make_it(self._what, self._to, dictionary)
@staticmethod
def _deobfuscate_str(item: str, dictionary: dict) -> str:
for k, v in dictionary.items():
# todo think another more efficient way
item = item.replace(k, str(v))
return item
@staticmethod
def _deobfuscate_finding(finding: Json, dictionary: dict) -> None:
"""
Deobfuscates one json item.
:param finding:
:param dictionary:
:return:
"""
gen = iter_values(finding)
try:
alias = next(gen)
while True:
if alias not in dictionary:
_LOG.warning(f'{alias} will not be replaced because '
f'there is not corresponding value in '
f'the dictionary')
alias = gen.send(alias)
continue
real = dictionary[alias]
_LOG.debug(f'{alias} will be '
f'replaced with "{str(real)[:3]}***"')
alias = gen.send(real)
except StopIteration:
pass
def _deobfuscate_maybe_json(self, item: str, dictionary: dict) -> str:
try:
data = json.loads(item)
self._deobfuscate_finding(data, dictionary)
return json.dumps(data, separators=(',', ':'))
except json.JSONDecodeError:
return self._deobfuscate_str(item, dictionary)
def _deobfuscate_line(self, ln: List[str], dictionary: dict) -> list:
"""
Returns the same object as received
"""
for i, item in enumerate(ln):
ln[i] = self._deobfuscate_maybe_json(item, dictionary)
return ln
@abstractmethod
def _make_it(self, what: Path, to: Path, dictionary: dict):
"""
Should create a deobfuscated file
:param what:
:param to:
:param dictionary:
:return:
"""
class JsonDeobfuscator(Deobfuscator):
def _make_it(self, what: Path, to: Path, dictionary: dict):
data = BaseHandler.load_json(what)
self._deobfuscate_finding(data, dictionary)
BaseHandler.dump_json(to, data)
class XlsxDeobfuscator(Deobfuscator):
def _deobfuscate_worksheet(self, wsh: 'Worksheet', dictionary: dict):
for row in wsh.rows:
for cell in row:
if cell.value is None: # isinstance(cell, MergedCell)
continue # MergedCell
if not isinstance(cell.value, str):
continue
cell.value = self._deobfuscate_maybe_json(cell.value,
dictionary)
def _make_it(self, what: Path, to: Path, dictionary: dict):
wb = openpyxl.load_workbook(what)
for wsh in wb:
_LOG.debug(f'Deobfuscating {wsh.title} worksheet')
self._deobfuscate_worksheet(wsh, dictionary)
wb.save(to)
class CsvDeobfuscator(Deobfuscator):
def _make_it(self, what: Path, to: Path, dictionary: dict):
f1 = open(what, 'r', newline='')
f2 = open(to, 'w', newline='')
reader = csv.reader(f1)
writer = csv.writer(f2, dialect=reader.dialect)
writer.writerows(
map(self._deobfuscate_line, reader, itertools.repeat(dictionary))
)
f1.close()
f2.close()
class EmlDeobfuscator(Deobfuscator):
def _deobfuscate_text_part(self, part: email.message.EmailMessage,
dictionary: dict):
charset = part.get_content_charset()
content = self._deobfuscate_maybe_json(part.get_content(), dictionary)
part.set_content(
content.encode(),
maintype=part.get_content_maintype(),
subtype=part.get_content_subtype(),
cte=part.get('Content-Transfer-Encoding'),
disposition=part.get_content_disposition(),
filename=part.get_filename(),
cid=part.get('Content-ID'),
)
part.set_charset(charset)
def _make_it(self, what: Path, to: Path, dictionary: dict):
with open(what, 'rb') as file:
msg = cast(
email.message.EmailMessage,
email.message_from_binary_file(file,
policy=email.policy.default)
)
for part in msg.walk():
# multipart/* are just containers
if part.get_content_maintype() == 'multipart':
continue
ct = part.get_content_type()
if ct in ('text/plain', 'text/html', 'text/csv', 'application/json'):
self._deobfuscate_text_part(part, dictionary)
with open(to, 'wb') as fp:
fp.write(msg.as_bytes())
class DeobfuscatorFactory:
def __init__(self, path: Path):
self._path = path
@classmethod
def path(cls, path: Path):
return cls(path)
def build(self, *args, **kwargs) -> Union[Deobfuscator, None]:
suffixes = self._path.suffixes
if suffixes == ['.xlsx']:
import_openpyxl()
return XlsxDeobfuscator(*args, **kwargs)
elif suffixes == ['.csv']:
return CsvDeobfuscator(*args, **kwargs)
elif suffixes == ['.json', '.gz'] or suffixes == ['.json']:
return JsonDeobfuscator(*args, **kwargs)
elif suffixes == ['.eml'] or suffixes == ['.emltpl']:
return EmlDeobfuscator(*args, **kwargs)
return
class DeObfuscateDump(BaseHandler):
def __call__(self, dump_directory: Path, to: Optional[Path],
dictionary: List[Path]):
if not to:
if query_yes_no('Parameter --to was not provided. '
'De-obfuscated files will override the dump'):
to = dump_directory
else:
_LOG.error('Aborting')
sys.exit(1)
if dump_directory.is_dir() and to.is_file():
_LOG.error('If --dump-directory is a directory --to must '
'be a directory as well')
sys.exit(1)
if dump_directory.is_file():
to.parent.mkdir(parents=True, exist_ok=True)
to.touch()
_LOG.info('Loading dictionary')
dct = {}
for i in dictionary:
try:
with open(i, 'r') as file:
dct.update(json.load(file))
except Exception as e:
_LOG.error(f'Could not load {i}: {e}')
for full, relative in self.yield_files(dump_directory):
_LOG.info(f'De-obfuscating {full}')
_path = to if to.is_file() else to / relative
_path.parent.mkdir(parents=True, exist_ok=True)
deobfuscator = DeobfuscatorFactory(relative).build()
if not deobfuscator:
_LOG.warning(f'Not supported file type: {relative}')
continue
deobfuscator.deobfuscate(full).to(_path).using(dct)
_LOG.info('Done!')
def main():
arguments = build_parser().parse_args()
mapping = {'obfuscate': ObfuscateDump(), 'deobfuscate': DeObfuscateDump()}
func = mapping[arguments.action]
delattr(arguments, 'action')
func(**vars(arguments))
if __name__ == '__main__':
main()