in src/validators/swagger_request_models.py [0:0]
def sanitize_schedule(schedule: str) -> str:
"""
May raise ValueError
:param schedule:
:return:
"""
_rate_error_message = (
'Invalid rate expression. Use `rate(value, unit)` where '
'value is a positive number, '
'unit is one of: minute, minutes, hour, hours, day, days. '
'Valid examples are: rate(1 hour), rate(2 hours). '
'If the value is equal to 1, then the unit must be singular.'
)
if 'rate' in schedule:
# consider the value to be rate expression only if explicitly
# specified "rate"
try:
value, unit = schedule.replace('rate', '').strip(' ()').split()
value = int(value)
if unit not in ('minute', 'minutes', 'hour', 'hours', 'day',
'days', 'week', 'weeks', 'second', 'seconds',):
raise ValueError
if value < 1:
raise ValueError
if value == 1 and unit.endswith('s') or value > 1 and not unit.endswith('s'):
raise ValueError
except ValueError:
raise ValueError(_rate_error_message)
return schedule
# considering it to be a cron expression.
# Currently, on-prem and saas cron expressions differ. On-prem only
# accepts standard crontab that contains five fields without year
# (https://en.wikipedia.org/wiki/Cron),
# whereas saas accepts expressions that are valid for EventBridge
# rule (https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-cron-expressions.html).
# The validator below does not 100% ensure that the expression if
# valid, but it does some things to make the difference less visible
raw = schedule.replace('cron', '').strip(' ()').split()
if len(raw) not in (5, 6):
raise ValueError('Invalid cron expression. '
'Must contain 5 or 6 fields: '
'(minute hour day-of-month month day-of-week [year])')
if SERVICE_PROVIDER.environment_service.is_docker():
# on-prem supports only 5 fields and does not support "?"
raw = ['*' if i == '?' else i for i in raw]
if len(raw) == 6:
raw.pop()
else:
# saas supports only 6 fields
if len(raw) == 5:
raw.append('*')
return f'cron({" ".join(raw)})'