-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy path5minutely_aggregation.sql
34 lines (31 loc) · 1.66 KB
/
5minutely_aggregation.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
CREATE OR REPLACE FUNCTION five_minutely_aggregation(OUT start_id bigint, OUT end_id bigint)
RETURNS record
LANGUAGE plpgsql
AS $function$
BEGIN
/* determine which page views we can safely aggregate */
SELECT window_start, window_end INTO start_id, end_id
FROM incremental_rollup_window('rollup_events_5min');
/* exit early if there are no new page views to aggregate */
IF start_id > end_id THEN RETURN; END IF;
/* aggregate the page views, merge results if the entry already exists */
INSERT INTO rollup_events_5min
SELECT customer_id,
event_type,
country,
browser,
date_trunc('seconds', (event_time - TIMESTAMP 'epoch') / 300) * 300 + TIMESTAMP 'epoch' AS minute,
count(*) as event_count,
hll_add_agg(hll_hash_bigint(device_id)) as device_distinct_count,
hll_add_agg(hll_hash_bigint(session_id)) as session_distinct_count,
topn_add_agg(device_id::text) top_devices_1000
FROM events WHERE event_id BETWEEN start_id AND end_id
GROUP BY customer_id,event_type,country,browser,minute
ON CONFLICT (customer_id,event_type,country,browser,minute)
DO UPDATE
SET event_count=rollup_events_5min.event_count+excluded.event_count,
device_distinct_count = hll_union(rollup_events_5min.device_distinct_count, excluded.device_distinct_count),
session_distinct_count= hll_union(rollup_events_5min.session_distinct_count, excluded.session_distinct_count),
top_devices_1000 = topn_union(rollup_events_5min.top_devices_1000, excluded.top_devices_1000);
END;
$function$;