-
Notifications
You must be signed in to change notification settings - Fork 2.6k
/
Copy pathquery_agg.py
103 lines (93 loc) · 3.3 KB
/
query_agg.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# EXAMPLE: query_agg
# HIDE_START
import json
import redis
from redis.commands.json.path import Path
from redis.commands.search import Search
from redis.commands.search.aggregation import AggregateRequest
from redis.commands.search.field import NumericField, TagField
from redis.commands.search.index_definition import IndexDefinition, IndexType
import redis.commands.search.reducers as reducers
r = redis.Redis(decode_responses=True)
# create index
schema = (
TagField("$.condition", as_name="condition"),
NumericField("$.price", as_name="price"),
)
index = r.ft("idx:bicycle")
index.create_index(
schema,
definition=IndexDefinition(prefix=["bicycle:"], index_type=IndexType.JSON),
)
# load data
with open("data/query_em.json") as f:
bicycles = json.load(f)
pipeline = r.pipeline(transaction=False)
for bid, bicycle in enumerate(bicycles):
pipeline.json().set(f'bicycle:{bid}', Path.root_path(), bicycle)
pipeline.execute()
# HIDE_END
# STEP_START agg1
search = Search(r, index_name="idx:bicycle")
aggregate_request = AggregateRequest(query='@condition:{new}') \
.load('__key', 'price') \
.apply(discounted='@price - (@price * 0.1)')
res = search.aggregate(aggregate_request)
print(len(res.rows)) # >>> 5
print(res.rows) # >>> [['__key', 'bicycle:0', ...
#[['__key', 'bicycle:0', 'price', '270', 'discounted', '243'],
# ['__key', 'bicycle:5', 'price', '810', 'discounted', '729'],
# ['__key', 'bicycle:6', 'price', '2300', 'discounted', '2070'],
# ['__key', 'bicycle:7', 'price', '430', 'discounted', '387'],
# ['__key', 'bicycle:8', 'price', '1200', 'discounted', '1080']]
# REMOVE_START
assert len(res.rows) == 5
# REMOVE_END
# STEP_END
# STEP_START agg2
search = Search(r, index_name="idx:bicycle")
aggregate_request = AggregateRequest(query='*') \
.load('price') \
.apply(price_category='@price<1000') \
.group_by('@condition', reducers.sum('@price_category').alias('num_affordable'))
res = search.aggregate(aggregate_request)
print(len(res.rows)) # >>> 3
print(res.rows) # >>>
#[['condition', 'refurbished', 'num_affordable', '1'],
# ['condition', 'used', 'num_affordable', '1'],
# ['condition', 'new', 'num_affordable', '3']]
# REMOVE_START
assert len(res.rows) == 3
# REMOVE_END
# STEP_END
# STEP_START agg3
search = Search(r, index_name="idx:bicycle")
aggregate_request = AggregateRequest(query='*') \
.apply(type="'bicycle'") \
.group_by('@type', reducers.count().alias('num_total'))
res = search.aggregate(aggregate_request)
print(len(res.rows)) # >>> 1
print(res.rows) # >>> [['type', 'bicycle', 'num_total', '10']]
# REMOVE_START
assert len(res.rows) == 1
# REMOVE_END
# STEP_END
# STEP_START agg4
search = Search(r, index_name="idx:bicycle")
aggregate_request = AggregateRequest(query='*') \
.load('__key') \
.group_by('@condition', reducers.tolist('__key').alias('bicycles'))
res = search.aggregate(aggregate_request)
print(len(res.rows)) # >>> 3
print(res.rows) # >>>
#[['condition', 'refurbished', 'bicycles', ['bicycle:9']],
# ['condition', 'used', 'bicycles', ['bicycle:1', 'bicycle:2', 'bicycle:3', 'bicycle:4']],
# ['condition', 'new', 'bicycles', ['bicycle:5', 'bicycle:6', 'bicycle:7', 'bicycle:0', 'bicycle:8']]]
# REMOVE_START
assert len(res.rows) == 3
# REMOVE_END
# STEP_END
# REMOVE_START
# destroy index and data
r.ft("idx:bicycle").dropindex(delete_documents=True)
# REMOVE_END