src/validators/swagger_request_models.py (1,293 lines of code) (raw):
# classes for swagger models are not instantiated directly in code.
# PreparedEvent models are used instead.
from base64 import standard_b64decode
from datetime import date, datetime, timedelta, timezone
from typing import Literal, Generator
from typing_extensions import Annotated, Self
from modular_sdk.commons.constants import Cloud as ModularCloud
from pydantic import (
AmqpDsn,
AnyUrl,
BaseModel as PydanticBaseModel,
ConfigDict,
Field,
HttpUrl,
StringConstraints,
field_validator,
model_validator,
)
from pydantic.json_schema import SkipJsonSchema, WithJsonSchema
from helpers.constants import (
HealthCheckStatus,
JobState,
JobType,
Permission,
PlatformType,
PolicyErrorType,
ReportFormat,
RuleDomain,
RuleSourceType,
GITHUB_API_URL_DEFAULT,
GITLAB_API_URL_DEFAULT,
PolicyEffect,
ReportType
)
from helpers import Version
from helpers.regions import AllRegions, AllRegionsWithGlobal
from helpers.time_helper import utc_datetime
from services import SERVICE_PROVIDER
from services.chronicle_service import ChronicleConverterType
from services.ruleset_service import RulesetName
from models.rule import RuleIndex
class BaseModel(PydanticBaseModel):
model_config = ConfigDict(
coerce_numbers_to_str=True,
populate_by_name=True,
)
customer_id: SkipJsonSchema[str] = Field(
None,
alias='customer',
description='Special parameter. Allows to perform actions on behalf '
'on the specified customer. This can be allowed only '
'for system users. Parameter will be ignored for '
'standard users',
)
@property
def customer(self) -> str | None:
"""
Just backward compatibility
:return:
"""
return self.customer_id
class BasePaginationModel(BaseModel):
limit: int = Field(
50,
ge=1,
le=50,
description='Max number of items to return'
)
next_token: str = Field(
None,
description='Provide next_token received from the previous request'
)
class TimeRangedMixin:
"""
Base model which provides time-range constraint
"""
# start_iso: datetime | date = Field(None, alias='from')
# end_iso: datetime | date = Field(None, alias='to')
@classmethod
def skip_validation_if_no_input(cls) -> bool:
return False
@classmethod
def max_range(cls) -> timedelta:
return timedelta(days=7)
@classmethod
def to_datetime(cls, d: datetime | date | None) -> datetime | None:
if not d:
return
if isinstance(d, datetime):
return d
return datetime.combine(d, datetime.min.time())
@model_validator(mode='after')
def validate_dates(self) -> Self:
"""
What it does:
- converts start_iso and end_iso to utc tz-aware datetime object
- validates that start_iso < end_iso, end_iso <= now
- sets default values in case they are not provided
:param values:
:return:
"""
now = utc_datetime()
max_range = self.max_range()
start = self.to_datetime(self.start_iso)
end = self.to_datetime(self.end_iso)
if start:
start = start.astimezone(timezone.utc)
if start > now:
raise ValueError('value of \'from\' must be less '
'than current date')
if end:
end = end.astimezone(timezone.utc)
if end > now:
raise ValueError('value of \'to\' must be less '
'than current date')
if start and end:
pass
elif start:
end = min(start + max_range, now)
elif end:
start = end - max_range
else: # both not provided
if self.skip_validation_if_no_input:
self.start_iso = None
self.end_iso = None
return self
end = now
start = end - max_range
if start >= end:
raise ValueError('value of \'to\' must '
'be bigger than \'from\' date')
if (end - start) > max_range:
raise ValueError(
f'Time range between \'from\' and \'to\' must '
f'not overflow {max_range}')
self.start_iso = start
self.end_iso = end
return self
class CustomerGetModel(BaseModel):
"""
GET
"""
name: str = Field(None)
@model_validator(mode='after')
def _(self) -> Self:
if name := self.customer: # not system
self.name = name
return self
class MultipleTenantsGetModel(BasePaginationModel):
"""
GET
"""
# cloud_identifier: Optional[str] # TODO API separate endpoint for this
active: bool = Field(None)
cloud: ModularCloud = Field(None)
class TenantPostModel(BaseModel):
name: str
account_id: str
cloud: Literal['AWS', 'AZURE', 'GOOGLE']
display_name: str = Field(None)
primary_contacts: list[str] = Field(default_factory=list)
secondary_contacts: list[str] = Field(default_factory=list)
tenant_manager_contacts: list[str] = Field(default_factory=list)
default_owner: str = Field(None)
@model_validator(mode='after')
def set_display_name(self) -> Self:
if not self.display_name:
self.display_name = self.name
return self
class TenantGetActiveLicensesModel(BaseModel):
limit: int = 1
class TenantRegionPostModel(BaseModel):
region: str # means native region name
@field_validator('region')
@classmethod
def validate_region(cls, value: str) -> str:
"""
Of course, we can use typing "region: AllRegions", but the output is
huge is validation fails
"""
if not AllRegions.has(value):
raise ValueError(f'Not known region: {value}')
return value
class RulesetPostModel(BaseModel):
name: str
cloud: Literal['AWS', 'AZURE', 'GOOGLE', 'GCP', 'KUBERNETES']
version: str = Field(
None,
description='Ruleset version. If not specified, '
'will be generated automatically based on github '
'release of rules or based on the previous ruleset version'
)
rule_source_id: str = Field(
None,
description='Id of rule source object to get rules from. '
'If the type of that source is GITHUB_RELEASE, '
'the version from release tag will be used'
)
git_project_id: str = Field(None)
git_ref: str = Field(None)
rules: set = Field(default_factory=set)
excluded_rules: set = Field(default_factory=set)
platforms: set[str] = Field(
default_factory=set,
description='Platform for k8s rules to filter based on',
)
categories: set[str] = Field(
default_factory=set,
description='Rules category to use'
)
service_sections: set[str] = Field(
default_factory=set,
description='Service section to use'
)
sources: set[str] = Field(
default_factory=set,
description='Sources to use'
)
@field_validator('platforms', mode='after')
@classmethod
def validate_platforms(cls, platforms: set[str]) -> set[str]:
if not platforms: return platforms
all_platforms = {i.lower() for i in RuleIndex.platform_map.values() if i}
platforms = {p.strip().lower() for p in platforms}
not_existing = platforms - all_platforms
if not_existing:
raise ValueError(f'not available platforms: {", ".join(not_existing)}. Choose from: {", ".join(all_platforms)}')
return platforms
@field_validator('categories', mode='after')
@classmethod
def validate_categories(cls, categories: set[str]) -> set[str]:
if not categories: return categories
all_categories = {i.lower() for i in RuleIndex.category_map.values() if i}
categories = {c.strip().lower() for c in categories}
not_existing = categories - all_categories
if not_existing:
raise ValueError(f'not available categories: {", ".join(not_existing)}. Choose from: {", ".join(all_categories)}')
return categories
@field_validator('service_sections', mode='after')
@classmethod
def validate_service_sections(cls, service_sections: set[str]) -> set[str]:
if not service_sections: return service_sections
all_sections = {i.lower() for i in RuleIndex.service_section_map.values() if i}
service_sections = {ss.strip().lower() for ss in service_sections}
not_existing = service_sections - all_sections
if not_existing:
raise ValueError(f'not available service sections: {", ".join(not_existing)}. Choose from: {", ".join(all_sections)}')
return service_sections
@field_validator('sources', mode='after')
@classmethod
def validate_sources(cls, sources: set[str]) -> set[str]:
if not sources: return sources
all_sources = {i.lower() for i in RuleIndex.source_map.values() if i}
sources = {s.strip().lower() for s in sources}
not_existing = sources - all_sources
if not_existing:
raise ValueError(f'not available sources: {", ".join(not_existing)}. Choose from: {", ".join(all_sources)}')
return sources
@field_validator('name', mode='after')
@classmethod
def validate_name(cls, name: str) -> str:
if ':' in name:
raise ValueError('colon in not allowed in ruleset name')
return name
@field_validator('cloud', mode='after')
@classmethod
def validate_cloud(cls, cloud: str) -> str:
if cloud == 'GOOGLE':
cloud = 'GCP'
return cloud
@field_validator('version', mode='after')
@classmethod
def validate_version(cls, version: str | None) -> str | None:
if not version:
return version
_ = Version(version) # raise ValueError
return version
@model_validator(mode='after')
def validate_model(self) -> Self:
if self.git_ref and not self.git_project_id:
raise ValueError('git_project_id must be specified with git_ref')
if self.rule_source_id and (self.git_ref or self.git_project_id):
raise ValueError('Do not specify git_ref or git_project_id '
'if rule_source_id is specified')
return self
class RulesetPatchModel(BaseModel):
name: str
version: str = Field(
None,
description='A version of the ruleset you want to update. '
'If not specified, the latest previous ruleset will '
'be used as base to update'
)
rules_to_attach: set = Field(default_factory=set)
rules_to_detach: set = Field(default_factory=set)
force: bool = False
@field_validator('name', mode='after')
@classmethod
def validate_name(cls, name: str) -> str:
if ':' in name:
raise ValueError('colon in not allowed in ruleset name')
return name
@field_validator('version', mode='after')
@classmethod
def validate_version(cls, version: str | None) -> str | None:
if not version:
return version
_ = Version(version) # raise ValueError
return version
@model_validator(mode='after')
def at_least_one_given(self) -> Self:
if self.force:
return self
if not self.rules_to_attach and not self.rules_to_detach:
raise ValueError(
'At least one attribute to update must be provided'
)
return self
class RulesetDeleteModel(BaseModel):
name: str
version: str = Field(
description='Specific version to remove. * can be specified to '
'remove all the versions of a specific ruleset'
)
@field_validator('name', mode='after')
@classmethod
def validate_name(cls, name: str) -> str:
if ':' in name:
raise ValueError('colon in not allowed in ruleset name')
return name
@field_validator('version', mode='after')
@classmethod
def validate_version(cls, version: str) -> str:
version = version.strip()
if version != '*':
_ = Version(version)
return version
@property
def is_all_versions(self) -> bool:
return self.version == '*'
class RulesetGetModel(BaseModel):
"""
GET
"""
name: str = Field(None)
version: str = Field(None)
cloud: RuleDomain = Field(None)
get_rules: bool = False
licensed: bool = Field(None)
@field_validator('version', mode='after')
@classmethod
def validate_version(cls, version: str | None) -> str | None:
if not version:
return version
version = version.strip()
_ = Version(version)
return version
@model_validator(mode='after')
def validate_codependent_params(self) -> Self:
if self.version and not self.name:
raise ValueError('\'name\' is required if \'version\' is given')
if self.name and self.version and self.cloud:
raise ValueError(
'you don\'t have to specify \'cloud\' or \'active\' '
'if \'name\' and \'version\' are given')
return self
class RulesetReleasePostModel(BaseModel):
name: str
version: str = Field(
None,
description='Specific version to release to LM. * can be specified to '
'release all the versions of a specific ruleset. '
'If not specified, the latest version will be released'
)
description: str
display_name: str
@field_validator('version', mode='after')
@classmethod
def validate_version(cls, version: str | None) -> str | None:
if not version:
return version
version = version.strip()
if version != '*':
_ = Version(version)
return version
@property
def is_all_versions(self) -> bool:
return self.version == '*'
# other params
class RuleDeleteModel(BaseModel):
rule: str = Field(None)
cloud: RuleDomain = Field(None)
git_project_id: str = Field(None)
git_ref: str = Field(None)
@model_validator(mode='after')
def validate_root(self) -> Self:
if self.git_ref and not self.git_project_id:
raise ValueError('git_project_id must be specified with git_ref')
return self
class RuleGetModel(BasePaginationModel):
"""
GET
"""
rule: str = Field(None)
cloud: RuleDomain = Field(None)
git_project_id: str = Field(None)
git_ref: str = Field(None)
rule_source_id: str = Field(None)
@model_validator(mode='after')
def validate_root(self) -> Self:
if self.rule_source_id and (self.git_project_id or self.git_ref):
raise ValueError(
'Do not specify git_project_id or git_ref if rule_source_id '
'is given'
)
if self.git_ref and not self.git_project_id:
raise ValueError('git_project_id must be specified with git_ref')
return self
class RuleUpdateMetaPostModel(BaseModel):
rule_source_id: str = Field(None)
class RuleSourcePostModel(BaseModel):
git_project_id: str # "141234124" or "epam/ecc"
description: str
type: RuleSourceType = Field(
None,
description='If not specified will be inferred from git_project_id.'
)
git_url: HttpUrl = Field(
None,
description=f'If not specified will be inferred from git_project_id. '
f'"{GITHUB_API_URL_DEFAULT}" will be used for GitHub, '
f'"{GITLAB_API_URL_DEFAULT}" will be used for GitLab'
) # can be inferred
git_ref: str = Field(
'main',
description='Git branch to pull rules from. Not used for '
'GITHUB_RELEASE'
)
git_rules_prefix: str = '/'
git_access_secret: str = Field(None)
@property
def baseurl(self) -> str:
return self.git_url.scheme + '://' + self.git_url.host
@model_validator(mode='after')
def root(self) -> Self:
self.git_project_id = self.git_project_id.strip().strip('/')
is_github = self.git_project_id.count('/') == 1
is_gitlab = self.git_project_id.isdigit()
if not is_github and not is_gitlab:
raise ValueError(
'unknown git_project_id. '
'Specify Gitlab project id or Github owner/repo'
)
if not self.git_url:
if is_github:
self.git_url = HttpUrl(GITHUB_API_URL_DEFAULT)
elif is_gitlab:
self.git_url = HttpUrl(GITLAB_API_URL_DEFAULT)
if not self.type:
if is_github:
self.type = RuleSourceType.GITHUB
elif is_gitlab:
self.type = RuleSourceType.GITLAB
if self.type is RuleSourceType.GITHUB_RELEASE and not is_github:
raise ValueError(
'GITHUB_RELEASES is only available for GitHub projects'
)
return self
class RuleSourcePatchModel(BaseModel):
git_access_secret: str = Field(None)
description: str = Field(None)
@model_validator(mode='after')
def validate_any_to_update(self) -> Self:
if not self.git_access_secret and not self.description:
raise ValueError('Provide data to update')
return self
class RuleSourceDeleteModel(BaseModel):
delete_rules: bool = False
class RuleSourcesListModel(BasePaginationModel):
type: RuleSourceType = Field(None)
project_id: str = Field(
None,
description='Gitlab project id (12345) or Github project id (epam/ecc)'
)
has_secret: bool = Field(None)
class RolePostModel(BaseModel):
name: str
policies: set[str]
expiration: datetime = Field(None)
description: str
@field_validator('expiration')
@classmethod
def _(cls, expiration: datetime | None) -> datetime | None:
if not expiration:
return expiration
expiration.astimezone(timezone.utc)
if expiration < datetime.now(tz=timezone.utc):
raise ValueError('Expiration date has already passed')
return expiration
class RolePatchModel(BaseModel):
policies_to_attach: set[str] = Field(default_factory=set)
policies_to_detach: set[str] = Field(default_factory=set)
expiration: datetime = Field(None)
description: str = Field(None)
@model_validator(mode='after')
def to_attach_or_to_detach(self) -> Self:
if not self.policies_to_detach and not self.policies_to_attach and not self.expiration:
raise ValueError('Provide some parameter to update')
return self
class PolicyPostModel(BaseModel):
name: str
permissions: set[Permission] = Field(default_factory=set)
permissions_admin: bool = False
effect: PolicyEffect
tenants: set[str] = Field(default_factory=set)
description: str
# todo add effect and tenants
@field_validator('permissions', mode='after')
@classmethod
def validate_hidden(cls, permission: set[Permission]) -> set[Permission]:
if not_allowed := permission & set(Permission.iter_disabled()):
raise ValueError(f'Permissions: {", ".join(not_allowed)} are '
f'currently not allowed')
return permission
@model_validator(mode='after')
def _(self) -> Self:
if not self.permissions_admin and not self.permissions:
raise ValueError('Provide either permissions or permissions_admin')
if self.permissions_admin:
self.permissions = set(Permission.iter_enabled())
return self
class PolicyPatchModel(BaseModel):
permissions_to_attach: set[Permission] = Field(default_factory=set)
permissions_to_detach: set[Permission] = Field(default_factory=set)
effect: PolicyEffect = Field(None)
tenants_to_add: set[str] = Field(default_factory=set)
tenants_to_remove: set[str] = Field(default_factory=set)
description: str = Field(None)
@field_validator('permissions_to_attach', mode='after')
@classmethod
def validate_hidden(cls, permission: set[Permission]) -> set[Permission]:
if not_allowed := permission & set(Permission.iter_disabled()):
raise ValueError(f'Permissions: {", ".join(not_allowed)} are '
f'currently not allowed')
return permission
@model_validator(mode='after')
def _(self) -> Self:
if not any((self.permissions_to_attach, self.permissions_to_detach,
self.effect, self.tenants_to_add, self.tenants_to_remove,
self.description)):
raise ValueError('Provide some attribute to update')
return self
class JobGetModel(TimeRangedMixin, BasePaginationModel):
start_iso: datetime | date = Field(None, alias='from')
end_iso: datetime | date = Field(None, alias='to')
tenant_name: str = Field(None)
status: JobState = Field(None)
@classmethod
def max_range(cls) -> timedelta:
return timedelta(days=365)
class AWSCredentials(PydanticBaseModel):
AWS_ACCESS_KEY_ID: str
AWS_SECRET_ACCESS_KEY: str
AWS_SESSION_TOKEN: str = Field(None)
AWS_DEFAULT_REGION: str = 'us-east-1'
# TODO, add certificates & username-password creds
# https://learn.microsoft.com/en-us/dotnet/api/azure.identity.environmentcredential?view=azure-dotnet
class AZURECredentials(PydanticBaseModel):
AZURE_TENANT_ID: str
AZURE_CLIENT_ID: str
AZURE_CLIENT_SECRET: str
AZURE_SUBSCRIPTION_ID: str = Field(None)
class GOOGLECredentials1(PydanticBaseModel):
model_config = ConfigDict(extra='allow')
type: str
project_id: str
private_key_id: str
private_key: str
client_email: str
client_id: str
auth_uri: str
token_uri: str
auth_provider_x509_cert_url: str
client_x509_cert_url: str
class GOOGLECredentials2(PydanticBaseModel):
model_config = ConfigDict(extra='allow')
type: str
access_token: str
refresh_token: str
client_id: str
client_secret: str
project_id: str
class GOOGLECredentials3(PydanticBaseModel):
model_config = ConfigDict(extra='allow')
access_token: str
project_id: str
class JobPostModel(BaseModel):
credentials: AWSCredentials | AZURECredentials | GOOGLECredentials1 | GOOGLECredentials2 | GOOGLECredentials3 = Field(
None)
tenant_name: str
target_rulesets: set[str] = Field(
default_factory=set,
alias='rulesets',
)
target_regions: set[str] = Field(default_factory=set, alias='regions')
rules_to_scan: set[str] = Field(default_factory=set, alias='rules')
timeout_minutes: float = Field(
None,
description='Job timeout in minutes. This timeout is soft '
'meaning that when the desired number of minutes have '
'passed job termination will be triggered'
)
license_key: str = Field(
None,
description='License to exhaust for this job. Will be resolved '
'automatically unless an ambiguous occurs'
)
@field_validator('target_rulesets', mode='after')
@classmethod
def validate_rulesets(cls, value: set[str]) -> set[str]:
"""
Removes license keys and validates
:param value:
:return:
"""
name_to_items = {}
rulesets = set()
for item in value:
i = RulesetName(item) # raises ValueError
name_to_items.setdefault(i.name, []).append(i)
rulesets.add(RulesetName(i.name, i.version, None).to_str())
if any(len(items) > 1 for items in name_to_items.values()):
raise ValueError('Only one version of specific ruleset can be '
'used')
return rulesets
def iter_rulesets(self) -> Generator[RulesetName, None, None]:
yield from map(RulesetName, self.target_rulesets)
def sanitize_schedule(schedule: str) -> str:
"""
May raise ValueError
:param schedule:
:return:
"""
_rate_error_message = (
'Invalid rate expression. Use `rate(value, unit)` where '
'value is a positive number, '
'unit is one of: minute, minutes, hour, hours, day, days. '
'Valid examples are: rate(1 hour), rate(2 hours). '
'If the value is equal to 1, then the unit must be singular.'
)
if 'rate' in schedule:
# consider the value to be rate expression only if explicitly
# specified "rate"
try:
value, unit = schedule.replace('rate', '').strip(' ()').split()
value = int(value)
if unit not in ('minute', 'minutes', 'hour', 'hours', 'day',
'days', 'week', 'weeks', 'second', 'seconds',):
raise ValueError
if value < 1:
raise ValueError
if value == 1 and unit.endswith('s') or value > 1 and not unit.endswith('s'):
raise ValueError
except ValueError:
raise ValueError(_rate_error_message)
return schedule
# considering it to be a cron expression.
# Currently, on-prem and saas cron expressions differ. On-prem only
# accepts standard crontab that contains five fields without year
# (https://en.wikipedia.org/wiki/Cron),
# whereas saas accepts expressions that are valid for EventBridge
# rule (https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-cron-expressions.html).
# The validator below does not 100% ensure that the expression if
# valid, but it does some things to make the difference less visible
raw = schedule.replace('cron', '').strip(' ()').split()
if len(raw) not in (5, 6):
raise ValueError('Invalid cron expression. '
'Must contain 5 or 6 fields: '
'(minute hour day-of-month month day-of-week [year])')
if SERVICE_PROVIDER.environment_service.is_docker():
# on-prem supports only 5 fields and does not support "?"
raw = ['*' if i == '?' else i for i in raw]
if len(raw) == 6:
raw.pop()
else:
# saas supports only 6 fields
if len(raw) == 5:
raw.append('*')
return f'cron({" ".join(raw)})'
class ScheduledJobPostModel(BaseModel):
schedule: str
tenant_name: str = Field(None)
name: str = Field(None)
target_rulesets: set[str] = Field(default_factory=set, alias='rulesets')
target_regions: set[str] = Field(default_factory=set, alias='regions')
license_key: str = Field(
None,
description='License to exhaust for this job. Will be resolved '
'automatically unless an ambiguous occurs'
)
@field_validator('schedule')
@classmethod
def _(cls, schedule: str) -> str:
return sanitize_schedule(schedule)
@field_validator('target_rulesets', mode='after')
@classmethod
def validate_rulesets(cls, value: set[str]) -> set[str]:
"""
Removes license keys and validates
:param value:
:return:
"""
rulesets = set()
for item in value:
i = RulesetName(item) # raises ValueError
rulesets.add(RulesetName(i.name, i.version, None).to_str())
return rulesets
def iter_rulesets(self) -> Generator[RulesetName, None, None]:
yield from map(RulesetName, self.target_rulesets)
class ScheduledJobGetModel(BaseModel):
"""
GET
"""
tenant_name: str = Field(None)
class ScheduledJobPatchModel(BaseModel):
schedule: str = Field(None)
enabled: bool = Field(None)
@model_validator(mode='after')
def at_least_one_given(self) -> Self:
if not self.schedule and self.enabled is None:
raise ValueError('Provide attributes to update')
return self
@field_validator('schedule')
@classmethod
def _(cls, schedule: str) -> str:
return sanitize_schedule(schedule)
class EventPostModel(BaseModel):
version: Annotated[
Literal['1.0.0'],
WithJsonSchema({'type': 'string', 'title': 'Event type version',
'default': '1.0.0'})
] = '1.0.0'
vendor: Literal['AWS', 'MAESTRO']
events: list[dict]
def validate_password(password: str) -> list[str]:
errors = []
upper = any(char.isupper() for char in password)
lower = any(char.islower() for char in password)
numeric = any(char.isdigit() for char in password)
symbol = any(not char.isalnum() for char in password)
if not upper:
errors.append('must have uppercase characters')
if not numeric:
errors.append('must have numeric characters')
if not lower:
errors.append('must have lowercase characters')
if not symbol:
errors.append('must have at least one symbol')
if len(password) < 8:
errors.append('valid min length for password: 8')
return errors
class UserPasswordResetPostModel(BaseModel):
password: str
username: str = Field(None)
# validators
@field_validator('password')
@classmethod
def _(cls, password: str) -> str:
if errors := validate_password(password):
raise ValueError(', '.join(errors))
return password
class SignInPostModel(PydanticBaseModel):
username: str
password: str
class RefreshPostModel(PydanticBaseModel):
refresh_token: str
class MailSettingGetModel(BaseModel):
"""
GET
"""
disclose: bool = False
class MailSettingPostModel(BaseModel):
username: str
password: str
password_alias: str
port: int
host: str
max_emails: int = 1
default_sender: str
use_tls: bool = False
class ReportsSendingSettingPostModel(BaseModel):
enable: bool = True
class LicenseManagerConfigSettingPostModel(BaseModel):
host: str
port: int = Field(None)
protocol: Annotated[
Literal['HTTP', 'HTTPS', 'http', 'https'],
StringConstraints(to_upper=True)
] = Field(None)
stage: str = Field(None)
class LicenseManagerClientSettingPostModel(BaseModel):
key_id: str
algorithm: Annotated[
Literal['ECC:p521_DSS_SHA:256'],
WithJsonSchema({'type': 'string', 'title': 'LM algorithm',
'default': 'ECC:p521_DSS_SHA:256'})
] = 'ECC:p521_DSS_SHA:256'
private_key: str
b64_encoded: bool
@model_validator(mode='after')
def check_properly_encoded_key(self) -> Self:
if not self.b64_encoded:
return self
try:
self.private_key = standard_b64decode(self.private_key).decode()
except (TypeError, BaseException):
raise ValueError(
'\'private_key\' must be a safe to decode'
' base64-string.'
)
return self
class LicenseManagerClientSettingDeleteModel(BaseModel):
key_id: str
class BatchResultsQueryModel(BasePaginationModel):
tenant_name: str = Field(None)
start: datetime = Field(None)
end: datetime = Field(None)
# reports
class JobFindingsReportGetModel(BaseModel):
job_type: JobType = JobType.MANUAL
href: bool = False
obfuscated: bool = False
class TenantJobsFindingsReportGetModel(TimeRangedMixin, BaseModel):
start_iso: datetime | date = Field(None, alias='from')
end_iso: datetime | date = Field(None, alias='to')
job_type: JobType = Field(None)
href: bool = False
obfuscated: bool = False
class JobDetailsReportGetModel(BaseModel):
job_type: JobType = JobType.MANUAL
href: bool = False
obfuscated: bool = False
class TenantJobsDetailsReportGetModel(TimeRangedMixin, BaseModel):
start_iso: datetime | date = Field(None, alias='from')
end_iso: datetime | date = Field(None, alias='to')
job_type: JobType = Field(None)
href: bool = False
obfuscated: bool = False
class JobDigestReportGetModel(BaseModel):
job_type: JobType = JobType.MANUAL
class TenantJobsDigestsReportGetModel(TimeRangedMixin, BaseModel):
start_iso: datetime | date = Field(None, alias='from')
end_iso: datetime | date = Field(None, alias='to')
job_type: JobType = Field(None)
class JobComplianceReportGetModel(BaseModel):
job_type: JobType = JobType.MANUAL
format: ReportFormat = ReportFormat.JSON
href: bool = False
class TenantComplianceReportGetModel(BaseModel):
format: ReportFormat = ReportFormat.JSON
href: bool = False
class JobErrorReportGetModel(BaseModel):
job_type: JobType = JobType.MANUAL
href: bool = False
format: ReportFormat = ReportFormat.JSON
error_type: PolicyErrorType = Field(None)
class JobRuleReportGetModel(BaseModel):
job_type: JobType = JobType.MANUAL
href: bool = False
format: ReportFormat = ReportFormat.JSON
class TenantRuleReportGetModel(TimeRangedMixin, BaseModel):
start_iso: datetime | date = Field(None, alias='from')
end_iso: datetime | date = Field(None, alias='to')
job_type: JobType = Field(None)
class ReportPushByJobIdModel(BaseModel):
"""
/reports/push/dojo/{job_id}/
/reports/push/security-hub/{job_id}/
/reports/push/chronicle/{job_id}/
"""
type: JobType = JobType.MANUAL
class ReportPushMultipleModel(TimeRangedMixin, BaseModel):
"""
/reports/push/dojo
/reports/push/security-hub
"""
tenant_name: str
start_iso: datetime | date = Field(None, alias='from')
end_iso: datetime | date = Field(None, alias='to')
type: JobType = Field(None)
class EventDrivenRulesetGetModel(BaseModel):
cloud: RuleDomain = Field(None)
get_rules: bool = False
class EventDrivenRulesetPostModel(BaseModel):
# name: str
cloud: Literal['AWS', 'AZURE', 'GOOGLE', 'GCP', 'KUBERNETES']
version: str = Field(
None,
description='Ruleset version. If not specified, '
'will be generated automatically based on github '
'release of rules or based on the previous ruleset version'
)
rule_source_id: str = Field(
None,
description='Id of rule source object to get rules from. '
'If the type of that source is GITHUB_RELEASE, '
'the version from release tag will be used'
)
@field_validator('cloud', mode='after')
@classmethod
def validate_cloud(cls, cloud: str) -> str:
if cloud == 'GOOGLE':
cloud = 'GCP'
return cloud
@field_validator('version', mode='after')
@classmethod
def validate_version(cls, version: str | None) -> str | None:
if not version:
return version
_ = Version(version) # raise ValueError
return version
class EventDrivenRulesetDeleteModel(BaseModel):
# name: str
cloud: RuleDomain
version: str = Field(
description='Specific version to remove. * can be specified to '
'remove all the versions of a specific ruleset'
)
@field_validator('version', mode='after')
@classmethod
def validate_version(cls, version: str) -> str:
version = version.strip()
if version != '*':
_ = Version(version)
return version
@property
def is_all_versions(self) -> bool:
return self.version == '*'
class ProjectGetReportModel(BaseModel):
tenant_display_names: set[str]
types: set[Literal['OVERVIEW', 'RESOURCES', 'COMPLIANCE', 'ATTACK_VECTOR', 'FINOPS']] = Field(default_factory=set)
receivers: set[str] = Field(default_factory=set)
attempt: SkipJsonSchema[int] = 0
execution_job_id: SkipJsonSchema[str] = Field(None)
class OperationalGetReportModel(BaseModel):
tenant_names: set[str]
types: set[Literal['OVERVIEW', 'RESOURCES', 'COMPLIANCE', 'RULE', 'ATTACK_VECTOR', 'FINOPS', 'KUBERNETES']] = Field(default_factory=set)
receivers: set[str] = Field(default_factory=set)
attempt: SkipJsonSchema[int] = 0
execution_job_id: SkipJsonSchema[str] = Field(None)
@property
def new_types(self) -> tuple[ReportType, ...]:
"""
Converts to new types
"""
old_new = {
'OVERVIEW': ReportType.OPERATIONAL_OVERVIEW,
'RESOURCES': ReportType.OPERATIONAL_RESOURCES,
# 'COMPLIANCE': ReportType.OPERATIONAL_COMPLIANCE,
'RULE': ReportType.OPERATIONAL_RULES,
'FINOPS': ReportType.OPERATIONAL_FINOPS
}
if not self.types:
return tuple(old_new.values())
res = []
for t in self.types:
if t in old_new:
res.append(old_new[t])
return tuple(res)
class DepartmentGetReportModel(BaseModel):
types: set[Literal['TOP_RESOURCES_BY_CLOUD', 'TOP_TENANTS_RESOURCES', 'TOP_TENANTS_COMPLIANCE', 'TOP_COMPLIANCE_BY_CLOUD', 'TOP_TENANTS_ATTACKS', 'TOP_ATTACK_BY_CLOUD']] = Field(default_factory=set)
attempt: SkipJsonSchema[int] = 0
execution_job_id: SkipJsonSchema[str] = Field(None)
class CLevelGetReportModel(BaseModel):
receivers: set[str] = Field(default_factory=set)
types: set[Literal['OVERVIEW', 'COMPLIANCE', 'ATTACK_VECTOR']] = Field(default_factory=set)
attempt: SkipJsonSchema[int] = 0
execution_job_id: SkipJsonSchema[str] = Field(None)
@property
def new_types(self) -> tuple[ReportType, ...]:
"""
Converts to new types
"""
old_new = {
'OVERVIEW': ReportType.C_LEVEL_OVERVIEW,
# 'COMPLIANCE': ReportType.C_LEVEL_COMPLIANCE,
}
if not self.types:
return tuple(old_new.values())
res = []
for t in self.types:
if t in old_new:
res.append(old_new[t])
return tuple(res)
class HealthCheckQueryModel(BaseModel):
status: HealthCheckStatus = Field(None)
class RabbitMQPostModel(BaseModel):
maestro_user: str
rabbit_exchange: str = Field(None)
request_queue: str
response_queue: str
sdk_access_key: str
connection_url: AmqpDsn
sdk_secret_key: str
class RabbitMQGetModel(BaseModel):
pass
class RabbitMQDeleteModel(BaseModel):
pass
class RawReportGetModel(BaseModel):
obfuscated: bool = False
meta: bool = False
class ResourcesReportGetModel(BaseModel):
model_config = ConfigDict(extra='allow')
resource_type: Annotated[
str, StringConstraints(to_lower=True, strip_whitespace=True)] = Field(
None)
region: AllRegionsWithGlobal = Field(None)
full: bool = False
obfuscated: bool = False
exact_match: bool = True
search_by_all: bool = False
format: ReportFormat = ReportFormat.JSON
href: bool = False
@field_validator('region', mode='before')
def _(cls, v: str) -> str:
if v and isinstance(v, str):
v = v.lower()
return v
@property
def extras(self) -> dict:
"""
These attributes will be used to look for resources
"""
return self.__pydantic_extra__
@model_validator(mode='after')
def root(self) -> Self:
if self.search_by_all and not self.__pydantic_extra__:
raise ValueError('If search_by_all, an least one query to search '
'by must be provided')
if self.obfuscated and self.format == ReportFormat.JSON and not self.href:
raise ValueError('Obfuscation is currently not supported for '
'raw json report')
return self
# all the other attributes to search by can be provided as well.
# They are not declared
class PlatformK8sResourcesReportGetModel(BaseModel):
model_config = ConfigDict(extra='allow')
resource_type: Annotated[
str, StringConstraints(to_lower=True, strip_whitespace=True)] = Field(
None)
full: bool = False
obfuscated: bool = False
exact_match: bool = True
search_by_all: bool = False
format: ReportFormat = ReportFormat.JSON
href: bool = False
@property
def extras(self) -> dict:
"""
These attributes will be used to look for resources
"""
return self.__pydantic_extra__
@model_validator(mode='after')
def root(self) -> Self:
if self.search_by_all and not self.__pydantic_extra__:
raise ValueError('If search_by_all, an least one query to search '
'by must be provided')
if self.obfuscated and self.format == ReportFormat.JSON and not self.href:
raise ValueError('Obfuscation is currently not supported for '
'raw json report')
return self
# all the other attributes to search by can be provided as well.
# They are not declared
class ResourceReportJobsGetModel(TimeRangedMixin, BaseModel):
model_config = ConfigDict(extra='allow')
start_iso: datetime | date = Field(None, alias='from')
end_iso: datetime | date = Field(None, alias='to')
job_type: JobType = JobType.MANUAL
resource_type: Annotated[
str, StringConstraints(to_lower=True, strip_whitespace=True)] = None
region: AllRegionsWithGlobal = Field(None)
full: bool = False
exact_match: bool = True
search_by_all: bool = False
@field_validator('region', mode='before')
def _(cls, v: str) -> str:
if v and isinstance(v, str):
v = v.lower()
return v
@property
def extras(self) -> dict:
"""
These attributes will be used to look for resources
"""
return self.__pydantic_extra__
@model_validator(mode='after')
def root(self) -> Self:
if self.search_by_all and not self.__pydantic_extra__:
raise ValueError('If search_by_all, an least one query to search '
'by must be provided')
return self
class ResourceReportJobGetModel(BaseModel):
model_config = ConfigDict(extra='allow')
job_type: JobType = JobType.MANUAL
resource_type: Annotated[
str, StringConstraints(to_lower=True, strip_whitespace=True)] = Field(
None)
region: AllRegionsWithGlobal = Field(None)
full: bool = False
obfuscated: bool = False
exact_match: bool = True
search_by_all: bool = False
href: bool = False
@field_validator('region', mode='before')
def _(cls, v: str) -> str:
if v and isinstance(v, str):
v = v.lower()
return v
@property
def extras(self) -> dict:
"""
These attributes will be used to look for resources
"""
return self.__pydantic_extra__
@model_validator(mode='after')
def root(self) -> Self:
if self.search_by_all and not self.__pydantic_extra__:
raise ValueError('If search_by_all, an least one query to search '
'by must be provided')
if self.obfuscated and not self.href:
raise ValueError('Currently obfuscation is supported only if href '
'is true')
return self
class PlatformK8SPostModel(BaseModel):
tenant_name: Annotated[
str, StringConstraints(to_upper=True, strip_whitespace=True)]
name: str
region: AllRegions = Field(None)
type: PlatformType
description: str
endpoint: HttpUrl = Field(None)
certificate_authority: str = Field(None) # base64 encoded
token: str = Field(None)
@model_validator(mode='after')
def root(self) -> Self:
if (self.type != PlatformType.SELF_MANAGED
and not self.region):
raise ValueError('region is required if platform is cloud managed')
if self.type != PlatformType.EKS and not self.endpoint:
raise ValueError('endpoint must be '
'specified if type is not EKS')
return self
class PlatformK8sQueryModel(BaseModel):
tenant_name: str = Field(None)
class K8sJobPostModel(BaseModel):
"""
K8s platform job
"""
platform_id: str
target_rulesets: set[str] = Field(default_factory=set, alias='rulesets')
token: str = Field(None) # temp jwt token
timeout_minutes: float = Field(
None,
description='Job timeout in minutes. This timeout is soft '
'meaning that when the desired number of minutes have '
'passed job termination will be triggered'
)
license_key: str = Field(
None,
description='License to exhaust for this job. Will be resolved '
'automatically unless an ambiguous occurs'
)
@field_validator('target_rulesets', mode='after')
@classmethod
def validate_rulesets(cls, value: set[str]) -> set[str]:
"""
Removes license keys and validates
:param value:
:return:
"""
rulesets = set()
for item in value:
i = RulesetName(item) # raises ValueError
rulesets.add(RulesetName(i.name, i.version, None).to_str())
return rulesets
def iter_rulesets(self) -> Generator[RulesetName, None, None]:
yield from map(RulesetName, self.target_rulesets)
class ReportStatusGetModel(BaseModel):
"""
GET
"""
job_id: str
complete: bool = False
class MetricsStatusGetModel(TimeRangedMixin, BaseModel):
start_iso: datetime | date = Field(None, alias='from')
end_iso: datetime | date = Field(None, alias='to')
@classmethod
def max_range(cls) -> timedelta:
return timedelta(days=365)
@classmethod
def skip_validation_if_no_input(cls) -> bool:
return True
class LicensePostModel(BaseModel):
tenant_license_key: str = Field(alias='license_key')
class LicenseActivationPutModel(BaseModel):
tenant_names: set[str] = Field(default_factory=set)
all_tenants: bool = False
clouds: set[Literal['AWS', 'AZURE', 'GOOGLE', 'KUBERNETES']] = Field(default_factory=set)
exclude_tenants: set[str] = Field(default_factory=set)
@model_validator(mode='after')
def _(self) -> Self:
if self.tenant_names and any((self.all_tenants, self.clouds,
self.exclude_tenants)):
raise ValueError('do not provide all_tenants, clouds or '
'exclude_tenants if specific '
'tenant names are provided')
if not self.all_tenants and not self.tenant_names:
raise ValueError('either all_tenants or specific tenant names '
'must be given')
if (self.clouds or self.exclude_tenants) and not self.all_tenants:
raise ValueError('set all tenants to true if you provide clouds '
'or excluded')
return self
class LicenseActivationPatchModel(BaseModel):
add_tenants: set[str] = Field(default_factory=set)
remove_tenants: set[str] = Field(default_factory=set)
@model_validator(mode='after')
def _(self) -> Self:
if not self.add_tenants and not self.remove_tenants:
raise ValueError('provide either add_tenants or remove_tenants')
return self
class DefectDojoPostModel(BaseModel):
url: AnyUrl # = Field(examples=['http://127.0.0.1:8080/api/v2']) # api gw models does not support examples
api_key: str
description: str
class ChroniclePostModel(BaseModel):
endpoint: HttpUrl # https://malachiteingestion-pa.googleapis.com/v2
description: str
credentials_application_id: str # application with google creds
instance_customer_id: str
@property
def baseurl(self) -> str:
return self.endpoint.scheme + '://' + self.endpoint.host
class ChronicleActivationPutModel(BaseModel):
tenant_names: set[str] = Field(default_factory=set)
all_tenants: bool = False
clouds: set[Literal['AWS', 'AZURE', 'GOOGLE']] = Field(default_factory=set)
exclude_tenants: set[str] = Field(default_factory=set)
send_after_job: bool = Field(
False,
description='Whether to send the results to dojo after each scan'
)
convert_to: ChronicleConverterType = Field(
ChronicleConverterType.EVENTS,
description='How to convert Rule Engine data '
'before sending to Chronicle'
)
@model_validator(mode='after')
def _(self) -> Self:
if self.tenant_names and any((self.all_tenants, self.clouds,
self.exclude_tenants)):
raise ValueError('do not provide all_tenants, clouds or '
'exclude_tenants if specific '
'tenant names are provided')
if not self.all_tenants and not self.tenant_names:
raise ValueError('either all_tenants or specific tenant names '
'must be given')
if (self.clouds or self.exclude_tenants) and not self.all_tenants:
raise ValueError('set all tenants to true if you provide clouds '
'or excluded')
return self
class DefectDojoQueryModel(BaseModel):
pass
class DefectDojoActivationPutModel(BaseModel):
tenant_names: set[str] = Field(default_factory=set)
all_tenants: bool = False
clouds: set[Literal['AWS', 'AZURE', 'GOOGLE']] = Field(default_factory=set)
exclude_tenants: set[str] = Field(default_factory=set)
scan_type: Literal['Generic Findings Import', 'Cloud Custodian Scan'] = Field('Generic Findings Import', description='Defect dojo scan type')
product_type: str = Field('Rule Engine',
description='Defect dojo product type name')
product: str = Field('{tenant_name}',
description='Defect dojo product name')
engagement: str = Field('Rule-Engine Main',
description='Defect dojo engagement name')
test: str = Field('{job_id}', description='Defect dojo test')
send_after_job: bool = Field(
False,
description='Whether to send the results to dojo after each scan'
)
attachment: Literal['json', 'xlsx', 'csv'] = Field(None)
@model_validator(mode='after')
def _(self) -> Self:
if self.tenant_names and any((self.all_tenants, self.clouds,
self.exclude_tenants)):
raise ValueError('do not provide all_tenants, clouds or '
'exclude_tenants if specific '
'tenant names are provided')
if not self.all_tenants and not self.tenant_names:
raise ValueError('either all_tenants or specific tenant names '
'must be given')
if (self.clouds or self.exclude_tenants) and not self.all_tenants:
raise ValueError('set all tenants to true if you provide clouds '
'or excluded')
return self
class SelfIntegrationPutModel(BaseModel):
description: str = 'Custodian access application'
username: str
password: str
auto_resolve_access: bool = False
url: AnyUrl = Field(None) # full link: https://host.com:port/hello
results_storage: str = Field(None)
tenant_names: set[str] = Field(default_factory=set)
all_tenants: bool = False
clouds: set[Literal['AWS', 'AZURE', 'GOOGLE']] = Field(default_factory=set)
exclude_tenants: set[str] = Field(default_factory=set)
@model_validator(mode='after')
def _(self) -> Self:
if self.tenant_names and any((self.all_tenants, self.clouds,
self.exclude_tenants)):
raise ValueError('do not provide all_tenants, clouds or '
'exclude_tenants if specific '
'tenant names are provided')
if not self.all_tenants and not self.tenant_names:
raise ValueError('either all_tenants or specific tenant names '
'must be given')
if (self.clouds or self.exclude_tenants) and not self.all_tenants:
raise ValueError('set all tenants to true if you provide clouds '
'or excluded')
if not self.auto_resolve_access and not self.url:
raise ValueError('url must be given in case '
'auto_resolve_access is not True')
return self
class SelfIntegrationPatchModel(BaseModel):
add_tenants: set[str] = Field(default_factory=set)
remove_tenants: set[str] = Field(default_factory=set)
@model_validator(mode='after')
def _(self) -> Self:
if not self.add_tenants and not self.remove_tenants:
raise ValueError('provide either add_tenants or remove_tenants')
return self
class TenantExcludedRulesPutModel(BaseModel):
rules: set[str]
class CustomerExcludedRulesPutModel(BaseModel):
rules: set[str]
class CredentialsQueryModel(BasePaginationModel):
cloud: Literal['AWS', 'AZURE', 'GOOGLE']
class CredentialsBindModel(BaseModel):
tenant_names: set[str] = Field(default_factory=set)
all_tenants: bool = False
exclude_tenants: set[str] = Field(default_factory=set)
@model_validator(mode='after')
def _(self) -> Self:
if self.tenant_names and any((self.all_tenants, self.exclude_tenants)):
raise ValueError('do not provide all_tenants or '
'exclude_tenants if specific '
'tenant names are provided')
if not self.all_tenants and not self.tenant_names:
raise ValueError('either all_tenants or specific tenant names '
'must be given')
if self.exclude_tenants and not self.all_tenants:
raise ValueError('set all tenants to true if you provide '
'excluded')
return self
class UserPatchModel(BaseModel):
"""
System admin endpoint
"""
role_name: str = Field(None)
password: str = Field(None)
@model_validator(mode='after')
def at_least_one(self) -> Self:
if not any((self.role_name, self.password)):
raise ValueError('provide at least one attribute to update')
return self
@field_validator('password')
@classmethod
def _(cls, password: str) -> str:
if errors := validate_password(password):
raise ValueError(', '.join(errors))
return password
class UserPostModel(BaseModel):
username: str
role_name: str
password: str
@field_validator('username', mode='after')
@classmethod
def check_reserved(cls, username: str) -> str:
if username in ('whoami', 'reset-password'):
raise ValueError('Such username cannot be used.')
return username
@field_validator('password')
@classmethod
def _(cls, password: str) -> str:
if errors := validate_password(password):
raise ValueError(', '.join(errors))
return password
class UserResetPasswordModel(BaseModel):
new_password: str
@field_validator('new_password')
@classmethod
def _(cls, password: str) -> str:
if errors := validate_password(password):
raise ValueError(', '.join(errors))
return password
class SignUpModel(PydanticBaseModel):
username: str
password: str
customer_name: str
customer_display_name: str
customer_admins: set[str] = Field(default_factory=set)
@field_validator('password')
@classmethod
def _(cls, password: str) -> str:
if errors := validate_password(password):
raise ValueError(', '.join(errors))
return password