postgresql_metrics/postgres_queries.py (204 lines of code) (raw):
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module contains code for statistics extraction that is based
on having a connection to a Postgres database, and running queries through it.
"""
import psycopg2
import re
from postgresql_metrics.common import get_logger
LOG = get_logger()
# contains mappings of metric-name: (last_timestamp, last_value)
# used to derive metric value diffs between the current and the previous value
DERIVE_DICT = dict()
# regex used to extra host from conninfo string
CONNINFO_HOST_RE = re.compile(r'($|\s)host=(?P<host>.*?)(^|\s)')
def get_db_connection(database, username, password, host='127.0.0.1', port=5432,
connect_timeout=10):
connection = psycopg2.connect(user=username,
password=password,
host=host,
port=int(port),
database=database,
connect_timeout=connect_timeout)
connection.autocommit = True
return connection
def get_db_name_from_connection(connection):
"""example dsn: dbname=varjodb user=varjo password=xxxxxxxx host=127.0.0.1
This works also for closed connection.
"""
for dsn_part in connection.dsn.split():
key, value = dsn_part.split('=')
if key.strip() == 'dbname':
return value.strip()
return None
def get_metric_diff(db_name, metric_name, current_time, current_value):
derive_dict_key = db_name + "_" + metric_name
diff = None
if derive_dict_key in DERIVE_DICT:
last_time, last_value = DERIVE_DICT[derive_dict_key]
seconds_since_last_check = int((current_time - last_time).total_seconds())
if seconds_since_last_check == 0:
diff = 0
else:
diff = float(current_value - last_value) / seconds_since_last_check
DERIVE_DICT[derive_dict_key] = (current_time, current_value)
return diff
def query(cursor, sql, params=None):
"""accepts a database connection or cursor"""
if type(cursor) == psycopg2._psycopg.connection:
cursor = cursor.cursor()
LOG.debug('QUERY "{}" {}', sql, params)
try:
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
results = cursor.fetchall()
except Exception:
LOG.exception("failed calling the database")
results = []
LOG.debug('QUERY RESULT: {}', results)
return results
def get_tables_with_oids_for_current_db(conn):
tables = []
results = query(conn,
"SELECT oid, relname FROM pg_class WHERE relkind = 'r' "
"AND relname NOT LIKE 'pg_%' AND relname NOT LIKE 'sql_%'")
for result in results:
table_oid, table_name = result
tables.append((table_oid, table_name))
return tables
def get_client_connections_amount(conn):
results = query(conn, 'SELECT count(*) FROM pg_stat_activity')
if results:
return results[0][0]
return None
def get_disk_usage_for_database(conn):
sql = ("SELECT datname, pg_database_size(datname) FROM pg_database "
"WHERE datname = current_database()")
results = query(conn, sql)
if results:
return results[0]
return None
def get_major_version(conn):
"""Get the major version part of the PostgreSQL version, i.e. the first two digits"""
results = query(conn, "SELECT substring(version() from $$(\d+\.\d+)\.\d+$$)")
if results:
return results[0][0]
return None
def get_transaction_rate_for_database(conn):
sql = ("SELECT current_database(), datname, now(), xact_commit + xact_rollback, xact_rollback "
"FROM pg_stat_database WHERE datname = current_database()")
results = query(conn, sql)
db_name, dat_name, time_now, transactions_now, rollbacks_now = results[0]
if None in results[0]:
LOG.error("Fetching transactions got 'None' in result set")
return None, None, None
recent_transactions = get_metric_diff(db_name, 'transactions', time_now, transactions_now)
recent_rollbacks = get_metric_diff(db_name, 'rollbacks', time_now, rollbacks_now)
return dat_name, recent_transactions, recent_rollbacks
def get_seconds_since_last_vacuum_per_table(conn):
"""Returns a list of tuples: (db_name, table_name, seconds_since_last_vacuum)
where seconds_since_last_vacuum is 0 if no vacuum is done ever (stays flat zero)"""
sql = ("SELECT current_database(), relname, now(), last_vacuum, last_autovacuum "
"FROM pg_stat_user_tables")
results = query(conn, sql)
table_last_vacuum_list = []
for db_name, table_name, time_now, last_vacuum, last_autovacuum in results:
latest_vacuum = None
if last_vacuum or last_autovacuum:
latest_vacuum = max([x for x in (last_vacuum, last_autovacuum) if x])
seconds_since_last_vacuum = int((time_now - (latest_vacuum or time_now)).total_seconds())
table_last_vacuum_list.append((db_name, table_name, seconds_since_last_vacuum))
return table_last_vacuum_list
def get_heap_hit_statistics(conn):
sql = ("SELECT current_database(), now(), sum(heap_blks_read), sum(heap_blks_hit) "
"FROM pg_statio_user_tables")
results = query(conn, sql)
if not results or None in results[0]:
LOG.error("fetching heap_hit_statistics got empty results: {}", str(results))
return None, None, None, None
db_name, time_now, heap_read_now, heap_hit_now = results[0]
recent_heap_read = get_metric_diff(db_name, 'heap_read', time_now, heap_read_now)
recent_heap_hit = get_metric_diff(db_name, 'heap_hit', time_now, heap_hit_now)
recent_heap_hit_ratio = None
if recent_heap_read is not None:
if recent_heap_hit == 0:
recent_heap_hit_ratio = 0
else:
recent_heap_hit_ratio = recent_heap_hit / float(recent_heap_hit + recent_heap_read)
return db_name, recent_heap_read, recent_heap_hit, recent_heap_hit_ratio
def get_lock_statistics(conn):
sql = ("SELECT locktype, granted, count(*) FROM pg_locks GROUP BY locktype, granted")
results = query(conn, sql)
total = [0, 0]
lock_stats = {}
for lock_type, granted, count in results:
if lock_type not in lock_stats:
lock_stats[lock_type] = [0, 0]
lock_stats[lock_type][granted] = count
total[granted] += count
return [lock_stats, total]
def get_oldest_transaction_timestamp(conn):
sql = ("SELECT datname, now(), xact_start FROM pg_stat_activity "
"WHERE xact_start IS NOT NULL AND datname=current_database() "
"ORDER BY xact_start ASC LIMIT 1")
results = query(conn, sql)
if results:
db_name, time_now, xact_start = results[0]
seconds_since_oldest_xact_start = int((time_now - (xact_start or time_now)).total_seconds())
return db_name, seconds_since_oldest_xact_start
return None, None
def get_max_mxid_age(conn):
# `mxid_age` is only available on postgres 9.5 and newer
if conn.server_version < 95000:
LOG.error("Unable to check mxid_age on versions of postgres below 9.5")
return None
sql = "SELECT max(mxid_age(relminmxid)) FROM pg_class WHERE relminmxid <> '0'"
results = query(conn, sql)
if not results:
return None
mxid_age, = results[0]
return int(mxid_age)
def get_max_xid_age(conn):
sql = "SELECT max(age(datfrozenxid)) FROM pg_database"
results = query(conn, sql)
if not results:
return None
xid_age, = results[0]
return int(xid_age)
def get_replication_delays(conn):
sql = ("SELECT client_addr, "
"pg_xlog_location_diff(pg_current_xlog_location(), replay_location) AS bytes_diff "
"FROM public.pg_stat_repl")
if is_in_recovery(conn):
# pg_current_xlog_location cannot be called in a replica
# use pg_last_xlog_receive_location for monitoring cascade replication
sql = sql.replace("pg_current_xlog_location", "pg_last_xlog_receive_location")
if conn.server_version >= 100000: # PostgreSQL 10 and higher
sql = sql.replace('_xlog', '_wal')
sql = sql.replace('_location', '_lsn')
all_delays = []
results = query(conn, sql)
for result_row in results:
client_addr = result_row[0]
bytes_diff = int(result_row[1])
all_delays.append((client_addr, bytes_diff))
return all_delays
def get_table_bloat(conn, table_oid):
"""Based on extension pgstattuple, so you need to call CREATE EXTENSION before using this.
Check the function get_tables_with_oids_for_current_db to see how to get table oids.
"""
results = query(conn, "SELECT current_database, dead_tuple_percent "
"FROM pgstattuple_for_table_oid(%s)", [table_oid])
if results:
db_name, dead_tuple_percent = results[0]
return db_name, dead_tuple_percent / 100.0
return None, None
def get_index_hit_rates(conn):
sql = ("SELECT current_database() as db_name, relname as table_name, "
"idx_scan as index_hit, seq_scan as index_miss "
"FROM pg_stat_user_tables")
results = query(conn, sql)
index_hit_rates = []
LOG.debug(results)
for db_name, table_name, index_hit, index_miss in results:
if index_hit is not None and index_miss is not None:
if index_hit == 0:
recent_ratio = 0
else:
recent_ratio = index_hit / float(index_miss + index_hit)
index_hit_rates.append((db_name, table_name, recent_ratio))
else:
index_hit_rates.append((db_name, table_name, None))
return index_hit_rates
def get_wal_receiver_status(conn):
sql = ("SELECT conninfo, CASE WHEN status = 'streaming' THEN 1 ELSE 0 END "
"FROM public.stat_incoming_replication")
results = query(conn, sql)
host_replication_status = []
for conn_info, status in results:
host = CONNINFO_HOST_RE.search(conn_info).groupdict().get('host', 'UNKNOWN')
host_replication_status.append((host, status))
return host_replication_status
def is_in_recovery(conn):
return query(conn, "SELECT pg_is_in_recovery()")[0][0]