Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36865][cdc] Provide UNIX_TIMESTAMP series functions in YAML pipeline #3819

Merged
merged 4 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/content.zh/docs/core-concept/transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [
| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
| TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. |
| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. |
| FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | Returns a representation of the numeric argument as a value in string format (default is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing seconds since ‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function. The return value is expressed in the session time zone (specified in TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01-01 00:00:44’ if in UTC time zone, but returns ‘1970-01-01 09:00:44’ if in ‘Asia/Tokyo’ time zone. |
| UNIX_TIMESTAMP() | unixTimestamp() | Gets current Unix timestamp in seconds. This function is not deterministic which means the value would be recalculated for each record. |
| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified timezone in table config.<br/>If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed, the default value Long.MIN_VALUE(-9223372036854775808) will be returned.|

## Conditional Functions

Expand Down
3 changes: 3 additions & 0 deletions docs/content/docs/core-concept/transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [
| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
| TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. |
| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. |
| FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | Returns a representation of the numeric argument as a value in string format (default is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing seconds since ‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function. The return value is expressed in the session time zone (specified in TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01-01 00:00:44’ if in UTC time zone, but returns ‘1970-01-01 09:00:44’ if in ‘Asia/Tokyo’ time zone. |
| UNIX_TIMESTAMP() | unixTimestamp() | Gets current Unix timestamp in seconds. This function is not deterministic which means the value would be recalculated for each record. |
| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified timezone in table config.<br/>If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed, the default value Long.MIN_VALUE(-9223372036854775808) will be returned.|

## Conditional Functions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ public class DateTimeUtils {
*/
public static final long MILLIS_PER_DAY = 86400000L; // = 24 * 60 * 60 * 1000

/** The SimpleDateFormat string for ISO dates, "yyyy-MM-dd". */
private static final String DATE_FORMAT_STRING = "yyyy-MM-dd";

/** The SimpleDateFormat string for ISO times, "HH:mm:ss". */
private static final String TIME_FORMAT_STRING = "HH:mm:ss";

/** The SimpleDateFormat string for ISO timestamps, "yyyy-MM-dd HH:mm:ss". */
private static final String TIMESTAMP_FORMAT_STRING =
DATE_FORMAT_STRING + " " + TIME_FORMAT_STRING;

/**
* A ThreadLocal cache map for SimpleDateFormat, because SimpleDateFormat is not thread-safe.
* (string_format) => formatter
Expand Down Expand Up @@ -109,7 +119,7 @@ private static long internalParseTimestampMillis(String dateStr, String format,
} catch (ParseException e) {
LOG.error(
String.format(
"Exception when parsing datetime string '%s' in format '%s'",
"Exception when parsing datetime string '%s' in format '%s', the default value Long.MIN_VALUE(-9223372036854775808) will be returned.",
dateStr, format),
e);
return Long.MIN_VALUE;
Expand All @@ -128,6 +138,56 @@ private static int ymdToJulian(int year, int month, int day) {
return day + (153 * m + 2) / 5 + 365 * y + y / 4 - y / 100 + y / 400 - 32045;
}

// --------------------------------------------------------------------------------------------
// UNIX TIME
// --------------------------------------------------------------------------------------------

/**
* Convert unix timestamp (seconds since '1970-01-01 00:00:00' UTC) to datetime string in the
* "yyyy-MM-dd HH:mm:ss" format.
*/
public static String formatUnixTimestamp(long unixTime, TimeZone timeZone) {
return formatUnixTimestamp(unixTime, TIMESTAMP_FORMAT_STRING, timeZone);
}

/**
* Convert unix timestamp (seconds since '1970-01-01 00:00:00' UTC) to datetime string in the
* given format.
*/
public static String formatUnixTimestamp(long unixTime, String format, TimeZone timeZone) {
SimpleDateFormat formatter = FORMATTER_CACHE.get(format);
formatter.setTimeZone(timeZone);
Date date = new Date(unixTime * 1000);
try {
return formatter.format(date);
} catch (Exception e) {
LOG.error("Exception when formatting.", e);
return null;
}
}

/**
* Returns the value of the argument as an unsigned integer in seconds since '1970-01-01
* 00:00:00' UTC.
*/
public static long unixTimestamp(String dateStr, TimeZone timeZone) {
return unixTimestamp(dateStr, TIMESTAMP_FORMAT_STRING, timeZone);
}

/**
* Returns the value of the argument as an unsigned integer in seconds since '1970-01-01
* 00:00:00' UTC.
*/
public static long unixTimestamp(String dateStr, String format, TimeZone timeZone) {
long ts = internalParseTimestampMillis(dateStr, format, timeZone);
if (ts == Long.MIN_VALUE) {
return Long.MIN_VALUE;
} else {
// return the seconds
return ts / 1000;
}
}

// --------------------------------------------------------------------------------------------
// Format
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,27 @@ public static int currentDate(long epochTime, String timezone) {
return timestampMillisToDate(localtimestamp(epochTime, timezone).getMillisecond());
}

public static String fromUnixtime(long seconds, String timezone) {
return DateTimeUtils.formatUnixTimestamp(seconds, TimeZone.getTimeZone(timezone));
}

public static String fromUnixtime(long seconds, String format, String timezone) {
return DateTimeUtils.formatUnixTimestamp(seconds, format, TimeZone.getTimeZone(timezone));
}

public static long unixTimestamp(long epochTime, String timezone) {
return epochTime / 1000;
}

public static long unixTimestamp(String dateTimeStr, long epochTime, String timezone) {
return DateTimeUtils.unixTimestamp(dateTimeStr, TimeZone.getTimeZone(timezone));
}

public static long unixTimestamp(
String dateTimeStr, String format, long epochTime, String timezone) {
return DateTimeUtils.unixTimestamp(dateTimeStr, format, TimeZone.getTimeZone(timezone));
}

public static String dateFormat(TimestampData timestamp, String format) {
return DateTimeUtils.formatTimestampMillis(
timestamp.getMillisecond(), format, TimeZone.getTimeZone("UTC"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,18 @@ public class JaninoCompiler {
Arrays.asList("CURRENT_TIMESTAMP", "NOW");

private static final List<String> TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS =
Arrays.asList("LOCALTIME", "LOCALTIMESTAMP", "CURRENT_TIME", "CURRENT_DATE");
Arrays.asList(
"LOCALTIME",
"LOCALTIMESTAMP",
"CURRENT_TIME",
"CURRENT_DATE",
"UNIX_TIMESTAMP");

private static final List<String> TIMEZONE_FREE_TEMPORAL_CONVERSION_FUNCTIONS =
Arrays.asList("DATE_FORMAT");

private static final List<String> TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS =
Arrays.asList("TO_DATE", "TO_TIMESTAMP");
Arrays.asList("TO_DATE", "TO_TIMESTAMP", "FROM_UNIXTIME");

public static final String DEFAULT_EPOCH_TIME = "__epoch_time__";
public static final String DEFAULT_TIME_ZONE = "__time_zone__";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,27 @@ public SqlSyntax getSyntax() {
return SqlSyntax.FUNCTION;
}
};
public static final SqlFunction UNIX_TIMESTAMP =
new SqlFunction(
"UNIX_TIMESTAMP",
SqlKind.OTHER_FUNCTION,
ReturnTypes.BIGINT_NULLABLE,
null,
OperandTypes.or(
OperandTypes.NILADIC,
OperandTypes.family(SqlTypeFamily.CHARACTER),
OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)),
SqlFunctionCategory.TIMEDATE);
public static final SqlFunction FROM_UNIXTIME =
new SqlFunction(
"FROM_UNIXTIME",
SqlKind.OTHER_FUNCTION,
TransformSqlReturnTypes.VARCHAR_FORCE_NULLABLE,
null,
OperandTypes.or(
OperandTypes.family(SqlTypeFamily.INTEGER),
OperandTypes.family(SqlTypeFamily.INTEGER, SqlTypeFamily.CHARACTER)),
SqlFunctionCategory.TIMEDATE);
public static final SqlFunction DATE_FORMAT =
new SqlFunction(
"DATE_FORMAT",
Expand Down
Loading
Loading