Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix timestamp with timezone mapping in iceberg type converter #23534

Conversation

auden-woolfson
Copy link
Contributor

@auden-woolfson auden-woolfson commented Aug 27, 2024

Description

Fixes bug described in #23529

== RELEASE NOTES ==

Iceberg Connector Changes
* Add logic to iceberg type converter for timestamp with timezone :pr:`23534`

@auden-woolfson auden-woolfson added bug iceberg Apache Iceberg related labels Aug 27, 2024
@auden-woolfson auden-woolfson self-assigned this Aug 27, 2024
@auden-woolfson auden-woolfson force-pushed the add_timestamptz_mapping_to_iceberg_connector branch from 8e1716e to 38919d7 Compare August 27, 2024 22:01
Copy link
Member

@agrawalreetika agrawalreetika left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some tests with column types Timestamp with timezone?

@tdcmeehan
Copy link
Contributor

+1. Let's add some end to end tests. Additionally, we may want to remove the validation added here, since I believe we should support this properly now: #22926

@tdcmeehan tdcmeehan self-assigned this Aug 28, 2024
@hantangwangd
Copy link
Member

Also add test cases involving Timestamp with timezone in filter conditions and partition columns, I'm a little concerned about the behavior in these scenarios.

@auden-woolfson
Copy link
Contributor Author

Also add test cases involving Timestamp with timezone in filter conditions and partition columns, I'm a little concerned about the behavior in these scenarios.

Just to clarify, do you want the timestamp with timezone to be a part of the table that is being partitioned or the type of the partition column? Currently it is not supported as one of the types for partition columns.

@hantangwangd
Copy link
Member

Just to clarify, do you want the timestamp with timezone to be a part of the table that is being partitioned or the type of the partition column? Currently it is not supported as one of the types for partition columns.

Yes, that's right. But I think it's better for us to first figure out how to handle it in these cases when we start to support it.

A very important question is, what format of long type data do we plan to actually store in data files for Timestamp with timezone? Presto has a special encoding for data with type of timestamp with timezone, which mix the time zone information with UTC values in millis. Meanwhile, Iceberg spec store the timestamp tz data as a UTC values in micros and do not retain the source time zone.

If we store the data following Iceberg spec, then we will lose the information of time zone; and if we store the data following Presto's format, then we may meet problems involving cross-engine compatibility.

cc: @tdcmeehan @agrawalreetika @ZacBlanco

@tdcmeehan
Copy link
Contributor

@hantangwangd I don't see this as a choice, we must store the data according to the Iceberg spec, which means we'll lose the embedded time zone information. This is fine--semantically, it's the same thing, and the only thing that might be confusing is the user, when retrieving stored Iceberg timestamps, will see that the timezones have been adjusted to UTC. But the point in time values will remain the same, and this is merely a limitation of the Iceberg table format.

@hantangwangd
Copy link
Member

@tdcmeehan Completely agree with your viewpoint.

That means we need to perform transformation logics for data with type of timestamp with timezone when writing/reading, parsing filter conditions, and handling partition values, besides doing the type conversion. It's not to say completing all these works all at once, but it can be divided into a series of PRs to complete.

Copy link
Contributor

@ZacBlanco ZacBlanco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nits. One question about removing the verifyTypeSupported method

@auden-woolfson auden-woolfson force-pushed the add_timestamptz_mapping_to_iceberg_connector branch 2 times, most recently from a562f45 to d877bcc Compare September 13, 2024 21:50
Copy link
Member

@agrawalreetika agrawalreetika left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good to me.

  1. Please add a document entry in https://prestodb.io/docs/current/connector/iceberg.html#type-mapping
  2. Squash all the commits into 1 "Fix timestamp with timezone mapping in iceberg type converter"

