Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADBDEV-6159: Add support pushdown for timestamp with time zone data type #142

Open
wants to merge 2 commits into
base: pxf-6.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/content/jdbc_pxf.html.md.erb
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ You include JDBC connector custom options in the `LOCATION` URI, prefacing each
| FETCH_SIZE | Read | Integer that identifies the number of rows to buffer when reading from an external SQL database. Read row batching is activated by default. The default read fetch size for MySQL is `-2147483648` (`Integer.MIN_VALUE`). The default read fetch size for all other databases is 1000. |
| QUERY_TIMEOUT | Read/Write | Integer that identifies the amount of time (in seconds) that the JDBC driver waits for a statement to run. The default wait time is infinite. |
| DATE_WIDE_RANGE | Read/Write | Boolean that enables special parsing of dates when the year contains more than four alphanumeric characters. The default value is `false`. When set to `true`, PXF uses extended classes to parse dates, and recognizes years that specify `BC` or `AD`. |
| UNKNOWN_DBMS_AS_POSTGRESQL | Read | Boolean that allows using PostgreSQL syntax when interacting with an external data source whose syntax is unknown. The default value is `true`. When set to `false`, PXF uses default syntaxes. This parameter might be important for pushdown filter. |
| POOL_SIZE | Write | Activate thread pooling on `INSERT` operations and identify the number of threads in the pool. Thread pooling is deactivated by default. |
| PARTITION_BY | Read | Activates read partitioning. The partition column, \<column-name\>:\<column-type\>. You may specify only one partition column. The JDBC connector supports `date`, `int`, and `enum` \<column-type\> values, where `int` represents any JDBC integral type. If you do not identify a `PARTITION_BY` column, a single PXF instance services the read request. |
| RANGE | Read | Required when `PARTITION_BY` is specified. The query range; used as a hint to aid the creation of partitions. The `RANGE` format is dependent upon the data type of the partition column. When the partition column is an `enum` type, `RANGE` must specify a list of values, \<value\>:\<value\>[:\<value\>[...]], each of which forms its own fragment. If the partition column is an `int` or `date` type, `RANGE` must specify \<start-value\>:\<end-value\> and represents the interval from \<start-value\> through \<end-value\>, inclusive. The `RANGE` for an `int` partition column may span any 64-bit signed integer values. If the partition column is a `date` type, use the `yyyy-MM-dd` date format. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public boolean openForWrite() throws SQLException {
}
}
// Get database product name
DbProduct dbProduct = DbProduct.getDbProduct(connection.getMetaData().getDatabaseProductName());
DbProduct dbProduct = DbProduct.getDbProduct(connection.getMetaData().getDatabaseProductName(), treatUnknownDbmsAsPostgreSql);

