|  | 
|  | 1 | +CREATE DATABASE IF NOT EXISTS analytics ON CLUSTER cluster_1; | 
|  | 2 | + | 
|  | 3 | +CREATE TABLE IF NOT EXISTS analytics.driver_ratings ON CLUSTER cluster_1( | 
|  | 4 | +    rate UInt8, | 
|  | 5 | +    userID Int64, | 
|  | 6 | +    driverID String, | 
|  | 7 | +    orderID String, | 
|  | 8 | +    inserted_time DateTime DEFAULT now() | 
|  | 9 | +) ENGINE = ReplicatedMergeTree | 
|  | 10 | +PARTITION BY driverID | 
|  | 11 | +ORDER BY (inserted_time); | 
|  | 12 | + | 
|  | 13 | +CREATE TABLE analytics.driver_ratings_queue ON CLUSTER cluster_1( | 
|  | 14 | +    rate UInt8, | 
|  | 15 | +    userID Int64, | 
|  | 16 | +    driverID String, | 
|  | 17 | +    orderID String | 
|  | 18 | +) ENGINE = Kafka | 
|  | 19 | +SETTINGS kafka_broker_list = 'broker:9092', | 
|  | 20 | +    kafka_topic_list = 'driver-ratings', | 
|  | 21 | +    kafka_group_name = 'rating_readers', | 
|  | 22 | +    kafka_format = 'Avro', | 
|  | 23 | +    kafka_max_block_size = 1048576; | 
|  | 24 | + | 
|  | 25 | +CREATE MATERIALIZED VIEW analytics.driver_ratings_queue_mv ON CLUSTER cluster_1 TO analytics.driver_ratings AS | 
|  | 26 | +SELECT rate, userID, driverID, orderID | 
|  | 27 | +FROM analytics.driver_ratings_queue; | 
|  | 28 | + | 
|  | 29 | +CREATE TABLE IF NOT EXISTS analytics.user_ratings ON CLUSTER cluster_1( | 
|  | 30 | +    rate UInt8, | 
|  | 31 | +    userID Int64, | 
|  | 32 | +    driverID String, | 
|  | 33 | +    orderID String, | 
|  | 34 | +    inserted_time DateTime DEFAULT now() | 
|  | 35 | +) ENGINE = ReplicatedMergeTree | 
|  | 36 | +    PARTITION BY userID | 
|  | 37 | +    ORDER BY (inserted_time); | 
|  | 38 | + | 
|  | 39 | +CREATE TABLE analytics.user_ratings_queue ON CLUSTER cluster_1( | 
|  | 40 | +    rate UInt8, | 
|  | 41 | +    userID Int64, | 
|  | 42 | +    driverID String, | 
|  | 43 | +    orderID String | 
|  | 44 | +) ENGINE = Kafka | 
|  | 45 | +SETTINGS kafka_broker_list = 'broker:9092', | 
|  | 46 | +    kafka_topic_list = 'user-ratings', | 
|  | 47 | +    kafka_group_name = 'rating_readers', | 
|  | 48 | +    kafka_format = 'JSON', | 
|  | 49 | +    kafka_max_block_size = 1048576; | 
|  | 50 | + | 
|  | 51 | +CREATE MATERIALIZED VIEW analytics.user_ratings_queue_mv ON CLUSTER cluster_1 TO analytics.user_ratings AS | 
|  | 52 | +SELECT rate, userID, driverID, orderID | 
|  | 53 | +FROM analytics.user_ratings_queue; | 
|  | 54 | + | 
|  | 55 | +CREATE TABLE IF NOT EXISTS analytics.orders ON CLUSTER cluster_1( | 
|  | 56 | +    from_place String, | 
|  | 57 | +    to_place String, | 
|  | 58 | +    userID Int64, | 
|  | 59 | +    driverID String, | 
|  | 60 | +    orderID String, | 
|  | 61 | +    inserted_time DateTime DEFAULT now() | 
|  | 62 | +) ENGINE = ReplicatedMergeTree | 
|  | 63 | +    PARTITION BY driverID | 
|  | 64 | +    ORDER BY (inserted_time); | 
|  | 65 | + | 
|  | 66 | +CREATE TABLE analytics.orders_queue ON CLUSTER cluster_1( | 
|  | 67 | +    from_place String, | 
|  | 68 | +    to_place String, | 
|  | 69 | +    userID Int64, | 
|  | 70 | +    driverID String, | 
|  | 71 | +    orderID String | 
|  | 72 | +) ENGINE = Kafka | 
|  | 73 | +SETTINGS kafka_broker_list = 'broker:9092', | 
|  | 74 | +    kafka_topic_list = 'orders', | 
|  | 75 | +    kafka_group_name = 'order_readers', | 
|  | 76 | +    kafka_format = 'Avro', | 
|  | 77 | +    kafka_max_block_size = 1048576; | 
|  | 78 | + | 
|  | 79 | +CREATE MATERIALIZED VIEW analytics.orders_queue_mv ON CLUSTER cluster_1 TO orders AS | 
|  | 80 | +SELECT from_place, to_place, userID, driverID, orderID | 
|  | 81 | +FROM analytics.orders_queue; | 
0 commit comments