redash/query_runner/google_spreadsheets.py (162 lines of code) (raw):

import logging from base64 import b64decode from dateutil import parser from requests import Session from xlsxwriter.utility import xl_col_to_name from redash.query_runner import * from redash.utils import json_dumps, json_loads logger = logging.getLogger(__name__) try: import gspread from gspread.exceptions import APIError from oauth2client.service_account import ServiceAccountCredentials enabled = True except ImportError: enabled = False def _load_key(filename): with open(filename, "rb") as f: return json_loads(f.read()) def _get_columns_and_column_names(row): column_names = [] columns = [] duplicate_counter = 1 for i, column_name in enumerate(row): if not column_name: column_name = "column_{}".format(xl_col_to_name(i)) if column_name in column_names: column_name = "{}{}".format(column_name, duplicate_counter) duplicate_counter += 1 column_names.append(column_name) columns.append( {"name": column_name, "friendly_name": column_name, "type": TYPE_STRING} ) return columns, column_names def _value_eval_list(row_values, col_types): value_list = [] raw_values = zip(col_types, row_values) for typ, rval in raw_values: try: if rval is None or rval == "": val = None elif typ == TYPE_BOOLEAN: val = True if str(rval).lower() == "true" else False elif typ == TYPE_DATETIME: val = parser.parse(rval) elif typ == TYPE_FLOAT: val = float(rval) elif typ == TYPE_INTEGER: val = int(rval) else: # for TYPE_STRING and default val = str(rval) value_list.append(val) except (ValueError, OverflowError): value_list.append(rval) return value_list HEADER_INDEX = 0 class WorksheetNotFoundError(Exception): def __init__(self, worksheet_num, worksheet_count): message = "Worksheet number {} not found. Spreadsheet has {} worksheets. Note that the worksheet count is zero based.".format( worksheet_num, worksheet_count ) super(WorksheetNotFoundError, self).__init__(message) def parse_query(query): values = query.split("|") key = values[0] # key of the spreadsheet worksheet_num = ( 0 if len(values) != 2 else int(values[1]) ) # if spreadsheet contains more than one worksheet - this is the number of it return key, worksheet_num def parse_worksheet(worksheet): if not worksheet: return {"columns": [], "rows": []} columns, column_names = _get_columns_and_column_names(worksheet[HEADER_INDEX]) if len(worksheet) > 1: for j, value in enumerate(worksheet[HEADER_INDEX + 1]): columns[j]["type"] = guess_type(value) column_types = [c["type"] for c in columns] rows = [ dict(zip(column_names, _value_eval_list(row, column_types))) for row in worksheet[HEADER_INDEX + 1 :] ] data = {"columns": columns, "rows": rows} return data def parse_spreadsheet(spreadsheet, worksheet_num): worksheets = spreadsheet.worksheets() worksheet_count = len(worksheets) if worksheet_num >= worksheet_count: raise WorksheetNotFoundError(worksheet_num, worksheet_count) worksheet = worksheets[worksheet_num].get_all_values() return parse_worksheet(worksheet) def is_url_key(key): return key.startswith("https://") def parse_api_error(error): error_data = error.response.json() if "error" in error_data and "message" in error_data["error"]: message = error_data["error"]["message"] else: message = str(error) return message class TimeoutSession(Session): def request(self, *args, **kwargs): kwargs.setdefault("timeout", 300) return super(TimeoutSession, self).request(*args, **kwargs) class GoogleSpreadsheet(BaseQueryRunner): should_annotate_query = False def __init__(self, configuration): super(GoogleSpreadsheet, self).__init__(configuration) self.syntax = "custom" @classmethod def name(cls): return "Google Sheets" @classmethod def type(cls): return "google_spreadsheets" @classmethod def enabled(cls): return enabled @classmethod def configuration_schema(cls): return { "type": "object", "properties": {"jsonKeyFile": {"type": "string", "title": "JSON Key File"}}, "required": ["jsonKeyFile"], "secret": ["jsonKeyFile"], } def _get_spreadsheet_service(self): scope = ["https://spreadsheets.google.com/feeds"] key = json_loads(b64decode(self.configuration["jsonKeyFile"])) creds = ServiceAccountCredentials.from_json_keyfile_dict(key, scope) timeout_session = Session() timeout_session.requests_session = TimeoutSession() spreadsheetservice = gspread.Client(auth=creds, session=timeout_session) spreadsheetservice.login() return spreadsheetservice def test_connection(self): service = self._get_spreadsheet_service() test_spreadsheet_key = "1S0mld7LMbUad8LYlo13Os9f7eNjw57MqVC0YiCd1Jis" try: service.open_by_key(test_spreadsheet_key).worksheets() except APIError as e: message = parse_api_error(e) raise Exception(message) def run_query(self, query, user): logger.debug("Spreadsheet is about to execute query: %s", query) key, worksheet_num = parse_query(query) try: spreadsheet_service = self._get_spreadsheet_service() if is_url_key(key): spreadsheet = spreadsheet_service.open_by_url(key) else: spreadsheet = spreadsheet_service.open_by_key(key) data = parse_spreadsheet(spreadsheet, worksheet_num) return json_dumps(data), None except gspread.SpreadsheetNotFound: return ( None, "Spreadsheet ({}) not found. Make sure you used correct id.".format( key ), ) except APIError as e: return None, parse_api_error(e) register(GoogleSpreadsheet)