diff --git a/docs/content.zh/docs/core-concept/transform.md b/docs/content.zh/docs/core-concept/transform.md
index 892243628ff..dfa90a72802 100644
--- a/docs/content.zh/docs/core-concept/transform.md
+++ b/docs/content.zh/docs/core-concept/transform.md
@@ -160,6 +160,9 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [
| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
| TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. |
| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. |
+| FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | Returns a representation of the numeric argument as a value in string format (default is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing seconds since ‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function. The return value is expressed in the session time zone (specified in TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01-01 00:00:44’ if in UTC time zone, but returns ‘1970-01-01 09:00:44’ if in ‘Asia/Tokyo’ time zone. |
+| UNIX_TIMESTAMP() | unixTimestamp() | Gets current Unix timestamp in seconds. This function is not deterministic which means the value would be recalculated for each record. |
+| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified timezone in table config.
If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed, the default value Long.MIN_VALUE(-9223372036854775808) will be returned.|
## Conditional Functions
diff --git a/docs/content/docs/core-concept/transform.md b/docs/content/docs/core-concept/transform.md
index b4979beca51..67eeaf2d819 100644
--- a/docs/content/docs/core-concept/transform.md
+++ b/docs/content/docs/core-concept/transform.md
@@ -160,6 +160,9 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [
| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
| TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. |
| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. |
+| FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | Returns a representation of the numeric argument as a value in string format (default is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing seconds since ‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function. The return value is expressed in the session time zone (specified in TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01-01 00:00:44’ if in UTC time zone, but returns ‘1970-01-01 09:00:44’ if in ‘Asia/Tokyo’ time zone. |
+| UNIX_TIMESTAMP() | unixTimestamp() | Gets current Unix timestamp in seconds. This function is not deterministic which means the value would be recalculated for each record. |
+| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified timezone in table config.
If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed, the default value Long.MIN_VALUE(-9223372036854775808) will be returned.|
## Conditional Functions
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java
index b923107e2c9..1fb080de99f 100644
--- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java
@@ -43,6 +43,16 @@ public class DateTimeUtils {
*/
public static final long MILLIS_PER_DAY = 86400000L; // = 24 * 60 * 60 * 1000
+ /** The SimpleDateFormat string for ISO dates, "yyyy-MM-dd". */
+ private static final String DATE_FORMAT_STRING = "yyyy-MM-dd";
+
+ /** The SimpleDateFormat string for ISO times, "HH:mm:ss". */
+ private static final String TIME_FORMAT_STRING = "HH:mm:ss";
+
+ /** The SimpleDateFormat string for ISO timestamps, "yyyy-MM-dd HH:mm:ss". */
+ private static final String TIMESTAMP_FORMAT_STRING =
+ DATE_FORMAT_STRING + " " + TIME_FORMAT_STRING;
+
/**
* A ThreadLocal cache map for SimpleDateFormat, because SimpleDateFormat is not thread-safe.
* (string_format) => formatter
@@ -109,7 +119,7 @@ private static long internalParseTimestampMillis(String dateStr, String format,
} catch (ParseException e) {
LOG.error(
String.format(
- "Exception when parsing datetime string '%s' in format '%s'",
+ "Exception when parsing datetime string '%s' in format '%s', the default value Long.MIN_VALUE(-9223372036854775808) will be returned.",
dateStr, format),
e);
return Long.MIN_VALUE;
@@ -128,6 +138,56 @@ private static int ymdToJulian(int year, int month, int day) {
return day + (153 * m + 2) / 5 + 365 * y + y / 4 - y / 100 + y / 400 - 32045;
}
+ // --------------------------------------------------------------------------------------------
+ // UNIX TIME
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Convert unix timestamp (seconds since '1970-01-01 00:00:00' UTC) to datetime string in the
+ * "yyyy-MM-dd HH:mm:ss" format.
+ */
+ public static String formatUnixTimestamp(long unixTime, TimeZone timeZone) {
+ return formatUnixTimestamp(unixTime, TIMESTAMP_FORMAT_STRING, timeZone);
+ }
+
+ /**
+ * Convert unix timestamp (seconds since '1970-01-01 00:00:00' UTC) to datetime string in the
+ * given format.
+ */
+ public static String formatUnixTimestamp(long unixTime, String format, TimeZone timeZone) {
+ SimpleDateFormat formatter = FORMATTER_CACHE.get(format);
+ formatter.setTimeZone(timeZone);
+ Date date = new Date(unixTime * 1000);
+ try {
+ return formatter.format(date);
+ } catch (Exception e) {
+ LOG.error("Exception when formatting.", e);
+ return null;
+ }
+ }
+
+ /**
+ * Returns the value of the argument as an unsigned integer in seconds since '1970-01-01
+ * 00:00:00' UTC.
+ */
+ public static long unixTimestamp(String dateStr, TimeZone timeZone) {
+ return unixTimestamp(dateStr, TIMESTAMP_FORMAT_STRING, timeZone);
+ }
+
+ /**
+ * Returns the value of the argument as an unsigned integer in seconds since '1970-01-01
+ * 00:00:00' UTC.
+ */
+ public static long unixTimestamp(String dateStr, String format, TimeZone timeZone) {
+ long ts = internalParseTimestampMillis(dateStr, format, timeZone);
+ if (ts == Long.MIN_VALUE) {
+ return Long.MIN_VALUE;
+ } else {
+ // return the seconds
+ return ts / 1000;
+ }
+ }
+
// --------------------------------------------------------------------------------------------
// Format
// --------------------------------------------------------------------------------------------
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
index 6f6d52a3b9d..a7555203c72 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
@@ -80,6 +80,27 @@ public static int currentDate(long epochTime, String timezone) {
return timestampMillisToDate(localtimestamp(epochTime, timezone).getMillisecond());
}
+ public static String fromUnixtime(long seconds, String timezone) {
+ return DateTimeUtils.formatUnixTimestamp(seconds, TimeZone.getTimeZone(timezone));
+ }
+
+ public static String fromUnixtime(long seconds, String format, String timezone) {
+ return DateTimeUtils.formatUnixTimestamp(seconds, format, TimeZone.getTimeZone(timezone));
+ }
+
+ public static long unixTimestamp(long epochTime, String timezone) {
+ return epochTime / 1000;
+ }
+
+ public static long unixTimestamp(String dateTimeStr, long epochTime, String timezone) {
+ return DateTimeUtils.unixTimestamp(dateTimeStr, TimeZone.getTimeZone(timezone));
+ }
+
+ public static long unixTimestamp(
+ String dateTimeStr, String format, long epochTime, String timezone) {
+ return DateTimeUtils.unixTimestamp(dateTimeStr, format, TimeZone.getTimeZone(timezone));
+ }
+
public static String dateFormat(TimestampData timestamp, String format) {
return DateTimeUtils.formatTimestampMillis(
timestamp.getMillisecond(), format, TimeZone.getTimeZone("UTC"));
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
index 6f5b2612514..f60bf968ca3 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
@@ -57,13 +57,18 @@ public class JaninoCompiler {
Arrays.asList("CURRENT_TIMESTAMP", "NOW");
private static final List TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS =
- Arrays.asList("LOCALTIME", "LOCALTIMESTAMP", "CURRENT_TIME", "CURRENT_DATE");
+ Arrays.asList(
+ "LOCALTIME",
+ "LOCALTIMESTAMP",
+ "CURRENT_TIME",
+ "CURRENT_DATE",
+ "UNIX_TIMESTAMP");
private static final List TIMEZONE_FREE_TEMPORAL_CONVERSION_FUNCTIONS =
Arrays.asList("DATE_FORMAT");
private static final List TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS =
- Arrays.asList("TO_DATE", "TO_TIMESTAMP");
+ Arrays.asList("TO_DATE", "TO_TIMESTAMP", "FROM_UNIXTIME");
public static final String DEFAULT_EPOCH_TIME = "__epoch_time__";
public static final String DEFAULT_TIME_ZONE = "__time_zone__";
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
index bb2c3503d51..d47db49f8e7 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
@@ -212,6 +212,27 @@ public SqlSyntax getSyntax() {
return SqlSyntax.FUNCTION;
}
};
+ public static final SqlFunction UNIX_TIMESTAMP =
+ new SqlFunction(
+ "UNIX_TIMESTAMP",
+ SqlKind.OTHER_FUNCTION,
+ ReturnTypes.BIGINT_NULLABLE,
+ null,
+ OperandTypes.or(
+ OperandTypes.NILADIC,
+ OperandTypes.family(SqlTypeFamily.CHARACTER),
+ OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)),
+ SqlFunctionCategory.TIMEDATE);
+ public static final SqlFunction FROM_UNIXTIME =
+ new SqlFunction(
+ "FROM_UNIXTIME",
+ SqlKind.OTHER_FUNCTION,
+ TransformSqlReturnTypes.VARCHAR_FORCE_NULLABLE,
+ null,
+ OperandTypes.or(
+ OperandTypes.family(SqlTypeFamily.INTEGER),
+ OperandTypes.family(SqlTypeFamily.INTEGER, SqlTypeFamily.CHARACTER)),
+ SqlFunctionCategory.TIMEDATE);
public static final SqlFunction DATE_FORMAT =
new SqlFunction(
"DATE_FORMAT",
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
index a7dc61ac4b6..944527b9008 100644
--- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
@@ -109,6 +109,40 @@ public class PostTransformOperatorTest {
.primaryKey("col1")
.build();
+ private static final TableId FROM_UNIX_TIME_TABLEID =
+ TableId.tableId("my_company", "my_branch", "from_unix_time_table");
+ private static final Schema FROM_UNIX_TIME_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING().notNull())
+ .physicalColumn("seconds", DataTypes.BIGINT())
+ .physicalColumn("format_str", DataTypes.STRING())
+ .primaryKey("col1")
+ .build();
+ private static final Schema EXPECTED_FROM_UNIX_TIME_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING().notNull())
+ .physicalColumn("from_unix_time", DataTypes.STRING())
+ .physicalColumn("from_unix_time_format", DataTypes.STRING())
+ .primaryKey("col1")
+ .build();
+
+ private static final TableId UNIX_TIMESTAMP_TABLEID =
+ TableId.tableId("my_company", "my_branch", "unix_timestamp_table");
+ private static final Schema UNIX_TIMESTAMP_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING().notNull())
+ .physicalColumn("date_time_str", DataTypes.STRING())
+ .physicalColumn("unix_timestamp_format", DataTypes.STRING())
+ .primaryKey("col1")
+ .build();
+ private static final Schema EXPECTED_UNIX_TIMESTAMP_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING().notNull())
+ .physicalColumn("unix_timestamp", DataTypes.BIGINT())
+ .physicalColumn("unix_timestamp_format", DataTypes.BIGINT())
+ .primaryKey("col1")
+ .build();
+
private static final TableId TIMESTAMPDIFF_TABLEID =
TableId.tableId("my_company", "my_branch", "timestampdiff_table");
private static final Schema TIMESTAMPDIFF_SCHEMA =
@@ -780,6 +814,322 @@ void testTimestampTransform() throws Exception {
transformFunctionEventEventOperatorTestHarness.close();
}
+ @Test
+ void testFromUnixTimeTransform() throws Exception {
+ // In UTC, from_unix_time(0s) ==> 1970-01-01 00:00:00
+ testFromUnixTimeTransformWithTimeZone("UTC", 0L, "1970-01-01 00:00:00");
+ // In UTC, from_unix_time(44s) ==> 1970-01-01 00:00:44
+ testFromUnixTimeTransformWithTimeZone("UTC", 44L, "1970-01-01 00:00:44");
+ // In Berlin, the time zone is +1:00, from_unix_time(44s) ==> 1970-01-01 01:00:44
+ testFromUnixTimeTransformWithTimeZone("Europe/Berlin", 44L, "1970-01-01 01:00:44");
+ // In Shanghai, the time zone is +8:00, from_unix_time(44s) ==> 1970-01-01 08:00:44
+ testFromUnixTimeTransformWithTimeZone("Asia/Shanghai", 44L, "1970-01-01 08:00:44");
+ }
+
+ private void testFromUnixTimeTransformWithTimeZone(
+ String timeZone, Long seconds, String unixTimeStr) throws Exception {
+ PostTransformOperator transform =
+ PostTransformOperator.newBuilder()
+ .addTransform(
+ FROM_UNIX_TIME_TABLEID.identifier(),
+ "col1, FROM_UNIXTIME(seconds) as from_unix_time,"
+ + " FROM_UNIXTIME(seconds, format_str) as from_unix_time_format",
+ null)
+ .addTimezone(timeZone)
+ .build();
+ RegularEventOperatorTestHarness
+ transformFunctionEventEventOperatorTestHarness =
+ RegularEventOperatorTestHarness.with(transform, 1);
+ // Initialization
+ transformFunctionEventEventOperatorTestHarness.open();
+ // Create table
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(FROM_UNIX_TIME_TABLEID, FROM_UNIX_TIME_SCHEMA);
+ BinaryRecordDataGenerator recordDataGenerator =
+ new BinaryRecordDataGenerator(((RowType) FROM_UNIX_TIME_SCHEMA.toRowDataType()));
+ BinaryRecordDataGenerator expectedRecordDataGenerator =
+ new BinaryRecordDataGenerator(
+ ((RowType) EXPECTED_FROM_UNIX_TIME_SCHEMA.toRowDataType()));
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ FROM_UNIX_TIME_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("1"),
+ seconds,
+ new BinaryStringData("yyyy-MM-dd HH:mm:ss")
+ }));
+ DataChangeEvent insertEventExpect =
+ DataChangeEvent.insertEvent(
+ FROM_UNIX_TIME_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("1"),
+ new BinaryStringData(unixTimeStr),
+ new BinaryStringData(unixTimeStr)
+ }));
+ transform.processElement(new StreamRecord<>(createTableEvent));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(
+ new StreamRecord<>(
+ new CreateTableEvent(
+ FROM_UNIX_TIME_TABLEID, EXPECTED_FROM_UNIX_TIME_SCHEMA)));
+ transform.processElement(new StreamRecord<>(insertEvent));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect));
+ transformFunctionEventEventOperatorTestHarness.close();
+ }
+
+ /*
+ Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds),
+ using the specified timezone in table config.
+
+ If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”,
+ this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed,
+ the default value Long.MIN_VALUE(-9223372036854775808) will be returned.
+ */
+ @Test
+ void testUnixTimestampTransformInBerlin() throws Exception {
+ PostTransformOperator transform =
+ PostTransformOperator.newBuilder()
+ .addTransform(
+ UNIX_TIMESTAMP_TABLEID.identifier(),
+ "col1,"
+ + " UNIX_TIMESTAMP(date_time_str) as unix_timestamp,"
+ + " UNIX_TIMESTAMP(date_time_str, unix_timestamp_format) as unix_timestamp_format",
+ null)
+ .addTimezone("Europe/Berlin")
+ .build();
+ RegularEventOperatorTestHarness
+ transformFunctionEventEventOperatorTestHarness =
+ RegularEventOperatorTestHarness.with(transform, 1);
+ // Initialization
+ transformFunctionEventEventOperatorTestHarness.open();
+ // Create table
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(UNIX_TIMESTAMP_TABLEID, UNIX_TIMESTAMP_SCHEMA);
+ BinaryRecordDataGenerator recordDataGenerator =
+ new BinaryRecordDataGenerator(((RowType) UNIX_TIMESTAMP_SCHEMA.toRowDataType()));
+ BinaryRecordDataGenerator expectedRecordDataGenerator =
+ new BinaryRecordDataGenerator(
+ ((RowType) EXPECTED_UNIX_TIMESTAMP_SCHEMA.toRowDataType()));
+ transform.processElement(new StreamRecord<>(createTableEvent));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(
+ new StreamRecord<>(
+ new CreateTableEvent(
+ UNIX_TIMESTAMP_TABLEID, EXPECTED_UNIX_TIMESTAMP_SCHEMA)));
+
+ // In Berlin, "1970-01-01 08:00:01.001" formatted by "yyyy-MM-dd HH:mm:ss.SSS" ==> 25201L
+ DataChangeEvent insertEvent1 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("1"),
+ new BinaryStringData("1970-01-01 08:00:01.001"),
+ new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS")
+ }));
+ DataChangeEvent insertEventExpect1 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {new BinaryStringData("1"), 25201L, 25201L}));
+ transform.processElement(new StreamRecord<>(insertEvent1));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect1));
+
+ // In Berlin, "1970-01-01 08:00:01.001 +0800" formatted by "yyyy-MM-dd HH:mm:ss.SSS X" ==>
+ // 1L
+ DataChangeEvent insertEvent2 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("2"),
+ new BinaryStringData("1970-01-01 08:00:01.001 +0800"),
+ new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS X")
+ }));
+ DataChangeEvent insertEventExpect2 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {new BinaryStringData("2"), 25201L, 1L}));
+ transform.processElement(new StreamRecord<>(insertEvent2));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect2));
+
+ // In Berlin, "1970-01-01 08:00:01.001 +0800" formatted by "yyyy-MM-dd HH:mm:ss.SSS" ==>
+ // 25201L
+ DataChangeEvent insertEvent3 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("3"),
+ new BinaryStringData("1970-01-01 08:00:01.001 +0800"),
+ new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS")
+ }));
+ DataChangeEvent insertEventExpect3 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {new BinaryStringData("3"), 25201L, 25201L}));
+ transform.processElement(new StreamRecord<>(insertEvent3));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect3));
+
+ // In Berlin, "1970-01-01 08:00:01.001" formatted by "yyyy-MM-dd HH:mm:ss.SSS X" ==>
+ // -9223372036854775808L
+ DataChangeEvent insertEvent4 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("4"),
+ new BinaryStringData("1970-01-01 08:00:01.001"),
+ new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS X")
+ }));
+ DataChangeEvent insertEventExpect4 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("4"), 25201L, -9223372036854775808L
+ }));
+ transform.processElement(new StreamRecord<>(insertEvent4));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect4));
+ transformFunctionEventEventOperatorTestHarness.close();
+ }
+
+ @Test
+ void testUnixTimestampTransformInShanghai() throws Exception {
+ PostTransformOperator transform =
+ PostTransformOperator.newBuilder()
+ .addTransform(
+ UNIX_TIMESTAMP_TABLEID.identifier(),
+ "col1,"
+ + " UNIX_TIMESTAMP(date_time_str) as unix_timestamp,"
+ + " UNIX_TIMESTAMP(date_time_str, unix_timestamp_format) as unix_timestamp_format",
+ null)
+ .addTimezone("Asia/Shanghai")
+ .build();
+ RegularEventOperatorTestHarness
+ transformFunctionEventEventOperatorTestHarness =
+ RegularEventOperatorTestHarness.with(transform, 1);
+ // Initialization
+ transformFunctionEventEventOperatorTestHarness.open();
+ // Create table
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(UNIX_TIMESTAMP_TABLEID, UNIX_TIMESTAMP_SCHEMA);
+ BinaryRecordDataGenerator recordDataGenerator =
+ new BinaryRecordDataGenerator(((RowType) UNIX_TIMESTAMP_SCHEMA.toRowDataType()));
+ BinaryRecordDataGenerator expectedRecordDataGenerator =
+ new BinaryRecordDataGenerator(
+ ((RowType) EXPECTED_UNIX_TIMESTAMP_SCHEMA.toRowDataType()));
+ transform.processElement(new StreamRecord<>(createTableEvent));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(
+ new StreamRecord<>(
+ new CreateTableEvent(
+ UNIX_TIMESTAMP_TABLEID, EXPECTED_UNIX_TIMESTAMP_SCHEMA)));
+
+ // In Shanghai, "1970-01-01 08:00:01.001" formatted by "yyyy-MM-dd HH:mm:ss.SSS" ==> 1L
+ DataChangeEvent insertEvent1 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("1"),
+ new BinaryStringData("1970-01-01 08:00:01.001"),
+ new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS")
+ }));
+ DataChangeEvent insertEventExpect1 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {new BinaryStringData("1"), 1L, 1L}));
+ transform.processElement(new StreamRecord<>(insertEvent1));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect1));
+
+ // In Shanghai, "1970-01-01 08:00:01.001 +0100" formatted by "yyyy-MM-dd HH:mm:ss.SSS X" ==>
+ // 1L
+ DataChangeEvent insertEvent2 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("2"),
+ new BinaryStringData("1970-01-01 08:00:01.001 +0100"),
+ new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS X")
+ }));
+ DataChangeEvent insertEventExpect2 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {new BinaryStringData("2"), 1L, 25201L}));
+ transform.processElement(new StreamRecord<>(insertEvent2));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect2));
+
+ // In Shanghai, "1970-01-01 08:00:01.001 +0100" formatted by "yyyy-MM-dd HH:mm:ss.SSS" ==>
+ // 1L
+ DataChangeEvent insertEvent3 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("3"),
+ new BinaryStringData("1970-01-01 08:00:01.001 +0100"),
+ new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS")
+ }));
+ DataChangeEvent insertEventExpect3 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {new BinaryStringData("3"), 1L, 1L}));
+ transform.processElement(new StreamRecord<>(insertEvent3));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect3));
+
+ // In Shanghai, "1970-01-01 08:00:01.001" formatted by "yyyy-MM-dd HH:mm:ss.SSS X" ==>
+ // -9223372036854775808L
+ DataChangeEvent insertEvent4 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("4"),
+ new BinaryStringData("1970-01-01 08:00:01.001"),
+ new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS X")
+ }));
+ DataChangeEvent insertEventExpect4 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("4"), 1L, -9223372036854775808L
+ }));
+ transform.processElement(new StreamRecord<>(insertEvent4));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect4));
+ transformFunctionEventEventOperatorTestHarness.close();
+ }
+
@Test
void testTimestampDiffTransform() throws Exception {
PostTransformOperator transform =
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
index bc5d1200524..478d4e92ee8 100644
--- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
@@ -243,6 +243,17 @@ public void testTranslateFilterToJaninoExpression() {
testFilterExpression(
"id = CURRENT_TIMESTAMP", "valueEquals(id, currentTimestamp(__epoch_time__))");
testFilterExpression("NOW()", "now(__epoch_time__)");
+ testFilterExpression("FROM_UNIXTIME(44)", "fromUnixtime(44, __time_zone__)");
+ testFilterExpression(
+ "FROM_UNIXTIME(44, 'yyyy/MM/dd HH:mm:ss')",
+ "fromUnixtime(44, \"yyyy/MM/dd HH:mm:ss\", __time_zone__)");
+ testFilterExpression("UNIX_TIMESTAMP()", "unixTimestamp(__epoch_time__, __time_zone__)");
+ testFilterExpression(
+ "UNIX_TIMESTAMP('1970-01-01 08:00:01')",
+ "unixTimestamp(\"1970-01-01 08:00:01\", __epoch_time__, __time_zone__)");
+ testFilterExpression(
+ "UNIX_TIMESTAMP('1970-01-01 08:00:01.001 +0800', 'yyyy-MM-dd HH:mm:ss.SSS X')",
+ "unixTimestamp(\"1970-01-01 08:00:01.001 +0800\", \"yyyy-MM-dd HH:mm:ss.SSS X\", __epoch_time__, __time_zone__)");
testFilterExpression("YEAR(dt)", "year(dt)");
testFilterExpression("QUARTER(dt)", "quarter(dt)");
testFilterExpression("MONTH(dt)", "month(dt)");