Skip to content

Commit 1e8948e

Browse files
committed
Merge branch '1.10_release_4.1.x' into tmp_1.10_release_4.1x
2 parents a04ef86 + d8a2b0a commit 1e8948e

File tree

399 files changed

+20341
-922
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

399 files changed

+20341
-922
lines changed

.gitlab-ci.yml

+15-3
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,22 @@
1-
build:
1+
stages:
2+
- validate
3+
- test
4+
5+
test-job:
26
stage: test
37
script:
48
- mvn clean org.jacoco:jacoco-maven-plugin:0.7.8:prepare-agent package -Dmaven.test.failure.ignore=true -q
9+
only:
10+
- v1.10.0_dev
11+
tags:
12+
- dt-insight-engine
13+
14+
validate-job:
15+
stage: validate
16+
script:
517
- mvn sonar:sonar -Dsonar.projectKey="dt-insight-engine/flinkStreamSQL" -Dsonar.login=11974c5e9a29625efa09fdc3c3fdc031efb1aab1 -Dsonar.host.url=http://172.16.100.198:9000 -Dsonar.jdbc.url=jdbc:postgresql://172.16.100.198:5432/sonar -Dsonar.java.binaries=target/sonar
618
- sh ci/sonar_notify.sh
719
only:
8-
- v1.8.0_dev
20+
- v1.10.0_dev
921
tags:
10-
- dt-insight-engine
22+
- dt-insight-engine
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.side.cassandra;
20+
21+
import com.dtstack.flink.sql.side.BaseAllReqRow;
22+
import com.dtstack.flink.sql.side.BaseSideInfo;
23+
import com.dtstack.flink.sql.side.FieldInfo;
24+
import com.dtstack.flink.sql.side.JoinInfo;
25+
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
26+
import com.google.common.collect.Lists;
27+
import com.google.common.collect.Maps;
28+
import org.apache.calcite.sql.JoinType;
29+
import org.apache.flink.api.common.typeinfo.TypeInformation;
30+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
31+
import org.apache.flink.table.dataformat.BaseRow;
32+
import org.apache.flink.table.dataformat.GenericRow;
33+
import org.apache.flink.types.Row;
34+
import org.apache.flink.util.Collector;
35+
import org.junit.Before;
36+
import org.junit.Test;
37+
import org.junit.runner.RunWith;
38+
import org.powermock.api.mockito.PowerMockito;
39+
import org.powermock.core.classloader.annotations.PowerMockIgnore;
40+
import org.powermock.core.classloader.annotations.PrepareForTest;
41+
import org.powermock.modules.junit4.PowerMockRunner;
42+
import org.powermock.reflect.Whitebox;
43+
44+
import java.sql.SQLException;
45+
import java.sql.Timestamp;
46+
import java.util.ArrayList;
47+
import java.util.List;
48+
import java.util.Map;
49+
import java.util.concurrent.atomic.AtomicReference;
50+
51+
import static org.mockito.Mockito.mock;
52+
import static org.mockito.Mockito.when;
53+
import static org.powermock.api.support.membermodification.MemberMatcher.constructor;
54+
import static org.powermock.api.support.membermodification.MemberModifier.suppress;
55+
56+
/**
57+
* @author: chuixue
58+
* @create: 2020-07-28 10:58
59+
* @description:
60+
**/
61+
@RunWith(PowerMockRunner.class)
62+
@PrepareForTest({CassandraAllReqRow.class,
63+
BaseAllReqRow.class,
64+
CassandraAllSideInfo.class})//要跳过的写在后面
65+
@PowerMockIgnore({"javax.*"})
66+
public class CassandraAllReqRowTest {
67+
68+
private CassandraAllReqRow cassandraAllReqRow;
69+
private RowTypeInfo rowTypeInfo = new RowTypeInfo(new TypeInformation[]{TypeInformation.of(Integer.class), TypeInformation.of(String.class), TypeInformation.of(Integer.class)}, new String[]{"id", "bb", "PROCTIME"});
70+
private JoinInfo joinInfo;
71+
private List<FieldInfo> outFieldInfoList = new ArrayList<>();
72+
private CassandraSideTableInfo sideTableInfo;
73+
private BaseSideInfo sideInfo;
74+
private AtomicReference<Map<String, List<Map<String, Object>>>> cacheRef = new AtomicReference<>();
75+
76+
@Before
77+
public void setUp() {
78+
joinInfo = mock(JoinInfo.class);
79+
sideTableInfo = mock(CassandraSideTableInfo.class);
80+
sideInfo = PowerMockito.mock(CassandraAllSideInfo.class);
81+
82+
Map<String, List<Map<String, Object>>> map = Maps.newHashMap();
83+
cacheRef.set(map);
84+
85+
suppress(constructor(CassandraAllSideInfo.class));
86+
suppress(constructor(BaseAllReqRow.class));
87+
cassandraAllReqRow = new CassandraAllReqRow(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
88+
Whitebox.setInternalState(cassandraAllReqRow, "sideInfo", sideInfo);
89+
Whitebox.setInternalState(cassandraAllReqRow, "cacheRef", cacheRef);
90+
}
91+
92+
@Test
93+
public void testReloadCache() throws SQLException {
94+
when(sideInfo.getSideTableInfo()).thenReturn(sideTableInfo);
95+
when(sideTableInfo.getMaxRequestsPerConnection()).thenReturn(1);
96+
when(sideTableInfo.getCoreConnectionsPerHost()).thenReturn(1);
97+
when(sideTableInfo.getMaxConnectionsPerHost()).thenReturn(1);
98+
when(sideTableInfo.getMaxQueueSize()).thenReturn(1);
99+
when(sideTableInfo.getReadTimeoutMillis()).thenReturn(1);
100+
when(sideTableInfo.getConnectTimeoutMillis()).thenReturn(1);
101+
when(sideTableInfo.getPoolTimeoutMillis()).thenReturn(1);
102+
when(sideTableInfo.getAddress()).thenReturn("12.12.12.12:9042,10.10.10.10:9042");
103+
when(sideTableInfo.getUserName()).thenReturn("userName");
104+
when(sideTableInfo.getPassword()).thenReturn("password");
105+
when(sideTableInfo.getDatabase()).thenReturn("getDatabase");
106+
107+
cassandraAllReqRow.initCache();
108+
cassandraAllReqRow.reloadCache();
109+
}
110+
111+
@Test
112+
public void testFlatmap() throws Exception {
113+
GenericRow row = new GenericRow(3);
114+
row.setField(0, 1);
115+
row.setField(1, "bbbbbb");
116+
row.setField(2, "2020-07-14 01:27:43.969");
117+
Collector<BaseRow> out = mock(Collector.class);
118+
119+
List<String> equalFieldList = Lists.newArrayList();
120+
equalFieldList.add("rowkey");
121+
List<Integer> equalValIndex = Lists.newArrayList();
122+
equalValIndex.add(0);
123+
124+
List<FieldInfo> outFieldInfoList = Lists.newArrayList();
125+
FieldInfo fieldInfo = new FieldInfo();
126+
fieldInfo.setTable("m");
127+
fieldInfo.setFieldName("id");
128+
fieldInfo.setTypeInformation(TypeInformation.of(Integer.class));
129+
outFieldInfoList.add(fieldInfo);
130+
outFieldInfoList.add(fieldInfo);
131+
outFieldInfoList.add(fieldInfo);
132+
outFieldInfoList.add(fieldInfo);
133+
134+
Map<Integer, Integer> inFieldIndex = Maps.newHashMap();
135+
inFieldIndex.put(0, 0);
136+
inFieldIndex.put(1, 1);
137+
138+
139+
Map<Integer, Integer> sideFieldIndex = Maps.newHashMap();
140+
sideFieldIndex.put(2, 0);
141+
sideFieldIndex.put(3, 1);
142+
143+
Map<Integer, String> sideFieldNameIndex = Maps.newHashMap();
144+
sideFieldNameIndex.put(2, "rowkey");
145+
sideFieldNameIndex.put(3, "channel");
146+
147+
RowTypeInfo rowTypeInfo = new RowTypeInfo(new TypeInformation[]{TypeInformation.of(Integer.class), TypeInformation.of(String.class), TypeInformation.of(Timestamp.class)}, new String[]{"id", "bb", "PROCTIME"});
148+
149+
when(sideInfo.getEqualValIndex()).thenReturn(equalValIndex);
150+
when(sideInfo.getJoinType()).thenReturn(JoinType.LEFT);
151+
when(sideInfo.getOutFieldInfoList()).thenReturn(outFieldInfoList);
152+
when(sideInfo.getInFieldIndex()).thenReturn(inFieldIndex);
153+
when(sideInfo.getRowTypeInfo()).thenReturn(rowTypeInfo);
154+
when(sideInfo.getSideFieldNameIndex()).thenReturn(sideFieldNameIndex);
155+
156+
cassandraAllReqRow.flatMap(row, out);
157+
}
158+
159+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.side.cassandra;
20+
21+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
22+
import com.dtstack.flink.sql.side.BaseSideInfo;
23+
import com.dtstack.flink.sql.side.FieldInfo;
24+
import com.dtstack.flink.sql.side.JoinInfo;
25+
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
26+
import com.google.common.collect.Lists;
27+
import com.google.common.collect.Maps;
28+
import org.apache.calcite.sql.SqlBasicCall;
29+
import org.apache.calcite.sql.SqlNode;
30+
import org.apache.flink.api.common.typeinfo.TypeInformation;
31+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
32+
import org.junit.Before;
33+
import org.junit.Test;
34+
import org.junit.runner.RunWith;
35+
import org.powermock.core.classloader.annotations.PrepareForTest;
36+
import org.powermock.modules.junit4.PowerMockRunner;
37+
import org.powermock.reflect.Whitebox;
38+
39+
import java.util.List;
40+
import java.util.Map;
41+
42+
import static org.mockito.Mockito.mock;
43+
import static org.mockito.Mockito.when;
44+
import static org.powermock.api.support.membermodification.MemberMatcher.constructor;
45+
import static org.powermock.api.support.membermodification.MemberModifier.suppress;
46+
47+
/**
48+
* @author: chuixue
49+
* @create: 2020-07-28 11:00
50+
* @description:
51+
**/
52+
@RunWith(PowerMockRunner.class)
53+
@PrepareForTest({CassandraAllSideInfo.class,
54+
BaseSideInfo.class})//要跳过的写在后面
55+
public class CassandraAllSideInfoTest {
56+
private CassandraAllSideInfo cassandraAllSideInfo;
57+
private JoinInfo joinInfo;
58+
private AbstractSideTableInfo sideTableInfo;
59+
60+
@Before
61+
public void setUp() {
62+
joinInfo = mock(JoinInfo.class);
63+
sideTableInfo = mock(AbstractSideTableInfo.class);
64+
65+
List<FieldInfo> outFieldInfoList = Lists.newArrayList();
66+
FieldInfo fieldInfo = new FieldInfo();
67+
fieldInfo.setTable("m");
68+
fieldInfo.setFieldName("_id");
69+
fieldInfo.setTypeInformation(TypeInformation.of(String.class));
70+
outFieldInfoList.add(fieldInfo);
71+
72+
FieldInfo fieldInfo2 = new FieldInfo();
73+
fieldInfo2.setTable("s");
74+
fieldInfo2.setFieldName("name");
75+
fieldInfo2.setTypeInformation(TypeInformation.of(String.class));
76+
outFieldInfoList.add(fieldInfo2);
77+
78+
Map<Integer, Integer> sideFieldIndex = Maps.newHashMap();
79+
Map<Integer, String> sideFieldNameIndex = Maps.newHashMap();
80+
RowTypeInfo rowTypeInfo = new RowTypeInfo(new TypeInformation[]{TypeInformation.of(Integer.class), TypeInformation.of(String.class), TypeInformation.of(Integer.class)}, new String[]{"id", "bb", "PROCTIME"});
81+
Map<Integer, Integer> inFieldIndex = Maps.newHashMap();
82+
List<String> equalFieldList = Lists.newArrayList();
83+
equalFieldList.add("_id");
84+
85+
suppress(constructor(BaseSideInfo.class));
86+
cassandraAllSideInfo = new CassandraAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
87+
Whitebox.setInternalState(cassandraAllSideInfo, "outFieldInfoList", outFieldInfoList);
88+
Whitebox.setInternalState(cassandraAllSideInfo, "sideFieldIndex", sideFieldIndex);
89+
Whitebox.setInternalState(cassandraAllSideInfo, "sideFieldNameIndex", sideFieldNameIndex);
90+
Whitebox.setInternalState(cassandraAllSideInfo, "rowTypeInfo", rowTypeInfo);
91+
Whitebox.setInternalState(cassandraAllSideInfo, "inFieldIndex", inFieldIndex);
92+
Whitebox.setInternalState(cassandraAllSideInfo, "equalFieldList", equalFieldList);
93+
}
94+
95+
@Test
96+
public void testBuildEqualInfo() {
97+
CassandraSideTableInfo sideTableInfo = mock(CassandraSideTableInfo.class);
98+
when(sideTableInfo.getDatabase()).thenReturn("dd");
99+
when(sideTableInfo.getTableName()).thenReturn("d");
100+
Whitebox.setInternalState(cassandraAllSideInfo, "sideSelectFields", "cassandraAllSideInfo");
101+
cassandraAllSideInfo.buildEqualInfo(joinInfo, sideTableInfo);
102+
}
103+
104+
@Test
105+
public void testParseSelectFields() throws NoSuchMethodException {
106+
SqlBasicCall conditionNode = mock(SqlBasicCall.class);
107+
when(joinInfo.getSideTableName()).thenReturn("s");
108+
when(joinInfo.getNonSideTable()).thenReturn("m");
109+
when(joinInfo.getCondition()).thenReturn(conditionNode);
110+
suppress(BaseSideInfo.class.getMethod("dealOneEqualCon", SqlNode.class, String.class));
111+
112+
cassandraAllSideInfo.parseSelectFields(joinInfo);
113+
}
114+
}

0 commit comments

Comments
 (0)