|
1 | 1 |
|
2 | 2 | ## 1.格式:
|
| 3 | + |
| 4 | + 通过建表语句中的` PERIOD FOR SYSTEM_TIME`将表标识为维表,其中`PRIMARY KEY(keyInfo)`中的keyInfo,表示用来和源表进行关联的字段, |
| 5 | + 维表JOIN的条件必须与`keyInfo`字段一致。 |
3 | 6 | ```
|
4 | 7 | CREATE TABLE tableName(
|
5 | 8 | colName cloType,
|
|
22 | 25 | ```
|
23 | 26 |
|
24 | 27 | # 2.支持版本
|
25 |
| - cassandra-3.6.x |
| 28 | + cassandra-3.x |
26 | 29 |
|
27 | 30 | ## 3.表结构定义
|
28 | 31 |
|
29 | 32 | |参数名称|含义|
|
30 | 33 | |----|---|
|
31 |
| - | tableName | 注册到flink的表名称(可选填;不填默认和hbase对应的表名称相同)| |
| 34 | + | tableName | 注册到flink的表名称(可选填;不填默认和cassandra对应的表名称相同)| |
32 | 35 | | colName | 列名称|
|
33 |
| - | colType | 列类型 [colType支持的类型](docs/colType.md)| |
| 36 | + | colType | 列类型| |
34 | 37 | | PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息|
|
35 | 38 | | PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开|
|
36 | 39 |
|
|
46 | 49 | | database | cassandra表名称|是||
|
47 | 50 | | cache | 维表缓存策略(NONE/LRU)|否|NONE|
|
48 | 51 | | partitionedJoin | 是否在維表join之前先根据 設定的key 做一次keyby操作(可以減少维表的数据缓存量)|否|false|
|
49 |
| - | maxRequestsPerConnection | 每个连接最多允许64个并发请求|否|NONE| |
50 |
| - | coreConnectionsPerHost | 和Cassandra集群里的每个机器都至少有2个连接|否|NONE| |
51 |
| - | maxConnectionsPerHost | 和Cassandra集群里的每个机器都最多有6个连接|否|NONE| |
52 |
| - | maxQueueSize | Cassandra队列大小|否|NONE| |
53 |
| - | readTimeoutMillis | Cassandra读超时|否|NONE| |
54 |
| - | connectTimeoutMillis | Cassandra连接超时|否|NONE| |
55 |
| - | poolTimeoutMillis | Cassandra线程池超时|否|NONE| |
| 52 | + | maxRequestsPerConnection | 每个连接允许的并发请求数|否|1| |
| 53 | + | coreConnectionsPerHost | 每台主机连接的核心数|否|8| |
| 54 | + | maxConnectionsPerHost | Cassandra集群里的每个机器都最多连接数|否|32768| |
| 55 | + | maxQueueSize | Cassandra队列大小|否|100000| |
| 56 | + | readTimeoutMillis | Cassandra读超时|否|60000| |
| 57 | + | connectTimeoutMillis | Cassandra连接超时|否|60000| |
| 58 | + | poolTimeoutMillis | Cassandra线程池超时|否|60000| |
56 | 59 |
|
57 | 60 | ----------
|
58 | 61 | > 缓存策略
|
59 |
| - * NONE: 不做内存缓存 |
60 |
| - * LRU: |
61 |
| - * cacheSize: 缓存的条目数量 |
62 |
| - * cacheTTLMs:缓存的过期时间(ms) |
| 62 | +- NONE:不做内存缓存。每条流数据触发一次维表查询操作。 |
| 63 | +- ALL: 任务启动时,一次性加载所有数据到内存,并进行缓存。适用于维表数据量较小的情况。 |
| 64 | +- LRU: 任务执行时,根据维表关联条件使用异步算子加载维表数据,并进行缓存。 |
63 | 65 |
|
64 | 66 |
|
65 |
| -## 5.样例 |
| 67 | +## 5.维表定义样例 |
| 68 | + |
| 69 | +### ALL全量维表定义 |
66 | 70 | ```
|
67 |
| -create table sideTable( |
68 |
| - CHANNEL varchar, |
69 |
| - XCCOUNT int, |
70 |
| - PRIMARY KEY(channel), |
| 71 | +CREATE TABLE sideTable( |
| 72 | + id bigint, |
| 73 | + school varchar, |
| 74 | + home varchar, |
| 75 | + PRIMARY KEY(id), |
71 | 76 | PERIOD FOR SYSTEM_TIME
|
72 |
| - )WITH( |
| 77 | +)WITH( |
| 78 | + type='mysql', |
| 79 | + url='jdbc:mysql://172.16.8.109:3306/tiezhu', |
| 80 | + userName='dtstack', |
| 81 | + password='abc123', |
| 82 | + tableName='stressTest', |
| 83 | + cache='ALL', |
| 84 | + parallelism='1' |
| 85 | +); |
| 86 | +``` |
| 87 | +### LRU异步维表定义 |
| 88 | +``` |
| 89 | +CREATE TABLE sideTable( |
| 90 | + id bigint, |
| 91 | + message varchar, |
| 92 | + PRIMARY KEY(id), |
| 93 | + PERIOD FOR SYSTEM_TIME |
| 94 | +)WITH( |
73 | 95 | type ='cassandra',
|
74 |
| - address ='172.21.32.1:9042,172.21.32.1:9042', |
75 |
| - database ='test', |
76 |
| - tableName ='sidetest', |
| 96 | + address ='192.168.80.106:9042, 192.168.80.107:9042', |
| 97 | + database ='tiezhu', |
| 98 | + tableName ='stu', |
| 99 | + userName='cassandra', |
| 100 | + password='cassandra', |
77 | 101 | cache ='LRU',
|
78 | 102 | parallelism ='1',
|
79 | 103 | partitionedJoin='false'
|
80 |
| - ); |
| 104 | +); |
| 105 | +``` |
| 106 | +## 6.完整样例 |
| 107 | +``` |
| 108 | +CREATE TABLE MyTable( |
| 109 | + id bigint, |
| 110 | + name varchar, |
| 111 | + address varchar |
| 112 | +)WITH( |
| 113 | + type = 'kafka10', |
| 114 | + bootstrapServers = '172.16.101.224:9092', |
| 115 | + zookeeperQuorm = '172.16.100.188:2181/kafka', |
| 116 | + offsetReset = 'latest', |
| 117 | + topic = 'tiezhu_test_in2', |
| 118 | + timezone = 'Asia/Shanghai', |
| 119 | + topicIsPattern = 'false', |
| 120 | + parallelism = '1' |
| 121 | +); |
81 | 122 |
|
| 123 | +CREATE TABLE sideTable( |
| 124 | + id bigint, |
| 125 | + message varchar, |
| 126 | + PRIMARY KEY(id), |
| 127 | + PERIOD FOR SYSTEM_TIME |
| 128 | +)WITH( |
| 129 | + type ='cassandra', |
| 130 | + address ='192.168.80.106:9042, 192.168.80.107:9042', |
| 131 | + database ='tiezhu', |
| 132 | + tableName ='stu', |
| 133 | + userName='cassandra', |
| 134 | + password='cassandra', |
| 135 | + cache ='LRU', |
| 136 | + parallelism ='1', |
| 137 | + partitionedJoin='false' |
| 138 | +); |
| 139 | +
|
| 140 | +CREATE TABLE MyResult( |
| 141 | + id bigint, |
| 142 | + name varchar, |
| 143 | + address varchar, |
| 144 | + message varchar |
| 145 | + )WITH( |
| 146 | + type ='cassandra', |
| 147 | + address ='192.168.80.106:9042,192.168.80.107:9042', |
| 148 | + userName='cassandra', |
| 149 | + password='cassandra', |
| 150 | + database ='tiezhu', |
| 151 | + tableName ='stu_out', |
| 152 | + parallelism ='1' |
| 153 | +); |
82 | 154 |
|
| 155 | +insert |
| 156 | +into |
| 157 | + MyResult |
| 158 | + select |
| 159 | + t1.id AS id, |
| 160 | + t1.name AS name, |
| 161 | + t1.address AS address, |
| 162 | + t2.message AS message |
| 163 | + from |
| 164 | + ( |
| 165 | + select |
| 166 | + id, |
| 167 | + name, |
| 168 | + address |
| 169 | + from |
| 170 | + MyTable |
| 171 | + ) t1 |
| 172 | + join sideTable t2 |
| 173 | + on t1.id = t2.id; |
83 | 174 | ```
|
84 | 175 |
|
85 | 176 |
|
0 commit comments