@@ -117,6 +117,10 @@ public static Type toPrestoType(org.apache.iceberg.types.Type type, TypeManager
case TIME:
return TimeType.TIME;
case TIMESTAMP:
Types.TimestampType timestampType = (Types.TimestampType) type.asPrimitiveType();
if (timestampType.shouldAdjustToUTC()) {
return TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add static import for this

@auden-woolfson auden-woolfson force-pushed the add_timestamptz_mapping_to_iceberg_connector branch from 69a4878 to 361d23a Compare September 24, 2024 19:54
@@ -60,6 +60,9 @@ interface Int64TimeAndTimestampMicrosValuesDecoder
void readNext(long[] values, int offset, int length)
throws IOException;

void readNextWithTimezone(long[] values, int offset, int length)
throws IOException;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is necessary because the decoder is accesses the individual raw values from parquet, not the batch reader. The batch reader has the column descriptor (metadata) which should tell it whether or not to pack the value with timezone, like is done in the regular column reader. The actual packing should be done within the decoder, so there needs to be a way for decoders to dynamically switch between with and without timezone mode.

The approach I took here is just to copy the readNext method from each implementation and add packDateTimewithZone. A stateful approach (instance variable bool withTimestamp or something) would probably be more efficient for future development and code execution but for now I am seeing if this implementation works.

@auden-woolfson auden-woolfson force-pushed the add_timestamptz_mapping_to_iceberg_connector branch from 79d53ce to 607edf0 Compare September 25, 2024 17:59
@auden-woolfson auden-woolfson force-pushed the add_timestamptz_mapping_to_iceberg_connector branch from a381447 to c7deb43 Compare October 18, 2024 16:08
public Object[][] createTestTimestampWithTimezoneData()
{
return new Object[][] {
{getQueryRunner()},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we need a whole separate test for this. Can't we just create a dataProvider which passes in true/false values and lets us construct a valid session in the beginning of the test method? Then you can pass the session to all of the execute/assertQuery methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on this please? I'm not sure what you are referring to as the separate test. I can have the data provider pass in one true and one false value and add a condition inside the test function itself, is that what you are asking for here? If so what purpose would that serve? Thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating an entirely new query runner is expensive. We should be able to test with the batch read optimization by just setting a session property.

In this data provider you should can create two sessions using the default session. One with batch read enabled and disabled and then run the test. It will take far less time to run. You will need to update all of the query statements to pass the session variable though.

public Object[][] batchReadSessions()
    {
        return new Object[][] {
                {Session.builder(getSession())
                        .setCatalogSessionProperty("iceberg", PARQUET_BATCH_READ_OPTIMIZATION_ENABLED, "true")
                        .build()},
                {Session.builder(getSession())
                        .setCatalogSessionProperty("iceberg", PARQUET_BATCH_READ_OPTIMIZATION_ENABLED, "false")
                        .build()}
        };
    }

This would also allow you to remove the additional logic inside of createQueryRunner

@@ -69,7 +91,13 @@ public void readNext(long[] values, int offset, int length)
final LongDictionary localDictionary = dictionary;
for (int srcIndex = currentBuffer.length - currentCount; destinationIndex < endIndex; srcIndex++) {
long dictionaryValue = localDictionary.decodeToLong(localBuffer[srcIndex]);
values[destinationIndex++] = MICROSECONDS.toMillis(dictionaryValue);
long millisValue = MICROSECONDS.toMillis(dictionaryValue);
if (isWithTimezone()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than having a branch statement inside the hot loop for a lot of these readers, I think we should set the "reading function" in the constructor of the reader as the reading behavior shouldn't change. Do you see a performance impact when this conditional is introduced?

@prestodb-ci prestodb-ci added the from:IBM PR from IBM label Jan 14, 2025
@prestodb-ci prestodb-ci requested review from a team, sh-shamsan and pdabre12 and removed request for a team January 14, 2025 23:56
@auden-woolfson auden-woolfson force-pushed the add_timestamptz_mapping_to_iceberg_connector branch 2 times, most recently from be489f1 to 8add1c4 Compare January 16, 2025 21:40
@auden-woolfson auden-woolfson force-pushed the add_timestamptz_mapping_to_iceberg_connector branch from 8add1c4 to 67895c0 Compare January 16, 2025 23:08
Copy link
Contributor

@ZacBlanco ZacBlanco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Core logic is sound. Just some comments on the test cases

Comment on lines 92 to 93
assertQuerySucceeds("CREATE TABLE test_timestamp_with_timezone (x timestamp with time zone)");
dropTable(getSession(), "test_timestamp_with_timezone");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why create and then drop? Can't we just remove L92+93 altogether?

public Object[][] createTestTimestampWithTimezoneData()
{
return new Object[][] {
{getQueryRunner()},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating an entirely new query runner is expensive. We should be able to test with the batch read optimization by just setting a session property.

In this data provider you should can create two sessions using the default session. One with batch read enabled and disabled and then run the test. It will take far less time to run. You will need to update all of the query statements to pass the session variable though.

public Object[][] batchReadSessions()
    {
        return new Object[][] {
                {Session.builder(getSession())
                        .setCatalogSessionProperty("iceberg", PARQUET_BATCH_READ_OPTIMIZATION_ENABLED, "true")
                        .build()},
                {Session.builder(getSession())
                        .setCatalogSessionProperty("iceberg", PARQUET_BATCH_READ_OPTIMIZATION_ENABLED, "false")
                        .build()}
        };
    }

This would also allow you to remove the additional logic inside of createQueryRunner

String timestamptz = "TIMESTAMP '1984-12-08 00:10:00 America/Los_Angeles'";
String timestamp = "TIMESTAMP '1984-12-08 00:10:00'";

runner.execute("CREATE TABLE test_timestamptz(a TIMESTAMP WITH TIME ZONE, b TIMESTAMP, c TIMESTAMP WITH TIME ZONE)");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's do assertQuerySucceeds here

runner.execute("CREATE TABLE test_timestamptz(a TIMESTAMP WITH TIME ZONE, b TIMESTAMP, c TIMESTAMP WITH TIME ZONE)");
String row = "(" + timestamptz + ", " + timestamp + ", " + timestamptz + ")";
for (int i = 0; i < 10; i++) {
runner.execute("INSERT INTO test_timestamptz values " + row);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use assertUpdate

assertTrue(types.get(0) instanceof TimestampWithTimeZoneType);
assertTrue(types.get(1) instanceof TimestampType);

runner.execute("CREATE TABLE test_timestamptz_partition(a TIMESTAMP WITH TIME ZONE, b TIMESTAMP, c TIMESTAMP WITH TIME ZONE) " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertQuerySucceeds

runner.execute("INSERT INTO test_timestamptz values " + row);
}

MaterializedResult initialRows = runner.execute("SELECT * FROM test_timestamptz");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's assert on the content of the result here too

"WITH (PARTITIONING = ARRAY['b'])");
runner.execute("INSERT INTO test_timestamptz_partition (a, b, c) SELECT a, b, c FROM test_timestamptz");

MaterializedResult partitionRows = runner.execute("SELECT * FROM test_timestamptz");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert on content of the query as well

runner.execute("CREATE TABLE test_timestamptz_filter(a TIMESTAMP WITH TIME ZONE)");

for (int i = 0; i < 5; i++) {
runner.execute("INSERT INTO test_timestamptz_filter VALUES (" + earlyTimestamptz + ")");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertUpdate

assertTrue(partitionTypes.get(1) instanceof TimestampType);

String earlyTimestamptz = "TIMESTAMP '1980-12-08 00:10:00 America/Los_Angeles'";
runner.execute("CREATE TABLE test_timestamptz_filter(a TIMESTAMP WITH TIME ZONE)");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertQuerySucceeds

runner.execute("INSERT INTO test_timestamptz_filter VALUES (" + earlyTimestamptz + ")");
}
for (int i = 0; i < 5; i++) {
runner.execute("INSERT INTO test_timestamptz_filter VALUES (" + timestamptz + ")");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertUpdate

@auden-woolfson auden-woolfson force-pushed the add_timestamptz_mapping_to_iceberg_connector branch from d888615 to 3badf53 Compare January 27, 2025 21:43
@auden-woolfson
Copy link
Contributor Author

@ZacBlanco thanks, just pushed the changes

@auden-woolfson auden-woolfson force-pushed the add_timestamptz_mapping_to_iceberg_connector branch from e1b5581 to fa8be89 Compare January 28, 2025 17:29
.build()},
{Session.builder(getSession())
.setCatalogSessionProperty("iceberg", PARQUET_BATCH_READ_OPTIMIZATION_ENABLED, "false")
.build()}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be different timezone data for test as the name suggests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just to test with batch reader enabled and disabled. We are not changing the data used

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then I think name of the DataProvider and Object method according to the properties would be better?

@@ -179,10 +180,14 @@ public static DistributedQueryRunner createIcebergQueryRunner(
{
setupLogging();

Session session = testSessionBuilder()
Session.SessionBuilder sessionBuilder = testSessionBuilder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: import SessionBuilder

for (int i = 0; i < block.getPositionCount(); i++) {
if (!block.isNull(i)) {
long value = unpackMillisUtc(type.getLong(block, i));
long scaledValue = scaleValueFunction.scaleValue(value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can directly do this here? And we don't need ScaleValueFunction interface and scaleValueFunction variable above.

long scaledValue = writeMicroseconds ? MILLISECONDS.toMicros(value) : value;

Comment on lines 51 to 56
if (withTimezone) {
packFunction = millis -> packDateTimeWithZone(millis, UTC_KEY);
}
else {
packFunction = millis -> millis;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (withTimezone) {
packFunction = millis -> packDateTimeWithZone(millis, UTC_KEY);
}
else {
packFunction = millis -> millis;
}
this.packFunction = withTimezone ? millis -> packDateTimeWithZone(millis, UTC_KEY) : millis -> millis;

Copy link
Member

@hantangwangd hantangwangd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the change, overall looks good to me. Some little nits.

Copy link
Contributor

@ZacBlanco ZacBlanco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some minor issues, otherwise lgtm

@auden-woolfson auden-woolfson force-pushed the add_timestamptz_mapping_to_iceberg_connector branch from f5c9447 to 0ce3cc8 Compare January 29, 2025 22:56
@auden-woolfson auden-woolfson merged commit ec9e904 into prestodb:master Jan 30, 2025
55 checks passed
@tdcmeehan
Copy link
Contributor

Thank you for the reviews @ZacBlanco @agrawalreetika @hantangwangd, and thanks @auden-woolfson for the fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug from:IBM PR from IBM iceberg Apache Iceberg related
Projects
Status: Done
Status: Done
Development

Successfully merging this pull request may close these issues.

Iceberg timestamptz should map to Presto TIMESTAMP WITH TIME ZONE type
7 participants