Skip to content

Commit 0e612c3

Browse files
committed
Merge branch 'hotfix_3.9.4_23229' into '1.8_release_3.9.x'
时间处理 See merge request !290
2 parents 3df78d7 + 8fb5b4f commit 0e612c3

File tree

3 files changed

+122
-21
lines changed

3 files changed

+122
-21
lines changed

core/src/main/java/com/dtstack/flink/sql/util/DateUtil.java

+62
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,18 @@
2323
import java.sql.Timestamp;
2424
import java.text.ParseException;
2525
import java.text.SimpleDateFormat;
26+
import java.time.Instant;
27+
import java.time.LocalDate;
28+
import java.time.LocalTime;
29+
import java.time.ZoneOffset;
2630
import java.util.Calendar;
2731
import java.util.Date;
2832
import java.util.Locale;
2933
import java.util.SimpleTimeZone;
34+
import java.util.TimeZone;
35+
import java.util.regex.Pattern;
36+
37+
import static java.time.format.DateTimeFormatter.ISO_INSTANT;
3038

3139

3240
/**
@@ -47,6 +55,12 @@ public class DateUtil {
4755
static final SimpleDateFormat dateFormatter = new SimpleDateFormat(dateFormat);
4856
static final SimpleDateFormat timeFormatter = new SimpleDateFormat(timeFormat);
4957

58+
private static final Pattern DATETIME = Pattern.compile("^\\d{4}-(?:0[0-9]|1[0-2])-[0-9]{2}T\\d{2}:\\d{2}:\\d{2}(\\.\\d{3,9})?Z$");
59+
private static final Pattern DATE = Pattern.compile("^\\d{4}-(?:0[0-9]|1[0-2])-[0-9]{2}$");
60+
61+
private static final int MILLIS_PER_SECOND = 1000;
62+
63+
5064
public static java.sql.Date columnToDate(Object column) {
5165
if(column instanceof String) {
5266
return new java.sql.Date(stringToDate((String)column).getTime());
@@ -770,4 +784,52 @@ public static String timestampToString(Date date) {
770784
return datetimeFormatter.format(date);
771785
}
772786

787+
788+
public static Timestamp getTimestampFromStr(String timeStr) {
789+
if (DATETIME.matcher(timeStr).matches()) {
790+
Instant instant = Instant.from(ISO_INSTANT.parse(timeStr));
791+
return new Timestamp(instant.getEpochSecond() * MILLIS_PER_SECOND);
792+
} else {
793+
java.sql.Date date = null;
794+
try {
795+
date = new java.sql.Date(datetimeFormatter.parse(timeStr).getTime());
796+
} catch (ParseException e) {
797+
throw new RuntimeException("getTimestampFromStr error data is " + timeStr);
798+
}
799+
return new Timestamp(date.getTime());
800+
}
801+
}
802+
803+
public static java.sql.Date getDateFromStr(String dateStr) {
804+
// 2020-01-01 format
805+
if (DATE.matcher(dateStr).matches()) {
806+
// convert from local date to instant
807+
Instant instant = LocalDate.parse(dateStr).atTime(LocalTime.of(0, 0, 0, 0)).toInstant(ZoneOffset.UTC);
808+
// calculate the timezone offset in millis
809+
int offset = TimeZone.getDefault().getOffset(instant.toEpochMilli());
810+
// need to remove the offset since time has no TZ component
811+
return new java.sql.Date(instant.toEpochMilli() - offset);
812+
} else if (DATETIME.matcher(dateStr).matches()) {
813+
// 2020-01-01T12:12:12Z format
814+
Instant instant = Instant.from(ISO_INSTANT.parse(dateStr));
815+
return new java.sql.Date(instant.toEpochMilli());
816+
} else {
817+
try {
818+
// 2020-01-01 12:12:12.0 format
819+
return new java.sql.Date(datetimeFormatter.parse(dateStr).getTime());
820+
} catch (ParseException e) {
821+
throw new RuntimeException("String convert to Date fail.");
822+
}
823+
}
824+
}
825+
826+
827+
public static String getStringFromTimestamp(Timestamp timestamp) {
828+
return datetimeFormatter.format(timestamp);
829+
}
830+
831+
public static String getStringFromDate(java.sql.Date date) {
832+
return dateFormatter.format(date);
833+
}
834+
773835
}

core/src/main/java/com/dtstack/flink/sql/util/MathUtil.java

+19-13
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
* limitations under the License.
1717
*/
1818

19-
2019

2120
package com.dtstack.flink.sql.util;
2221

@@ -27,6 +26,16 @@
2726
import java.text.ParseException;
2827
import java.text.SimpleDateFormat;
2928

29+
30+
import java.time.Instant;
31+
import java.time.LocalDate;
32+
import java.time.LocalTime;
33+
import java.time.ZoneOffset;
34+
import java.util.TimeZone;
35+
import java.util.regex.Pattern;
36+
37+
import static java.time.format.DateTimeFormatter.ISO_INSTANT;
38+
3039
/**
3140
* Convert val to specified numeric type
3241
* Date: 2017/4/21
@@ -35,7 +44,6 @@
3544
*/
3645

