workflows/pipe-common/pipeline/log/logger.py (291 lines of code) (raw):
# Copyright 2017-2019 EPAM Systems, Inc. (https://www.epam.com/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import traceback
from abc import ABCMeta, abstractmethod
import datetime
from pipeline.api import LogEntry, TaskStatus, PipelineAPI
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
STATUS_COLORS = {TaskStatus.RUNNING: '',
TaskStatus.SUCCESS: OKGREEN,
TaskStatus.FAILURE: FAIL}
@staticmethod
def colored(message, status):
return bcolors.STATUS_COLORS[status] + message + bcolors.ENDC
class Logger:
@staticmethod
def info(message,
task_name=None,
run_id=None,
api_url=None,
log_dir=None,
omit_console=False):
Logger.log_task_event(task_name,
message,
status=TaskStatus.RUNNING,
run_id=run_id,
api_url=api_url,
log_dir=log_dir,
omit_console=omit_console)
@staticmethod
def warn(message,
task_name=None,
run_id=None,
api_url=None,
log_dir=None,
omit_console=False):
Logger.log_task_event(task_name,
bcolors.WARNING + message + bcolors.ENDC,
status=TaskStatus.RUNNING,
run_id=run_id,
api_url=api_url,
log_dir=log_dir,
omit_console=omit_console)
@staticmethod
def success(message,
task_name=None,
run_id=None,
api_url=None,
log_dir=None,
omit_console=False):
Logger.log_task_event(task_name,
message,
status=TaskStatus.SUCCESS,
run_id=run_id,
api_url=api_url,
log_dir=log_dir,
omit_console=omit_console)
@staticmethod
def fail(message,
task_name=None,
run_id=None,
api_url=None,
log_dir=None,
omit_console=False):
Logger.log_task_event(task_name,
message,
status=TaskStatus.FAILURE,
run_id=run_id,
api_url=api_url,
log_dir=log_dir,
omit_console=omit_console)
@staticmethod
def log_task_event(task_name, message, status=TaskStatus.RUNNING, run_id=None, instance=None, api_url=None, log_dir=None, omit_console=False):
_run_id = run_id
_instance = instance
_api_url = api_url
_log_dir = log_dir
_task_name = task_name
_pipeline_name = os.environ.get('PIPELINE_NAME')
if not _pipeline_name:
_pipeline_name = 'Pipeline-output'
if not _task_name:
_task_name = _pipeline_name
if not _run_id:
_run_id = os.environ.get('RUN_ID')
if not _instance:
pipeline_name = _pipeline_name
_instance = "{}-{}".format(pipeline_name, _run_id)
if not _api_url:
_api_url = os.environ.get('API')
if not _log_dir:
_log_dir = os.environ.get('LOG_DIR')
log_entry = LogEntry(_run_id,
status,
bcolors.colored(message, status),
_task_name,
instance)
pipe_api = PipelineAPI(_api_url, _log_dir)
pipe_api.log_event(log_entry, omit_console=omit_console)
class CloudPipelineLogger:
__metaclass__ = ABCMeta
@abstractmethod
def info(self, message, task=None, trace=False):
pass
@abstractmethod
def debug(self, message, task=None, trace=False):
pass
@abstractmethod
def warning(self, message, task=None, trace=False):
pass
@abstractmethod
def success(self, message, task=None, trace=False):
pass
@abstractmethod
def error(self, message, task=None, trace=False):
pass
class PrintLogger(CloudPipelineLogger):
_DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
_DATE_WITH_MILLISECONDS = '%s.%03d'
def __init__(self, inner=None):
self._inner = inner
def info(self, message, task=None, trace=False):
self._log(message=message, level='INFO', trace=trace)
if self._inner:
self._inner.info(message, task=task, trace=trace)
def debug(self, message, task=None, trace=False):
self._log(message=message, level='DEBUG', trace=trace)
if self._inner:
self._inner.debug(message, task=task, trace=trace)
def warning(self, message, task=None, trace=False):
self._log(message=message, level='WARNING', trace=trace)
if self._inner:
self._inner.warning(message, task=task, trace=trace)
def success(self, message, task=None, trace=False):
self._log(message=message, level='INFO', trace=trace)
if self._inner:
self._inner.success(message, task=task, trace=trace)
def error(self, message, task=None, trace=False):
self._log(message=message, level='ERROR', trace=trace)
if self._inner:
self._inner.error(message, task=task, trace=trace)
def _log(self, message, level, trace):
now_utc = datetime.datetime.utcnow()
formatted_dt = self._DATE_WITH_MILLISECONDS % (now_utc.strftime(self._DATE_FORMAT), now_utc.microsecond / 1000)
if trace:
stacktrace = traceback.format_exc()
message += ' ' + stacktrace
print(formatted_dt + ' [' + level + '] ' + message)
class LocalLogger(CloudPipelineLogger):
def __init__(self, logger=None, inner=None):
self._logger = logger or logging
self._inner = inner
def info(self, message, task=None, trace=False):
self._logger.info(message, exc_info=trace)
if self._inner:
self._inner.info(message, task=task, trace=trace)
def debug(self, message, task=None, trace=False):
self._logger.debug(message, exc_info=trace)
if self._inner:
self._inner.debug(message, task=task, trace=trace)
def warning(self, message, task=None, trace=False):
self._logger.warning(message, exc_info=trace)
if self._inner:
self._inner.warning(message, task=task, trace=trace)
def success(self, message, task=None, trace=False):
self._logger.info(message, exc_info=trace)
if self._inner:
self._inner.success(message, task=task, trace=trace)
def error(self, message, task=None, trace=False):
self._logger.error(message, exc_info=trace)
if self._inner:
self._inner.error(message, task=task, trace=trace)
class ExplicitLogger(CloudPipelineLogger):
def __init__(self, level, inner):
self._inner = inner
self._level = level.strip().lower()
def info(self, message, task=None, trace=False):
self._log(message=message, task=task, trace=trace)
def debug(self, message, task=None, trace=False):
self._log(message=message, task=task, trace=trace)
def warning(self, message, task=None, trace=False):
self._log(message=message, task=task, trace=trace)
def success(self, message, task=None, trace=False):
self._log(message=message, task=task, trace=trace)
def error(self, message, task=None, trace=False):
self._log(message=message, task=task, trace=trace)
def _log(self, message, task, trace):
getattr(self._inner, self._level)(message, task=task, trace=trace)
class LevelLogger(CloudPipelineLogger):
_levels = ['ERROR', 'WARNING', 'INFO', 'DEBUG']
_error_idx = _levels.index('ERROR')
_warn_idx = _levels.index('WARNING')
_info_idx = _levels.index('INFO')
_debug_idx = _levels.index('DEBUG')
def __init__(self, level, inner):
self._level_idx = self._levels.index(level)
self._inner = inner
def info(self, message, task=None, trace=False):
if self._applies(self._info_idx):
self._inner.info(message, task=task, trace=trace)
def debug(self, message, task=None, trace=False):
if self._applies(self._debug_idx):
self._inner.debug(message, task=task, trace=trace)
def warning(self, message, task=None, trace=False):
if self._applies(self._warn_idx):
self._inner.warning(message, task=task, trace=trace)
def success(self, message, task=None, trace=False):
if self._applies(self._info_idx):
self._inner.success(message, task=task, trace=trace)
def error(self, message, task=None, trace=False):
if self._applies(self._error_idx):
self._inner.error(message, task=task, trace=trace)
def _applies(self, level_idx):
return self._level_idx >= level_idx
class TaskLogger(CloudPipelineLogger):
def __init__(self, task, inner):
self._task = task
self._inner = inner
def info(self, message, task=None, trace=False):
self._inner.info(message, task=task or self._task, trace=trace)
def debug(self, message, task=None, trace=False):
self._inner.debug(message, task=task or self._task, trace=trace)
def warning(self, message, task=None, trace=False):
self._inner.warning(message, task=task or self._task, trace=trace)
def success(self, message, task=None, trace=False):
self._inner.success(message, task=task or self._task, trace=trace)
def error(self, message, task=None, trace=False):
self._inner.error(message, task=task or self._task, trace=trace)
class RunLogger(CloudPipelineLogger):
_DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
_DATE_WITH_MILLISECONDS = '%s.%03d'
def __init__(self, api, run_id, inner=None):
self._api = api
self._run_id = run_id
self._inner = inner
def info(self, message, task=None, trace=False):
self._log(message=message, task=task, status='RUNNING', trace=trace)
if self._inner:
self._inner.info(message, task=task, trace=trace)
def debug(self, message, task=None, trace=False):
self._log(message=message, task=task, status='RUNNING', trace=trace)
if self._inner:
self._inner.debug(message, task=task, trace=trace)
def warning(self, message, task=None, trace=False):
self._log(message=message, task=task, status='RUNNING', trace=trace)
if self._inner:
self._inner.warning(message, task=task, trace=trace)
def success(self, message, task=None, trace=False):
self._log(message=message, task=task, status='SUCCESS', trace=trace)
if self._inner:
self._inner.success(message, task=task, trace=trace)
def error(self, message, task=None, trace=False):
self._log(message=message, task=task, status='FAILURE', trace=trace)
if self._inner:
self._inner.error(message, task=task, trace=trace)
def _log(self, message, task, status, trace):
now_utc = datetime.datetime.utcnow()
formatted_dt = self._DATE_WITH_MILLISECONDS % (now_utc.strftime(self._DATE_FORMAT), now_utc.microsecond / 1000)
if trace:
stacktrace = traceback.format_exc()
message += ' ' + stacktrace
self._api.log_efficiently(run_id=self._run_id, message=message, task=task, status=status, date=formatted_dt)
class ResilientLogger(CloudPipelineLogger):
def __init__(self, inner, fallback):
self._inner = inner
self._fallback = fallback
def info(self, message, task=None, trace=False):
self._log(self._inner.info, self._fallback.error, message, task=task, trace=trace)
def debug(self, message, task=None, trace=False):
self._log(self._inner.debug, self._fallback.error, message, task=task, trace=trace)
def warning(self, message, task=None, trace=False):
self._log(self._inner.warning, self._fallback.error, message, task=task, trace=trace)
def success(self, message, task=None, trace=False):
self._log(self._inner.success, self._fallback.error, message, task=task, trace=trace)
def error(self, message, task=None, trace=False):
self._log(self._inner.error, self._fallback.error, message, task=task, trace=trace)
def _log(self, inner_method, fallback_method, message, task=None, trace=False):
try:
inner_method(message, task=task, trace=trace)
except Exception:
try:
fallback_method('Logging error', task=task, trace=True)
except Exception:
pass