23
23
import org .apache .flink .table .api .TableSchema ;
24
24
import org .apache .flink .table .api .java .StreamTableEnvironment ;
25
25
import org .apache .flink .table .sinks .TableSink ;
26
+ import org .junit .Before ;
26
27
import org .junit .Test ;
27
28
import org .junit .runner .RunWith ;
28
29
import org .powermock .api .mockito .PowerMockito ;
31
32
32
33
import java .io .IOException ;
33
34
import java .net .URL ;
35
+ import java .util .HashMap ;
34
36
import java .util .Map ;
35
37
import java .util .Properties ;
36
38
import java .util .Set ;
44
46
@ RunWith (PowerMockRunner .class )
45
47
@ PrepareForTest ({SqlParser .class , PluginUtil .class , StreamSourceFactory .class , StreamSinkFactory .class })
46
48
public class ExecuteProcessHelperTest {
49
+
50
+ private Map <String , Object > dirtyMap ;
51
+
52
+ @ Before
53
+ public void setUp () {
54
+ dirtyMap = new HashMap <>();
55
+ dirtyMap .put ("type" , "console" );
56
+ // 多少条数据打印一次
57
+ dirtyMap .put ("printLimit" , "100" );
58
+ dirtyMap .put ("url" , "jdbc:mysql://localhost:3306/tiezhu" );
59
+ dirtyMap .put ("userName" , "root" );
60
+ dirtyMap .put ("password" , "abc123" );
61
+ dirtyMap .put ("isCreateTable" , "false" );
62
+ // 多少条数据写入一次
63
+ dirtyMap .put ("batchSize" , "1" );
64
+ dirtyMap .put ("tableName" , "dirtyData" );
65
+ }
47
66
48
67
@ Test
49
68
public void parseParams () throws Exception {
50
69
String [] sql = new String []{"-mode" , "yarnPer" , "-sql" , "/Users/maqi/tmp/json/group_tmp4.txt" , "-name" , "PluginLoadModeTest" ,
51
- "-localSqlPluginPath" , "/Users/maqi/code/dtstack/dt-center-flinkStreamSQL/plugins" ,
52
- "-remoteSqlPluginPath" , "/Users/maqi/code/dtstack/dt-center-flinkStreamSQL/plugins" ,
53
- "-flinkconf" , "/Users/maqi/tmp/flink-1.8.1/conf" ,
54
- "-confProp" , "{\" sql.checkpoint.cleanup.mode\" :\" false\" ,\" sql.checkpoint.interval\" :10000,\" time.characteristic\" :\" EventTime\" }" ,
55
- "-yarnconf" , "/Users/maqi/tmp/hadoop" , "-flinkJarPath" , "/Users/maqi/tmp/flink-1.8.1/lib" , "-queue" , "c" , "-pluginLoadMode" , "shipfile" };
70
+ "-localSqlPluginPath" , "/Users/maqi/code/dtstack/dt-center-flinkStreamSQL/plugins" ,
71
+ "-remoteSqlPluginPath" , "/Users/maqi/code/dtstack/dt-center-flinkStreamSQL/plugins" ,
72
+ "-flinkconf" , "/Users/maqi/tmp/flink-1.8.1/conf" ,
73
+ "-confProp" , "{\" sql.checkpoint.cleanup.mode\" :\" false\" ,\" sql.checkpoint.interval\" :10000,\" time.characteristic\" :\" EventTime\" }" ,
74
+ "-yarnconf" , "/Users/maqi/tmp/hadoop" , "-flinkJarPath" , "/Users/maqi/tmp/flink-1.8.1/lib" , "-queue" , "c" , "-pluginLoadMode" , "shipfile" };
56
75
57
76
ExecuteProcessHelper .parseParams (sql );
58
77
}
59
78
60
79
@ Test
61
- public void checkRemoteSqlPluginPath (){
62
- ExecuteProcessHelper .checkRemoteSqlPluginPath (null , EPluginLoadMode .SHIPFILE .name (), ClusterMode .local .name ());
80
+ public void checkRemoteSqlPluginPath () {
81
+ ExecuteProcessHelper .checkRemoteSqlPluginPath (null , EPluginLoadMode .SHIPFILE .name (), ClusterMode .local .name ());
63
82
64
83
}
65
84
66
85
// @Test
67
86
public void getStreamExecution () throws Exception {
68
87
String [] sql = new String []{"-mode" , "yarnPer" , "-sql" , "/Users/maqi/tmp/json/group_tmp4.txt" , "-name" , "PluginLoadModeTest" ,
69
- "-localSqlPluginPath" , "/Users/maqi/code/dtstack/dt-center-flinkStreamSQL/plugins" ,
70
- "-remoteSqlPluginPath" , "/Users/maqi/code/dtstack/dt-center-flinkStreamSQL/plugins" ,
71
- "-flinkconf" , "/Users/maqi/tmp/flink-1.8.1/conf" ,
72
- "-confProp" , "{\" sql.checkpoint.cleanup.mode\" :\" false\" ,\" sql.checkpoint.interval\" :10000,\" time.characteristic\" :\" EventTime\" }" ,
73
- "-yarnconf" , "/Users/maqi/tmp/hadoop" , "-flinkJarPath" , "/Users/maqi/tmp/flink-1.8.1/lib" , "-queue" , "c" , "-pluginLoadMode" , "shipfile" };
88
+ "-localSqlPluginPath" , "/Users/maqi/code/dtstack/dt-center-flinkStreamSQL/plugins" ,
89
+ "-remoteSqlPluginPath" , "/Users/maqi/code/dtstack/dt-center-flinkStreamSQL/plugins" ,
90
+ "-flinkconf" , "/Users/maqi/tmp/flink-1.8.1/conf" ,
91
+ "-confProp" , "{\" sql.checkpoint.cleanup.mode\" :\" false\" ,\" sql.checkpoint.interval\" :10000,\" time.characteristic\" :\" EventTime\" }" ,
92
+ "-yarnconf" , "/Users/maqi/tmp/hadoop" , "-flinkJarPath" , "/Users/maqi/tmp/flink-1.8.1/lib" , "-queue" , "c" , "-pluginLoadMode" , "shipfile" };
74
93
ParamsInfo paramsInfo = ExecuteProcessHelper .parseParams (sql );
75
94
PowerMockito .mockStatic (SqlParser .class );
76
95
SqlTree sqlTree = mock (SqlTree .class );
@@ -113,7 +132,7 @@ public void registerTable() throws Exception {
113
132
PowerMockito .mockStatic (PluginUtil .class );
114
133
115
134
PowerMockito .mockStatic (StreamSourceFactory .class );
116
- when (StreamSourceFactory .getStreamSource (anyObject (), anyObject (), anyObject (), anyString (),anyString ())).thenReturn (table );
135
+ when (StreamSourceFactory .getStreamSource (anyObject (), anyObject (), anyObject (), anyString (), anyString ())).thenReturn (table );
117
136
118
137
TableSink tableSink = mock (TableSink .class );
119
138
PowerMockito .mockStatic (StreamSinkFactory .class );
@@ -133,7 +152,7 @@ public void registerTable() throws Exception {
133
152
when (sideTableInfo .getCacheType ()).thenReturn ("all" );
134
153
when (sideTableInfo .getName ()).thenReturn ("sideTable" );
135
154
when (sideTableInfo .getType ()).thenReturn ("redis" );
136
- when (PluginUtil .buildSidePathByLoadMode (anyString (), anyString (), anyString (), anyString (), anyString (),anyString ())).thenReturn (new URL ("file://a" ));
155
+ when (PluginUtil .buildSidePathByLoadMode (anyString (), anyString (), anyString (), anyString (), anyString (), anyString ())).thenReturn (new URL ("file://a" ));
137
156
138
157
AbstractTargetTableInfo targetTableInfo = mock (AbstractTargetTableInfo .class );
139
158
when (targetTableInfo .getName ()).thenReturn ("sinkTable" );
@@ -147,12 +166,21 @@ public void registerTable() throws Exception {
147
166
tableMap .put ("target" , targetTableInfo );
148
167
when (sqlTree .getTableInfoMap ()).thenReturn (tableMap );
149
168
150
- ExecuteProcessHelper .registerTable (sqlTree , env , tableEnv , localSqlPluginPath , remoteSqlPluginPath , pluginLoadMode , sideTableMap , registerTableCache );
169
+ // SqlTree sqlTree
170
+ // , StreamExecutionEnvironment env
171
+ // , StreamTableEnvironment tableEnv
172
+ // , String localSqlPluginPath
173
+ // , String remoteSqlPluginPath
174
+ // , String pluginLoadMode
175
+ // , Map<String, Object> dirtyProperties
176
+ // , Map<String, AbstractSideTableInfo> sideTableMap
177
+ // , Map<String, Table> registerTableCache
178
+ ExecuteProcessHelper .registerTable (sqlTree , env , tableEnv , localSqlPluginPath , remoteSqlPluginPath , pluginLoadMode , dirtyMap , sideTableMap , registerTableCache );
151
179
}
152
180
153
181
@ Test
154
182
public void registerPluginUrlToCachedFile () throws Exception {
155
- StreamExecutionEnvironment executionEnvironment = ExecuteProcessHelper .getStreamExeEnv (new Properties (), "local" );
183
+ StreamExecutionEnvironment executionEnvironment = ExecuteProcessHelper .getStreamExeEnv (new Properties (), "local" );
156
184
Set <URL > classPathSet = Sets .newHashSet ();
157
185
classPathSet .add (new URL ("file://" ));
158
186
ExecuteProcessHelper .registerPluginUrlToCachedFile (executionEnvironment , classPathSet );
@@ -164,5 +192,4 @@ public void getStreamExeEnv() throws Exception {
164
192
}
165
193
166
194
167
-
168
195
}
0 commit comments