dusty/processors/false_positive/processor.py (64 lines of code) (raw):
#!/usr/bin/python3
# coding=utf-8
# Copyright 2019 getcarrier.io
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Processor: false_positive
"""
from json import dumps
from requests import get
from dusty.tools import log
from dusty.models.module import DependentModuleModel
from dusty.models.processor import ProcessorModel
from . import constants
class Processor(DependentModuleModel, ProcessorModel):
""" Process findings: filter false-positives """
def __init__(self, context):
""" Initialize processor instance """
super().__init__()
self.context = context
self.config = \
self.context.config["processing"][__name__.split(".")[-2]]
def galloper_connector(self):
auth = None
if self.config.get("user") and self.config.get("password"):
auth = (self.config.get("user"), self.config.get("password"))
if self.config.get("project_id"):
galloper_url = constants.GALLOPER_API_PATH.format(project_id=self.config.get("project_id"))
else:
galloper_url = constants.LEGACY_GALLOPER_API_PATH
data = {
"project_name": self.context.get_meta('project_name'),
"scan_type": self.context.get_meta("testing_type"),
"app_name": self.context.get_meta("project_description")
}
fp_list = get(f'{self.config.get("galloper")}{galloper_url}',
headers={"content-type": "application/json"}, auth=auth,
params=data).json()
with open(constants.GALLOPER_FPA_PATH, "w") as f:
f.write("\n".join(fp_list).strip())
return constants.GALLOPER_FPA_PATH
def execute(self):
""" Run the processor """
log.info("Processing false-positives")
if self.config.get("galloper"):
fp_config_path = self.galloper_connector()
else:
fp_config_path = self.config.get("file", constants.DEFAULT_FP_CONFIG_PATH)
try:
false_positives = list()
# Load false positives
with open(fp_config_path, "r") as file:
for line in file.readlines():
if line.strip():
false_positives.append(line.strip())
# Process findings
for item in self.context.findings:
issue_hash = item.get_meta("issue_hash", "<no_hash>")
if issue_hash in false_positives:
item.set_meta("false_positive_finding", True)
except: # pylint: disable=W0702
log.exception("Failed to process false-positives")
@staticmethod
def fill_config(data_obj):
""" Make sample config """
data_obj.insert(
len(data_obj), "file", "/path/to/false_positive.config",
comment="File with issue hashes"
)
@staticmethod
def depends_on():
""" Return required depencies """
return ["issue_hash"]
@staticmethod
def get_name():
""" Module name """
return "False-positive"
@staticmethod
def get_description():
""" Module description """
return "False-positive processor"