Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change the behavior of in-memory tables to support multiple '@index' annotations #1491

Merged
merged 1 commit into from
Sep 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public PartitionRuntimeImpl(ConcurrentMap<String, AbstractDefinition> streamDefi
Partition partition, int partitionIndex, SiddhiAppContext siddhiAppContext) {
this.siddhiAppContext = siddhiAppContext;
if (partition.getPartitionTypeMap().isEmpty()) {
throw new SiddhiAppCreationException("Partition must have at least one executor. But found none.");
throw new SiddhiAppCreationException("Partition must have at least one partition executor. " +
"But found none.");
}
try {
Element element = AnnotationHelper.getAnnotationElement("info", "name",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
import io.siddhi.core.event.stream.converter.ZeroStreamEventConverter;
import io.siddhi.core.event.stream.holder.StreamEventClonerHolder;
import io.siddhi.core.exception.OperationNotSupportedException;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.table.holder.*;
import io.siddhi.core.table.holder.EventHolder;
import io.siddhi.core.table.holder.IndexEventHolder;
import io.siddhi.core.table.holder.IndexEventHolderForCache;
import io.siddhi.core.table.holder.ListEventHolder;
import io.siddhi.core.table.holder.PrimaryKeyReferenceHolder;
import io.siddhi.core.util.SiddhiConstants;
import io.siddhi.query.api.annotation.Annotation;
import io.siddhi.query.api.annotation.Element;
Expand All @@ -37,9 +40,7 @@
import org.apache.log4j.Logger;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

/**
* Class to parse {@link EventHolder}
Expand Down Expand Up @@ -70,19 +71,28 @@ public static EventHolder parse(AbstractDefinition tableDefinition, StreamEventF
.toArray(PrimaryKeyReferenceHolder[]::new);
}

// indexes.
Annotation indexAnnotation = AnnotationHelper.getAnnotation(SiddhiConstants.ANNOTATION_INDEX,
tableDefinition.getAnnotations());
if (indexAnnotation != null) {
for (Annotation indexAnnotation : AnnotationHelper.getAnnotations(SiddhiConstants.ANNOTATION_INDEX,
tableDefinition.getAnnotations())) {
if (indexAnnotation.getElements().size() == 0) {
throw new SiddhiAppValidationException(SiddhiConstants.ANNOTATION_INDEX + " annotation contains "
+ indexAnnotation.getElements().size() + " element");
throw new SiddhiAppValidationException(SiddhiConstants.ANNOTATION_INDEX + " annotation of " +
"in-memory table should contain only one index element, but found "
+ indexAnnotation.getElements().size() + " element",
indexAnnotation.getQueryContextStartIndex(),
indexAnnotation.getQueryContextEndIndex());
} else if (indexAnnotation.getElements().size() > 1) {
throw new SiddhiAppValidationException(SiddhiConstants.ANNOTATION_INDEX + " annotation of the " +
"in-memory table should only contain one index element but found "
+ indexAnnotation.getElements().size() + " elements. To use multiple indexes, " +
"define multiple '@index(<index key>)' annotations with one index element " +
"per each index key",
indexAnnotation.getQueryContextStartIndex(),
indexAnnotation.getQueryContextEndIndex());
}
for (Element element : indexAnnotation.getElements()) {
Integer previousValue = indexMetaData.put(element.getValue().trim(), tableDefinition
.getAttributePosition(element.getValue().trim()));
if (previousValue != null) {
throw new SiddhiAppCreationException("Multiple " + SiddhiConstants.ANNOTATION_INDEX + " " +
throw new SiddhiAppValidationException("Multiple " + SiddhiConstants.ANNOTATION_INDEX + " " +
"annotations defined with same attribute '" + element.getValue().trim() + "', at '" +
tableDefinition.getId() + "'", indexAnnotation.getQueryContextStartIndex(),
indexAnnotation.getQueryContextEndIndex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ public static PartitionRuntimeImpl parse(SiddhiAppRuntimeBuilder siddhiAppRuntim
siddhiAppRuntimeBuilder.getStreamDefinitionMap();
ConcurrentMap<String, AbstractDefinition> windowDefinitionMap =
siddhiAppRuntimeBuilder.getWindowDefinitionMap();
validateStreamPartitions(partition.getPartitionTypeMap(), streamDefinitionMap, windowDefinitionMap);
PartitionRuntimeImpl partitionRuntime = new PartitionRuntimeImpl(streamDefinitionMap, windowDefinitionMap,
siddhiAppRuntimeBuilder.getStreamJunctions(), partition, partitionIndex, siddhiAppContext);
validateStreamPartitions(partition.getPartitionTypeMap(), streamDefinitionMap, windowDefinitionMap);
for (Query query : partition.getQueryList()) {
List<VariableExpressionExecutor> executors = new ArrayList<VariableExpressionExecutor>();
ConcurrentMap<String, AbstractDefinition> combinedStreamMap =
Expand Down Expand Up @@ -93,7 +93,7 @@ private static void validateStreamPartitions(Map<String, PartitionType> partitio
if ((!streamDefinitionMap.containsKey(entry.getKey())) &&
(!windowDefinitionMap.containsKey(entry.getKey()))) {
throw new SiddhiAppCreationException("Stream/window with name '" + entry.getKey() +
"' does not defined!",
"' is not defined!",
entry.getValue().getQueryContextStartIndex(),
entry.getValue().getQueryContextEndIndex());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1114,4 +1114,92 @@ public void receive(Event[] events) {
siddhiAppRuntime.shutdown();
}

@Test
public void testPartitionQuery50() throws InterruptedException {
log.info("Partition test50");
SiddhiManager siddhiManager = new SiddhiManager();

String siddhiApp = "" +
"@app:name('PartitionTest50') " +
"" +
"" +
"define stream cseEventStream (symbol string, price float, volume int); " +
"" +
"define stream cseEventStreamOne (symbol string, price float,volume int);" +
"define stream cseEventStreamTwo (symbol string, price float,volume int);" +
"" +
"@info(name = 'query')" +
"from cseEventStreamOne " +
"select symbol, price, volume " +
"insert into cseEventStream;" +
" " +
"partition with (price>=100 as 'large' or " +
" price<100 and price>=50 as 'medium' or " +
" price<50 as 'small' of cseEventStream, " +
" symbol of cseEventStreamTwo ) " +
" begin" +
" @info(name = 'query1') " +
" from cseEventStream " +
" insert into #OutStockStream1 ; " +
"" +
" @info(name = 'query2') " +
" from cseEventStreamTwo " +
" insert into #OutStockStream1 ; " +
" " +
" @info(name = 'query3') " +
" from #OutStockStream1 " +
" select symbol, sum(price) as price " +
" insert into #OutStockStream2 ;" +
" " +
" @info(name = 'query4') " +
" from #OutStockStream2 " +
" insert into OutStockStream ;" +
" " +
" end ; ";


SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);


siddhiAppRuntime.addCallback("OutStockStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
EventPrinter.print(events);
for (Event event : events) {
count.incrementAndGet();
eventArrived = true;
if (count.get() == 1) {
AssertJUnit.assertEquals(25.0, event.getData()[1]);
} else if (count.get() == 2) {
AssertJUnit.assertEquals(30.0, event.getData()[1]);
} else if (count.get() == 3) {
AssertJUnit.assertEquals(7005.60009765625, event.getData()[1]);
} else if (count.get() == 4) {
AssertJUnit.assertEquals(50.0, event.getData()[1]);
} else if (count.get() == 5) {
AssertJUnit.assertEquals(55.0, event.getData()[1]);
} else if (count.get() == 6) {
AssertJUnit.assertEquals(7000.000097751617, event.getData()[1]);
}
}
}
});

InputHandler inputHandlerOne = siddhiAppRuntime.getInputHandler("cseEventStreamOne");
InputHandler inputHandlerTwo = siddhiAppRuntime.getInputHandler("cseEventStreamTwo");
siddhiAppRuntime.start();

inputHandlerOne.send(new Object[]{"IBM", 25f, 100});
inputHandlerTwo.send(new Object[]{"small", 5f, 100});
inputHandlerOne.send(new Object[]{"WSO2", 7005.6f, 100});
inputHandlerOne.send(new Object[]{"IBM", 50f, 100});
inputHandlerOne.send(new Object[]{"ORACLE", 25f, 100});
inputHandlerTwo.send(new Object[]{"large", -5.6f, 100});

SiddhiTestHelper.waitForEvents(100, 5, count, 60000);
AssertJUnit.assertTrue(6 >= count.get());
siddhiAppRuntime.shutdown();

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@
import io.siddhi.core.SiddhiAppRuntime;
import io.siddhi.core.SiddhiManager;
import io.siddhi.core.event.Event;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.query.output.callback.QueryCallback;
import io.siddhi.core.stream.input.InputHandler;
import io.siddhi.core.util.EventPrinter;
import io.siddhi.core.util.SiddhiTestHelper;
import io.siddhi.query.api.exception.AttributeNotExistException;
import io.siddhi.query.api.exception.DuplicateAnnotationException;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import org.apache.log4j.Logger;
import org.testng.AssertJUnit;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -2006,7 +2005,8 @@ public void indexTableTest28() throws InterruptedException {
"define stream CheckStockStream (symbol string, volume long); " +
"define stream UpdateStockStream (symbol string, price float, volume long);" +
"@PrimaryKey('symbol') " +
"@Index('price','volume') " +
"@Index('price') " +
"@Index('volume') " +
"define table StockTable (symbol string, price float, volume long); ";
String query = "" +
"@info(name = 'query1') " +
Expand Down Expand Up @@ -2072,7 +2072,8 @@ public void indexTableTest29() throws InterruptedException {
"define stream StockStream (symbol string, price float, volume long); " +
"define stream CheckStockStream (symbol string, volume long); " +
"define stream UpdateStockStream (symbol string, price float, volume long);" +
"@Index('symbol', 'volume') " +
"@Index('symbol') " +
"@Index('volume') " +
"define table StockTable (symbol string, price float, volume long); ";
String query = "" +
"@info(name = 'query1') " +
Expand Down Expand Up @@ -2154,14 +2155,14 @@ public void indexTableTest30() throws InterruptedException {
}
}

@Test(expectedExceptions = SiddhiAppCreationException.class)
@Test(expectedExceptions = SiddhiAppValidationException.class)
public void indexTableTest31() throws InterruptedException {
log.info("indexTableTest31");

SiddhiManager siddhiManager = new SiddhiManager();
String streams = "" +
"define stream StockStream (symbol string, price float, volume long); " +
"@Index('symbol', 'symbol') " +
"@Index('symbol', 'volume') " +
"define table StockTable (symbol string, price float, volume long); ";
String query = "" +
"@info(name = 'query1') " +
Expand All @@ -2180,15 +2181,15 @@ public void indexTableTest31() throws InterruptedException {
}
}

@Test(expectedExceptions = DuplicateAnnotationException.class)
@Test(expectedExceptions = SiddhiAppValidationException.class)
public void indexTableTest32() throws InterruptedException {
log.info("indexTableTest32");

SiddhiManager siddhiManager = new SiddhiManager();
String streams = "" +
"define stream StockStream (symbol string, price float, volume long); " +
"@Index('symbol') " +
"@Index('volume') " +
"@Index('symbol') " +
"define table StockTable (symbol string, price float, volume long); ";
String query = "" +
"@info(name = 'query1') " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.siddhi.query.api.exception.DuplicateAnnotationException;

import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;

/**
Expand Down Expand Up @@ -55,6 +56,16 @@ public static Annotation getAnnotation(String annotationName, List<Annotation> a
return annotation;
}

public static List<Annotation> getAnnotations(String annotationName, List<Annotation> annotationList) {
List<Annotation> annotations = new LinkedList<>();
for (Annotation aAnnotation : annotationList) {
if (annotationName.equalsIgnoreCase(aAnnotation.getName())) {
annotations.add(aAnnotation);
}
}
return annotations;
}

// TODO: 1/28/17 update helper methods to work with nested annotations.
public static Element getAnnotationElement(String annotationName, String elementName,
List<Annotation> annotationList) {
Expand Down