writer = JdbcWriter.fromProps(
JdbcWriterProperties.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public class JdbcBasePlugin extends BasePlugin implements Reloader {
private static final String MYSQL_DRIVER_PREFIX = "com.mysql.";
private static final String JDBC_DATE_WIDE_RANGE = "jdbc.date.wideRange";
private static final String JDBC_DATE_WIDE_RANGE_LEGACY = "jdbc.date.wide-range";
private static final String JDBC_UNKNOWN_DBMS_AS_POSTGRESQL = "jdbc.unknownDbmsAsPostgreSql";

private enum TransactionIsolation {
READ_UNCOMMITTED(1),
Expand Down Expand Up @@ -179,6 +180,10 @@ public static TransactionIsolation typeOf(String str) {

// Flag which is used when the year might contain more than 4 digits in `date` or 'timestamp'
protected boolean isDateWideRange;
// Flag which is used when the external database will use PostgreSql syntaxes to wrap date, timestamp and timestamp with time zone
// while using pushdown filter.
protected boolean treatUnknownDbmsAsPostgreSql;


static {
// Deprecated as of Oct 22, 2019 in version 5.9.2+
Expand Down Expand Up @@ -402,6 +407,8 @@ public void afterPropertiesSet() {
}

isDateWideRange = getIsDateWideRange(context);

treatUnknownDbmsAsPostgreSql = treatUnknownDbmsAsPostgreSql(context);
}

/**
Expand Down Expand Up @@ -432,6 +439,19 @@ public static boolean getIsDateWideRange(RequestContext context) {
return configuration != null && configuration.getBoolean(JDBC_DATE_WIDE_RANGE, false);
}

/**
* Determine if the external database will use PostgreSql syntaxes to wrap date, timestamp and timestamp with time zone
* while using pushdown filter.
* Optional parameter. The default value is 'true' for backward compatability.
*
* @param context request context
* @return false if we don't want to use PostgreSql syntaxes to form pushdown filter.
*/
public static boolean treatUnknownDbmsAsPostgreSql(RequestContext context) {
Configuration configuration = context.getConfiguration();
return configuration != null && configuration.getBoolean(JDBC_UNKNOWN_DBMS_AS_POSTGRESQL, true);
}

/**
* Open a new JDBC connection
*
Expand Down Expand Up @@ -627,7 +647,7 @@ private void prepareConnection(Connection connection) throws SQLException {

// Prepare session (process sessionConfiguration)
if (!sessionConfiguration.isEmpty()) {
DbProduct dbProduct = DbProduct.getDbProduct(metadata.getDatabaseProductName());
DbProduct dbProduct = DbProduct.getDbProduct(metadata.getDatabaseProductName(), treatUnknownDbmsAsPostgreSql);

try (Statement statement = connection.createStatement()) {
for (Map.Entry<String, String> e : sessionConfiguration.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class JdbcPredicateBuilder extends ColumnPredicateBuilder {

private final DbProduct dbProduct;
private boolean wrapDateWithTime = false;
private boolean isDateWideRange;

public JdbcPredicateBuilder(DbProduct dbProduct,
List<ColumnDescriptor> tupleDescription) {
Expand All @@ -56,10 +57,12 @@ public JdbcPredicateBuilder(DbProduct dbProduct,
public JdbcPredicateBuilder(DbProduct dbProduct,
String quoteString,
List<ColumnDescriptor> tupleDescription,
boolean wrapDateWithTime) {
boolean wrapDateWithTime,
boolean isDateWideRange) {
super(quoteString, tupleDescription);
this.dbProduct = dbProduct;
this.wrapDateWithTime = wrapDateWithTime;
this.isDateWideRange = isDateWideRange;
}

@Override
Expand Down Expand Up @@ -98,6 +101,14 @@ protected String serializeValue(DataType type, String value) {
} else {
return dbProduct.wrapTimestamp(value);
}
case TIMESTAMP_WITH_TIME_ZONE:
// We support TIMESTAMP_WITH_TIME_ZONE only when isDateWideRange=true
if (isDateWideRange) {
return dbProduct.wrapTimestampWithTZ(value);
} else {
throw new UnsupportedOperationException(String.format(
"'%s' is not supported fo filtering without additional property. Try to use the property DATE_WIDE_RANGE=true", type));
}
default:
throw new UnsupportedOperationException(String.format(
"Unsupported column type for filtering '%s' ", type.getOID()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public List<OneField> getFields(OneRow row) throws SQLException {
value = offsetDateTime != null ? offsetDateTime.format(OFFSET_DATE_TIME_FORMATTER) : null;
} else {
throw new UnsupportedOperationException(
String.format("Field type '%s' (column '%s') is not supported",
String.format("Field type '%s' (column '%s') is not supported. Try to use the property DATE_WIDE_RANGE=true",
DataType.get(oneField.type),
column));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public class SQLQueryBuilder {
DataType.VARCHAR,
DataType.BPCHAR,
DataType.DATE,
DataType.TIMESTAMP
DataType.TIMESTAMP,
DataType.TIMESTAMP_WITH_TIME_ZONE
);
private static final TreeVisitor PRUNER = new SupportedOperatorPruner(SUPPORTED_OPERATORS);
private static final TreeTraverser TRAVERSER = new TreeTraverser();
Expand Down Expand Up @@ -122,7 +123,8 @@ public SQLQueryBuilder(RequestContext context, DatabaseMetaData metaData, String
}
databaseMetaData = metaData;

dbProduct = DbProduct.getDbProduct(databaseMetaData.getDatabaseProductName());
dbProduct = DbProduct.getDbProduct(databaseMetaData.getDatabaseProductName(),
JdbcBasePlugin.treatUnknownDbmsAsPostgreSql(context));
columns = context.getTupleDescription();

// pick the source as either requested table name or a wrapped subquery with an alias
Expand Down Expand Up @@ -285,7 +287,8 @@ protected JdbcPredicateBuilder getPredicateBuilder() {
dbProduct,
quoteString,
context.getTupleDescription(),
wrapDateWithTime);
wrapDateWithTime,
JdbcBasePlugin.getIsDateWideRange(context));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ public class DateTimeEraFormatters {
.appendLiteral(" ")
.appendText(ChronoField.ERA, TextStyle.SHORT)
.toFormatter();

/**
* Signifies the Time Zone
* +H:mm:ss - hour, with minute if non-zero or with minute and second if non-zero, with colon
* Examples: "-5", "+03", "-03:30", "+04:15"
*/
public final static DateTimeFormatter TIME_ZONE_FORMATTER = new DateTimeFormatterBuilder()
.appendOffset("+H:mm","+00:00")
.toFormatter();

/**
* Used to parse String to LocalDateTime.
* Examples: "1980-08-10 17:10:20" -> 1980-08-10T17:10:20; "123456-10-19 11:12:13" -> +123456-10-19T11:12:13;
Expand Down Expand Up @@ -78,6 +88,20 @@ public class DateTimeEraFormatters {
.appendOptional(ERA_FORMATTER)
.toFormatter()
.withLocale(Locale.ROOT);
/**
* Used to format String to OffsetDateTime.
* Examples: "2024-11-13 21:01:02.95+3" -> "2024-11-13T21:01:02.95+03:00"; "2015-10-11 15:00:00.9+05" -> "2015-10-11T15:00:00.9+05:00";
* "2015-10-11 15:00:00.9-03:30" -> "2015-10-11T15:00:00.9-03:30"; "2018-04-03 18:10:23.956789+00" -> "2018-04-03T18:10:23.956789Z"
*/
public static final DateTimeFormatter OFFSET_DATE_TIME_WITH_TIME_ZONE_FORMATTER = new DateTimeFormatterBuilder()
.appendValue(ChronoField.YEAR_OF_ERA, 4, 9, SignStyle.NORMAL).appendLiteral("-")
.appendValue(ChronoField.MONTH_OF_YEAR, 2).appendLiteral('-')
.appendValue(ChronoField.DAY_OF_MONTH, 2).appendLiteral(" ")
.append(ISO_LOCAL_TIME)
.append(TIME_ZONE_FORMATTER)
.appendOptional(ERA_FORMATTER)
.toFormatter()
.withLocale(Locale.ROOT);
/**
* Used to format LocalDateTime to String.
* Examples: 2018-10-19T10:11 -> "2018-10-19 10:11:00 AD"; +123456-10-19T11:12:13 -> "123456-10-19 11:12:13 AD";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;

import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE_TIME;
import static org.greenplum.pxf.plugins.jdbc.utils.DateTimeEraFormatters.OFFSET_DATE_TIME_WITH_TIME_ZONE_FORMATTER;

/**
* A tool class to change PXF-JDBC plugin behaviour for certain external databases
Expand All @@ -40,6 +42,24 @@ public enum DbProduct {
public String buildSessionQuery(String key, String value) {
return String.format("SET %s %s", key, value);
}

/**
* Convert Postgres timestamp with time zone string to the appropriate Microsoft SQL Server DATETIMEOFFSET string format.
* The Microsoft SQL Server DATETIMEOFFSET type supports only the `+|-hh:mm` format or the literal `Z` for time zones.
* Greenplum may send a timestamp with a time zone that contains only hours, for example, +03.
* We use the OffsetDateTime#toString method to convert time zones without minutes to those with minutes or to Z.
* In case of a parsing error, we will avoid pushdown and send the query without the WHERE clause.
*/
@Override
public String wrapTimestampWithTZ(String val) {
try {
String valStr = OffsetDateTime.parse(val, OFFSET_DATE_TIME_WITH_TIME_ZONE_FORMATTER).toString();
return "CONVERT(DATETIMEOFFSET, '" + valStr + "')";
} catch (Exception e) {
throw new IllegalArgumentException(
String.format("The value '%s' cannot be converted to the Microsoft SQL Server 'DATETIMEOFFSET' type", val));
}
}
},

MYSQL {
Expand Down Expand Up @@ -70,6 +90,14 @@ public String wrapTimestamp(String val) {
return "to_timestamp('" + val + "', 'YYYY-MM-DD HH24:MI:SS.FF')";
}

/**
* Convert Postgres timestamp with time zone string to the appropriate Oracle timestamp with time zone string format.
*/
@Override
public String wrapTimestampWithTZ(String val) {
return "to_timestamp_tz('" + val + "', 'YYYY-MM-DD HH24:MI:SS.FFTZH:TZM')";
}

@Override
public String buildSessionQuery(String key, String value) {
return OracleJdbcUtils.buildSessionQuery(key, value);
Expand All @@ -91,6 +119,11 @@ public String wrapDate(@NonNull LocalDate val, boolean isDateWideRange) {
public String wrapTimestamp(@NonNull LocalDateTime val, boolean isDateWideRange) {
return wrapTimestamp(isDateWideRange ? val.format(DateTimeEraFormatters.LOCAL_DATE_TIME_FORMATTER) : val.toString());
}

@Override
public String wrapTimestampWithTZ(String val) {
return "'" + val + "'";
}
},

S3_SELECT {
Expand All @@ -115,7 +148,15 @@ public String wrapTimestamp(String val) {
public String buildSessionQuery(String key, String value) {
return String.format("SET %s %s", key, value);
}
};

@Override
public String wrapTimestampWithTZ(String val) {
throw new UnsupportedOperationException(
String.format("The database %s doesn't support the TIMESTAMP WITH TIME ZONE data type", this));
}
},

OTHER;

/**
* Wraps a given date value the way required by target database
Expand Down Expand Up @@ -160,11 +201,21 @@ public String wrapTimestamp(String val) {
}

/**
* Wraps a given timestamp value the way required by target database
* Wraps a given timestamp with time zone value the way required by target database
*
* @param val {@link java.sql.Timestamp} object to wrap
* @param val {@link java.sql.Types.TIME_WITH_TIMEZONE} object to wrap
* @return a string with a properly wrapped timestamp object
*/
public String wrapTimestampWithTZ(String val) {
throw new UnsupportedOperationException("The database doesn't support pushdown of the `TIMESTAMP WITH TIME ZONE` data type");
}

/**
* Wraps a given timestamp value the way required by target database
*
* @param val {@link java.sql.Timestamp} object to wrap
* @param isDateWideRange flag which is used when the year might contain more than 4 digits
* @return a string with a properly wrapped timestamp object
*/
public String wrapTimestamp(@NonNull LocalDateTime val, boolean isDateWideRange) {
return wrapTimestamp(isDateWideRange ? val.format(ISO_LOCAL_DATE_TIME) :
Expand All @@ -188,7 +239,7 @@ public String buildSessionQuery(String key, String value) {
* @param dbName database name
* @return a DbProduct of the required class
*/
public static DbProduct getDbProduct(String dbName) {
public static DbProduct getDbProduct(String dbName, boolean treatUnknownDbmsAsPostgreSql) {
if (LOG.isDebugEnabled()) {
LOG.debug("Database product name is '{}'", dbName);
}
Expand All @@ -205,8 +256,14 @@ public static DbProduct getDbProduct(String dbName) {
result = DbProduct.S3_SELECT;
} else if (dbName.contains("ADAPTIVE SERVER ENTERPRISE")) {
result = DbProduct.SYBASE;
} else {
} else if (dbName.contains("POSTGRESQL")) {
result = DbProduct.POSTGRES;
} else {
if (treatUnknownDbmsAsPostgreSql) {
result = DbProduct.POSTGRES;
} else {
result = DbProduct.OTHER;
}
}

if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@
import java.util.Map;
import java.util.Properties;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.anyInt;
Expand Down Expand Up @@ -526,6 +523,23 @@ public void testInvalidBatchTimeout() {
() -> getPlugin(mockConnectionManager, mockSecureLogin, context));
}

@Test
public void testUnknownDbmsAsPostgreSqlFromConfiguration() {
configuration.set("jdbc.driver", "org.greenplum.pxf.plugins.jdbc.FakeJdbcDriver");
configuration.set("jdbc.url", "test-url");
configuration.set("jdbc.unknownDbmsAsPostgreSql", "false");
JdbcBasePlugin plugin = getPlugin(mockConnectionManager, mockSecureLogin, context);
assertFalse(plugin.treatUnknownDbmsAsPostgreSql);
}

@Test
public void testUnknownDbmsAsPostgreSqlDefaultValueFromConfiguration() {
configuration.set("jdbc.driver", "org.greenplum.pxf.plugins.jdbc.FakeJdbcDriver");
configuration.set("jdbc.url", "test-url");
JdbcBasePlugin plugin = getPlugin(mockConnectionManager, mockSecureLogin, context);
assertTrue(plugin.treatUnknownDbmsAsPostgreSql);
}

private JdbcBasePlugin getPlugin(ConnectionManager mockConnectionManager, SecureLogin mockSecureLogin, RequestContext context) {
JdbcBasePlugin plugin = new JdbcBasePlugin(mockConnectionManager, mockSecureLogin, mockDecryptClient);
plugin.setRequestContext(context);
Expand Down
Loading