@@ -94,12 +94,12 @@ protected void initCache() throws SQLException {
94
94
protected void reloadCache () {
95
95
//reload cacheRef and replace to old cacheRef
96
96
Map <String , List <Map <String , Object >>> newCache = Maps .newConcurrentMap ();
97
- cacheRef .set (newCache );
98
97
try {
99
98
loadData (newCache );
100
99
} catch (SQLException e ) {
101
100
throw new RuntimeException (e );
102
101
}
102
+ cacheRef .set (newCache );
103
103
LOG .info ("----- rdb all cacheRef reload end:{}" , Calendar .getInstance ());
104
104
}
105
105
@@ -123,9 +123,10 @@ public void flatMap(Tuple2<Boolean,Row> value, Collector<Tuple2<Boolean,Row>> ou
123
123
List <Map <String , Object >> cacheList = cacheRef .get ().get (cacheKey );
124
124
if (CollectionUtils .isEmpty (cacheList ) && sideInfo .getJoinType () == JoinType .LEFT ) {
125
125
out .collect (Tuple2 .of (value .f0 , fillData (value .f1 , null )));
126
+ } else if (!CollectionUtils .isEmpty (cacheList )) {
127
+ cacheList .stream ().forEach (one -> out .collect (Tuple2 .of (value .f0 , fillData (value .f1 , one ))));
126
128
}
127
129
128
- cacheList .stream ().forEach (one -> out .collect (Tuple2 .of (value .f0 , fillData (value .f1 , null ))));
129
130
}
130
131
131
132
@ Override
@@ -219,7 +220,7 @@ private void queryAndFillData(Map<String, List<Map<String, Object>>> tmpCache, C
219
220
}
220
221
221
222
String cacheKey = sideInfo .getEqualFieldList ().stream ()
222
- .map (equalField -> oneRow . get ( equalField ) )
223
+ .map (oneRow :: get )
223
224
.map (Object ::toString )
224
225
.collect (Collectors .joining ("_" ));
225
226
@@ -233,7 +234,8 @@ public int getFetchSize() {
233
234
}
234
235
235
236
/**
236
- * get jdbc connection
237
+ * get jdbc connection
238
+ *
237
239
* @param dbURL
238
240
* @param userName
239
241
* @param password
0 commit comments