Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: PPL command - WMA Trendline #3293

Open
wants to merge 37 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
927fbfa
WMA
andy-k-improving Jan 31, 2025
3836f31
Update switch
andy-k-improving Feb 1, 2025
0fcd688
Unit-test
andy-k-improving Feb 3, 2025
898d3a6
Integ-test
andy-k-improving Feb 3, 2025
ab58370
Doc test
andy-k-improving Feb 3, 2025
51d8395
Spotless
andy-k-improving Feb 3, 2025
e4c25b3
Update test cases
andy-k-improving Feb 3, 2025
f6c82ae
Update test coverage
andy-k-improving Feb 3, 2025
d0cf898
Update docs/user/ppl/cmd/trendline.rst
andy-k-improving Feb 6, 2025
ce94ca9
Update docs/user/ppl/cmd/trendline.rst
andy-k-improving Feb 6, 2025
f32e5ad
Remove debug
andy-k-improving Feb 6, 2025
4d95529
Address code comments
andy-k-improving Feb 6, 2025
df3c666
Add support to all numeric types
andy-k-improving Feb 8, 2025
3ed8fa8
Update generic
andy-k-improving Feb 8, 2025
7fc4075
Replace evaluator with functionalInterface
andy-k-improving Feb 10, 2025
3b7902f
Apply suggestions from code review
andy-k-improving Feb 10, 2025
47f9cec
Update wma doc
andy-k-improving Feb 10, 2025
b8cf496
Fix style
andy-k-improving Feb 10, 2025
4a56b57
Update docs/user/ppl/cmd/trendline.rst
andy-k-improving Feb 11, 2025
0b50637
Fix code commentse
andy-k-improving Feb 11, 2025
b8e3983
update default name
andy-k-improving Feb 11, 2025
8dc96c9
Doc
andy-k-improving Feb 12, 2025
c197ecd
Doc
andy-k-improving Feb 12, 2025
5ea47ae
Refactor test-cases
andy-k-improving Feb 12, 2025
8d6ac31
DataPoints test-cases
andy-k-improving Feb 12, 2025
9ce1d99
Update test-cases
andy-k-improving Feb 12, 2025
bdb0f28
Update test-cases
andy-k-improving Feb 12, 2025
fa685bd
Update test-cases
andy-k-improving Feb 12, 2025
b87db9f
Address code comments
andy-k-improving Feb 13, 2025
39bf113
Update test-cases
andy-k-improving Feb 13, 2025
ba9bfb6
Update test-cases
andy-k-improving Feb 13, 2025
9910b22
Update IT magic number
andy-k-improving Feb 14, 2025
4be0400
Update IT test
andy-k-improving Feb 14, 2025
8348950
Update test coverage
andy-k-improving Feb 14, 2025
6037742
Update unit test
andy-k-improving Feb 14, 2025
e590fb9
Code style
andy-k-improving Feb 14, 2025
eccbf1d
Update doc test
andy-k-improving Feb 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public <R, C> R accept(AbstractNodeVisitor<R, C> nodeVisitor, C context) {
}

