src/validators/swagger_response_models.py (473 lines of code) (raw):
from datetime import datetime
from typing import Literal
from pydantic import BaseModel
from typing_extensions import NotRequired, TypedDict
from helpers.constants import (
HealthCheckStatus,
JobState,
JobType,
PlatformType,
PolicyErrorType,
ReportFormat,
RuleSourceType,
RuleDomain,
RuleSourceSyncingStatus,
PolicyEffect
)
class BaseActivation(TypedDict):
activated_for_all: bool
within_clouds: NotRequired[list[str]]
excluding: list[str]
activated_for: NotRequired[list[str]]
class Customer(TypedDict):
name: str
display_name: str
admins: list[str]
class Tenant(TypedDict):
account_id: str
activation_date: datetime
customer_name: str
is_active: bool
name: str
regions: list[str]
class Rule(TypedDict):
description: str
name: str
cloud: RuleDomain
branch: str
customer: str
project: str
resource: str
rule_source_id: str
class Job(TypedDict):
created_at: NotRequired[datetime]
customer_name: str
id: str
regions: list[str]
rulesets: list[str]
started_at: NotRequired[datetime]
status: JobState
stopped_at: NotRequired[datetime]
submitted_at: str
tenant_name: str
scheduled_rule_name: NotRequired[str]
class Policy(TypedDict):
customer: str
name: str
permissions: list[str]
effect: PolicyEffect
description: str | None
tenants: list[str]
class Role(TypedDict):
customer: str
name: str
expiration: datetime
policies: list[str]
description: str | None
class Ruleset(TypedDict):
active: bool
cloud: RuleDomain
customer: str
last_update_time: datetime
license_keys: list[str]
license_manager_id: NotRequired[str]
licensed: bool
name: str
rules_number: int
rules: NotRequired[list[str]]
version: str
class RuleSourceLatestSync(TypedDict):
current_status: RuleSourceSyncingStatus
sync_date: datetime
release_tag: NotRequired[str]
class RuleSource(TypedDict):
id: str
customer: str
git_project_id: str
git_url: str
git_ref: str
git_rules_prefix: str
description: str
has_secret: bool
latest_sync: NotRequired[RuleSourceLatestSync]
type: RuleSourceType
class RuleMetaUpdate(TypedDict):
customer: str
git_project_id: str
rule_source_id: str
status: str
class ScheduledJob(TypedDict):
name: str
customer_name: str
tenant_name: str
creation_date: datetime
enabled: bool
last_execution_time: NotRequired[datetime]
schedule: str
scan_regions: list[str]
scan_rulesets: list[str]
class HealthCheck(TypedDict):
id: str
status: HealthCheckStatus
details: dict
remediation: NotRequired[str]
impact: NotRequired[str]
class LicenseManagerConfig(TypedDict):
host: str
port: int
protocol: Literal['HTTP', 'HTTPS']
stage: str
class LicenseManagerClient(TypedDict):
algorithm: str
b64_encoded: bool
format: str
key_id: str
public_key: str
class RabbitMQ(TypedDict):
maestro_user: str
rabbit_exchange: NotRequired[str]
request_queue: str
response_queue: str
sdk_access_key: str
customer: str
class ReportStatus(TypedDict):
id: str
triggered_at: str
attempt: int
customer_name: str
level: str
status: str
reason: NotRequired[str]
tenant: NotRequired[str]
type: str
user: NotRequired[str]
class K8sPlatform(TypedDict):
customer: str
description: str
id: str
name: str
region: NotRequired[str]
tenant_name: str
type: PlatformType
class BatchResult(TypedDict):
cloud_identifier: str
customer_name: str
id: str
status: JobState
stopped_at: str
submitted_at: str
tenant_name: str
class Event(TypedDict):
"""
202 POST /event
"""
received: int
saved: int
class MetricsStatus(TypedDict):
started_at: datetime
state: str
class LicenseAllowance(TypedDict):
balance_exhaustion_model: Literal['collective', 'independent']
job_balance: int
time_range: Literal['DAY', 'WEEK', 'MONTH']
class LicenseEventDriven(TypedDict):
active: bool
class License(TypedDict):
allowance: LicenseAllowance
event_driven: LicenseEventDriven
expiration: datetime
latest_sync: datetime
license_key: str
ruleset_ids: list[str]
class MailSetting(TypedDict):
username: str
password: str
default_sender: str
host: str
port: str
max_emails: int
use_tls: bool
class Credentials(TypedDict):
id: str
type: Literal[
'AWS_CREDENTIALS',
'AWS_ROLE',
'AZURE_CREDENTIALS',
'AZURE_CERTIFICATE',
'GCP_SERVICE_ACCOUNT',
'GCP_COMPUTE_ACCOUNT'
]
description: str
has_secret: bool
credentials: dict
# reports
class BaseReportJob(TypedDict):
format: ReportFormat
url: NotRequired[str]
dictionary_url: NotRequired[str]
obfuscated: bool
content: NotRequired[dict]
job_id: NotRequired[str]
job_type: NotRequired[JobType]
tenant_name: str
customer_name: str
class BaseReportEntity(TypedDict):
format: ReportFormat
url: NotRequired[str]
dictionary_url: NotRequired[str]
obfuscated: bool
content: NotRequired[dict]
platform_id: NotRequired[str]
tenant_name: NotRequired[str]
customer_name: str
class ErrorReportItem(TypedDict):
error_type: PolicyErrorType
policy: str
reason: str
region: str
class RulesReportItem(TypedDict):
api_calls: dict
execution_time: float
failed_resources: int
policy: str
region: str
scanned_resources: int
succeeded: bool
class AverageRulesReportItem(TypedDict):
average_exec: float
average_resources_failed: int
average_resources_scanned: int
failed_invocations: int
invocations: int
max_exec: float
min_exec: float
policy: str
region: str
resources_failed: int
resources_scanned: int
succeeded_invocations: int
total_api_calls: dict
total_exec: float
class ViolatedRule(TypedDict):
name: str
description: str
severity: str
remediation: NotRequired[str]
article: NotRequired[str]
impact: NotRequired[str]
class ResourcesReportItem(TypedDict):
account_id: NotRequired[str]
platform_id: NotRequired[str]
job_id: NotRequired[str]
type: NotRequired[JobType]
data: dict
last_found: float
matched_by: dict
region: str
resource_type: str
violated_rules: ViolatedRule
class DefectDojo(TypedDict):
id: str
description: str
host: str
port: int
stage: str
protocol: Literal['HTTP', 'HTTPS']
class Chronicle(TypedDict):
id: str
description: str
endpoint: str
credentials_application_id: str
instance_customer_id: str
customer: str
class DefectDojoActivation(BaseActivation):
scan_type: Literal['Generic Findings Import', 'Cloud Custodian Scan']
product_type: str
product: str
engagement: str
test: str
send_after_job: bool
attachment: Literal['json', 'xlsx', 'csv'] | None
class ChronicleActivation(BaseActivation):
send_after_job: bool
class DojoPushResult(TypedDict):
job_id: str
scan_type: Literal['Generic Findings Import', 'Cloud Custodian Scan']
product_type_name: str
product_name: str
engagement_name: str
test_title: str
tenant_name: str
dojo_integration_id: str
success: bool
attachment: Literal['json', 'xlsx', 'csv'] | None
platform_id: NotRequired[str]
error: NotRequired[str]
class ChroniclePushResult(TypedDict):
job_id: str
tenant_name: str
chronicle_integration_id: str
success: bool
platform_id: NotRequired[str]
error: NotRequired[str]
class SelfIntegration(BaseActivation):
customer_name: str
description: str
username: str
host: str
stage: str
port: int
protocol: Literal['HTTP', 'HTTPS']
results_storage: NotRequired[str]
class TenantExcludedRules(TypedDict):
tenant_name: str
rules: list[str]
class CustomerExcludedRules(TypedDict):
customer_name: str
rules: list[str]
class User(TypedDict):
username: str
customer: str | None
role: str | None
latest_login: datetime | None
created_at: datetime | None
class RawReportItem(TypedDict):
customer_name: str
tenant_name: str
obfuscated: bool
url: str
dictionary_url: NotRequired[str]
meta_url: NotRequired[str]
# Here real response models
class RawReportModel(BaseModel):
data: RawReportItem
class MessageModel(BaseModel):
message: str
class ErrorData(TypedDict):
location: list[str]
message: str
class ErrorsModel(BaseModel):
"""
400 Validation error
"""
errors: list[ErrorData]
class MultipleJobsModel(BaseModel):
"""
200 GET /jobs
"""
items: list[Job]
next_token: str | None
class SingleJobModel(BaseModel):
"""
201 POST /jobs
201 POST /jobs/k8s
201 POST /jobs/standard
200 GET /jobs/{job_id}
"""
data: Job
class MultipleScheduledJobsModel(BaseModel):
"""
200 GET /jobs
"""
items: list[ScheduledJob]
class SingleScheduledJobModel(BaseModel):
"""
201 POST /jobs
201 POST /jobs/k8s
201 POST /jobs/standard
200 GET /jobs/{job_id}
"""
data: ScheduledJob
class MultipleK8SPlatformsModel(BaseModel):
items: list[K8sPlatform]
class SingleK8SPlatformModel(BaseModel):
data: K8sPlatform
class MultipleBatchResultsModel(BaseModel):
items: list[BatchResult]
class SingleBatchResultModel(BaseModel):
data: BatchResult
class SignInModel(BaseModel):
access_token: str # actually it's Congito's id_token
refresh_token: str
expires_in: int
class MultipleCustomersModel(BaseModel):
items: list[Customer]
class MultipleTenantsModel(BaseModel):
items: list[Tenant]
class SingleTenantsModel(BaseModel):
data: Tenant
class EventModel(BaseModel):
data: Event
class MultipleHealthChecksModel(BaseModel):
items: list[HealthCheck]
class SingleHealthCheckModel(BaseModel):
data: HealthCheck
class SingleRabbitMQModel(BaseModel):
data: RabbitMQ
class MultipleRulesModel(BaseModel):
items: list[Rule]
next_token: str | None
class MultipleRuleMetaUpdateModel(BaseModel):
items: list[RuleMetaUpdate]
class MultipleMetricsStatusesModel(BaseModel):
items: list[MetricsStatus]
class SingleRulesetModel(BaseModel):
data: Ruleset
class MultipleRulesetsModel(BaseModel):
items: list[Ruleset]
class SingleRuleSourceModel(BaseModel):
data: RuleSource
class MultipleRuleSourceModel(BaseModel):
items: list[RuleSource]
class SinglePolicyModel(BaseModel):
data: Policy
class MultiplePoliciesModel(BaseModel):
items: list[Policy]
class SingleRoleModel(BaseModel):
data: Role
class MultipleRoleModel(BaseModel):
items: list[Role]
class SingleLicenseModel(BaseModel):
data: License
class MultipleLicensesModel(BaseModel):
items: list[License]
class SingleMailSettingModel(BaseModel):
data: MailSetting
class SingleLMClientModel(BaseModel):
data: LicenseManagerClient
class SingleLMConfigModel(BaseModel):
data: LicenseManagerConfig
class SingleJobReportModel(BaseModel):
data: BaseReportJob
class MultipleJobReportModel(BaseModel):
items: list[BaseReportJob]
class SingleEntityReportModel(BaseModel):
data: BaseReportEntity
class ErrorsReportModel(BaseModel):
items: list[ErrorReportItem] | None
data: BaseReportJob | None # if href=true
class RulesReportModel(BaseModel):
items: list[RulesReportItem] | None
data: BaseReportJob | None # if href=true
class EntityRulesReportModel(BaseModel):
items: list[AverageRulesReportItem]
class MultipleReportStatusModel(BaseModel):
items: list[ReportStatus]
class EntityResourcesReportModel(BaseModel):
items: list[ResourcesReportItem] | None
data: BaseReportEntity | None
class JobResourcesReportModel(BaseModel):
items: list[ResourcesReportItem]
data: BaseReportJob | None
class SingleLicenseActivationModel(BaseModel):
data: BaseActivation
class SingleDefeDojoModel(BaseModel):
data: DefectDojo
class MultipleDefectDojoModel(BaseModel):
items: DefectDojo
class SingleChronicleModel(BaseModel):
data: Chronicle
class SingleChronicleActivationModel(BaseModel):
data: ChronicleActivation
class MultipleChronicleModel(BaseModel):
items: list[Chronicle]
class SingleDefectDojoActivation(BaseModel):
data: DefectDojoActivation
class SingleDefectDojoPushResult(BaseModel):
data: DojoPushResult
class MultipleDefectDojoPushResult(BaseModel):
items: list[DojoPushResult]
class SingleChroniclePushResult(BaseModel):
data: ChroniclePushResult
class SingleSelfIntegration(BaseModel):
data: SelfIntegration
class SingleTenantExcludedRules(BaseModel):
data: TenantExcludedRules
class SingleCustomerExcludedRules(BaseModel):
data: CustomerExcludedRules
class MultipleCredentialsModel(BaseModel):
items: list[Credentials]
next_token: str | None
class SingleCredentialsModel(BaseModel):
data: Credentials
class CredentialsActivationModel(BaseModel):
data: BaseActivation
class SingleUserModel(BaseModel):
data: User
class MultipleUsersModel(BaseModel):
items: list[User]
next_token: str | None