cli/srecli/service/constants.py (232 lines of code) (raw):

import operator import os from enum import Enum from typing import TypedDict # from http import HTTPMethod # python3.11+ class TenantModel(TypedDict): # aws account_id azure subscription_id or google project_id account_id: str activation_date: str cloud: str customer_name: str display_name: str is_active: bool name: str region: list[str] class HTTPMethod(str, Enum): HEAD = 'HEAD' GET = 'GET' POST = 'POST' PATCH = 'PATCH' DELETE = 'DELETE' PUT = 'PUT' class CustodianEndpoint(str, Enum): """ Should correspond to Api gateway models """ DOC = '/doc' JOBS = '/jobs' ROLES = '/roles' RULES = '/rules' USERS = '/users' EVENT = '/event' SIGNIN = '/signin' SIGNUP = '/signup' HEALTH = '/health' REFRESH = '/refresh' TENANTS = '/tenants' RULESETS = '/rulesets' LICENSES = '/licenses' POLICIES = '/policies' JOBS_K8S = '/jobs/k8s' CUSTOMERS = '/customers' HEALTH_ID = '/health/{id}' JOBS_JOB = '/jobs/{job_id}' DOC_PROXY = '/doc/{proxy+}' ROLES_NAME = '/roles/{name}' CREDENTIALS = '/credentials' RULE_SOURCES = '/rule-sources' USERS_WHOAMI = '/users/whoami' SCHEDULED_JOB = '/scheduled-job' PLATFORMS_K8S = '/platforms/k8s' SETTINGS_MAIL = '/settings/mail' JOBS_STANDARD = '/jobs/standard' BATCH_RESULTS = '/batch-results' REPORTS_RETRY = '/reports/retry' POLICIES_NAME = '/policies/{name}' METRICS_STATUS = '/metrics/status' REPORTS_CLEVEL = '/reports/clevel' METRICS_UPDATE = '/metrics/update' REPORTS_STATUS = '/reports/status' REPORTS_PROJECT = '/reports/project' USERS_USERNAME = '/users/{username}' CREDENTIALS_ID = '/credentials/{id}' RULE_SOURCES_ID = '/rule-sources/{id}' RULESETS_RELEASE = '/rulesets/release' ED_RULESETS = '/rulesets/event-driven' DOC_SWAGGER_JSON = '/doc/swagger.json' RULE_META_UPDATER = '/rules/update-meta' REPORTS_PUSH_DOJO = '/reports/push/dojo' CUSTOMERS_RABBITMQ = '/customers/rabbitmq' REPORTS_DIAGNOSTIC = '/reports/diagnostic' REPORTS_DEPARTMENT = '/reports/department' INTEGRATIONS_SELF = '/integrations/temp/sre' SCHEDULED_JOB_NAME = '/scheduled-job/{name}' REPORTS_OPERATIONAL = '/reports/operational' TENANTS_TENANT_NAME = '/tenants/{tenant_name}' USERS_RESET_PASSWORD = '/users/reset-password' REPORTS_EVENT_DRIVEN = '/reports/event_driven' RULE_SOURCES_ID_SYNC = '/rule-sources/{id}/sync' LICENSES_LICENSE_KEY = '/licenses/{license_key}' SETTINGS_SEND_REPORTS = '/settings/send_reports' PLATFORMS_K8S_ID = '/platforms/k8s/{platform_id}' INTEGRATIONS_CHRONICLE = '/integrations/chronicle' CREDENTIALS_ID_BINDING = '/credentials/{id}/binding' CUSTOMERS_EXCLUDED_RULES = '/customers/excluded-rules' INTEGRATIONS_DEFECT_DOJO = '/integrations/defect-dojo' REPORTS_PUSH_DOJO_JOB_ID = '/reports/push/dojo/{job_id}' INTEGRATIONS_CHRONICLE_ID = '/integrations/chronicle/{id}' REPORTS_RULES_JOBS_JOB_ID = '/reports/rules/jobs/{job_id}' BATCH_RESULTS_JOB_ID = '/batch-results/{batch_results_id}' LICENSES_LICENSE_KEY_SYNC = '/licenses/{license_key}/sync' REPORTS_ERRORS_JOBS_JOB_ID = '/reports/errors/jobs/{job_id}' INTEGRATIONS_DEFECT_DOJO_ID = '/integrations/defect-dojo/{id}' REPORTS_DIGESTS_JOBS_JOB_ID = '/reports/digests/jobs/{job_id}' REPORTS_DETAILS_JOBS_JOB_ID = '/reports/details/jobs/{job_id}' TENANTS_TENANT_NAME_REGIONS = '/tenants/{tenant_name}/regions' REPORTS_FINDINGS_JOBS_JOB_ID = '/reports/findings/jobs/{job_id}' REPORTS_PUSH_CHRONICLE_JOB_ID = '/reports/push/chronicle/{job_id}' REPORTS_RESOURCES_JOBS_JOB_ID = '/reports/resources/jobs/{job_id}' REPORTS_COMPLIANCE_JOBS_JOB_ID = '/reports/compliance/jobs/{job_id}' SETTINGS_LICENSE_MANAGER_CLIENT = '/settings/license-manager/client' SETTINGS_LICENSE_MANAGER_CONFIG = '/settings/license-manager/config' LICENSE_LICENSE_KEY_ACTIVATION = '/licenses/{license_key}/activation' REPORTS_RULES_TENANTS_TENANT_NAME = '/reports/rules/tenants/{tenant_name}' TENANTS_TENANT_NAME_EXCLUDED_RULES = '/tenants/{tenant_name}/excluded-rules' TENANTS_TENANT_NAME_ACTIVE_LICENSES = '/tenants/{tenant_name}/active-licenses' INTEGRATIONS_CHRONICLE_ID_ACTIVATION = '/integrations/chronicle/{id}/activation' REPORTS_COMPLIANCE_TENANTS_TENANT_NAME = '/reports/compliance/tenants/{tenant_name}' INTEGRATIONS_DEFECT_DOJO_ID_ACTIVATION = '/integrations/defect-dojo/{id}/activation' REPORTS_DETAILS_TENANTS_TENANT_NAME_JOBS = '/reports/details/tenants/{tenant_name}/jobs' REPORTS_DIGESTS_TENANTS_TENANT_NAME_JOBS = '/reports/digests/tenants/{tenant_name}/jobs' REPORTS_FINDINGS_TENANTS_TENANT_NAME_JOBS = '/reports/findings/tenants/{tenant_name}/jobs' REPORTS_PUSH_CHRONICLE_TENANTS_TENANT_NAME = '/reports/push/chronicle/tenants/{tenant_name}' REPORTS_RESOURCES_TENANTS_TENANT_NAME_JOBS = '/reports/resources/tenants/{tenant_name}/jobs' REPORTS_RAW_TENANTS_TENANT_NAME_STATE_LATEST = '/reports/raw/tenants/{tenant_name}/state/latest' REPORTS_RESOURCES_TENANTS_TENANT_NAME_LATEST = '/reports/resources/tenants/{tenant_name}/state/latest' REPORTS_RESOURCES_PLATFORMS_K8S_PLATFORM_ID_LATEST = '/reports/resources/platforms/k8s/{platform_id}/state/latest' LAMBDA_INVOCATION_TRACE_ID_HEADER = 'Lambda-Invocation-Trace-Id' SERVER_VERSION_HEADER = 'Accept-Version' class ParentScope(str, Enum): ALL = 'ALL' DISABLED = 'DISABLED' SPECIFIC = 'SPECIFIC' @classmethod def iter(cls): return map(operator.attrgetter('value'), cls) AWS, AZURE, GCP, GOOGLE = 'AWS', 'AZURE', 'GCP', 'GOOGLE' KUBERNETES = 'KUBERNETES' # This tuple represent clouds types of rules/rulesets, not tenants or jobs RULE_CLOUDS = (AWS, AZURE, GCP, KUBERNETES) DATA_ATTR = 'data' ITEMS_ATTR = 'items' ERRORS_ATTR = 'errors' MESSAGE_ATTR = 'message' NEXT_TOKEN_ATTR = 'next_token' _SENTINEL = object() class Env(str, Enum): default: str | None def __new__(cls, value: str, default: str | None = None): """ All environment variables and optionally their default values. Since envs always have string type the default value also should be of string type and then converted to the necessary type in code. There is no default value if not specified (default equal to None) """ obj = str.__new__(cls, value) obj._value_ = value obj.default = default return obj def get(self, default=_SENTINEL) -> str | None: if default is _SENTINEL: default = self.default if default is not None: default = str(default) return os.environ.get(self.value, default) def set(self, val: str | None): if val is None: os.environ.pop(self.value, None) else: os.environ[self.value] = str(val) LOG_LEVEL = 'SRE_CLI_LOG_LEVEL', 'INFO' LOGS_FOLDER = 'SRE_CLI_LOGS_FOLDER' # if not specified, logs are not written to file DEVELOPER_MODE = 'SRE_CLI_DEVELOPER_MODE' RESPONSE_FORMAT = 'SRE_CLI_RESPONSE_FORMAT', 'table' VERBOSE = 'SRE_CLI_VERBOSE' NO_PROMPT = 'SRE_CLI_NO_PROMPT' # aws AWS_ACCESS_KEY_ID = 'AWS_ACCESS_KEY_ID' AWS_SECRET_ACCESS_KEY = 'AWS_SECRET_ACCESS_KEY' AWS_SESSION_TOKEN = 'AWS_SESSION_TOKEN' AWS_DEFAULT_REGION = 'AWS_DEFAULT_REGION' AWS_REGION = 'AWS_REGION' AWS_DEFAULT_PROFILE = 'AWS_DEFAULT_PROFILE' AWS_PROFILE = 'AWS_PROFILE' # azure AZURE_TENANT_ID = 'AZURE_TENANT_ID' AZURE_SUBSCRIPTION_ID = 'AZURE_SUBSCRIPTION_ID' AZURE_CLIENT_ID = 'AZURE_CLIENT_ID' AZURE_CLIENT_SECRET = 'AZURE_CLIENT_SECRET' AZURE_KEYVAULT_CLIENT_ID = 'AZURE_KEYVAULT_CLIENT_ID' AZURE_KEYVAULT_SECRET = 'AZURE_KEYVAULT_SECRET' AZURE_CLIENT_CERTIFICATE_PATH = 'AZURE_CLIENT_CERTIFICATE_PATH' AZURE_CLIENT_CERTIFICATE_PASSWORD = 'AZURE_CLIENT_CERTIFICATE_PASSWORD' AZURE_ACCESS_TOKEN = 'AZURE_ACCESS_TOKEN' # google GOOGLE_APPLICATION_CREDENTIALS = 'GOOGLE_APPLICATION_CREDENTIALS' class JobType(str, Enum): MANUAL = 'manual' REACTIVE = 'reactive' DEFAULT_AWS_REGION = 'us-east-1' # responses NO_ITEMS_TO_DISPLAY_RESPONSE_MESSAGE = 'No items to display' NO_CONTENT_RESPONSE_MESSAGE = 'Request is successful. No content returned' # 204 CONFIG_FOLDER = '.c7n' CONTEXT_MODULAR_ADMIN_USERNAME = 'modular_admin_username' CONF_ACCESS_TOKEN = 'access_token' CONF_REFRESH_TOKEN = 'refresh_token' CONF_API_LINK = 'api_link' CONF_ITEMS_PER_COLUMN = 'items_per_column' MODULE_NAME = 'c7n' # for modular admin class JobState(str, Enum): """ https://docs.aws.amazon.com/batch/latest/userguide/job_states.html """ SUBMITTED = 'SUBMITTED' PENDING = 'PENDING' RUNNABLE = 'RUNNABLE' STARTING = 'STARTING' RUNNING = 'RUNNING' FAILED = 'FAILED' SUCCEEDED = 'SUCCEEDED' @classmethod def iter(cls): return map(operator.attrgetter('value'), cls) class PolicyErrorType(str, Enum): """ For statistics """ SKIPPED = 'SKIPPED' ACCESS = 'ACCESS' # not enough permissions CREDENTIALS = 'CREDENTIALS' # invalid credentials CLIENT = 'CLIENT' # some other client error INTERNAL = 'INTERNAL' # unexpected error @classmethod def iter(cls): return map(operator.attrgetter('value'), cls) class ModularCloud(str, Enum): AZURE = 'AZURE' YANDEX = 'YANDEX' GOOGLE = 'GOOGLE' AWS = 'AWS' OPENSTACK = 'OPEN_STACK' CSA = 'CSA' HWU = 'HARDWARE' ENTERPRISE = 'ENTERPRISE' EXOSCALE = 'EXOSCALE' WORKSPACE = 'WORKSPACE' AOS = 'AOS' VSPHERE = 'VSPHERE' VMWARE = 'VMWARE' # VCloudDirector group NUTANIX = 'NUTANIX' @classmethod def iter(cls): return map(operator.attrgetter('value'), cls) # modular cli MODULAR_ADMIN = 'modules' SUCCESS_STATUS = 'SUCCESS' ERROR_STATUS = 'FAILED' STATUS_ATTR = 'status' CODE_ATTR = 'code' TABLE_TITLE_ATTR = 'table_title' # ----------- REVERT_TO_JSON_MESSAGE = 'The command`s response is pretty huge and the ' \ 'result table structure can be broken.\nDo you want ' \ 'to show the response in the JSON format?' COLUMN_OVERFLOW = 'Column has overflown, within the table representation.'