Skip to content

Commit a4382f7

Browse files
LinhongLiucloud-fan
authored andcommitted
[SPARK-29486][SQL] CalendarInterval should have 3 fields: months, days and microseconds
### What changes were proposed in this pull request? Current CalendarInterval has 2 fields: months and microseconds. This PR try to change it to 3 fields: months, days and microseconds. This is because one logical day interval may have different number of microseconds (daylight saving). ### Why are the changes needed? One logical day interval may have different number of microseconds (daylight saving). For example, in PST timezone, there will be 25 hours from 2019-11-2 12:00:00 to 2019-11-3 12:00:00 ### Does this PR introduce any user-facing change? no ### How was this patch tested? unit test and new added test cases Closes apache#26134 from LinhongLiu/calendarinterval. Authored-by: Liu,Linhong <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 8a4378c commit a4382f7

File tree

42 files changed

+337
-228
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+337
-228
lines changed

common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.unsafe.types;
1919

2020
import java.io.Serializable;
21+
import java.util.Objects;
2122

2223
/**
2324
* The internal representation of interval type.
@@ -31,45 +32,50 @@ public final class CalendarInterval implements Serializable {
3132
public static final long MICROS_PER_WEEK = MICROS_PER_DAY * 7;
3233

3334
public final int months;
35+
public final int days;
3436
public final long microseconds;
3537

3638
public long milliseconds() {
3739
return this.microseconds / MICROS_PER_MILLI;
3840
}
3941

40-
public CalendarInterval(int months, long microseconds) {
42+
public CalendarInterval(int months, int days, long microseconds) {
4143
this.months = months;
44+
this.days = days;
4245
this.microseconds = microseconds;
4346
}
4447

4548
public CalendarInterval add(CalendarInterval that) {
4649
int months = this.months + that.months;
50+
int days = this.days + that.days;
4751
long microseconds = this.microseconds + that.microseconds;
48-
return new CalendarInterval(months, microseconds);
52+
return new CalendarInterval(months, days, microseconds);
4953
}
5054

5155
public CalendarInterval subtract(CalendarInterval that) {
5256
int months = this.months - that.months;
57+
int days = this.days - that.days;
5358
long microseconds = this.microseconds - that.microseconds;
54-
return new CalendarInterval(months, microseconds);
59+
return new CalendarInterval(months, days, microseconds);
5560
}
5661

5762
public CalendarInterval negate() {
58-
return new CalendarInterval(-this.months, -this.microseconds);
63+
return new CalendarInterval(-this.months, -this.days, -this.microseconds);
5964
}
6065

6166
@Override
62-
public boolean equals(Object other) {
63-
if (this == other) return true;
64-
if (other == null || !(other instanceof CalendarInterval)) return false;
65-
66-
CalendarInterval o = (CalendarInterval) other;
67-
return this.months == o.months && this.microseconds == o.microseconds;
67+
public boolean equals(Object o) {
68+
if (this == o) return true;
69+
if (o == null || getClass() != o.getClass()) return false;
70+
CalendarInterval that = (CalendarInterval) o;
71+
return months == that.months &&
72+
days == that.days &&
73+
microseconds == that.microseconds;
6874
}
6975

7076
@Override
7177
public int hashCode() {
72-
return 31 * months + (int) microseconds;
78+
return Objects.hash(months, days, microseconds);
7379
}
7480

7581
@Override
@@ -81,12 +87,13 @@ public String toString() {
8187
appendUnit(sb, months % 12, "month");
8288
}
8389

90+
if (days != 0) {
91+
appendUnit(sb, days / 7, "week");
92+
appendUnit(sb, days % 7, "day");
93+
}
94+
8495
if (microseconds != 0) {
8596
long rest = microseconds;
86-
appendUnit(sb, rest / MICROS_PER_WEEK, "week");
87-
rest %= MICROS_PER_WEEK;
88-
appendUnit(sb, rest / MICROS_PER_DAY, "day");
89-
rest %= MICROS_PER_DAY;
9097
appendUnit(sb, rest / MICROS_PER_HOUR, "hour");
9198
rest %= MICROS_PER_HOUR;
9299
appendUnit(sb, rest / MICROS_PER_MINUTE, "minute");
@@ -96,7 +103,7 @@ public String toString() {
96103
appendUnit(sb, rest / MICROS_PER_MILLI, "millisecond");
97104
rest %= MICROS_PER_MILLI;
98105
appendUnit(sb, rest, "microsecond");
99-
} else if (months == 0) {
106+
} else if (months == 0 && days == 0) {
100107
sb.append(" 0 microseconds");
101108
}
102109

common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java

Lines changed: 39 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -26,59 +26,72 @@ public class CalendarIntervalSuite {
2626

2727
@Test
2828
public void equalsTest() {
29-
CalendarInterval i1 = new CalendarInterval(3, 123);
30-
CalendarInterval i2 = new CalendarInterval(3, 321);
31-
CalendarInterval i3 = new CalendarInterval(1, 123);
32-
CalendarInterval i4 = new CalendarInterval(3, 123);
29+
CalendarInterval i1 = new CalendarInterval(3, 2, 123);
30+
CalendarInterval i2 = new CalendarInterval(3, 2,321);
31+
CalendarInterval i3 = new CalendarInterval(3, 4,123);
32+
CalendarInterval i4 = new CalendarInterval(1, 2, 123);
33+
CalendarInterval i5 = new CalendarInterval(1, 4, 321);
34+
CalendarInterval i6 = new CalendarInterval(3, 2, 123);
3335

3436
assertNotSame(i1, i2);
3537
assertNotSame(i1, i3);
38+
assertNotSame(i1, i4);
3639
assertNotSame(i2, i3);
37-
assertEquals(i1, i4);
40+
assertNotSame(i2, i4);
41+
assertNotSame(i3, i4);
42+
assertNotSame(i1, i5);
43+
assertEquals(i1, i6);
3844
}
3945

4046
@Test
4147
public void toStringTest() {
4248
CalendarInterval i;
4349

44-
i = new CalendarInterval(0, 0);
50+
i = new CalendarInterval(0, 0, 0);
4551
assertEquals("interval 0 microseconds", i.toString());
4652

47-
i = new CalendarInterval(34, 0);
53+
i = new CalendarInterval(34, 0, 0);
4854
assertEquals("interval 2 years 10 months", i.toString());
4955

50-
i = new CalendarInterval(-34, 0);
56+
i = new CalendarInterval(-34, 0, 0);
5157
assertEquals("interval -2 years -10 months", i.toString());
5258

53-
i = new CalendarInterval(0, 3 * MICROS_PER_WEEK + 13 * MICROS_PER_HOUR + 123);
54-
assertEquals("interval 3 weeks 13 hours 123 microseconds", i.toString());
59+
i = new CalendarInterval(0, 31, 0);
60+
assertEquals("interval 4 weeks 3 days", i.toString());
5561

56-
i = new CalendarInterval(0, -3 * MICROS_PER_WEEK - 13 * MICROS_PER_HOUR - 123);
57-
assertEquals("interval -3 weeks -13 hours -123 microseconds", i.toString());
62+
i = new CalendarInterval(0, -31, 0);
63+
assertEquals("interval -4 weeks -3 days", i.toString());
5864

59-
i = new CalendarInterval(34, 3 * MICROS_PER_WEEK + 13 * MICROS_PER_HOUR + 123);
60-
assertEquals("interval 2 years 10 months 3 weeks 13 hours 123 microseconds", i.toString());
65+
i = new CalendarInterval(0, 0, 3 * MICROS_PER_HOUR + 13 * MICROS_PER_MINUTE + 123);
66+
assertEquals("interval 3 hours 13 minutes 123 microseconds", i.toString());
67+
68+
i = new CalendarInterval(0, 0, -3 * MICROS_PER_HOUR - 13 * MICROS_PER_MINUTE - 123);
69+
assertEquals("interval -3 hours -13 minutes -123 microseconds", i.toString());
70+
71+
i = new CalendarInterval(34, 31, 3 * MICROS_PER_HOUR + 13 * MICROS_PER_MINUTE + 123);
72+
assertEquals("interval 2 years 10 months 4 weeks 3 days 3 hours 13 minutes 123 microseconds",
73+
i.toString());
6174
}
6275

6376
@Test
6477
public void addTest() {
65-
CalendarInterval input1 = new CalendarInterval(3, 1 * MICROS_PER_HOUR);
66-
CalendarInterval input2 = new CalendarInterval(2, 100 * MICROS_PER_HOUR);
67-
assertEquals(input1.add(input2), new CalendarInterval(5, 101 * MICROS_PER_HOUR));
78+
CalendarInterval input1 = new CalendarInterval(3, 1, 1 * MICROS_PER_HOUR);
79+
CalendarInterval input2 = new CalendarInterval(2, 4, 100 * MICROS_PER_HOUR);
80+
assertEquals(input1.add(input2), new CalendarInterval(5, 5, 101 * MICROS_PER_HOUR));
6881

69-
input1 = new CalendarInterval(-10, -81 * MICROS_PER_HOUR);
70-
input2 = new CalendarInterval(75, 200 * MICROS_PER_HOUR);
71-
assertEquals(input1.add(input2), new CalendarInterval(65, 119 * MICROS_PER_HOUR));
82+
input1 = new CalendarInterval(-10, -30, -81 * MICROS_PER_HOUR);
83+
input2 = new CalendarInterval(75, 150, 200 * MICROS_PER_HOUR);
84+
assertEquals(input1.add(input2), new CalendarInterval(65, 120, 119 * MICROS_PER_HOUR));
7285
}
7386

7487
@Test
7588
public void subtractTest() {
76-
CalendarInterval input1 = new CalendarInterval(3, 1 * MICROS_PER_HOUR);
77-
CalendarInterval input2 = new CalendarInterval(2, 100 * MICROS_PER_HOUR);
78-
assertEquals(input1.subtract(input2), new CalendarInterval(1, -99 * MICROS_PER_HOUR));
89+
CalendarInterval input1 = new CalendarInterval(3, 1, 1 * MICROS_PER_HOUR);
90+
CalendarInterval input2 = new CalendarInterval(2, 4, 100 * MICROS_PER_HOUR);
91+
assertEquals(input1.subtract(input2), new CalendarInterval(1, -3, -99 * MICROS_PER_HOUR));
7992

80-
input1 = new CalendarInterval(-10, -81 * MICROS_PER_HOUR);
81-
input2 = new CalendarInterval(75, 200 * MICROS_PER_HOUR);
82-
assertEquals(input1.subtract(input2), new CalendarInterval(-85, -281 * MICROS_PER_HOUR));
93+
input1 = new CalendarInterval(-10, -30, -81 * MICROS_PER_HOUR);
94+
input2 = new CalendarInterval(75, 150, 200 * MICROS_PER_HOUR);
95+
assertEquals(input1.subtract(input2), new CalendarInterval(-85, -180, -281 * MICROS_PER_HOUR));
8396
}
8497
}

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,9 +230,10 @@ public CalendarInterval getInterval(int ordinal) {
230230
if (isNullAt(ordinal)) return null;
231231
final long offsetAndSize = getLong(ordinal);
232232
final int offset = (int) (offsetAndSize >> 32);
233-
final int months = (int) Platform.getLong(baseObject, baseOffset + offset);
233+
final int months = Platform.getInt(baseObject, baseOffset + offset);
234+
final int days = Platform.getInt(baseObject, baseOffset + offset + 4);
234235
final long microseconds = Platform.getLong(baseObject, baseOffset + offset + 8);
235-
return new CalendarInterval(months, microseconds);
236+
return new CalendarInterval(months, days, microseconds);
236237
}
237238

238239
@Override

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -401,9 +401,10 @@ public CalendarInterval getInterval(int ordinal) {
401401
} else {
402402
final long offsetAndSize = getLong(ordinal);
403403
final int offset = (int) (offsetAndSize >> 32);
404-
final int months = (int) Platform.getLong(baseObject, baseOffset + offset);
404+
final int months = Platform.getInt(baseObject, baseOffset + offset);
405+
final int days = Platform.getInt(baseObject, baseOffset + offset + 4);
405406
final long microseconds = Platform.getLong(baseObject, baseOffset + offset + 8);
406-
return new CalendarInterval(months, microseconds);
407+
return new CalendarInterval(months, days, microseconds);
407408
}
408409
}
409410

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,9 @@ public final void write(int ordinal, CalendarInterval input) {
134134
// grow the global buffer before writing data.
135135
grow(16);
136136

137-
// Write the months and microseconds fields of Interval to the variable length portion.
138-
Platform.putLong(getBuffer(), cursor(), input.months);
137+
// Write the months, days and microseconds fields of Interval to the variable length portion.
138+
Platform.putInt(getBuffer(), cursor(), input.months);
139+
Platform.putInt(getBuffer(), cursor() + 4, input.days);
139140
Platform.putLong(getBuffer(), cursor() + 8, input.microseconds);
140141

141142
setOffsetAndSize(ordinal, 16);

sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -267,21 +267,24 @@ public final ColumnarRow getStruct(int rowId) {
267267
* Returns the calendar interval type value for rowId. If the slot for rowId is null, it should
268268
* return null.
269269
*
270-
* In Spark, calendar interval type value is basically an integer value representing the number of
271-
* months in this interval, and a long value representing the number of microseconds in this
272-
* interval. An interval type vector is the same as a struct type vector with 2 fields: `months`
273-
* and `microseconds`.
270+
* In Spark, calendar interval type value is basically two integer values representing the number
271+
* of months and days in this interval, and a long value representing the number of microseconds
272+
* in this interval. An interval type vector is the same as a struct type vector with 3 fields:
273+
* `months`, `days` and `microseconds`.
274274
*
275-
* To support interval type, implementations must implement {@link #getChild(int)} and define 2
275+
* To support interval type, implementations must implement {@link #getChild(int)} and define 3
276276
* child vectors: the first child vector is an int type vector, containing all the month values of
277-
* all the interval values in this vector. The second child vector is a long type vector,
278-
* containing all the microsecond values of all the interval values in this vector.
277+
* all the interval values in this vector. The second child vector is an int type vector,
278+
* containing all the day values of all the interval values in this vector. The third child vector
279+
* is a long type vector, containing all the microsecond values of all the interval values in this
280+
* vector.
279281
*/
280282
public final CalendarInterval getInterval(int rowId) {
281283
if (isNullAt(rowId)) return null;
282284
final int months = getChild(0).getInt(rowId);
283-
final long microseconds = getChild(1).getLong(rowId);
284-
return new CalendarInterval(months, microseconds);
285+
final int days = getChild(1).getInt(rowId);
286+
final long microseconds = getChild(2).getLong(rowId);
287+
return new CalendarInterval(months, days, microseconds);
285288
}
286289

