Skip to content

Commit bd722f1

Browse files
author
gituser
committed
Merge branch '1.10_release_4.0.x' into 1.10_release_4.1.x
2 parents d4ac5e1 + d688310 commit bd722f1

File tree

2 files changed

+46
-5
lines changed

2 files changed

+46
-5
lines changed

redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,10 @@
4646

4747
import java.util.List;
4848
import java.util.Map;
49+
import java.util.Objects;
4950
import java.util.function.Consumer;
51+
import java.util.regex.Matcher;
52+
5053
/**
5154
* @author yanxi
5255
*/
@@ -98,11 +101,34 @@ private void buildRedisClient(RedisSideTableInfo tableInfo){
98101
async = connection.async();
99102
break;
100103
case SENTINEL:
101-
RedisURI redisSentinelURI = RedisURI.create("redis-sentinel://" + url);
102-
redisSentinelURI.setPassword(password);
103-
redisSentinelURI.setDatabase(Integer.valueOf(database));
104-
redisSentinelURI.setSentinelMasterId(redisSideTableInfo.getMasterName());
105-
redisClient = RedisClient.create(redisSentinelURI);
104+
String[] urlSplit = StringUtils.split(url, ",");
105+
RedisURI.Builder builder = null;
106+
for (String item : urlSplit) {
107+
Matcher mather = RedisSideReqRow.HOST_PORT_PATTERN.matcher(item);
108+
if (mather.find()) {
109+
builder = buildSentinelUri(
110+
mather.group("host"),
111+
mather.group("port"),
112+
builder
113+
);
114+
} else {
115+
throw new IllegalArgumentException(
116+
String.format("Illegal format with redis url [%s]", item)
117+
);
118+
}
119+
}
120+
121+
if (Objects.nonNull(builder)) {
122+
builder
123+
.withPassword(tableInfo.getPassword())
124+
.withDatabase(Integer.parseInt(tableInfo.getDatabase()))
125+
.withSentinelMasterId(tableInfo.getMasterName());
126+
} else {
127+
throw new NullPointerException("build redis uri error!");
128+
}
129+
130+
RedisURI uri = builder.build();
131+
redisClient = RedisClient.create(uri);
106132
connection = redisClient.connect();
107133
async = connection.async();
108134
break;
@@ -117,6 +143,18 @@ private void buildRedisClient(RedisSideTableInfo tableInfo){
117143
}
118144
}
119145

146+
private RedisURI.Builder buildSentinelUri(
147+
String host,
148+
String port,
149+
RedisURI.Builder builder) {
150+
if (Objects.nonNull(builder)) {
151+
builder.withSentinel(host, Integer.parseInt(port));
152+
} else {
153+
builder = RedisURI.Builder.sentinel(host, Integer.parseInt(port));
154+
}
155+
return builder;
156+
}
157+
120158
@Override
121159
public BaseRow fillData(BaseRow input, Object sideInput) {
122160
return redisSideReqRow.fillData(input, sideInput);

redis5/redis5-side/redis-side-core/src/main/java/com/dtstack/flink/sql/side/redis/table/RedisSideReqRow.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.time.LocalDateTime;
3333
import java.util.List;
3434
import java.util.Map;
35+
import java.util.regex.Pattern;
3536

3637
/**
3738
* redis fill row data
@@ -44,6 +45,8 @@ public class RedisSideReqRow implements ISideReqRow, Serializable {
4445

4546
private static final long serialVersionUID = 3751171828444748982L;
4647

48+
public static final Pattern HOST_PORT_PATTERN = Pattern.compile("(?<host>(.*)):(?<port>\\d+)*");
49+
4750
private BaseSideInfo sideInfo;
4851

4952
private RedisSideTableInfo sideTableInfo;

0 commit comments

Comments
 (0)