From 51da1719b8b32800f578c1f4a6bef2dcb7b45ce7 Mon Sep 17 00:00:00 2001
From: wenmo <32723967+aiwenmo@users.noreply.github.com>
Date: Thu, 26 Dec 2024 20:47:51 +0800
Subject: [PATCH 1/4] [FLINK-36865][cdc] Provide UNIX_TIMESTAMP series
functions in YAML pipeline
---
.../content.zh/docs/core-concept/transform.md | 3 +
docs/content/docs/core-concept/transform.md | 3 +
.../flink/cdc/common/utils/DateTimeUtils.java | 67 ++++-
.../runtime/functions/BuiltInSqlFunction.java | 239 ++++++++++++++++++
.../functions/SystemFunctionUtils.java | 21 ++
.../cdc/runtime/parser/JaninoCompiler.java | 9 +-
.../metadata/TransformSqlOperatorTable.java | 32 +++
.../transform/PostTransformOperatorTest.java | 85 +++++++
.../runtime/parser/TransformParserTest.java | 17 +-
9 files changed, 470 insertions(+), 6 deletions(-)
create mode 100644 flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/BuiltInSqlFunction.java
diff --git a/docs/content.zh/docs/core-concept/transform.md b/docs/content.zh/docs/core-concept/transform.md
index 892243628ff..20694dc321b 100644
--- a/docs/content.zh/docs/core-concept/transform.md
+++ b/docs/content.zh/docs/core-concept/transform.md
@@ -160,6 +160,9 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [
| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
| TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. |
| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. |
+| FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | Returns a representation of the numeric argument as a value in string format (default is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing seconds since ‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function. The return value is expressed in the session time zone (specified in TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01-01 00:00:44’ if in UTC time zone, but returns ‘1970-01-01 09:00:44’ if in ‘Asia/Tokyo’ time zone. |
+| UNIX_TIMESTAMP() | unixTimestamp() | Gets current Unix timestamp in seconds. This function is not deterministic which means the value would be recalculated for each record. |
+| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified timezone in table config.If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed, the default value Long.MIN_VALUE(-9223372036854775808) will be returned.|
## Conditional Functions
diff --git a/docs/content/docs/core-concept/transform.md b/docs/content/docs/core-concept/transform.md
index b4979beca51..e4ff1439102 100644
--- a/docs/content/docs/core-concept/transform.md
+++ b/docs/content/docs/core-concept/transform.md
@@ -160,6 +160,9 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [
| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
| TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. |
| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. |
+| FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | Returns a representation of the numeric argument as a value in string format (default is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing seconds since ‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function. The return value is expressed in the session time zone (specified in TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01-01 00:00:44’ if in UTC time zone, but returns ‘1970-01-01 09:00:44’ if in ‘Asia/Tokyo’ time zone. |
+| UNIX_TIMESTAMP() | unixTimestamp() | Gets current Unix timestamp in seconds. This function is not deterministic which means the value would be recalculated for each record. |
+| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified timezone in table config.If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed, the default value Long.MIN_VALUE(-9223372036854775808) will be returned.|
## Conditional Functions
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java
index b923107e2c9..b9f4b1b5014 100644
--- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java
@@ -43,6 +43,16 @@ public class DateTimeUtils {
*/
public static final long MILLIS_PER_DAY = 86400000L; // = 24 * 60 * 60 * 1000
+ /** The SimpleDateFormat string for ISO dates, "yyyy-MM-dd". */
+ private static final String DATE_FORMAT_STRING = "yyyy-MM-dd";
+
+ /** The SimpleDateFormat string for ISO times, "HH:mm:ss". */
+ private static final String TIME_FORMAT_STRING = "HH:mm:ss";
+
+ /** The SimpleDateFormat string for ISO timestamps, "yyyy-MM-dd HH:mm:ss". */
+ private static final String TIMESTAMP_FORMAT_STRING =
+ DATE_FORMAT_STRING + " " + TIME_FORMAT_STRING;
+
/**
* A ThreadLocal cache map for SimpleDateFormat, because SimpleDateFormat is not thread-safe.
* (string_format) => formatter
@@ -109,7 +119,7 @@ private static long internalParseTimestampMillis(String dateStr, String format,
} catch (ParseException e) {
LOG.error(
String.format(
- "Exception when parsing datetime string '%s' in format '%s'",
+ "Exception when parsing datetime string '%s' in format '%s', the default value Long.MIN_VALUE(-9223372036854775808) will be returned.",
dateStr, format),
e);
return Long.MIN_VALUE;
@@ -128,6 +138,61 @@ private static int ymdToJulian(int year, int month, int day) {
return day + (153 * m + 2) / 5 + 365 * y + y / 4 - y / 100 + y / 400 - 32045;
}
+ // --------------------------------------------------------------------------------------------
+ // UNIX TIME
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Convert unix timestamp (seconds since '1970-01-01 00:00:00' UTC) to datetime string in the
+ * "yyyy-MM-dd HH:mm:ss" format.
+ */
+ public static String formatUnixTimestamp(long unixtime, TimeZone tz) {
+ return formatUnixTimestamp(unixtime, TIMESTAMP_FORMAT_STRING, tz);
+ }
+
+ /**
+ * Convert unix timestamp (seconds since '1970-01-01 00:00:00' UTC) to datetime string in the
+ * given format.
+ */
+ public static String formatUnixTimestamp(long unixtime, String format, TimeZone tz) {
+ SimpleDateFormat formatter = FORMATTER_CACHE.get(format);
+ formatter.setTimeZone(tz);
+ Date date = new Date(unixtime * 1000);
+ try {
+ return formatter.format(date);
+ } catch (Exception e) {
+ LOG.error("Exception when formatting.", e);
+ return null;
+ }
+ }
+
+ /** Returns the value of the timestamp to seconds since '1970-01-01 00:00:00' UTC. */
+ public static long unixTimestamp(long ts) {
+ return ts / 1000;
+ }
+
+ /**
+ * Returns the value of the argument as an unsigned integer in seconds since '1970-01-01
+ * 00:00:00' UTC.
+ */
+ public static long unixTimestamp(String dateStr, TimeZone tz) {
+ return unixTimestamp(dateStr, TIMESTAMP_FORMAT_STRING, tz);
+ }
+
+ /**
+ * Returns the value of the argument as an unsigned integer in seconds since '1970-01-01
+ * 00:00:00' UTC.
+ */
+ public static long unixTimestamp(String dateStr, String format, TimeZone tz) {
+ long ts = internalParseTimestampMillis(dateStr, format, tz);
+ if (ts == Long.MIN_VALUE) {
+ return Long.MIN_VALUE;
+ } else {
+ // return the seconds
+ return ts / 1000;
+ }
+ }
+
// --------------------------------------------------------------------------------------------
// Format
// --------------------------------------------------------------------------------------------
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/BuiltInSqlFunction.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/BuiltInSqlFunction.java
new file mode 100644
index 00000000000..2bea19514f2
--- /dev/null
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/BuiltInSqlFunction.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.BuiltInFunctionDefinition;
+
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.type.SqlOperandTypeChecker;
+import org.apache.calcite.sql.type.SqlOperandTypeInference;
+import org.apache.calcite.sql.type.SqlReturnTypeInference;
+import org.apache.calcite.sql.validate.SqlMonotonicity;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+import static org.apache.flink.table.functions.BuiltInFunctionDefinition.DEFAULT_VERSION;
+import static org.apache.flink.table.functions.BuiltInFunctionDefinition.qualifyFunctionName;
+import static org.apache.flink.table.functions.BuiltInFunctionDefinition.validateFunction;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * SQL version of {@link BuiltInFunctionDefinition}. This is the case when the operator has a
+ * special parsing syntax or uses other Calcite-specific features that are not exposed via {@link
+ * BuiltInFunctionDefinition} yet.
+ *
+ *
Note: Try to keep usages of this class to a minimum and use Flink's {@link
+ * BuiltInFunctionDefinition} stack instead.
+ *
+ *
For simple functions, use the provided builder. Otherwise, this class can also be extended.
+ */
+@Internal
+public class BuiltInSqlFunction extends SqlFunction {
+
+ private final @Nullable Integer version;
+
+ private final boolean isDeterministic;
+
+ private final boolean isInternal;
+
+ private final Function monotonicity;
+
+ protected BuiltInSqlFunction(
+ String name,
+ int version,
+ SqlKind kind,
+ @Nullable SqlReturnTypeInference returnTypeInference,
+ @Nullable SqlOperandTypeInference operandTypeInference,
+ @Nullable SqlOperandTypeChecker operandTypeChecker,
+ SqlFunctionCategory category,
+ boolean isDeterministic,
+ boolean isInternal,
+ Function monotonicity) {
+ super(
+ checkNotNull(name),
+ checkNotNull(kind),
+ returnTypeInference,
+ operandTypeInference,
+ operandTypeChecker,
+ checkNotNull(category));
+ this.version = isInternal ? null : version;
+ this.isDeterministic = isDeterministic;
+ this.isInternal = isInternal;
+ this.monotonicity = monotonicity;
+ validateFunction(name, version, isInternal);
+ }
+
+ protected BuiltInSqlFunction(
+ String name,
+ SqlKind kind,
+ SqlReturnTypeInference returnTypeInference,
+ SqlOperandTypeInference operandTypeInference,
+ @Nullable SqlOperandTypeChecker operandTypeChecker,
+ SqlFunctionCategory category) {
+ this(
+ name,
+ DEFAULT_VERSION,
+ kind,
+ returnTypeInference,
+ operandTypeInference,
+ operandTypeChecker,
+ category,
+ true,
+ false,
+ call -> SqlMonotonicity.NOT_MONOTONIC);
+ }
+
+ /** Builder for configuring and creating instances of {@link BuiltInSqlFunction}. */
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public final Optional getVersion() {
+ return Optional.ofNullable(version);
+ }
+
+ public String getQualifiedName() {
+ if (isInternal) {
+ return getName();
+ }
+ assert version != null;
+ return qualifyFunctionName(getName(), version);
+ }
+
+ @Override
+ public boolean isDeterministic() {
+ return isDeterministic;
+ }
+
+ public final boolean isInternal() {
+ return isInternal;
+ }
+
+ @Override
+ public SqlMonotonicity getMonotonicity(SqlOperatorBinding call) {
+ return monotonicity.apply(call);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Builder
+ // --------------------------------------------------------------------------------------------
+
+ /** Builder for fluent definition of built-in functions. */
+ public static class Builder {
+
+ private String name;
+
+ private int version = DEFAULT_VERSION;
+
+ private SqlKind kind = SqlKind.OTHER_FUNCTION;
+
+ private SqlReturnTypeInference returnTypeInference;
+
+ private SqlOperandTypeInference operandTypeInference;
+
+ private SqlOperandTypeChecker operandTypeChecker;
+
+ private SqlFunctionCategory category = SqlFunctionCategory.SYSTEM;
+
+ private boolean isInternal = false;
+
+ private boolean isDeterministic = true;
+
+ private Function monotonicity =
+ call -> SqlMonotonicity.NOT_MONOTONIC;
+
+ /** @see BuiltInFunctionDefinition.Builder#name(String) */
+ public Builder name(String name) {
+ this.name = name;
+ return this;
+ }
+
+ /** @see BuiltInFunctionDefinition.Builder#version(int) */
+ public Builder version(int version) {
+ this.version = version;
+ return this;
+ }
+
+ public Builder kind(SqlKind kind) {
+ this.kind = kind;
+ return this;
+ }
+
+ public Builder returnType(SqlReturnTypeInference returnTypeInference) {
+ this.returnTypeInference = returnTypeInference;
+ return this;
+ }
+
+ public Builder operandTypeInference(SqlOperandTypeInference operandTypeInference) {
+ this.operandTypeInference = operandTypeInference;
+ return this;
+ }
+
+ public Builder operandTypeChecker(SqlOperandTypeChecker operandTypeChecker) {
+ this.operandTypeChecker = operandTypeChecker;
+ return this;
+ }
+
+ public Builder category(SqlFunctionCategory category) {
+ this.category = category;
+ return this;
+ }
+
+ public Builder notDeterministic() {
+ this.isDeterministic = false;
+ return this;
+ }
+
+ /** @see BuiltInFunctionDefinition.Builder#internal() */
+ public Builder internal() {
+ this.isInternal = true;
+ return this;
+ }
+
+ public Builder monotonicity(SqlMonotonicity staticMonotonicity) {
+ this.monotonicity = call -> staticMonotonicity;
+ return this;
+ }
+
+ public Builder monotonicity(Function monotonicity) {
+ this.monotonicity = monotonicity;
+ return this;
+ }
+
+ public BuiltInSqlFunction build() {
+ return new BuiltInSqlFunction(
+ name,
+ version,
+ kind,
+ returnTypeInference,
+ operandTypeInference,
+ operandTypeChecker,
+ category,
+ isDeterministic,
+ isInternal,
+ monotonicity);
+ }
+ }
+}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
index 6f6d52a3b9d..90f4ca0d6ef 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
@@ -80,6 +80,27 @@ public static int currentDate(long epochTime, String timezone) {
return timestampMillisToDate(localtimestamp(epochTime, timezone).getMillisecond());
}
+ public static String fromUnixtime(long seconds, String timezone) {
+ return DateTimeUtils.formatUnixTimestamp(seconds, TimeZone.getTimeZone(timezone));
+ }
+
+ public static String fromUnixtime(long seconds, String format, String timezone) {
+ return DateTimeUtils.formatUnixTimestamp(seconds, format, TimeZone.getTimeZone(timezone));
+ }
+
+ public static long unixTimestamp(long epochTime, String timezone) {
+ return DateTimeUtils.unixTimestamp(epochTime);
+ }
+
+ public static long unixTimestamp(String dateTimeStr, long epochTime, String timezone) {
+ return DateTimeUtils.unixTimestamp(dateTimeStr, TimeZone.getTimeZone(timezone));
+ }
+
+ public static long unixTimestamp(
+ String dateTimeStr, String format, long epochTime, String timezone) {
+ return DateTimeUtils.unixTimestamp(dateTimeStr, format, TimeZone.getTimeZone(timezone));
+ }
+
public static String dateFormat(TimestampData timestamp, String format) {
return DateTimeUtils.formatTimestampMillis(
timestamp.getMillisecond(), format, TimeZone.getTimeZone("UTC"));
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
index 6f5b2612514..f60bf968ca3 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
@@ -57,13 +57,18 @@ public class JaninoCompiler {
Arrays.asList("CURRENT_TIMESTAMP", "NOW");
private static final List TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS =
- Arrays.asList("LOCALTIME", "LOCALTIMESTAMP", "CURRENT_TIME", "CURRENT_DATE");
+ Arrays.asList(
+ "LOCALTIME",
+ "LOCALTIMESTAMP",
+ "CURRENT_TIME",
+ "CURRENT_DATE",
+ "UNIX_TIMESTAMP");
private static final List TIMEZONE_FREE_TEMPORAL_CONVERSION_FUNCTIONS =
Arrays.asList("DATE_FORMAT");
private static final List TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS =
- Arrays.asList("TO_DATE", "TO_TIMESTAMP");
+ Arrays.asList("TO_DATE", "TO_TIMESTAMP", "FROM_UNIXTIME");
public static final String DEFAULT_EPOCH_TIME = "__epoch_time__";
public static final String DEFAULT_TIME_ZONE = "__time_zone__";
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
index bb2c3503d51..b6f83d95c16 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
@@ -18,6 +18,7 @@
package org.apache.flink.cdc.runtime.parser.metadata;
import org.apache.flink.cdc.runtime.functions.BuiltInScalarFunction;
+import org.apache.flink.cdc.runtime.functions.BuiltInSqlFunction;
import org.apache.flink.cdc.runtime.functions.BuiltInTimestampFunction;
import org.apache.calcite.sql.SqlBinaryOperator;
@@ -41,6 +42,7 @@
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.type.SqlTypeTransforms;
import org.apache.calcite.sql.util.ReflectiveSqlOperatorTable;
+import org.apache.calcite.sql.validate.SqlMonotonicity;
import org.apache.calcite.sql.validate.SqlNameMatcher;
import org.apache.calcite.sql.validate.SqlNameMatchers;
@@ -212,6 +214,36 @@ public SqlSyntax getSyntax() {
return SqlSyntax.FUNCTION;
}
};
+ public static final SqlFunction UNIX_TIMESTAMP =
+ BuiltInSqlFunction.newBuilder()
+ .name("UNIX_TIMESTAMP")
+ .returnType(ReturnTypes.BIGINT_NULLABLE)
+ .operandTypeChecker(
+ OperandTypes.or(
+ OperandTypes.NILADIC,
+ OperandTypes.family(SqlTypeFamily.CHARACTER),
+ OperandTypes.family(
+ SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)))
+ .notDeterministic()
+ .monotonicity(
+ call -> {
+ if (call.getOperandCount() == 0) {
+ return SqlMonotonicity.INCREASING;
+ } else {
+ return SqlMonotonicity.NOT_MONOTONIC;
+ }
+ })
+ .build();
+ public static final SqlFunction FROM_UNIXTIME =
+ new SqlFunction(
+ "FROM_UNIXTIME",
+ SqlKind.OTHER_FUNCTION,
+ TransformSqlReturnTypes.VARCHAR_FORCE_NULLABLE,
+ null,
+ OperandTypes.or(
+ OperandTypes.family(SqlTypeFamily.INTEGER),
+ OperandTypes.family(SqlTypeFamily.INTEGER, SqlTypeFamily.CHARACTER)),
+ SqlFunctionCategory.TIMEDATE);
public static final SqlFunction DATE_FORMAT =
new SqlFunction(
"DATE_FORMAT",
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
index a7dc61ac4b6..0e89a0e9cd3 100644
--- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
@@ -109,6 +109,21 @@ public class PostTransformOperatorTest {
.primaryKey("col1")
.build();
+ private static final TableId UNIX_TIMESTAMP_TABLEID =
+ TableId.tableId("my_company", "my_branch", "unix_timestamp_table");
+ private static final Schema UNIX_TIMESTAMP_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING())
+ .physicalColumn("from_unixtime", DataTypes.STRING())
+ .physicalColumn("from_unixtime_format", DataTypes.STRING())
+ .physicalColumn("current_unix_timestamp", DataTypes.INT())
+ .physicalColumn("unix_timestamp", DataTypes.BIGINT())
+ .physicalColumn("unix_timestamp_format_tz", DataTypes.BIGINT())
+ .physicalColumn("unix_timestamp_format", DataTypes.BIGINT())
+ .physicalColumn("unix_timestamp_format_error", DataTypes.BIGINT())
+ .primaryKey("col1")
+ .build();
+
private static final TableId TIMESTAMPDIFF_TABLEID =
TableId.tableId("my_company", "my_branch", "timestampdiff_table");
private static final Schema TIMESTAMPDIFF_SCHEMA =
@@ -780,6 +795,76 @@ void testTimestampTransform() throws Exception {
transformFunctionEventEventOperatorTestHarness.close();
}
+ @Test
+ void testUnixTimestampTransform() throws Exception {
+ PostTransformOperator transform =
+ PostTransformOperator.newBuilder()
+ .addTransform(
+ UNIX_TIMESTAMP_TABLEID.identifier(),
+ "col1, FROM_UNIXTIME(44) as from_unixtime,"
+ + " FROM_UNIXTIME(44, 'yyyy/MM/dd HH:mm:ss') as from_unixtime_format,"
+ + " IF(UNIX_TIMESTAMP() = UNIX_TIMESTAMP(DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss')), 1, 0) as current_unix_timestamp,"
+ + " UNIX_TIMESTAMP('1970-01-01 08:00:01') as unix_timestamp,"
+ + " UNIX_TIMESTAMP('1970-01-01 08:00:01.001 +0800', 'yyyy-MM-dd HH:mm:ss.SSS X') as unix_timestamp_format_tz,"
+ + " UNIX_TIMESTAMP('1970-01-01 08:00:01.001 +0800', 'yyyy-MM-dd HH:mm:ss.SSS') as unix_timestamp_format,"
+ + " UNIX_TIMESTAMP('1970-01-01 08:00:01.001', 'yyyy-MM-dd HH:mm:ss.SSS X') as unix_timestamp_format_error",
+ null)
+ // .addTimezone("UTC")
+ .addTimezone("Europe/Berlin")
+ .build();
+ RegularEventOperatorTestHarness
+ transformFunctionEventEventOperatorTestHarness =
+ RegularEventOperatorTestHarness.with(transform, 1);
+ // Initialization
+ transformFunctionEventEventOperatorTestHarness.open();
+ // Create table
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(UNIX_TIMESTAMP_TABLEID, UNIX_TIMESTAMP_SCHEMA);
+ BinaryRecordDataGenerator recordDataGenerator =
+ new BinaryRecordDataGenerator(((RowType) UNIX_TIMESTAMP_SCHEMA.toRowDataType()));
+ // Insert
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("1"),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ }));
+ DataChangeEvent insertEventExpect =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("1"),
+ new BinaryStringData("1970-01-01 01:00:44"),
+ new BinaryStringData("1970/01/01 01:00:44"),
+ 1,
+ 25201L,
+ 1L,
+ 25201L,
+ -9223372036854775808L
+ }));
+ transform.processElement(new StreamRecord<>(createTableEvent));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(
+ new StreamRecord<>(
+ new CreateTableEvent(
+ UNIX_TIMESTAMP_TABLEID, UNIX_TIMESTAMP_SCHEMA)));
+ transform.processElement(new StreamRecord<>(insertEvent));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect));
+ transformFunctionEventEventOperatorTestHarness.close();
+ }
+
@Test
void testTimestampDiffTransform() throws Exception {
PostTransformOperator transform =
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
index bc5d1200524..f0033e5e93a 100644
--- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
@@ -243,6 +243,17 @@ public void testTranslateFilterToJaninoExpression() {
testFilterExpression(
"id = CURRENT_TIMESTAMP", "valueEquals(id, currentTimestamp(__epoch_time__))");
testFilterExpression("NOW()", "now(__epoch_time__)");
+ testFilterExpression("FROM_UNIXTIME(44)", "fromUnixtime(44, __time_zone__)");
+ testFilterExpression(
+ "FROM_UNIXTIME(44, 'yyyy/MM/dd HH:mm:ss')",
+ "fromUnixtime(44, \"yyyy/MM/dd HH:mm:ss\", __time_zone__)");
+ testFilterExpression("UNIX_TIMESTAMP()", "unixTimestamp(__epoch_time__, __time_zone__)");
+ testFilterExpression(
+ "UNIX_TIMESTAMP('1970-01-01 08:00:01')",
+ "unixTimestamp(\"1970-01-01 08:00:01\", __epoch_time__, __time_zone__)");
+ testFilterExpression(
+ "UNIX_TIMESTAMP('1970-01-01 08:00:01.001 +0800', 'yyyy-MM-dd HH:mm:ss.SSS X')",
+ "unixTimestamp(\"1970-01-01 08:00:01.001 +0800\", \"yyyy-MM-dd HH:mm:ss.SSS X\", __epoch_time__, __time_zone__)");
testFilterExpression("YEAR(dt)", "year(dt)");
testFilterExpression("QUARTER(dt)", "quarter(dt)");
testFilterExpression("MONTH(dt)", "month(dt)");
@@ -252,9 +263,9 @@ public void testTranslateFilterToJaninoExpression() {
"TO_DATE(dt, 'yyyy-MM-dd')", "toDate(dt, \"yyyy-MM-dd\", __time_zone__)");
testFilterExpression("TO_TIMESTAMP(dt)", "toTimestamp(dt, __time_zone__)");
testFilterExpression("TIMESTAMP_DIFF('DAY', dt1, dt2)", "timestampDiff(\"DAY\", dt1, dt2)");
- testFilterExpression("IF(a>b,a,b)", "a > b ? a : b");
- testFilterExpression("NULLIF(a,b)", "nullif(a, b)");
- testFilterExpression("COALESCE(a,b,c)", "coalesce(a, b, c)");
+ testFilterExpression("IF(a>b, a, b)", "a > b ? a : b");
+ testFilterExpression("NULLIF(a, b)", "nullif(a, b)");
+ testFilterExpression("COALESCE(a, b, c)", "coalesce(a, b, c)");
testFilterExpression("id + 2", "id + 2");
testFilterExpression("id - 2", "id - 2");
testFilterExpression("id * 2", "id * 2");
From 09842dfdcca506aa6ee46912c9de83d89ad087b1 Mon Sep 17 00:00:00 2001
From: wenmo <32723967+aiwenmo@users.noreply.github.com>
Date: Sat, 28 Dec 2024 01:23:28 +0800
Subject: [PATCH 2/4] Remove BuiltInSqlFunction
---
.../runtime/functions/BuiltInSqlFunction.java | 239 ------------------
.../metadata/TransformSqlOperatorTable.java | 31 +--
2 files changed, 10 insertions(+), 260 deletions(-)
delete mode 100644 flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/BuiltInSqlFunction.java
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/BuiltInSqlFunction.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/BuiltInSqlFunction.java
deleted file mode 100644
index 2bea19514f2..00000000000
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/BuiltInSqlFunction.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.runtime.functions;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.functions.BuiltInFunctionDefinition;
-
-import org.apache.calcite.sql.SqlFunction;
-import org.apache.calcite.sql.SqlFunctionCategory;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlOperatorBinding;
-import org.apache.calcite.sql.type.SqlOperandTypeChecker;
-import org.apache.calcite.sql.type.SqlOperandTypeInference;
-import org.apache.calcite.sql.type.SqlReturnTypeInference;
-import org.apache.calcite.sql.validate.SqlMonotonicity;
-
-import javax.annotation.Nullable;
-
-import java.util.Optional;
-import java.util.function.Function;
-
-import static org.apache.flink.table.functions.BuiltInFunctionDefinition.DEFAULT_VERSION;
-import static org.apache.flink.table.functions.BuiltInFunctionDefinition.qualifyFunctionName;
-import static org.apache.flink.table.functions.BuiltInFunctionDefinition.validateFunction;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * SQL version of {@link BuiltInFunctionDefinition}. This is the case when the operator has a
- * special parsing syntax or uses other Calcite-specific features that are not exposed via {@link
- * BuiltInFunctionDefinition} yet.
- *
- * Note: Try to keep usages of this class to a minimum and use Flink's {@link
- * BuiltInFunctionDefinition} stack instead.
- *
- *
For simple functions, use the provided builder. Otherwise, this class can also be extended.
- */
-@Internal
-public class BuiltInSqlFunction extends SqlFunction {
-
- private final @Nullable Integer version;
-
- private final boolean isDeterministic;
-
- private final boolean isInternal;
-
- private final Function monotonicity;
-
- protected BuiltInSqlFunction(
- String name,
- int version,
- SqlKind kind,
- @Nullable SqlReturnTypeInference returnTypeInference,
- @Nullable SqlOperandTypeInference operandTypeInference,
- @Nullable SqlOperandTypeChecker operandTypeChecker,
- SqlFunctionCategory category,
- boolean isDeterministic,
- boolean isInternal,
- Function monotonicity) {
- super(
- checkNotNull(name),
- checkNotNull(kind),
- returnTypeInference,
- operandTypeInference,
- operandTypeChecker,
- checkNotNull(category));
- this.version = isInternal ? null : version;
- this.isDeterministic = isDeterministic;
- this.isInternal = isInternal;
- this.monotonicity = monotonicity;
- validateFunction(name, version, isInternal);
- }
-
- protected BuiltInSqlFunction(
- String name,
- SqlKind kind,
- SqlReturnTypeInference returnTypeInference,
- SqlOperandTypeInference operandTypeInference,
- @Nullable SqlOperandTypeChecker operandTypeChecker,
- SqlFunctionCategory category) {
- this(
- name,
- DEFAULT_VERSION,
- kind,
- returnTypeInference,
- operandTypeInference,
- operandTypeChecker,
- category,
- true,
- false,
- call -> SqlMonotonicity.NOT_MONOTONIC);
- }
-
- /** Builder for configuring and creating instances of {@link BuiltInSqlFunction}. */
- public static Builder newBuilder() {
- return new Builder();
- }
-
- public final Optional getVersion() {
- return Optional.ofNullable(version);
- }
-
- public String getQualifiedName() {
- if (isInternal) {
- return getName();
- }
- assert version != null;
- return qualifyFunctionName(getName(), version);
- }
-
- @Override
- public boolean isDeterministic() {
- return isDeterministic;
- }
-
- public final boolean isInternal() {
- return isInternal;
- }
-
- @Override
- public SqlMonotonicity getMonotonicity(SqlOperatorBinding call) {
- return monotonicity.apply(call);
- }
-
- // --------------------------------------------------------------------------------------------
- // Builder
- // --------------------------------------------------------------------------------------------
-
- /** Builder for fluent definition of built-in functions. */
- public static class Builder {
-
- private String name;
-
- private int version = DEFAULT_VERSION;
-
- private SqlKind kind = SqlKind.OTHER_FUNCTION;
-
- private SqlReturnTypeInference returnTypeInference;
-
- private SqlOperandTypeInference operandTypeInference;
-
- private SqlOperandTypeChecker operandTypeChecker;
-
- private SqlFunctionCategory category = SqlFunctionCategory.SYSTEM;
-
- private boolean isInternal = false;
-
- private boolean isDeterministic = true;
-
- private Function monotonicity =
- call -> SqlMonotonicity.NOT_MONOTONIC;
-
- /** @see BuiltInFunctionDefinition.Builder#name(String) */
- public Builder name(String name) {
- this.name = name;
- return this;
- }
-
- /** @see BuiltInFunctionDefinition.Builder#version(int) */
- public Builder version(int version) {
- this.version = version;
- return this;
- }
-
- public Builder kind(SqlKind kind) {
- this.kind = kind;
- return this;
- }
-
- public Builder returnType(SqlReturnTypeInference returnTypeInference) {
- this.returnTypeInference = returnTypeInference;
- return this;
- }
-
- public Builder operandTypeInference(SqlOperandTypeInference operandTypeInference) {
- this.operandTypeInference = operandTypeInference;
- return this;
- }
-
- public Builder operandTypeChecker(SqlOperandTypeChecker operandTypeChecker) {
- this.operandTypeChecker = operandTypeChecker;
- return this;
- }
-
- public Builder category(SqlFunctionCategory category) {
- this.category = category;
- return this;
- }
-
- public Builder notDeterministic() {
- this.isDeterministic = false;
- return this;
- }
-
- /** @see BuiltInFunctionDefinition.Builder#internal() */
- public Builder internal() {
- this.isInternal = true;
- return this;
- }
-
- public Builder monotonicity(SqlMonotonicity staticMonotonicity) {
- this.monotonicity = call -> staticMonotonicity;
- return this;
- }
-
- public Builder monotonicity(Function monotonicity) {
- this.monotonicity = monotonicity;
- return this;
- }
-
- public BuiltInSqlFunction build() {
- return new BuiltInSqlFunction(
- name,
- version,
- kind,
- returnTypeInference,
- operandTypeInference,
- operandTypeChecker,
- category,
- isDeterministic,
- isInternal,
- monotonicity);
- }
- }
-}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
index b6f83d95c16..d47db49f8e7 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
@@ -18,7 +18,6 @@
package org.apache.flink.cdc.runtime.parser.metadata;
import org.apache.flink.cdc.runtime.functions.BuiltInScalarFunction;
-import org.apache.flink.cdc.runtime.functions.BuiltInSqlFunction;
import org.apache.flink.cdc.runtime.functions.BuiltInTimestampFunction;
import org.apache.calcite.sql.SqlBinaryOperator;
@@ -42,7 +41,6 @@
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.type.SqlTypeTransforms;
import org.apache.calcite.sql.util.ReflectiveSqlOperatorTable;
-import org.apache.calcite.sql.validate.SqlMonotonicity;
import org.apache.calcite.sql.validate.SqlNameMatcher;
import org.apache.calcite.sql.validate.SqlNameMatchers;
@@ -215,25 +213,16 @@ public SqlSyntax getSyntax() {
}
};
public static final SqlFunction UNIX_TIMESTAMP =
- BuiltInSqlFunction.newBuilder()
- .name("UNIX_TIMESTAMP")
- .returnType(ReturnTypes.BIGINT_NULLABLE)
- .operandTypeChecker(
- OperandTypes.or(
- OperandTypes.NILADIC,
- OperandTypes.family(SqlTypeFamily.CHARACTER),
- OperandTypes.family(
- SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)))
- .notDeterministic()
- .monotonicity(
- call -> {
- if (call.getOperandCount() == 0) {
- return SqlMonotonicity.INCREASING;
- } else {
- return SqlMonotonicity.NOT_MONOTONIC;
- }
- })
- .build();
+ new SqlFunction(
+ "UNIX_TIMESTAMP",
+ SqlKind.OTHER_FUNCTION,
+ ReturnTypes.BIGINT_NULLABLE,
+ null,
+ OperandTypes.or(
+ OperandTypes.NILADIC,
+ OperandTypes.family(SqlTypeFamily.CHARACTER),
+ OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)),
+ SqlFunctionCategory.TIMEDATE);
public static final SqlFunction FROM_UNIXTIME =
new SqlFunction(
"FROM_UNIXTIME",
From 108edc5855600209ab0ec164e574e86571e1f31e Mon Sep 17 00:00:00 2001
From: wenmo <32723967+aiwenmo@users.noreply.github.com>
Date: Wed, 8 Jan 2025 20:44:58 +0800
Subject: [PATCH 3/4] Optimize code and add test
---
.../content.zh/docs/core-concept/transform.md | 2 +-
docs/content/docs/core-concept/transform.md | 2 +-
.../flink/cdc/common/utils/DateTimeUtils.java | 23 +-
.../functions/SystemFunctionUtils.java | 2 +-
.../transform/PostTransformOperatorTest.java | 333 ++++++++++++++++--
.../runtime/parser/TransformParserTest.java | 6 +-
6 files changed, 313 insertions(+), 55 deletions(-)
diff --git a/docs/content.zh/docs/core-concept/transform.md b/docs/content.zh/docs/core-concept/transform.md
index 20694dc321b..dfa90a72802 100644
--- a/docs/content.zh/docs/core-concept/transform.md
+++ b/docs/content.zh/docs/core-concept/transform.md
@@ -162,7 +162,7 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [
| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. |
| FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | Returns a representation of the numeric argument as a value in string format (default is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing seconds since ‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function. The return value is expressed in the session time zone (specified in TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01-01 00:00:44’ if in UTC time zone, but returns ‘1970-01-01 09:00:44’ if in ‘Asia/Tokyo’ time zone. |
| UNIX_TIMESTAMP() | unixTimestamp() | Gets current Unix timestamp in seconds. This function is not deterministic which means the value would be recalculated for each record. |
-| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified timezone in table config.If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed, the default value Long.MIN_VALUE(-9223372036854775808) will be returned.|
+| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified timezone in table config.
If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed, the default value Long.MIN_VALUE(-9223372036854775808) will be returned.|
## Conditional Functions
diff --git a/docs/content/docs/core-concept/transform.md b/docs/content/docs/core-concept/transform.md
index e4ff1439102..67eeaf2d819 100644
--- a/docs/content/docs/core-concept/transform.md
+++ b/docs/content/docs/core-concept/transform.md
@@ -162,7 +162,7 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [
| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. |
| FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | Returns a representation of the numeric argument as a value in string format (default is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing seconds since ‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function. The return value is expressed in the session time zone (specified in TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01-01 00:00:44’ if in UTC time zone, but returns ‘1970-01-01 09:00:44’ if in ‘Asia/Tokyo’ time zone. |
| UNIX_TIMESTAMP() | unixTimestamp() | Gets current Unix timestamp in seconds. This function is not deterministic which means the value would be recalculated for each record. |
-| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified timezone in table config.If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed, the default value Long.MIN_VALUE(-9223372036854775808) will be returned.|
+| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified timezone in table config.
If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed, the default value Long.MIN_VALUE(-9223372036854775808) will be returned.|
## Conditional Functions
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java
index b9f4b1b5014..1fb080de99f 100644
--- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java
@@ -146,18 +146,18 @@ private static int ymdToJulian(int year, int month, int day) {
* Convert unix timestamp (seconds since '1970-01-01 00:00:00' UTC) to datetime string in the
* "yyyy-MM-dd HH:mm:ss" format.
*/
- public static String formatUnixTimestamp(long unixtime, TimeZone tz) {
- return formatUnixTimestamp(unixtime, TIMESTAMP_FORMAT_STRING, tz);
+ public static String formatUnixTimestamp(long unixTime, TimeZone timeZone) {
+ return formatUnixTimestamp(unixTime, TIMESTAMP_FORMAT_STRING, timeZone);
}
/**
* Convert unix timestamp (seconds since '1970-01-01 00:00:00' UTC) to datetime string in the
* given format.
*/
- public static String formatUnixTimestamp(long unixtime, String format, TimeZone tz) {
+ public static String formatUnixTimestamp(long unixTime, String format, TimeZone timeZone) {
SimpleDateFormat formatter = FORMATTER_CACHE.get(format);
- formatter.setTimeZone(tz);
- Date date = new Date(unixtime * 1000);
+ formatter.setTimeZone(timeZone);
+ Date date = new Date(unixTime * 1000);
try {
return formatter.format(date);
} catch (Exception e) {
@@ -166,25 +166,20 @@ public static String formatUnixTimestamp(long unixtime, String format, TimeZone
}
}
- /** Returns the value of the timestamp to seconds since '1970-01-01 00:00:00' UTC. */
- public static long unixTimestamp(long ts) {
- return ts / 1000;
- }
-
/**
* Returns the value of the argument as an unsigned integer in seconds since '1970-01-01
* 00:00:00' UTC.
*/
- public static long unixTimestamp(String dateStr, TimeZone tz) {
- return unixTimestamp(dateStr, TIMESTAMP_FORMAT_STRING, tz);
+ public static long unixTimestamp(String dateStr, TimeZone timeZone) {
+ return unixTimestamp(dateStr, TIMESTAMP_FORMAT_STRING, timeZone);
}
/**
* Returns the value of the argument as an unsigned integer in seconds since '1970-01-01
* 00:00:00' UTC.
*/
- public static long unixTimestamp(String dateStr, String format, TimeZone tz) {
- long ts = internalParseTimestampMillis(dateStr, format, tz);
+ public static long unixTimestamp(String dateStr, String format, TimeZone timeZone) {
+ long ts = internalParseTimestampMillis(dateStr, format, timeZone);
if (ts == Long.MIN_VALUE) {
return Long.MIN_VALUE;
} else {
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
index 90f4ca0d6ef..a7555203c72 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
@@ -89,7 +89,7 @@ public static String fromUnixtime(long seconds, String format, String timezone)
}
public static long unixTimestamp(long epochTime, String timezone) {
- return DateTimeUtils.unixTimestamp(epochTime);
+ return epochTime / 1000;
}
public static long unixTimestamp(String dateTimeStr, long epochTime, String timezone) {
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
index 0e89a0e9cd3..b2e82efe28b 100644
--- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
@@ -109,18 +109,37 @@ public class PostTransformOperatorTest {
.primaryKey("col1")
.build();
+ private static final TableId FROM_UNIX_TIME_TABLEID =
+ TableId.tableId("my_company", "my_branch", "from_unix_time_table");
+ private static final Schema FROM_UNIX_TIME_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING())
+ .physicalColumn("seconds", DataTypes.BIGINT())
+ .physicalColumn("format_str", DataTypes.STRING())
+ .primaryKey("col1")
+ .build();
+ private static final Schema EXPECTED_FROM_UNIX_TIME_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING())
+ .physicalColumn("from_unix_time", DataTypes.STRING())
+ .physicalColumn("from_unix_time_format", DataTypes.STRING())
+ .primaryKey("col1")
+ .build();
+
private static final TableId UNIX_TIMESTAMP_TABLEID =
TableId.tableId("my_company", "my_branch", "unix_timestamp_table");
private static final Schema UNIX_TIMESTAMP_SCHEMA =
Schema.newBuilder()
.physicalColumn("col1", DataTypes.STRING())
- .physicalColumn("from_unixtime", DataTypes.STRING())
- .physicalColumn("from_unixtime_format", DataTypes.STRING())
- .physicalColumn("current_unix_timestamp", DataTypes.INT())
+ .physicalColumn("date_time_str", DataTypes.STRING())
+ .physicalColumn("unix_timestamp_format", DataTypes.STRING())
+ .primaryKey("col1")
+ .build();
+ private static final Schema EXPECTED_UNIX_TIMESTAMP_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING())
.physicalColumn("unix_timestamp", DataTypes.BIGINT())
- .physicalColumn("unix_timestamp_format_tz", DataTypes.BIGINT())
.physicalColumn("unix_timestamp_format", DataTypes.BIGINT())
- .physicalColumn("unix_timestamp_format_error", DataTypes.BIGINT())
.primaryKey("col1")
.build();
@@ -796,20 +815,91 @@ void testTimestampTransform() throws Exception {
}
@Test
- void testUnixTimestampTransform() throws Exception {
+ void testFromUnixTimeTransform() throws Exception {
+ // In UTC, from_unix_time(0s) ==> 1970-01-01 00:00:00
+ testFromUnixTimeTransformWithTimeZone("UTC", 0L, "1970-01-01 00:00:00");
+ // In UTC, from_unix_time(44s) ==> 1970-01-01 00:00:44
+ testFromUnixTimeTransformWithTimeZone("UTC", 44L, "1970-01-01 00:00:44");
+ // In Berlin, the time zone is +1:00, from_unix_time(44s) ==> 1970-01-01 01:00:44
+ testFromUnixTimeTransformWithTimeZone("Europe/Berlin", 44L, "1970-01-01 01:00:44");
+ // In Shanghai, the time zone is +8:00, from_unix_time(44s) ==> 1970-01-01 08:00:44
+ testFromUnixTimeTransformWithTimeZone("Asia/Shanghai", 44L, "1970-01-01 08:00:44");
+ }
+
+ private void testFromUnixTimeTransformWithTimeZone(
+ String timeZone, Long seconds, String unixTimeStr) throws Exception {
+ PostTransformOperator transform =
+ PostTransformOperator.newBuilder()
+ .addTransform(
+ FROM_UNIX_TIME_TABLEID.identifier(),
+ "col1, FROM_UNIXTIME(seconds) as from_unix_time,"
+ + " FROM_UNIXTIME(seconds, format_str) as from_unix_time_format",
+ null)
+ .addTimezone(timeZone)
+ .build();
+ RegularEventOperatorTestHarness
+ transformFunctionEventEventOperatorTestHarness =
+ RegularEventOperatorTestHarness.with(transform, 1);
+ // Initialization
+ transformFunctionEventEventOperatorTestHarness.open();
+ // Create table
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(FROM_UNIX_TIME_TABLEID, FROM_UNIX_TIME_SCHEMA);
+ BinaryRecordDataGenerator recordDataGenerator =
+ new BinaryRecordDataGenerator(((RowType) FROM_UNIX_TIME_SCHEMA.toRowDataType()));
+ BinaryRecordDataGenerator expectedRecordDataGenerator =
+ new BinaryRecordDataGenerator(
+ ((RowType) EXPECTED_FROM_UNIX_TIME_SCHEMA.toRowDataType()));
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ FROM_UNIX_TIME_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("1"),
+ seconds,
+ new BinaryStringData("yyyy-MM-dd HH:mm:ss")
+ }));
+ DataChangeEvent insertEventExpect =
+ DataChangeEvent.insertEvent(
+ FROM_UNIX_TIME_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("1"),
+ new BinaryStringData(unixTimeStr),
+ new BinaryStringData(unixTimeStr)
+ }));
+ transform.processElement(new StreamRecord<>(createTableEvent));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(
+ new StreamRecord<>(
+ new CreateTableEvent(
+ FROM_UNIX_TIME_TABLEID, EXPECTED_FROM_UNIX_TIME_SCHEMA)));
+ transform.processElement(new StreamRecord<>(insertEvent));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect));
+ transformFunctionEventEventOperatorTestHarness.close();
+ }
+
+ /*
+ Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds),
+ using the specified timezone in table config.
+
+ If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”,
+ this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed,
+ the default value Long.MIN_VALUE(-9223372036854775808) will be returned.
+ */
+ @Test
+ void testUnixTimestampTransformInBerlin() throws Exception {
PostTransformOperator transform =
PostTransformOperator.newBuilder()
.addTransform(
UNIX_TIMESTAMP_TABLEID.identifier(),
- "col1, FROM_UNIXTIME(44) as from_unixtime,"
- + " FROM_UNIXTIME(44, 'yyyy/MM/dd HH:mm:ss') as from_unixtime_format,"
- + " IF(UNIX_TIMESTAMP() = UNIX_TIMESTAMP(DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss')), 1, 0) as current_unix_timestamp,"
- + " UNIX_TIMESTAMP('1970-01-01 08:00:01') as unix_timestamp,"
- + " UNIX_TIMESTAMP('1970-01-01 08:00:01.001 +0800', 'yyyy-MM-dd HH:mm:ss.SSS X') as unix_timestamp_format_tz,"
- + " UNIX_TIMESTAMP('1970-01-01 08:00:01.001 +0800', 'yyyy-MM-dd HH:mm:ss.SSS') as unix_timestamp_format,"
- + " UNIX_TIMESTAMP('1970-01-01 08:00:01.001', 'yyyy-MM-dd HH:mm:ss.SSS X') as unix_timestamp_format_error",
+ "col1,"
+ + " UNIX_TIMESTAMP(date_time_str) as unix_timestamp,"
+ + " UNIX_TIMESTAMP(date_time_str, unix_timestamp_format) as unix_timestamp_format",
null)
- // .addTimezone("UTC")
.addTimezone("Europe/Berlin")
.build();
RegularEventOperatorTestHarness
@@ -822,46 +912,219 @@ void testUnixTimestampTransform() throws Exception {
new CreateTableEvent(UNIX_TIMESTAMP_TABLEID, UNIX_TIMESTAMP_SCHEMA);
BinaryRecordDataGenerator recordDataGenerator =
new BinaryRecordDataGenerator(((RowType) UNIX_TIMESTAMP_SCHEMA.toRowDataType()));
- // Insert
- DataChangeEvent insertEvent =
+ BinaryRecordDataGenerator expectedRecordDataGenerator =
+ new BinaryRecordDataGenerator(
+ ((RowType) EXPECTED_UNIX_TIMESTAMP_SCHEMA.toRowDataType()));
+ transform.processElement(new StreamRecord<>(createTableEvent));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(
+ new StreamRecord<>(
+ new CreateTableEvent(
+ UNIX_TIMESTAMP_TABLEID, EXPECTED_UNIX_TIMESTAMP_SCHEMA)));
+
+ // In Berlin, "1970-01-01 08:00:01.001" formated by "yyyy-MM-dd HH:mm:ss.SSS" ==> 25201L
+ DataChangeEvent insertEvent1 =
DataChangeEvent.insertEvent(
UNIX_TIMESTAMP_TABLEID,
recordDataGenerator.generate(
new Object[] {
new BinaryStringData("1"),
- null,
- null,
- null,
- null,
- null,
- null,
- null
+ new BinaryStringData("1970-01-01 08:00:01.001"),
+ new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS")
}));
- DataChangeEvent insertEventExpect =
+ DataChangeEvent insertEventExpect1 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {new BinaryStringData("1"), 25201L, 25201L}));
+ transform.processElement(new StreamRecord<>(insertEvent1));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect1));
+
+ // In Berlin, "1970-01-01 08:00:01.001 +0800" formated by "yyyy-MM-dd HH:mm:ss.SSS X" ==> 1L
+ DataChangeEvent insertEvent2 =
DataChangeEvent.insertEvent(
UNIX_TIMESTAMP_TABLEID,
recordDataGenerator.generate(
new Object[] {
- new BinaryStringData("1"),
- new BinaryStringData("1970-01-01 01:00:44"),
- new BinaryStringData("1970/01/01 01:00:44"),
- 1,
- 25201L,
- 1L,
- 25201L,
- -9223372036854775808L
+ new BinaryStringData("2"),
+ new BinaryStringData("1970-01-01 08:00:01.001 +0800"),
+ new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS X")
+ }));
+ DataChangeEvent insertEventExpect2 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {new BinaryStringData("2"), 25201L, 1L}));
+ transform.processElement(new StreamRecord<>(insertEvent2));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect2));
+
+ // In Berlin, "1970-01-01 08:00:01.001 +0800" formated by "yyyy-MM-dd HH:mm:ss.SSS" ==>
+ // 25201L
+ DataChangeEvent insertEvent3 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("3"),
+ new BinaryStringData("1970-01-01 08:00:01.001 +0800"),
+ new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS")
+ }));
+ DataChangeEvent insertEventExpect3 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {new BinaryStringData("3"), 25201L, 25201L}));
+ transform.processElement(new StreamRecord<>(insertEvent3));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect3));
+
+ // In Berlin, "1970-01-01 08:00:01.001" formated by "yyyy-MM-dd HH:mm:ss.SSS X" ==>
+ // -9223372036854775808L
+ DataChangeEvent insertEvent4 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("4"),
+ new BinaryStringData("1970-01-01 08:00:01.001"),
+ new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS X")
+ }));
+ DataChangeEvent insertEventExpect4 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("4"), 25201L, -9223372036854775808L
}));
+ transform.processElement(new StreamRecord<>(insertEvent4));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect4));
+ transformFunctionEventEventOperatorTestHarness.close();
+ }
+
+ @Test
+ void testUnixTimestampTransformInShanghai() throws Exception {
+ PostTransformOperator transform =
+ PostTransformOperator.newBuilder()
+ .addTransform(
+ UNIX_TIMESTAMP_TABLEID.identifier(),
+ "col1,"
+ + " UNIX_TIMESTAMP(date_time_str) as unix_timestamp,"
+ + " UNIX_TIMESTAMP(date_time_str, unix_timestamp_format) as unix_timestamp_format",
+ null)
+ .addTimezone("Asia/Shanghai")
+ .build();
+ RegularEventOperatorTestHarness
+ transformFunctionEventEventOperatorTestHarness =
+ RegularEventOperatorTestHarness.with(transform, 1);
+ // Initialization
+ transformFunctionEventEventOperatorTestHarness.open();
+ // Create table
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(UNIX_TIMESTAMP_TABLEID, UNIX_TIMESTAMP_SCHEMA);
+ BinaryRecordDataGenerator recordDataGenerator =
+ new BinaryRecordDataGenerator(((RowType) UNIX_TIMESTAMP_SCHEMA.toRowDataType()));
+ BinaryRecordDataGenerator expectedRecordDataGenerator =
+ new BinaryRecordDataGenerator(
+ ((RowType) EXPECTED_UNIX_TIMESTAMP_SCHEMA.toRowDataType()));
transform.processElement(new StreamRecord<>(createTableEvent));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(
new StreamRecord<>(
new CreateTableEvent(
- UNIX_TIMESTAMP_TABLEID, UNIX_TIMESTAMP_SCHEMA)));
- transform.processElement(new StreamRecord<>(insertEvent));
+ UNIX_TIMESTAMP_TABLEID, EXPECTED_UNIX_TIMESTAMP_SCHEMA)));
+
+ // In Shanghai, "1970-01-01 08:00:01.001" formated by "yyyy-MM-dd HH:mm:ss.SSS" ==> 1L
+ DataChangeEvent insertEvent1 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("1"),
+ new BinaryStringData("1970-01-01 08:00:01.001"),
+ new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS")
+ }));
+ DataChangeEvent insertEventExpect1 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {new BinaryStringData("1"), 1L, 1L}));
+ transform.processElement(new StreamRecord<>(insertEvent1));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
- .isEqualTo(new StreamRecord<>(insertEventExpect));
+ .isEqualTo(new StreamRecord<>(insertEventExpect1));
+
+ // In Shanghai, "1970-01-01 08:00:01.001 +0800" formated by "yyyy-MM-dd HH:mm:ss.SSS X" ==>
+ // 1L
+ DataChangeEvent insertEvent2 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("2"),
+ new BinaryStringData("1970-01-01 08:00:01.001 +0800"),
+ new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS X")
+ }));
+ DataChangeEvent insertEventExpect2 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {new BinaryStringData("2"), 1L, 1L}));
+ transform.processElement(new StreamRecord<>(insertEvent2));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect2));
+
+ // In Shanghai, "1970-01-01 08:00:01.001 +0800" formated by "yyyy-MM-dd HH:mm:ss.SSS" ==> 1L
+ DataChangeEvent insertEvent3 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("3"),
+ new BinaryStringData("1970-01-01 08:00:01.001 +0800"),
+ new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS")
+ }));
+ DataChangeEvent insertEventExpect3 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {new BinaryStringData("3"), 1L, 1L}));
+ transform.processElement(new StreamRecord<>(insertEvent3));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect3));
+
+ // In Shanghai, "1970-01-01 08:00:01.001" formated by "yyyy-MM-dd HH:mm:ss.SSS X" ==>
+ // -9223372036854775808L
+ DataChangeEvent insertEvent4 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("4"),
+ new BinaryStringData("1970-01-01 08:00:01.001"),
+ new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS X")
+ }));
+ DataChangeEvent insertEventExpect4 =
+ DataChangeEvent.insertEvent(
+ UNIX_TIMESTAMP_TABLEID,
+ expectedRecordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("4"), 1L, -9223372036854775808L
+ }));
+ transform.processElement(new StreamRecord<>(insertEvent4));
+ Assertions.assertThat(
+ transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect4));
transformFunctionEventEventOperatorTestHarness.close();
}
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
index f0033e5e93a..478d4e92ee8 100644
--- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
@@ -263,9 +263,9 @@ public void testTranslateFilterToJaninoExpression() {
"TO_DATE(dt, 'yyyy-MM-dd')", "toDate(dt, \"yyyy-MM-dd\", __time_zone__)");
testFilterExpression("TO_TIMESTAMP(dt)", "toTimestamp(dt, __time_zone__)");
testFilterExpression("TIMESTAMP_DIFF('DAY', dt1, dt2)", "timestampDiff(\"DAY\", dt1, dt2)");
- testFilterExpression("IF(a>b, a, b)", "a > b ? a : b");
- testFilterExpression("NULLIF(a, b)", "nullif(a, b)");
- testFilterExpression("COALESCE(a, b, c)", "coalesce(a, b, c)");
+ testFilterExpression("IF(a>b,a,b)", "a > b ? a : b");
+ testFilterExpression("NULLIF(a,b)", "nullif(a, b)");
+ testFilterExpression("COALESCE(a,b,c)", "coalesce(a, b, c)");
testFilterExpression("id + 2", "id + 2");
testFilterExpression("id - 2", "id - 2");
testFilterExpression("id * 2", "id * 2");
From 8e5777dbbe214a699ed6acbdbbce06db490598f6 Mon Sep 17 00:00:00 2001
From: wenmo <32723967+aiwenmo@users.noreply.github.com>
Date: Mon, 13 Jan 2025 00:52:41 +0800
Subject: [PATCH 4/4] Fix the wrongly spelled words
---
.../transform/PostTransformOperatorTest.java | 32 ++++++++++---------
1 file changed, 17 insertions(+), 15 deletions(-)
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
index b2e82efe28b..944527b9008 100644
--- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
@@ -113,14 +113,14 @@ public class PostTransformOperatorTest {
TableId.tableId("my_company", "my_branch", "from_unix_time_table");
private static final Schema FROM_UNIX_TIME_SCHEMA =
Schema.newBuilder()
- .physicalColumn("col1", DataTypes.STRING())
+ .physicalColumn("col1", DataTypes.STRING().notNull())
.physicalColumn("seconds", DataTypes.BIGINT())
.physicalColumn("format_str", DataTypes.STRING())
.primaryKey("col1")
.build();
private static final Schema EXPECTED_FROM_UNIX_TIME_SCHEMA =
Schema.newBuilder()
- .physicalColumn("col1", DataTypes.STRING())
+ .physicalColumn("col1", DataTypes.STRING().notNull())
.physicalColumn("from_unix_time", DataTypes.STRING())
.physicalColumn("from_unix_time_format", DataTypes.STRING())
.primaryKey("col1")
@@ -130,14 +130,14 @@ public class PostTransformOperatorTest {
TableId.tableId("my_company", "my_branch", "unix_timestamp_table");
private static final Schema UNIX_TIMESTAMP_SCHEMA =
Schema.newBuilder()
- .physicalColumn("col1", DataTypes.STRING())
+ .physicalColumn("col1", DataTypes.STRING().notNull())
.physicalColumn("date_time_str", DataTypes.STRING())
.physicalColumn("unix_timestamp_format", DataTypes.STRING())
.primaryKey("col1")
.build();
private static final Schema EXPECTED_UNIX_TIMESTAMP_SCHEMA =
Schema.newBuilder()
- .physicalColumn("col1", DataTypes.STRING())
+ .physicalColumn("col1", DataTypes.STRING().notNull())
.physicalColumn("unix_timestamp", DataTypes.BIGINT())
.physicalColumn("unix_timestamp_format", DataTypes.BIGINT())
.primaryKey("col1")
@@ -923,7 +923,7 @@ void testUnixTimestampTransformInBerlin() throws Exception {
new CreateTableEvent(
UNIX_TIMESTAMP_TABLEID, EXPECTED_UNIX_TIMESTAMP_SCHEMA)));
- // In Berlin, "1970-01-01 08:00:01.001" formated by "yyyy-MM-dd HH:mm:ss.SSS" ==> 25201L
+ // In Berlin, "1970-01-01 08:00:01.001" formatted by "yyyy-MM-dd HH:mm:ss.SSS" ==> 25201L
DataChangeEvent insertEvent1 =
DataChangeEvent.insertEvent(
UNIX_TIMESTAMP_TABLEID,
@@ -943,7 +943,8 @@ void testUnixTimestampTransformInBerlin() throws Exception {
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(new StreamRecord<>(insertEventExpect1));
- // In Berlin, "1970-01-01 08:00:01.001 +0800" formated by "yyyy-MM-dd HH:mm:ss.SSS X" ==> 1L
+ // In Berlin, "1970-01-01 08:00:01.001 +0800" formatted by "yyyy-MM-dd HH:mm:ss.SSS X" ==>
+ // 1L
DataChangeEvent insertEvent2 =
DataChangeEvent.insertEvent(
UNIX_TIMESTAMP_TABLEID,
@@ -963,7 +964,7 @@ void testUnixTimestampTransformInBerlin() throws Exception {
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(new StreamRecord<>(insertEventExpect2));
- // In Berlin, "1970-01-01 08:00:01.001 +0800" formated by "yyyy-MM-dd HH:mm:ss.SSS" ==>
+ // In Berlin, "1970-01-01 08:00:01.001 +0800" formatted by "yyyy-MM-dd HH:mm:ss.SSS" ==>
// 25201L
DataChangeEvent insertEvent3 =
DataChangeEvent.insertEvent(
@@ -984,7 +985,7 @@ void testUnixTimestampTransformInBerlin() throws Exception {
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(new StreamRecord<>(insertEventExpect3));
- // In Berlin, "1970-01-01 08:00:01.001" formated by "yyyy-MM-dd HH:mm:ss.SSS X" ==>
+ // In Berlin, "1970-01-01 08:00:01.001" formatted by "yyyy-MM-dd HH:mm:ss.SSS X" ==>
// -9223372036854775808L
DataChangeEvent insertEvent4 =
DataChangeEvent.insertEvent(
@@ -1042,7 +1043,7 @@ void testUnixTimestampTransformInShanghai() throws Exception {
new CreateTableEvent(
UNIX_TIMESTAMP_TABLEID, EXPECTED_UNIX_TIMESTAMP_SCHEMA)));
- // In Shanghai, "1970-01-01 08:00:01.001" formated by "yyyy-MM-dd HH:mm:ss.SSS" ==> 1L
+ // In Shanghai, "1970-01-01 08:00:01.001" formatted by "yyyy-MM-dd HH:mm:ss.SSS" ==> 1L
DataChangeEvent insertEvent1 =
DataChangeEvent.insertEvent(
UNIX_TIMESTAMP_TABLEID,
@@ -1062,7 +1063,7 @@ void testUnixTimestampTransformInShanghai() throws Exception {
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(new StreamRecord<>(insertEventExpect1));
- // In Shanghai, "1970-01-01 08:00:01.001 +0800" formated by "yyyy-MM-dd HH:mm:ss.SSS X" ==>
+ // In Shanghai, "1970-01-01 08:00:01.001 +0100" formatted by "yyyy-MM-dd HH:mm:ss.SSS X" ==>
// 1L
DataChangeEvent insertEvent2 =
DataChangeEvent.insertEvent(
@@ -1070,27 +1071,28 @@ void testUnixTimestampTransformInShanghai() throws Exception {
recordDataGenerator.generate(
new Object[] {
new BinaryStringData("2"),
- new BinaryStringData("1970-01-01 08:00:01.001 +0800"),
+ new BinaryStringData("1970-01-01 08:00:01.001 +0100"),
new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS X")
}));
DataChangeEvent insertEventExpect2 =
DataChangeEvent.insertEvent(
UNIX_TIMESTAMP_TABLEID,
expectedRecordDataGenerator.generate(
- new Object[] {new BinaryStringData("2"), 1L, 1L}));
+ new Object[] {new BinaryStringData("2"), 1L, 25201L}));
transform.processElement(new StreamRecord<>(insertEvent2));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(new StreamRecord<>(insertEventExpect2));
- // In Shanghai, "1970-01-01 08:00:01.001 +0800" formated by "yyyy-MM-dd HH:mm:ss.SSS" ==> 1L
+ // In Shanghai, "1970-01-01 08:00:01.001 +0100" formatted by "yyyy-MM-dd HH:mm:ss.SSS" ==>
+ // 1L
DataChangeEvent insertEvent3 =
DataChangeEvent.insertEvent(
UNIX_TIMESTAMP_TABLEID,
recordDataGenerator.generate(
new Object[] {
new BinaryStringData("3"),
- new BinaryStringData("1970-01-01 08:00:01.001 +0800"),
+ new BinaryStringData("1970-01-01 08:00:01.001 +0100"),
new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS")
}));
DataChangeEvent insertEventExpect3 =
@@ -1103,7 +1105,7 @@ void testUnixTimestampTransformInShanghai() throws Exception {
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(new StreamRecord<>(insertEventExpect3));
- // In Shanghai, "1970-01-01 08:00:01.001" formated by "yyyy-MM-dd HH:mm:ss.SSS X" ==>
+ // In Shanghai, "1970-01-01 08:00:01.001" formatted by "yyyy-MM-dd HH:mm:ss.SSS X" ==>
// -9223372036854775808L
DataChangeEvent insertEvent4 =
DataChangeEvent.insertEvent(