client/app/services/query-result.js (406 lines of code) (raw):

import debug from "debug"; import moment from "moment"; import { axios } from "@/services/axios"; import { QueryResultError } from "@/services/query"; import { Auth } from "@/services/auth"; import { isString, uniqBy, each, isNumber, includes, extend, forOwn, get } from "lodash"; const logger = debug("redash:services:QueryResult"); const filterTypes = ["filter", "multi-filter", "multiFilter"]; function defer() { const result = { onStatusChange: status => {} }; result.promise = new Promise((resolve, reject) => { result.resolve = resolve; result.reject = reject; }); return result; } function getColumnNameWithoutType(column) { let typeSplit; if (column.indexOf("::") !== -1) { typeSplit = "::"; } else if (column.indexOf("__") !== -1) { typeSplit = "__"; } else { return column; } const parts = column.split(typeSplit); if (parts[0] === "" && parts.length === 2) { return parts[1]; } if (!includes(filterTypes, parts[1])) { return column; } return parts[0]; } function getColumnFriendlyName(column) { return getColumnNameWithoutType(column).replace(/(?:^|\s)\S/g, a => a.toUpperCase()); } const createOrSaveUrl = data => (data.id ? `api/query_results/${data.id}` : "api/query_results"); const QueryResultResource = { get: ({ id }) => axios.get(`api/query_results/${id}`), post: data => axios.post(createOrSaveUrl(data), data), }; export const ExecutionStatus = { WAITING: "waiting", PROCESSING: "processing", DONE: "done", FAILED: "failed", LOADING_RESULT: "loading-result", }; const statuses = { 1: ExecutionStatus.WAITING, 2: ExecutionStatus.PROCESSING, 3: ExecutionStatus.DONE, 4: ExecutionStatus.FAILED, }; function handleErrorResponse(queryResult, error) { const status = get(error, "response.status"); switch (status) { case 403: queryResult.update(error.response.data); return; case 400: if ("job" in error.response.data) { queryResult.update(error.response.data); return; } break; case 404: queryResult.update({ job: { error: "cached query result unavailable, please execute again.", status: 4, }, }); return; // no default } logger("Unknown error", error); queryResult.update({ job: { error: get(error, "response.data.message", "Unknown error occurred. Please try again later."), status: 4, }, }); } function sleep(ms) { return new Promise(resolve => setTimeout(resolve, ms)); } export function fetchDataFromJob(jobId, interval = 1000) { return axios.get(`api/jobs/${jobId}`).then(data => { const status = statuses[data.job.status]; if (status === ExecutionStatus.WAITING || status === ExecutionStatus.PROCESSING) { return sleep(interval).then(() => fetchDataFromJob(data.job.id)); } else if (status === ExecutionStatus.DONE) { return data.job.result; } else if (status === ExecutionStatus.FAILED) { return Promise.reject(data.job.error); } }); } class QueryResult { constructor(props) { this.deferred = defer(); this.job = {}; this.query_result = {}; this.status = "waiting"; this.updatedAt = moment(); // extended status flags this.isLoadingResult = false; if (props) { this.update(props); } } update(props) { extend(this, props); if ("query_result" in props) { this.status = ExecutionStatus.DONE; this.deferred.onStatusChange(ExecutionStatus.DONE); const columnTypes = {}; // TODO: we should stop manipulating incoming data, and switch to relaying // on the column type set by the backend. This logic is prone to errors, // and better be removed. Kept for now, for backward compatability. each(this.query_result.data.rows, row => { forOwn(row, (v, k) => { let newType = null; if (isNumber(v)) { newType = "float"; } else if (isString(v) && v.match(/^\d{4}-\d{2}-\d{2}T/)) { row[k] = moment.utc(v); newType = "datetime"; } else if (isString(v) && v.match(/^\d{4}-\d{2}-\d{2}$/)) { row[k] = moment.utc(v); newType = "date"; } else if (typeof v === "object" && v !== null) { row[k] = JSON.stringify(v); } else { newType = "string"; } if (newType !== null) { if (columnTypes[k] !== undefined && columnTypes[k] !== newType) { columnTypes[k] = "string"; } else { columnTypes[k] = newType; } } }); }); each(this.query_result.data.columns, column => { column.name = "" + column.name; if (columnTypes[column.name]) { if (column.type == null || column.type === "string") { column.type = columnTypes[column.name]; } } }); this.deferred.resolve(this); } else if (this.job.status === 3 || this.job.status === 2) { this.deferred.onStatusChange(ExecutionStatus.PROCESSING); this.status = "processing"; } else if (this.job.status === 4) { this.status = statuses[this.job.status]; this.deferred.reject(new QueryResultError(this.job.error)); } else { this.deferred.onStatusChange(undefined); this.status = undefined; } } getId() { let id = null; if ("query_result" in this) { id = this.query_result.id; } return id; } cancelExecution() { axios.delete(`api/jobs/${this.job.id}`); } getStatus() { if (this.isLoadingResult) { return ExecutionStatus.LOADING_RESULT; } return this.status || statuses[this.job.status]; } getError() { // TODO: move this logic to the server... if (this.job.error === "None") { return undefined; } return this.job.error; } getLog() { if (!this.query_result.data || !this.query_result.data.log || this.query_result.data.log.length === 0) { return null; } return this.query_result.data.log; } getUpdatedAt() { return this.query_result.retrieved_at || this.job.updated_at * 1000.0 || this.updatedAt; } getRuntime() { return this.query_result.runtime; } getRawData() { if (!this.query_result.data) { return null; } return this.query_result.data.rows; } getData() { return this.query_result.data ? this.query_result.data.rows : null; } isEmpty() { return this.getData() === null || this.getData().length === 0; } getColumns() { if (this.columns === undefined && this.query_result.data) { this.columns = this.query_result.data.columns; } return this.columns; } getColumnNames() { if (this.columnNames === undefined && this.query_result.data) { this.columnNames = this.query_result.data.columns.map(v => v.name); } return this.columnNames; } getColumnFriendlyNames() { return this.getColumnNames().map(col => getColumnFriendlyName(col)); } getTruncated() { return this.query_result.data ? this.query_result.data.truncated : null; } getFilters() { if (!this.getColumns()) { return []; } const filters = []; this.getColumns().forEach(col => { const name = col.name; const type = name.split("::")[1] || name.split("__")[1]; if (includes(filterTypes, type)) { // filter found const filter = { name, friendlyName: getColumnFriendlyName(name), column: col, values: [], multiple: type === "multiFilter" || type === "multi-filter", }; filters.push(filter); } }, this); this.getRawData().forEach(row => { filters.forEach(filter => { filter.values.push(row[filter.name]); if (filter.values.length === 1) { if (filter.multiple) { filter.current = [row[filter.name]]; } else { filter.current = row[filter.name]; } } }); }); filters.forEach(filter => { filter.values = uniqBy(filter.values, v => { if (moment.isMoment(v)) { return v.unix(); } return v; }); }); return filters; } toPromise(statusCallback) { if (statusCallback) { this.deferred.onStatusChange = statusCallback; } return this.deferred.promise; } static getById(queryId, id) { const queryResult = new QueryResult(); queryResult.isLoadingResult = true; queryResult.deferred.onStatusChange(ExecutionStatus.LOADING_RESULT); axios .get(`api/queries/${queryId}/results/${id}.json`) .then(response => { // Success handler queryResult.isLoadingResult = false; queryResult.update(response); }) .catch(error => { // Error handler queryResult.isLoadingResult = false; handleErrorResponse(queryResult, error); }); return queryResult; } loadLatestCachedResult(queryId, parameters) { axios .post(`api/queries/${queryId}/results`, { queryId, parameters }) .then(response => { this.update(response); }) .catch(error => { handleErrorResponse(this, error); }); } loadResult(tryCount) { this.isLoadingResult = true; this.deferred.onStatusChange(ExecutionStatus.LOADING_RESULT); QueryResultResource.get({ id: this.job.query_result_id }) .then(response => { this.update(response); this.isLoadingResult = false; }) .catch(error => { if (tryCount === undefined) { tryCount = 0; } if (tryCount > 3) { logger("Connection error while trying to load result", error); this.update({ job: { error: "failed communicating with server. Please check your Internet connection and try again.", status: 4, }, }); this.isLoadingResult = false; } else { setTimeout(() => { this.loadResult(tryCount + 1); }, 1000 * Math.pow(2, tryCount)); } }); } refreshStatus(query, parameters, tryNumber = 1) { const loadResult = () => Auth.isAuthenticated() ? this.loadResult() : this.loadLatestCachedResult(query, parameters); const request = Auth.isAuthenticated() ? axios.get(`api/jobs/${this.job.id}`) : axios.get(`api/queries/${query}/jobs/${this.job.id}`); request .then(jobResponse => { this.update(jobResponse); if (this.getStatus() === "processing" && this.job.query_result_id && this.job.query_result_id !== "None") { loadResult(); } else if (this.getStatus() !== "failed") { const waitTime = tryNumber > 10 ? 3000 : 500; setTimeout(() => { this.refreshStatus(query, parameters, tryNumber + 1); }, waitTime); } }) .catch(error => { logger("Connection error", error); // TODO: use QueryResultError, or better yet: exception/reject of promise. this.update({ job: { error: "failed communicating with server. Please check your Internet connection and try again.", status: 4, }, }); }); } getLink(queryId, fileType, apiKey) { let link = `api/queries/${queryId}/results/${this.getId()}.${fileType}`; if (apiKey) { link = `${link}?api_key=${apiKey}`; } return link; } getName(queryName, fileType) { return `${queryName.replace(/ /g, "_") + moment(this.getUpdatedAt()).format("_YYYY_MM_DD")}.${fileType}`; } static getByQueryId(id, parameters, applyAutoLimit, maxAge) { const queryResult = new QueryResult(); axios .post(`api/queries/${id}/results`, { id, parameters, apply_auto_limit: applyAutoLimit, max_age: maxAge }) .then(response => { queryResult.update(response); if ("job" in response) { queryResult.refreshStatus(id, parameters); } }) .catch(error => { handleErrorResponse(queryResult, error); }); return queryResult; } static get(dataSourceId, query, parameters, applyAutoLimit, maxAge, queryId) { const queryResult = new QueryResult(); const params = { data_source_id: dataSourceId, parameters, query, apply_auto_limit: applyAutoLimit, max_age: maxAge, }; if (queryId !== undefined) { params.query_id = queryId; } QueryResultResource.post(params) .then(response => { queryResult.update(response); if ("job" in response) { queryResult.refreshStatus(query, parameters); } }) .catch(error => { handleErrorResponse(queryResult, error); }); return queryResult; } } export default QueryResult;