in src/handlers/event_assembler_handler.py [0:0]
def handler(self, event):
self._log_cache()
# Collect all events since the last execution.
_LOG.info('Going to obtain cursor value of the event assembler.')
event_cursor = None
config = self._settings_service.get_event_assembler_configuration()
if config and EVENT_CURSOR_TIMESTAMP_ATTR in config:
event_cursor = float(config[EVENT_CURSOR_TIMESTAMP_ATTR])
_LOG.info(f'Cursor was obtained: {event_cursor}')
# Establishes Events: List[$oldest, ... $nearest]
events = self._obtain_events(since=event_cursor)
if not events:
_LOG.info('No events have been collected.')
self._code = HTTPStatus.NOT_FOUND
return self.response
start_event = events[0]
end_event = events[-1]
config = self._settings_service.create_event_assembler_configuration(
cursor=end_event.timestamp
)
self._settings_service.save(setting=config)
_LOG.info('Cursor value of the event assembler has bee updated '
f'to - {end_event.timestamp}')
vendor_maps = self.vendor_rule_map(events)
tenant_batch_result: list[tuple[Tenant, BatchResults]] = []
for vendor, mapping in vendor_maps.items():
if not mapping:
_LOG.warning(f'{vendor}`s mapping is empty. Skipping')
continue
# handler must yield tuples (Tenant, BatchResult)
if vendor == MAESTRO_VENDOR:
tenant_batch_result.extend(self.handle_maestro_vendor(mapping))
elif vendor == AWS_VENDOR:
tenant_batch_result.extend(self.handle_aws_vendor(mapping))
if not tenant_batch_result:
self._code = 404
self._content = 'Could derive no BatchResults. Skipping'
return self.response
# here we leave only batch_results of tenants for which ed is
# enabled by license. And also leave only rules that available by
# that license.
allowed_batch_results = []
for tenant, br in tenant_batch_result:
_license = self.get_allowed_event_driven_license(tenant)
if not _license:
continue
# by here we have license item which allows event-driven for
# current tenant. Now we only have to restrict the list or rules
# here just a chain of generators in order to make the
# total number of iterations by all the rules equal to 1 with
# small number of lines or code
# all rule-sets ids provided by the license
ids = iter(set(_license.ruleset_ids or []))
# all rule-sets items
_rule_sets = (self.get_ruleset(_id) for _id in ids)
# Only rule-sets for tenant's cloud
_rule_sets = filter(
lambda r: r.cloud == adjust_cloud(tenant.cloud), _rule_sets
)
# all the rules ids from rule-sets
allowed_rules = set(chain.from_iterable(
iter(r.rules) for r in _rule_sets
))
# all the rules Custom Core's names (without versions)
# allowed_rules = set(
# self._rule_service.i_without_version(raw_rules)
# )
_LOG.debug(f'Tenant {br.tenant_name} is allowed to use '
f'such rules: {allowed_rules}')
region_rules_map = br.rules.as_dict() if not isinstance(
br.rules, dict) else br.rules
_LOG.debug(f'Restricting {region_rules_map} from event to '
f'the scope of allowed rules')
restricted_map = self.restrict_region_rule_map(region_rules_map,
allowed_rules)
if not restricted_map:
_LOG.info('No rules after restricting left. Skipping')
continue
_LOG.debug(f'Optimizing region rules map {restricted_map} size')
br.rules = self._optimize_region_rule_map_size(restricted_map)
allowed_batch_results.append(br)
if not allowed_batch_results:
self._code = 404
self._content = 'No batch results left after checking licenses'
return self.response
# here we already have allowed batch_results. Just start a job.
common_envs = self._build_common_envs()
common_envs[BatchJobEnv.BATCH_RESULTS_IDS.value] = ','.join(
item.id for item in allowed_batch_results)
for br in allowed_batch_results:
br.registration_start = str(start_event.timestamp)
br.registration_end = str(end_event.timestamp)
self._batch_results_service.batch_save(allowed_batch_results)
job_id = self._submit_batch_job(common_envs)
self._code = 202
self._content = f'AWS Batch job were submitted: {job_id}'
return self.response