287290
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,8 @@ object StreamingJoinHelper extends PredicateHelper with Logging {
264264
s"watermark calculation. Use interval in terms of day instead.")
265265
Literal(0.0)
266266
} else {
267-
Literal(calendarInterval.microseconds.toDouble)
267+
Literal(calendarInterval.days * CalendarInterval.MICROS_PER_DAY.toDouble +
268+
calendarInterval.microseconds.toDouble)
268269
}
269270
case DoubleType =>
270271
Multiply(lit, Literal(1000000.0))

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGe
2424
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
2525
import org.apache.spark.sql.catalyst.util.IntervalUtils
2626
import org.apache.spark.sql.types._
27+
import org.apache.spark.unsafe.types.CalendarInterval
2728

2829
case class TimeWindow(
2930
timeColumn: Expression,
@@ -107,7 +108,7 @@ object TimeWindow {
107108
throw new IllegalArgumentException(
108109
s"Intervals greater than a month is not supported ($interval).")
109110
}
110-
cal.microseconds
111+
cal.days * CalendarInterval.MICROS_PER_DAY + cal.microseconds
111112
}
112113

113114
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2610,25 +2610,33 @@ object Sequence {
26102610
override val defaultStep: DefaultStep = new DefaultStep(
26112611
(dt.ordering.lteq _).asInstanceOf[LessThanOrEqualFn],
26122612
CalendarIntervalType,
2613-
new CalendarInterval(0, MICROS_PER_DAY))
2613+
new CalendarInterval(0, 1, 0))
26142614

