in metrics/heron/influxdb/client.py [0:0]
def __init__(self, config: dict) -> None:
super().__init__(config)
try:
self.host: str = config["influx.host"]
self.port: int = int(config["influx.port"])
self.database_prefix: str = config["influx.database.prefix"]
self.tracker_url: str = config["heron.tracker.url"]
except KeyError as kerr:
msg: str = f"Required configuration keys were missing: {kerr.args}"
LOG.error(msg)
raise KeyError(msg)
else:
self.username: Optional[str] = None
self.password: Optional[str] = None
if ("influx.user" in config) and ("influx.password" in config):
self.username = config["influx.user"]
self.password = config["influx.password"]
LOG.info("Creating InfluxDB client for user: %s, on host: %s",
config["influx.user"], self.host)
self.client: InfluxDBClient = InfluxDBClient(
host=self.host, port=self.port, username=self.username,
password=self.password)
elif "influx.user" in config and "influx.password" not in config:
pw_msg: str = (f"Password for InfluxDB user: "
f"{config['influx.user']} was not provided")
LOG.error(pw_msg)
raise KeyError(pw_msg)
elif not ("influx.user" in config) and ("influx.password" in config):
user_msg: str = "InfluxDB user information was not provided"
LOG.error(user_msg)
raise KeyError(user_msg)
else:
LOG.info("Creating InfluxDB client for sever on host: %s",
self.host)
self.client: InfluxDBClient = InfluxDBClient(host=self.host,
port=self.port)
self.metric_name_cache: DefaultDict[str,
DefaultDict[str, List[str]]] = \
defaultdict(lambda: defaultdict(list))