in redash/query_runner/mongodb.py [0:0]
def run_query(self, query, user):
db = self._get_db()
logger.debug(
"mongodb connection string: %s", self.configuration["connectionString"]
)
logger.debug("mongodb got query: %s", query)
try:
query_data = parse_query_json(query)
except ValueError:
return None, "Invalid query format. The query is not a valid JSON."
if "collection" not in query_data:
return None, "'collection' must have a value to run a query"
else:
collection = query_data["collection"]
q = query_data.get("query", None)
f = None
aggregate = query_data.get("aggregate", None)
if aggregate:
for step in aggregate:
if "$sort" in step:
sort_list = []
for sort_item in step["$sort"]:
sort_list.append((sort_item["name"], sort_item["direction"]))
step["$sort"] = SON(sort_list)
if "fields" in query_data:
f = query_data["fields"]
s = None
if "sort" in query_data and query_data["sort"]:
s = []
for field_data in query_data["sort"]:
s.append((field_data["name"], field_data["direction"]))
columns = []
rows = []
cursor = None
if q or (not q and not aggregate):
if s:
cursor = db[collection].find(q, f).sort(s)
else:
cursor = db[collection].find(q, f)
if "skip" in query_data:
cursor = cursor.skip(query_data["skip"])
if "limit" in query_data:
cursor = cursor.limit(query_data["limit"])
if "count" in query_data:
cursor = cursor.count()
elif aggregate:
allow_disk_use = query_data.get("allowDiskUse", False)
r = db[collection].aggregate(aggregate, allowDiskUse=allow_disk_use)
# Backwards compatibility with older pymongo versions.
#
# Older pymongo version would return a dictionary from an aggregate command.
# The dict would contain a "result" key which would hold the cursor.
# Newer ones return pymongo.command_cursor.CommandCursor.
if isinstance(r, dict):
cursor = r["result"]
else:
cursor = r
if "count" in query_data:
columns.append(
{"name": "count", "friendly_name": "count", "type": TYPE_INTEGER}
)
rows.append({"count": cursor})
else:
rows, columns = parse_results(cursor)
if f:
ordered_columns = []
for k in sorted(f, key=f.get):
column = _get_column_by_name(columns, k)
if column:
ordered_columns.append(column)
columns = ordered_columns
if query_data.get("sortColumns"):
reverse = query_data["sortColumns"] == "desc"
columns = sorted(columns, key=lambda col: col["name"], reverse=reverse)
data = {"columns": columns, "rows": rows}
error = None
json_data = json_dumps(data, cls=MongoDBJSONEncoder)
return json_data, error