3746
public class MathUtil {
38-
3947
public static Long getLongVal(Object obj) {
4048
if (obj == null) {
4149
return null;
@@ -126,12 +134,12 @@ public static Double getDoubleVal(Object obj) {
126134
return Double.valueOf((String) obj);
127135
} else if (obj instanceof Float) {
128136
return ((Float) obj).doubleValue();
129-
} else if (obj instanceof Double){
137+
} else if (obj instanceof Double) {
130138
return (Double) obj;
131-
}else if (obj instanceof BigDecimal) {
139+
} else if (obj instanceof BigDecimal) {
132140
return ((BigDecimal) obj).doubleValue();
133-
}else if (obj instanceof Integer){
134-
return ((Integer)obj).doubleValue();
141+
} else if (obj instanceof Integer) {
142+
return ((Integer) obj).doubleValue();
135143
}
136144

137145
throw new RuntimeException("not support type of " + obj.getClass() + " convert to Double.");
@@ -229,12 +237,7 @@ public static Date getDate(Object obj) {
229237
return null;
230238
}
231239
if (obj instanceof String) {
232-
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
233-
try {
234-
return new Date(format.parse((String) obj).getTime());
235-
} catch (ParseException e) {
236-
throw new RuntimeException("String convert to Date fail.");
237-
}
240+
return DateUtil.getDateFromStr((String) obj);
238241
} else if (obj instanceof Timestamp) {
239242
return new Date(((Timestamp) obj).getTime());
240243
} else if (obj instanceof Date) {
@@ -243,6 +246,8 @@ public static Date getDate(Object obj) {
243246
throw new RuntimeException("not support type of " + obj.getClass() + " convert to Date.");
244247
}
245248

249+
250+
246251
public static Timestamp getTimestamp(Object obj) {
247252
if (obj == null) {
248253
return null;
@@ -252,8 +257,9 @@ public static Timestamp getTimestamp(Object obj) {
252257
} else if (obj instanceof Date) {
253258
return new Timestamp(((Date) obj).getTime());
254259
} else if (obj instanceof String) {
255-
return new Timestamp(getDate(obj).getTime());
260+
return DateUtil.getTimestampFromStr(obj.toString());
256261
}
257262
throw new RuntimeException("not support type of " + obj.getClass() + " convert to Date.");
258263
}
264+
259265
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

+41-8
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,9 @@
2323
import com.dtstack.flink.sql.side.*;
2424
import com.dtstack.flink.sql.side.cache.CacheObj;
2525
import com.dtstack.flink.sql.side.rdb.util.SwitchUtil;
26-
import io.vertx.core.AsyncResult;
27-
import io.vertx.core.Handler;
28-
import io.vertx.core.Vertx;
29-
import io.vertx.core.VertxOptions;
26+
import com.dtstack.flink.sql.util.DateUtil;
3027
import io.vertx.core.json.JsonArray;
3128
import io.vertx.core.json.JsonObject;
32-
import io.vertx.ext.jdbc.JDBCClient;
3329
import io.vertx.ext.sql.SQLClient;
3430
import io.vertx.ext.sql.SQLConnection;
3531
import com.google.common.collect.Lists;
@@ -39,9 +35,9 @@
3935
import org.slf4j.Logger;
4036
import org.slf4j.LoggerFactory;
4137

38+
import java.math.BigDecimal;
4239
import java.sql.Timestamp;
43-
import java.util.Collection;
44-
import java.util.Collections;
40+
import java.time.Instant;
4541
import java.util.List;
4642
import java.util.Map;
4743

@@ -88,7 +84,7 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
8884
dealMissKey(inputRow, resultFuture);
8985
return;
9086
}
91-
inputParams.add(equalObj);
87+
inputParams.add(convertDataType(equalObj));
9288
}
9389

9490
String key = buildCacheKey(inputParams);
@@ -152,6 +148,43 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
152148
});
153149
}
154150

151+
152+
private Object convertDataType(Object val) {
153+
if (val == null) {
154+
// OK
155+
} else if (val instanceof Number && !(val instanceof BigDecimal)) {
156+
// OK
157+
} else if (val instanceof Boolean) {
158+
// OK
159+
} else if (val instanceof String) {
160+
// OK
161+
} else if (val instanceof Character) {
162+
// OK
163+
} else if (val instanceof CharSequence) {
164+
165+
} else if (val instanceof JsonObject) {
166+
167+
} else if (val instanceof JsonArray) {
168+
169+
} else if (val instanceof Map) {
170+
171+
} else if (val instanceof List) {
172+
173+
} else if (val instanceof byte[]) {
174+
175+
} else if (val instanceof Instant) {
176+
177+
} else if (val instanceof Timestamp) {
178+
val = DateUtil.getStringFromTimestamp((Timestamp) val);
179+
} else if (val instanceof java.util.Date) {
180+
val = DateUtil.getStringFromDate((java.sql.Date) val);
181+
} else {
182+
val = val.toString();
183+
}
184+
return val;
185+
}
186+
187+
155188
protected List<Row> getRows(Row inputRow, List<JsonArray> cacheContent, List<JsonArray> results) {
156189
List<Row> rowList = Lists.newArrayList();
157190
for (JsonArray line : results) {

0 commit comments

Comments
 (0)