Skip to content

Commit 50bed1d

Browse files
committed
Initial support for Mongo's aggregation framework.
1 parent d4b5d78 commit 50bed1d

1 file changed

Lines changed: 99 additions & 13 deletions

File tree

redash/data/query_runner_mongodb.py

Lines changed: 99 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@
99
try:
1010
import pymongo
1111
from bson.objectid import ObjectId
12+
from bson.son import SON
1213
except ImportError:
1314
print "Missing dependencies. Please install pymongo."
1415
print "You can use pip: pip install pymongo"
16+
raise
1517

1618
TYPES_MAP = {
1719
ObjectId : "string",
@@ -26,6 +28,68 @@
2628

2729
date_regex = re.compile("ISODate\(\"(.*)\"\)", re.IGNORECASE)
2830

31+
# Simple query example:
32+
#
33+
# {
34+
# "collection" : "my_collection",
35+
# "query" : {
36+
# "date" : {
37+
# "$gt" : "ISODate(\"2015-01-15 11:41\")",
38+
# },
39+
# "type" : 1
40+
# },
41+
# "fields" : {
42+
# "_id" : 1,
43+
# "name" : 2
44+
# },
45+
# "sort" : [
46+
# {
47+
# "name" : "date",
48+
# "direction" : -1
49+
# }
50+
# ]
51+
#
52+
# }
53+
#
54+
#
55+
# Aggregation
56+
# ===========
57+
# Uses a syntax similar to the one used in PyMongo, however to support the
58+
# correct order of sorting, it uses a regular list for the "$sort" operation
59+
# that converts into a SON (sorted dictionary) object before execution.
60+
#
61+
# Aggregation query example:
62+
#
63+
# {
64+
# "collection" : "things",
65+
# "aggregate" : [
66+
# {
67+
# "$unwind" : "$tags"
68+
# },
69+
# {
70+
# "$group" : {
71+
# {
72+
# "_id" : "$tags",
73+
# "count" : { "$sum" : 1 }
74+
# }
75+
# }
76+
# },
77+
# {
78+
# "$sort" : [
79+
# {
80+
# "name" : "count",
81+
# "direction" : -1
82+
# },
83+
# {
84+
# "name" : "_id",
85+
# "direction" : -1
86+
# }
87+
# ]
88+
# }
89+
# ]
90+
# }
91+
#
92+
#
2993
def mongodb(connection_string):
3094
def _get_column_by_name(columns, column_name):
3195
for c in columns:
@@ -56,7 +120,7 @@ def query_runner(query):
56120
if is_replica_set:
57121
if not connection_string["replicaSetName"]:
58122
return None, "replicaSetName is set in the connection string JSON but is empty"
59-
123+
60124
db_connection = pymongo.MongoReplicaSetClient(connection_string["connectionString"], replicaSet=connection_string["replicaSetName"])
61125
else:
62126
db_connection = pymongo.MongoClient(connection_string["connectionString"])
@@ -74,9 +138,12 @@ def query_runner(query):
74138
except:
75139
return None, "Invalid query format. The query is not a valid JSON."
76140

141+
if "query" in query_data and "aggregate" in query_data:
142+
return None, "'query' and 'aggregate' sections cannot be used at the same time"
143+
77144
collection = None
78145
if not "collection" in query_data:
79-
return None, "'collection' must have a value to run a query"
146+
return None, "'collection' must be set"
80147
else:
81148
collection = query_data["collection"]
82149

@@ -93,26 +160,46 @@ def query_runner(query):
93160
_convert_date(q[k], k2)
94161

95162
f = None
163+
164+
aggregate = None
165+
if "aggregate" in query_data:
166+
aggregate = query_data["aggregate"]
167+
for step in aggregate:
168+
if "$sort" in step:
169+
sort_list = []
170+
for sort_item in step["$sort"]:
171+
sort_list.append((sort_item["name"], sort_item["direction"]))
172+
173+
step["$sort"] = SON(sort_list)
174+
175+
if aggregate:
176+
pass
177+
else:
178+
s = None
179+
if "sort" in query_data and query_data["sort"]:
180+
s = []
181+
for field in query_data["sort"]:
182+
for k in field:
183+
s.append((k, field[k]))
184+
96185
if "fields" in query_data:
97186
f = query_data["fields"]
98187

99-
s = None
100-
if "sort" in query_data and query_data["sort"]:
101-
s = []
102-
for field_name in query_data["sort"]:
103-
s.append((field_name, query_data["sort"][field_name]))
104-
105188
columns = []
106189
rows = []
107190

108191
error = None
109192
json_data = None
110193

111194
cursor = None
112-
if s:
113-
cursor = db[collection].find(q, f).sort(s)
114-
else:
115-
cursor = db[collection].find(q, f)
195+
if q:
196+
if s:
197+
cursor = db[collection].find(q, f).sort(s)
198+
else:
199+
cursor = db[collection].find(q, f)
200+
elif aggregate:
201+
r = db[collection].aggregate(aggregate)
202+
cursor = r["result"]
116203

117204
for r in cursor:
118205
for k in r:
@@ -127,7 +214,6 @@ def query_runner(query):
127214
if type(r[k]) == ObjectId:
128215
r[k] = str(r[k])
129216

130-
131217
rows.append(r)
132218

133219
if f:

0 commit comments

Comments
 (0)