in cartography/intel/gcp/compute.py [0:0]
def _transform_fw_entry(rule, fw_partial_uri, is_allow_rule):
"""
Takes a rule entry from a GCP firewall object's allow or deny list and converts it to a list of one or more
dicts representing a firewall rule for each port and port range. This format is easier to load into Neo4j.
Example 1 - single port range:
Input: `{'IPProtocol': 'tcp', 'ports': ['0-65535']}, fw_id, is_allow_rule=True`
Output: `[ {fromport: 0, toport: 65535, protocol: tcp, ruleid: fw_id/allow/0to65535tcp} ]`
Example 2 - multiple ports with a range
Input: `{'IPProtocol': 'tcp', 'ports': ['80', '443', '12345-12349']}, fw_id, is_allow_rule=False`
Output: `[ {fromport: 80, toport: 80, protocol: tcp, ruleid: fw_id/deny/80tcp,
{fromport: 443, toport: 443, protocol: tcp, ruleid: fw_id/deny/443tcp,
{fromport: 12345, toport: 12349, protocol: tcp, ruleid: fw_id/deny/12345to12349tcp ]`
Example 3 - ICMP (no ports)
Input: `{'IPProtocol': 'icmp'}, fw_id, is_allow_rule=True`
Output: `[ {fromport: None, toport: None, protocol: icmp, ruleid: fw_id/allow/icmp} ]`
:param rule: A rule entry object
:param fw_partial_uri: The parent GCPFirewall's unique identifier
:param is_allow_rule: Whether the rule is an `allow` rule. If false it is a `deny` rule.
:return: A list of one or more transformed rules
"""
result = []
# rule['ruleid'] = f"{fw_partial_uri}/"
protocol = rule['IPProtocol']
# If the protocol covered is TCP or UDP then we need to handle ports
if protocol == 'tcp' or protocol == 'udp':
# If ports are specified then create rules for each port and range
if 'ports' in rule:
for port in rule['ports']:
rule = _parse_port_string_to_rule(port, protocol, fw_partial_uri, is_allow_rule)
result.append(rule)
return result
# If ports are not specified then the rule applies to every port
else:
rule = _parse_port_string_to_rule('0-65535', protocol, fw_partial_uri, is_allow_rule)
result.append(rule)
return result
# The protocol is ICMP, ESP, AH, IPIP, SCTP, or proto numbers and ports don't apply
else:
rule = _parse_port_string_to_rule(None, protocol, fw_partial_uri, is_allow_rule)
result.append(rule)
return result