Skip to content

Commit 2fd03e6

Browse files
authored
[FLINK-36865][cdc] Provide UNIX_TIMESTAMP series functions in YAML pipeline
This closes #3819.
1 parent 39fd00c commit 2fd03e6

File tree

8 files changed

+477
-3
lines changed

8 files changed

+477
-3
lines changed

Diff for: docs/content.zh/docs/core-concept/transform.md

+3
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,9 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [
160160
| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
161161
| TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. |
162162
| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. |
163+
| FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | Returns a representation of the numeric argument as a value in string format (default is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing seconds since ‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function. The return value is expressed in the session time zone (specified in TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01-01 00:00:44’ if in UTC time zone, but returns ‘1970-01-01 09:00:44’ if in ‘Asia/Tokyo’ time zone. |
164+
| UNIX_TIMESTAMP() | unixTimestamp() | Gets current Unix timestamp in seconds. This function is not deterministic which means the value would be recalculated for each record. |
165+
| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified timezone in table config.<br/>If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed, the default value Long.MIN_VALUE(-9223372036854775808) will be returned.|
163166
164167
## Conditional Functions
165168

Diff for: docs/content/docs/core-concept/transform.md

+3
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,9 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [
160160
| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
161161
| TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. |
162162
| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. |
163+
| FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | Returns a representation of the numeric argument as a value in string format (default is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing seconds since ‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function. The return value is expressed in the session time zone (specified in TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01-01 00:00:44’ if in UTC time zone, but returns ‘1970-01-01 09:00:44’ if in ‘Asia/Tokyo’ time zone. |
164+
| UNIX_TIMESTAMP() | unixTimestamp() | Gets current Unix timestamp in seconds. This function is not deterministic which means the value would be recalculated for each record. |
165+
| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified timezone in table config.<br/>If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed, the default value Long.MIN_VALUE(-9223372036854775808) will be returned.|
163166
164167
## Conditional Functions
165168

Diff for: flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java

+61-1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,16 @@ public class DateTimeUtils {
4343
*/
4444
public static final long MILLIS_PER_DAY = 86400000L; // = 24 * 60 * 60 * 1000
4545

46+
/** The SimpleDateFormat string for ISO dates, "yyyy-MM-dd". */
47+
private static final String DATE_FORMAT_STRING = "yyyy-MM-dd";
48+
49+
/** The SimpleDateFormat string for ISO times, "HH:mm:ss". */
50+
private static final String TIME_FORMAT_STRING = "HH:mm:ss";
51+
52+
/** The SimpleDateFormat string for ISO timestamps, "yyyy-MM-dd HH:mm:ss". */
53+
private static final String TIMESTAMP_FORMAT_STRING =
54+
DATE_FORMAT_STRING + " " + TIME_FORMAT_STRING;
55+
4656
/**
4757
* A ThreadLocal cache map for SimpleDateFormat, because SimpleDateFormat is not thread-safe.
4858
* (string_format) => formatter
@@ -109,7 +119,7 @@ private static long internalParseTimestampMillis(String dateStr, String format,
109119
} catch (ParseException e) {
110120
LOG.error(
111121
String.format(
112-
"Exception when parsing datetime string '%s' in format '%s'",
122+
"Exception when parsing datetime string '%s' in format '%s', the default value Long.MIN_VALUE(-9223372036854775808) will be returned.",
113123
dateStr, format),
114124
e);
115125
return Long.MIN_VALUE;
@@ -128,6 +138,56 @@ private static int ymdToJulian(int year, int month, int day) {
128138
return day + (153 * m + 2) / 5 + 365 * y + y / 4 - y / 100 + y / 400 - 32045;
129139
}
130140

141+
// --------------------------------------------------------------------------------------------
142+
// UNIX TIME
143+
// --------------------------------------------------------------------------------------------
144+
145+
/**
146+
* Convert unix timestamp (seconds since '1970-01-01 00:00:00' UTC) to datetime string in the
147+
* "yyyy-MM-dd HH:mm:ss" format.
148+
*/
149+
public static String formatUnixTimestamp(long unixTime, TimeZone timeZone) {
150+
return formatUnixTimestamp(unixTime, TIMESTAMP_FORMAT_STRING, timeZone);
151+
}
152+
153+
/**
154+
* Convert unix timestamp (seconds since '1970-01-01 00:00:00' UTC) to datetime string in the
155+
* given format.
156+
*/
157+
public static String formatUnixTimestamp(long unixTime, String format, TimeZone timeZone) {
158+
SimpleDateFormat formatter = FORMATTER_CACHE.get(format);
159+
formatter.setTimeZone(timeZone);
160+
Date date = new Date(unixTime * 1000);
161+
try {
162+
return formatter.format(date);
163+
} catch (Exception e) {
164+
LOG.error("Exception when formatting.", e);
165+
return null;
166+
}
167+
}
168+
169+
/**
170+
* Returns the value of the argument as an unsigned integer in seconds since '1970-01-01
171+
* 00:00:00' UTC.
172+
*/
173+
public static long unixTimestamp(String dateStr, TimeZone timeZone) {
174+
return unixTimestamp(dateStr, TIMESTAMP_FORMAT_STRING, timeZone);
175+
}
176+
177+
/**
178+
* Returns the value of the argument as an unsigned integer in seconds since '1970-01-01
179+
* 00:00:00' UTC.
180+
*/
181+
public static long unixTimestamp(String dateStr, String format, TimeZone timeZone) {
182+
long ts = internalParseTimestampMillis(dateStr, format, timeZone);
183+
if (ts == Long.MIN_VALUE) {
184+
return Long.MIN_VALUE;
185+
} else {
186+
// return the seconds
187+
return ts / 1000;
188+
}
189+
}
190+
131191
// --------------------------------------------------------------------------------------------
132192
// Format
133193
// --------------------------------------------------------------------------------------------

Diff for: flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java

+21
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,27 @@ public static int currentDate(long epochTime, String timezone) {
8080
return timestampMillisToDate(localtimestamp(epochTime, timezone).getMillisecond());
8181
}
8282

83+
public static String fromUnixtime(long seconds, String timezone) {
84+
return DateTimeUtils.formatUnixTimestamp(seconds, TimeZone.getTimeZone(timezone));
85+
}
86+
87+
public static String fromUnixtime(long seconds, String format, String timezone) {
88+
return DateTimeUtils.formatUnixTimestamp(seconds, format, TimeZone.getTimeZone(timezone));
89+
}
90+
91+
public static long unixTimestamp(long epochTime, String timezone) {
92+
return epochTime / 1000;
93+
}
94+
95+
public static long unixTimestamp(String dateTimeStr, long epochTime, String timezone) {
96+
return DateTimeUtils.unixTimestamp(dateTimeStr, TimeZone.getTimeZone(timezone));
97+
}
98+
99+
public static long unixTimestamp(
100+
String dateTimeStr, String format, long epochTime, String timezone) {
101+
return DateTimeUtils.unixTimestamp(dateTimeStr, format, TimeZone.getTimeZone(timezone));
102+
}
103+
83104
public static String dateFormat(TimestampData timestamp, String format) {
84105
return DateTimeUtils.formatTimestampMillis(
85106
timestamp.getMillisecond(), format, TimeZone.getTimeZone("UTC"));

Diff for: flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,18 @@ public class JaninoCompiler {
5757
Arrays.asList("CURRENT_TIMESTAMP", "NOW");
5858

5959
private static final List<String> TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS =
60-
Arrays.asList("LOCALTIME", "LOCALTIMESTAMP", "CURRENT_TIME", "CURRENT_DATE");
60+
Arrays.asList(
61+
"LOCALTIME",
62+
"LOCALTIMESTAMP",
63+
"CURRENT_TIME",
64+
"CURRENT_DATE",
65+
"UNIX_TIMESTAMP");
6166

6267
private static final List<String> TIMEZONE_FREE_TEMPORAL_CONVERSION_FUNCTIONS =
6368
Arrays.asList("DATE_FORMAT");
6469

6570
private static final List<String> TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS =
66-
Arrays.asList("TO_DATE", "TO_TIMESTAMP");
71+
Arrays.asList("TO_DATE", "TO_TIMESTAMP", "FROM_UNIXTIME");
6772

6873
public static final String DEFAULT_EPOCH_TIME = "__epoch_time__";
6974
public static final String DEFAULT_TIME_ZONE = "__time_zone__";

Diff for: flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java

+21
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,27 @@ public SqlSyntax getSyntax() {
212212
return SqlSyntax.FUNCTION;
213213
}
214214
};
215+
public static final SqlFunction UNIX_TIMESTAMP =
216+
new SqlFunction(
217+
"UNIX_TIMESTAMP",
218+
SqlKind.OTHER_FUNCTION,
219+
ReturnTypes.BIGINT_NULLABLE,
220+
null,
221+
OperandTypes.or(
222+
OperandTypes.NILADIC,
223+
OperandTypes.family(SqlTypeFamily.CHARACTER),
224+
OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)),
225+
SqlFunctionCategory.TIMEDATE);
226+
public static final SqlFunction FROM_UNIXTIME =
227+
new SqlFunction(
228+
"FROM_UNIXTIME",
229+
SqlKind.OTHER_FUNCTION,
230+
TransformSqlReturnTypes.VARCHAR_FORCE_NULLABLE,
231+
null,
232+
OperandTypes.or(
233+
OperandTypes.family(SqlTypeFamily.INTEGER),
234+
OperandTypes.family(SqlTypeFamily.INTEGER, SqlTypeFamily.CHARACTER)),
235+
SqlFunctionCategory.TIMEDATE);
215236
public static final SqlFunction DATE_FORMAT =
216237
new SqlFunction(
217238
"DATE_FORMAT",

0 commit comments

Comments
 (0)