26152615
private val backedSequenceImpl = new IntegralSequenceImpl[T](dt)
2616-
private val microsPerMonth = 28 * CalendarInterval.MICROS_PER_DAY
2616+
private val microsPerDay = 24 * CalendarInterval.MICROS_PER_HOUR
2617+
// We choose a minimum days(28) in one month to calculate the `intervalStepInMicros`
2618+
// in order to make sure the estimated array length is long enough
2619+
private val microsPerMonth = 28 * microsPerDay
26172620

26182621
override def eval(input1: Any, input2: Any, input3: Any): Array[T] = {
26192622
val start = input1.asInstanceOf[T]
26202623
val stop = input2.asInstanceOf[T]
26212624
val step = input3.asInstanceOf[CalendarInterval]
26222625
val stepMonths = step.months
2626+
val stepDays = step.days
26232627
val stepMicros = step.microseconds
26242628

2625-
if (stepMonths == 0) {
2626-
backedSequenceImpl.eval(start, stop, fromLong(stepMicros / scale))
2629+
if (stepMonths == 0 && stepMicros == 0 && scale == MICROS_PER_DAY) {
2630+
backedSequenceImpl.eval(start, stop, fromLong(stepDays))
2631+
2632+
} else if (stepMonths == 0 && stepDays == 0 && scale == 1) {
2633+
backedSequenceImpl.eval(start, stop, fromLong(stepMicros))
26272634

26282635
} else {
26292636
// To estimate the resulted array length we need to make assumptions
2630-
// about a month length in microseconds
2631-
val intervalStepInMicros = stepMicros + stepMonths * microsPerMonth
2637+
// about a month length in days and a day length in microseconds
2638+
val intervalStepInMicros =
2639+
stepMicros + stepMonths * microsPerMonth + stepDays * microsPerDay
26322640
val startMicros: Long = num.toLong(start) * scale
26332641
val stopMicros: Long = num.toLong(stop) * scale
26342642
val maxEstimatedArrayLength =
@@ -2643,7 +2651,8 @@ object Sequence {
26432651
while (t < exclusiveItem ^ stepSign < 0) {
26442652
arr(i) = fromLong(t / scale)
26452653
i += 1
2646-
t = timestampAddInterval(startMicros, i * stepMonths, i * stepMicros, zoneId)
2654+
t = timestampAddInterval(
2655+
startMicros, i * stepMonths, i * stepDays, i * stepMicros, zoneId)
26472656
}
26482657

26492658
// truncate array to the correct length
@@ -2659,6 +2668,7 @@ object Sequence {
26592668
arr: String,
26602669
elemType: String): String = {
26612670
val stepMonths = ctx.freshName("stepMonths")
2671+
val stepDays = ctx.freshName("stepDays")
26622672
val stepMicros = ctx.freshName("stepMicros")
26632673
val stepScaled = ctx.freshName("stepScaled")
26642674
val intervalInMicros = ctx.freshName("intervalInMicros")
@@ -2673,18 +2683,21 @@ object Sequence {
26732683

26742684
val sequenceLengthCode =
26752685
s"""
2676-
|final long $intervalInMicros = $stepMicros + $stepMonths * ${microsPerMonth}L;
2686+
|final long $intervalInMicros =
2687+
| $stepMicros + $stepMonths * ${microsPerMonth}L + $stepDays * ${microsPerDay}L;
26772688
|${genSequenceLengthCode(ctx, startMicros, stopMicros, intervalInMicros, arrLength)}
26782689
""".stripMargin
26792690

26802691
s"""
26812692
|final int $stepMonths = $step.months;
2693+
|final int $stepDays = $step.days;
26822694
|final long $stepMicros = $step.microseconds;
26832695
|
2684-
|if ($stepMonths == 0) {
2685-
| final $elemType $stepScaled = ($elemType) ($stepMicros / ${scale}L);
2686-
| ${backedSequenceImpl.genCode(ctx, start, stop, stepScaled, arr, elemType)};
2696+
|if ($stepMonths == 0 && $stepMicros == 0 && ${scale}L == ${MICROS_PER_DAY}L) {
2697+
| ${backedSequenceImpl.genCode(ctx, start, stop, stepDays, arr, elemType)};
26872698
|
2699+
|} else if ($stepMonths == 0 && $stepDays == 0 && ${scale}L == 1) {
2700+
| ${backedSequenceImpl.genCode(ctx, start, stop, stepMicros, arr, elemType)};
26882701
|} else {
26892702
| final long $startMicros = $start * ${scale}L;
26902703
| final long $stopMicros = $stop * ${scale}L;
@@ -2702,7 +2715,7 @@ object Sequence {
27022715
| $arr[$i] = ($elemType) ($t / ${scale}L);
27032716
| $i += 1;
27042717
| $t = org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampAddInterval(
2705-
| $startMicros, $i * $stepMonths, $i * $stepMicros, $zid);
2718+
| $startMicros, $i * $stepMonths, $i * $stepDays, $i * $stepMicros, $zid);
27062719
| }
27072720
|
27082721
| if ($arr.length > $i) {

0 commit comments

Comments
 (0)