public enum TrendlineType {
SMA
SMA,
WMA
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.google.common.collect.ImmutableMap.Builder;
import java.time.Instant;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -21,6 +22,7 @@
import lombok.ToString;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.ast.tree.Trendline;
import org.opensearch.sql.data.model.ExprDoubleValue;
import org.opensearch.sql.data.model.ExprTupleValue;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.model.ExprValueUtils;
Expand Down Expand Up @@ -106,9 +108,11 @@ private Map<String, ExprValue> consumeInputTuple(ExprValue inputValue) {

private static TrendlineAccumulator createAccumulator(
Pair<Trendline.TrendlineComputation, ExprCoreType> computation) {
// Add a switch statement based on computation type to choose the accumulator when more
// types of computations are supported.
return new SimpleMovingAverageAccumulator(computation.getKey(), computation.getValue());
return switch (computation.getKey().getComputationType()) {
case SMA -> new SimpleMovingAverageAccumulator(computation.getKey(), computation.getValue());
case WMA -> new WeightedMovingAverageAccumulator(
computation.getKey(), computation.getValue());
};
}

/** Maintains stateful information for calculating the trendline. */
Expand Down Expand Up @@ -187,6 +191,103 @@ public ExprValue calculate() {
}
}

private static class WeightedMovingAverageAccumulator implements TrendlineAccumulator {
private final LiteralExpression dataPointsNeeded;
private final ArrayList<ExprValue> receivedValues;
private final WmaTrendlineEvaluator evaluator;

public WeightedMovingAverageAccumulator(
Trendline.TrendlineComputation computation, ExprCoreType type) {
this.dataPointsNeeded = DSL.literal(computation.getNumberOfDataPoints().doubleValue());
this.receivedValues = new ArrayList<>(computation.getNumberOfDataPoints());
this.evaluator = getEvaluator(type);
}

static WmaTrendlineEvaluator getEvaluator(ExprCoreType type) {
return switch (type) {
case DOUBLE -> NumericWmaEvaluator.INSTANCE;
case DATE, TIMESTAMP -> TimeStampWmaEvaluator.INSTANCE;
case TIME -> TimeWmaEvaluator.INSTANCE;
default -> throw new IllegalArgumentException(
String.format("Invalid type %s used for weighted moving average.", type.typeName()));
};
}

@Override
public void accumulate(ExprValue value) {
receivedValues.add(value);
if (receivedValues.size() > dataPointsNeeded.valueOf().integerValue()) {
receivedValues.removeFirst();
}
}

@Override
public ExprValue calculate() {
if (receivedValues.size() < dataPointsNeeded.valueOf().integerValue()) {
return null;
} else if (dataPointsNeeded.valueOf().integerValue() == 1) {
return receivedValues.getFirst();
}
return evaluator.evaluate(receivedValues);
}

private static class NumericWmaEvaluator implements WmaTrendlineEvaluator {

private static final NumericWmaEvaluator INSTANCE = new NumericWmaEvaluator();

@Override
public ExprValue evaluate(ArrayList<ExprValue> receivedValues) {
double sum = 0D;
int totalWeight = (receivedValues.size() * (receivedValues.size() + 1)) / 2;
for (int i = 0; i < receivedValues.size(); i++) {
sum += receivedValues.get(i).doubleValue() * ((i + 1D) / totalWeight);
}
return new ExprDoubleValue(sum);
}
}
Copy link
Contributor

@currantw currantw Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to have been quite an explosion of classes here! Some ideas:

