Skip to content

Commit 0beef49

Browse files
committed
Merge branch '1.8_release_3.9.x' into hotfix_1.8_3.9.x_21763
# Conflicts: # core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java
2 parents e568264 + 08ecc68 commit 0beef49

File tree

6 files changed

+18
-13
lines changed

6 files changed

+18
-13
lines changed

clickhouse/clickhouse-side/clickhouse-all-side/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
<configuration>
4242
<artifactSet>
4343
<excludes>
44-
44+
<exclude>org.slf4j</exclude>
4545
</excludes>
4646
</artifactSet>
4747
<filters>

clickhouse/clickhouse-side/clickhouse-async-side/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
<configuration>
4242
<artifactSet>
4343
<excludes>
44-
44+
<exclude>org.slf4j</exclude>
4545
</excludes>
4646
</artifactSet>
4747
<filters>

clickhouse/clickhouse-sink/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
<configuration>
4141
<artifactSet>
4242
<excludes>
43-
43+
<exclude>org.slf4j</exclude>
4444
</excludes>
4545
</artifactSet>
4646
<filters>

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,8 @@ private static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironmen
244244

245245
RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getTypes(), adaptTable.getSchema().getColumnNames());
246246
DataStream adaptStream = tableEnv.toRetractStream(adaptTable, typeInfo)
247-
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
247+
.filter((Tuple2<Boolean, Row> f0) -> f0.f0)
248+
.map((Tuple2<Boolean, Row> f0) -> f0.f1)
248249
.returns(typeInfo);
249250

250251
String fields = String.join(",", typeInfo.getFieldNames());

core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ public abstract class AsyncReqRow extends RichAsyncFunction<Row, Row> implements
5151
private static final Logger LOG = LoggerFactory.getLogger(AsyncReqRow.class);
5252
private static final long serialVersionUID = 2098635244857937717L;
5353

54+
private static int TIMEOUT_LOG_FLUSH_NUM = 10;
55+
private int timeOutNum = 0;
56+
5457
protected SideInfo sideInfo;
5558
protected transient Counter parseErrorRecords;
5659

@@ -119,14 +122,14 @@ protected void dealCacheData(String key, CacheObj missKeyObj) {
119122

120123
@Override
121124
public void timeout(Row input, ResultFuture<Row> resultFuture) throws Exception {
122-
StreamRecordQueueEntry<Row> future = (StreamRecordQueueEntry<Row>)resultFuture;
123-
try {
124-
if (null == future.get()) {
125-
resultFuture.completeExceptionally(new TimeoutException("Async function call has timed out."));
126-
}
127-
} catch (Exception e) {
128-
resultFuture.completeExceptionally(new Exception(e));
125+
126+
//TODO 需要添加数据指标
127+
if(timeOutNum % TIMEOUT_LOG_FLUSH_NUM == 0){
128+
LOG.info("Async function call has timed out. input:" + input.toString());
129129
}
130+
131+
timeOutNum++;
132+
resultFuture.complete(null);
130133
}
131134

132135

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -803,8 +803,9 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
803803
RowTypeInfo typeInfo = new RowTypeInfo(targetTable.getSchema().getTypes(), targetTable.getSchema().getColumnNames());
804804

805805
DataStream adaptStream = tableEnv.toRetractStream(targetTable, org.apache.flink.types.Row.class)
806-
.map((Tuple2<Boolean, Row> f0) -> f0.f1)
807-
.returns(Row.class);
806+
.filter((Tuple2<Boolean, Row> f0) -> f0.f0)
807+
.map((Tuple2<Boolean, Row> f0) -> f0.f1)
808+
.returns(Row.class);
808809

809810

810811
//join side table before keyby ===> Reducing the size of each dimension table cache of async

0 commit comments

Comments
 (0)