in src/run.py [0:0]
def standard_job(job: Job, tenant: Tenant, work_dir: Path):
cloud: Cloud # not cloud but rather domain
platform: Platform | None = None
if pid := BSP.env.platform_id():
parent = cast(
Parent,
SP.modular_client.parent_service().get_parent_by_id(pid)
)
platform = Platform(parent)
cloud = Cloud.KUBERNETES
else:
cloud = Cloud[tenant.cloud.upper()]
_LOG.info(f'{BSP.env.job_type().capitalize()} job \'{job.id}\' '
f'has started: {cloud=}, {tenant.name=}, {platform=}')
_LOG.debug(f'Entire sys.argv: {sys.argv}')
_LOG.debug(f'Environment: {BSP.env}')
_XRAY.put_annotation('job_id', job.id)
_XRAY.put_annotation('tenant_name', tenant.name)
_XRAY.put_metadata('cloud', cloud.value)
if platform:
credentials = get_platform_credentials(platform)
else:
credentials = get_credentials(tenant)
licensed_urls = map(operator.itemgetter('s3_path'),
get_licensed_ruleset_dto_list(tenant, job))
standard_urls = map(SP.ruleset_service.download_url,
BSP.policies_service.get_standard_rulesets(job))
policies = BSP.policies_service.get_policies(
urls=chain(licensed_urls, standard_urls),
keep=set(job.rules_to_scan),
exclude=get_rules_to_exclude(tenant)
)
with tempfile.NamedTemporaryFile(delete=False) as file:
file.write(msgspec.json.encode(policies))
failed = {}
with EnvironmentContext(credentials, reset_all=False):
q = multiprocessing.Queue()
for region in [GLOBAL_REGION, ] + sorted(BSP.env.target_regions()):
p = multiprocessing.Process(
target=process_job,
args=(file.name, work_dir, cloud, q, region)
)
p.start()
_LOG.info(f'Starting Cloud Custodian process for {region} with pid {p.pid}')
_LOG.info(f'Waiting for item from queue')
failed.update(q.get())
p.join()
p.close()
result = JobResult(work_dir, cloud)
if platform:
keys_builder = PlatformReportsBucketKeysBuilder(platform)
else:
keys_builder = TenantReportsBucketKeysBuilder(tenant)
collection = ShardsCollectionFactory.from_cloud(cloud)
collection.put_parts(result.iter_shard_parts())
meta = result.rules_meta()
collection.meta = meta
_LOG.info('Going to upload to SIEM')
upload_to_siem(tenant=tenant, collection=collection,
job=AmbiguousJob(job), platform=platform)
collection.io = ShardsS3IO(
bucket=SP.environment_service.default_reports_bucket_name(),
key=keys_builder.job_result(job),
client=SP.s3
)
_LOG.debug('Writing job report')
collection.write_all() # writes job report
latest = ShardsCollectionFactory.from_cloud(cloud)
latest.io = ShardsS3IO(
bucket=SP.environment_service.default_reports_bucket_name(),
key=keys_builder.latest_key(),
client=SP.s3
)
_LOG.debug('Pulling latest state')
latest.fetch_by_indexes(collection.shards.keys())
latest.fetch_meta()
_LOG.info('Self-healing regions') # todo remove after a couple of releases
fix_s3_regions(latest)
_LOG.debug('Writing latest state')
latest.update(collection)
latest.update_meta(meta)
latest.write_all()
latest.write_meta()
_LOG.info('Writing statistics')
SP.s3.gz_put_json(
bucket=SP.environment_service.get_statistics_bucket_name(),
key=StatisticsBucketKeysBuilder.job_statistics(job),
obj=result.statistics(tenant, failed)
)
_LOG.info(f'Job \'{job.id}\' has ended')