  • Do we need to do this with singleton classes? Rather than an Evaluator class with a single evaluate method, would it make sense to instead store a reference to a evaluate method that takes a List of ExprValues and returns the resulting ExprValue? Don't know what is more common, but it seems like a lot to have seven separate Evaluator classes; perhaps seven Evaluator method would be more manageable?
  • As alluded to above, could this take List<ExprValue> instead of an ArrayList?
  • As mentioned in a previous comment, I think LinkedList might be more efficient for this purpose -- but not if we iterator over it like this! Are we able to use an iterator to do so? Then we would get O(1) access for both a LinkedList or an ArrayList. See below for how I think this could look.
  • As mentioned in a previous comment, SMA and WMA are pretty much the same ... only the weights change. Would it be possible to change the evaluate signature so that it takes a List<ExprValue> and a List<Double> of weights? i.e. evaluate(List<ExprValue> values, List<Double> weights)? This would have the added advantage that, in the case of WMA, we wouldn't need to re-calculate the weights every time.
  public ExprValue evaluate(List<ExprValue> values) {

    // Calculate weights
    int n = values.size();
    double denominator = (double) (n * (n + 1)) / 2.0;
    List<Double> weights = IntStream.range(n, 0).mapToDouble(i -> (double) i / denominator).boxed().toList().reversed();

    // Calculate weighted average.
    double average = 0.0;
    
    Iterator<ExprValue> valuesIterator = values.iterator();
    Iterator<Double> weightsIterator = weights.iterator();
    
    while(valuesIterator.hasNext() && weightsIterator.hasNext()) {
      average += valuesIterator.next().doubleValue() * weightsIterator.next();
    }

    return new ExprDoubleValue(average);
  }

Copy link
Contributor Author

@andy-k-improving andy-k-improving Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the point for too many Evaluator, however indeed these are two distinct types of Evaluator, in the case of SMA, Evaluator not just take the new item, but also read the running-total in order to avoid re-computation, and all evaluators under the SMA umbrella take this into consideration especially on the API signature level, ex:

public ExprValue evaluate(Expression runningTotal, LiteralExpression numberOfDataPoints) {
      return DSL.divide(runningTotal, numberOfDataPoints).valueOf();
    }

Also the method calculateFirstTotal is unique to SMA:

@Override
    public Expression calculateFirstTotal(List<ExprValue> dataPoints) {
      Expression total = DSL.literal(0.0D);
      for (ExprValue dataPoint : dataPoints) {
        total = DSL.add(total, DSL.literal(dataPoint.doubleValue()));
      }
      return DSL.literal(total.valueOf().doubleValue());
    }

However in contrast, WMA don't share the same characteristic, which re-computation is required upon every update, also, the concept of running-total is not applicable here.

Regarding the concern of too many evaluators, I have updated to move all WMA related evaludator inside of class WeightedMovingAverageAccumulator in order to further narrow down the scope.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Good explanation for why SMA and WMA accumulators should be separate - thanks for that!

For WMA, I still think there is opportunity to combine some common logic. The weights should only need to be calculated once - can we pass them directly to the Evaluator from the Accumulator? Moreover, as I (tried to) describe in this comment, can we extract the common logic from all the WMA accumulators (related applying the weights to the values), and only have the different parts (mapping each value to a number, mapping the result back to the right ExprValue) split out into the different implementations.

As mentioned elsewhere, I think we should also use an iterator for these loops, so that we can get O(1) access for a linked list or queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Above make sense, and I have now using BiFunction to replace original usage of custom interface of wmaEvalulator along with the static class creation.
Also I have moved out the logic of totalWeight calculation out from respective function call, as that is common to all calculation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Above make sense, and I have now using BiFunction to replace original usage of custom interface of wmaEvalulator along with the static class creation.

Thanks. This looks good to me.

Also I have moved out the logic of totalWeight calculation out from respective function call, as that is common to all calculation.

I like that you have moved the totalWeight (the denominator) so that it doesn't need to be re-calculated each time. However, I think it is possible to move all the weight calculations out of these functions, and just pass a list containing all the weights to the function (i.e. to store n "complete" weights values as a WeightedMovingAverageAccumulator member).

I also think it is possible to extract a the common logic for determining sum into another helper function: as mentioned in a previous comment, that logic is the same, except for mapping the ExprValue list to longs.

Let me know if you want to discuss either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline the methods are not abstracted.


private static class TimeStampWmaEvaluator implements WmaTrendlineEvaluator {

private static final TimeStampWmaEvaluator INSTANCE = new TimeStampWmaEvaluator();

@Override
public ExprValue evaluate(ArrayList<ExprValue> receivedValues) {
long sum = 0L;
int totalWeight = (receivedValues.size() * (receivedValues.size() + 1)) / 2;
for (int i = 0; i < receivedValues.size(); i++) {
sum +=
(long)
(receivedValues.get(i).timestampValue().toEpochMilli()
* ((i + 1D) / totalWeight));
}

return ExprValueUtils.timestampValue(Instant.ofEpochMilli((sum)));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At it's heart, most of the logic between NumericWmaEvaluator, TimeWmaEvaluator, and TimeStampWmaEvaluator seems to be common: all the logic for determining the weights and calculating the weights average is the same, the only differences are (a) the type (double vs. long), and (b) how we get the numerical value from the ExprValue, and (c) how we convert the resulting weighted average back into an ExprValue.

Would it be possible to extract all of this common logic to WmaTrendlineEvaluator in a helper method like Long evaluateHelper(Collection<Long> values), and then have each sub-class only (i) convert a collection of ExprValue to a collection of Long, (ii) call evaluateHelper (pick a better name, obviously) to get the weighted average as a Long, and then (iii) convert that Long back into an ExprValue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in another comment, still think there is common logic that can be extracted. Please let me know if you want to discuss.

}

private static class TimeWmaEvaluator implements WmaTrendlineEvaluator {

private static final TimeWmaEvaluator INSTANCE = new TimeWmaEvaluator();

@Override
public ExprValue evaluate(ArrayList<ExprValue> receivedValues) {
long sum = 0L;
int totalWeight = (receivedValues.size() * (receivedValues.size() + 1)) / 2;
for (int i = 0; i < receivedValues.size(); i++) {
sum +=
(long)
(MILLIS.between(LocalTime.MIN, receivedValues.get(i).timeValue())
* ((i + 1D) / totalWeight));
}
return ExprValueUtils.timeValue(LocalTime.MIN.plus(sum, MILLIS));
}
}

private interface WmaTrendlineEvaluator {
ExprValue evaluate(ArrayList<ExprValue> receivedValues);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As alluded to in other comments, I'm not sure we need separate Evaluators for simple and weights moving averages, if we design the interface right.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explanation of this in another comments. Please resolve. ✅

}

private interface ArithmeticEvaluator {
Expression calculateFirstTotal(List<ExprValue> dataPoints);

Expand Down
Loading
Loading