diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/partition/PartitionRuntimeImpl.java b/modules/siddhi-core/src/main/java/io/siddhi/core/partition/PartitionRuntimeImpl.java index 8333264b8a..bdd2891513 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/partition/PartitionRuntimeImpl.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/partition/PartitionRuntimeImpl.java @@ -99,7 +99,8 @@ public PartitionRuntimeImpl(ConcurrentMap streamDefi Partition partition, int partitionIndex, SiddhiAppContext siddhiAppContext) { this.siddhiAppContext = siddhiAppContext; if (partition.getPartitionTypeMap().isEmpty()) { - throw new SiddhiAppCreationException("Partition must have at least one executor. But found none."); + throw new SiddhiAppCreationException("Partition must have at least one partition executor. " + + "But found none."); } try { Element element = AnnotationHelper.getAnnotationElement("info", "name", diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/EventHolderPasser.java b/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/EventHolderPasser.java index d43e06a141..b83dc0e315 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/EventHolderPasser.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/EventHolderPasser.java @@ -25,8 +25,11 @@ import io.siddhi.core.event.stream.converter.ZeroStreamEventConverter; import io.siddhi.core.event.stream.holder.StreamEventClonerHolder; import io.siddhi.core.exception.OperationNotSupportedException; -import io.siddhi.core.exception.SiddhiAppCreationException; -import io.siddhi.core.table.holder.*; +import io.siddhi.core.table.holder.EventHolder; +import io.siddhi.core.table.holder.IndexEventHolder; +import io.siddhi.core.table.holder.IndexEventHolderForCache; +import io.siddhi.core.table.holder.ListEventHolder; +import io.siddhi.core.table.holder.PrimaryKeyReferenceHolder; import io.siddhi.core.util.SiddhiConstants; import io.siddhi.query.api.annotation.Annotation; import io.siddhi.query.api.annotation.Element; @@ -37,9 +40,7 @@ import org.apache.log4j.Logger; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.stream.Stream; /** * Class to parse {@link EventHolder} @@ -70,19 +71,28 @@ public static EventHolder parse(AbstractDefinition tableDefinition, StreamEventF .toArray(PrimaryKeyReferenceHolder[]::new); } - // indexes. - Annotation indexAnnotation = AnnotationHelper.getAnnotation(SiddhiConstants.ANNOTATION_INDEX, - tableDefinition.getAnnotations()); - if (indexAnnotation != null) { + for (Annotation indexAnnotation : AnnotationHelper.getAnnotations(SiddhiConstants.ANNOTATION_INDEX, + tableDefinition.getAnnotations())) { if (indexAnnotation.getElements().size() == 0) { - throw new SiddhiAppValidationException(SiddhiConstants.ANNOTATION_INDEX + " annotation contains " - + indexAnnotation.getElements().size() + " element"); + throw new SiddhiAppValidationException(SiddhiConstants.ANNOTATION_INDEX + " annotation of " + + "in-memory table should contain only one index element, but found " + + indexAnnotation.getElements().size() + " element", + indexAnnotation.getQueryContextStartIndex(), + indexAnnotation.getQueryContextEndIndex()); + } else if (indexAnnotation.getElements().size() > 1) { + throw new SiddhiAppValidationException(SiddhiConstants.ANNOTATION_INDEX + " annotation of the " + + "in-memory table should only contain one index element but found " + + indexAnnotation.getElements().size() + " elements. To use multiple indexes, " + + "define multiple '@index()' annotations with one index element " + + "per each index key", + indexAnnotation.getQueryContextStartIndex(), + indexAnnotation.getQueryContextEndIndex()); } for (Element element : indexAnnotation.getElements()) { Integer previousValue = indexMetaData.put(element.getValue().trim(), tableDefinition .getAttributePosition(element.getValue().trim())); if (previousValue != null) { - throw new SiddhiAppCreationException("Multiple " + SiddhiConstants.ANNOTATION_INDEX + " " + + throw new SiddhiAppValidationException("Multiple " + SiddhiConstants.ANNOTATION_INDEX + " " + "annotations defined with same attribute '" + element.getValue().trim() + "', at '" + tableDefinition.getId() + "'", indexAnnotation.getQueryContextStartIndex(), indexAnnotation.getQueryContextEndIndex()); diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/PartitionParser.java b/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/PartitionParser.java index e2eb06248b..c76c4c2542 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/PartitionParser.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/util/parser/PartitionParser.java @@ -51,9 +51,9 @@ public static PartitionRuntimeImpl parse(SiddhiAppRuntimeBuilder siddhiAppRuntim siddhiAppRuntimeBuilder.getStreamDefinitionMap(); ConcurrentMap windowDefinitionMap = siddhiAppRuntimeBuilder.getWindowDefinitionMap(); + validateStreamPartitions(partition.getPartitionTypeMap(), streamDefinitionMap, windowDefinitionMap); PartitionRuntimeImpl partitionRuntime = new PartitionRuntimeImpl(streamDefinitionMap, windowDefinitionMap, siddhiAppRuntimeBuilder.getStreamJunctions(), partition, partitionIndex, siddhiAppContext); - validateStreamPartitions(partition.getPartitionTypeMap(), streamDefinitionMap, windowDefinitionMap); for (Query query : partition.getQueryList()) { List executors = new ArrayList(); ConcurrentMap combinedStreamMap = @@ -93,7 +93,7 @@ private static void validateStreamPartitions(Map partitio if ((!streamDefinitionMap.containsKey(entry.getKey())) && (!windowDefinitionMap.containsKey(entry.getKey()))) { throw new SiddhiAppCreationException("Stream/window with name '" + entry.getKey() + - "' does not defined!", + "' is not defined!", entry.getValue().getQueryContextStartIndex(), entry.getValue().getQueryContextEndIndex()); } diff --git a/modules/siddhi-core/src/test/java/io/siddhi/core/query/partition/PartitionTestCase2.java b/modules/siddhi-core/src/test/java/io/siddhi/core/query/partition/PartitionTestCase2.java index d462050753..fad06e36bc 100644 --- a/modules/siddhi-core/src/test/java/io/siddhi/core/query/partition/PartitionTestCase2.java +++ b/modules/siddhi-core/src/test/java/io/siddhi/core/query/partition/PartitionTestCase2.java @@ -1114,4 +1114,92 @@ public void receive(Event[] events) { siddhiAppRuntime.shutdown(); } + @Test + public void testPartitionQuery50() throws InterruptedException { + log.info("Partition test50"); + SiddhiManager siddhiManager = new SiddhiManager(); + + String siddhiApp = "" + + "@app:name('PartitionTest50') " + + "" + + "" + + "define stream cseEventStream (symbol string, price float, volume int); " + + "" + + "define stream cseEventStreamOne (symbol string, price float,volume int);" + + "define stream cseEventStreamTwo (symbol string, price float,volume int);" + + "" + + "@info(name = 'query')" + + "from cseEventStreamOne " + + "select symbol, price, volume " + + "insert into cseEventStream;" + + " " + + "partition with (price>=100 as 'large' or " + + " price<100 and price>=50 as 'medium' or " + + " price<50 as 'small' of cseEventStream, " + + " symbol of cseEventStreamTwo ) " + + " begin" + + " @info(name = 'query1') " + + " from cseEventStream " + + " insert into #OutStockStream1 ; " + + "" + + " @info(name = 'query2') " + + " from cseEventStreamTwo " + + " insert into #OutStockStream1 ; " + + " " + + " @info(name = 'query3') " + + " from #OutStockStream1 " + + " select symbol, sum(price) as price " + + " insert into #OutStockStream2 ;" + + " " + + " @info(name = 'query4') " + + " from #OutStockStream2 " + + " insert into OutStockStream ;" + + " " + + " end ; "; + + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp); + + + siddhiAppRuntime.addCallback("OutStockStream", new StreamCallback() { + @Override + public void receive(Event[] events) { + EventPrinter.print(events); + for (Event event : events) { + count.incrementAndGet(); + eventArrived = true; + if (count.get() == 1) { + AssertJUnit.assertEquals(25.0, event.getData()[1]); + } else if (count.get() == 2) { + AssertJUnit.assertEquals(30.0, event.getData()[1]); + } else if (count.get() == 3) { + AssertJUnit.assertEquals(7005.60009765625, event.getData()[1]); + } else if (count.get() == 4) { + AssertJUnit.assertEquals(50.0, event.getData()[1]); + } else if (count.get() == 5) { + AssertJUnit.assertEquals(55.0, event.getData()[1]); + } else if (count.get() == 6) { + AssertJUnit.assertEquals(7000.000097751617, event.getData()[1]); + } + } + } + }); + + InputHandler inputHandlerOne = siddhiAppRuntime.getInputHandler("cseEventStreamOne"); + InputHandler inputHandlerTwo = siddhiAppRuntime.getInputHandler("cseEventStreamTwo"); + siddhiAppRuntime.start(); + + inputHandlerOne.send(new Object[]{"IBM", 25f, 100}); + inputHandlerTwo.send(new Object[]{"small", 5f, 100}); + inputHandlerOne.send(new Object[]{"WSO2", 7005.6f, 100}); + inputHandlerOne.send(new Object[]{"IBM", 50f, 100}); + inputHandlerOne.send(new Object[]{"ORACLE", 25f, 100}); + inputHandlerTwo.send(new Object[]{"large", -5.6f, 100}); + + SiddhiTestHelper.waitForEvents(100, 5, count, 60000); + AssertJUnit.assertTrue(6 >= count.get()); + siddhiAppRuntime.shutdown(); + + } + } diff --git a/modules/siddhi-core/src/test/java/io/siddhi/core/query/table/IndexTableTestCase.java b/modules/siddhi-core/src/test/java/io/siddhi/core/query/table/IndexTableTestCase.java index c095e54605..b4679806d0 100644 --- a/modules/siddhi-core/src/test/java/io/siddhi/core/query/table/IndexTableTestCase.java +++ b/modules/siddhi-core/src/test/java/io/siddhi/core/query/table/IndexTableTestCase.java @@ -21,13 +21,12 @@ import io.siddhi.core.SiddhiAppRuntime; import io.siddhi.core.SiddhiManager; import io.siddhi.core.event.Event; -import io.siddhi.core.exception.SiddhiAppCreationException; import io.siddhi.core.query.output.callback.QueryCallback; import io.siddhi.core.stream.input.InputHandler; import io.siddhi.core.util.EventPrinter; import io.siddhi.core.util.SiddhiTestHelper; import io.siddhi.query.api.exception.AttributeNotExistException; -import io.siddhi.query.api.exception.DuplicateAnnotationException; +import io.siddhi.query.api.exception.SiddhiAppValidationException; import org.apache.log4j.Logger; import org.testng.AssertJUnit; import org.testng.annotations.BeforeMethod; @@ -2006,7 +2005,8 @@ public void indexTableTest28() throws InterruptedException { "define stream CheckStockStream (symbol string, volume long); " + "define stream UpdateStockStream (symbol string, price float, volume long);" + "@PrimaryKey('symbol') " + - "@Index('price','volume') " + + "@Index('price') " + + "@Index('volume') " + "define table StockTable (symbol string, price float, volume long); "; String query = "" + "@info(name = 'query1') " + @@ -2072,7 +2072,8 @@ public void indexTableTest29() throws InterruptedException { "define stream StockStream (symbol string, price float, volume long); " + "define stream CheckStockStream (symbol string, volume long); " + "define stream UpdateStockStream (symbol string, price float, volume long);" + - "@Index('symbol', 'volume') " + + "@Index('symbol') " + + "@Index('volume') " + "define table StockTable (symbol string, price float, volume long); "; String query = "" + "@info(name = 'query1') " + @@ -2154,14 +2155,14 @@ public void indexTableTest30() throws InterruptedException { } } - @Test(expectedExceptions = SiddhiAppCreationException.class) + @Test(expectedExceptions = SiddhiAppValidationException.class) public void indexTableTest31() throws InterruptedException { log.info("indexTableTest31"); SiddhiManager siddhiManager = new SiddhiManager(); String streams = "" + "define stream StockStream (symbol string, price float, volume long); " + - "@Index('symbol', 'symbol') " + + "@Index('symbol', 'volume') " + "define table StockTable (symbol string, price float, volume long); "; String query = "" + "@info(name = 'query1') " + @@ -2180,7 +2181,7 @@ public void indexTableTest31() throws InterruptedException { } } - @Test(expectedExceptions = DuplicateAnnotationException.class) + @Test(expectedExceptions = SiddhiAppValidationException.class) public void indexTableTest32() throws InterruptedException { log.info("indexTableTest32"); @@ -2188,7 +2189,7 @@ public void indexTableTest32() throws InterruptedException { String streams = "" + "define stream StockStream (symbol string, price float, volume long); " + "@Index('symbol') " + - "@Index('volume') " + + "@Index('symbol') " + "define table StockTable (symbol string, price float, volume long); "; String query = "" + "@info(name = 'query1') " + diff --git a/modules/siddhi-query-api/src/main/java/io/siddhi/query/api/util/AnnotationHelper.java b/modules/siddhi-query-api/src/main/java/io/siddhi/query/api/util/AnnotationHelper.java index 9350df06f8..bffd51f06b 100644 --- a/modules/siddhi-query-api/src/main/java/io/siddhi/query/api/util/AnnotationHelper.java +++ b/modules/siddhi-query-api/src/main/java/io/siddhi/query/api/util/AnnotationHelper.java @@ -23,6 +23,7 @@ import io.siddhi.query.api.exception.DuplicateAnnotationException; import java.util.Arrays; +import java.util.LinkedList; import java.util.List; /** @@ -55,6 +56,16 @@ public static Annotation getAnnotation(String annotationName, List a return annotation; } + public static List getAnnotations(String annotationName, List annotationList) { + List annotations = new LinkedList<>(); + for (Annotation aAnnotation : annotationList) { + if (annotationName.equalsIgnoreCase(aAnnotation.getName())) { + annotations.add(aAnnotation); + } + } + return annotations; + } + // TODO: 1/28/17 update helper methods to work with nested annotations. public static Element getAnnotationElement(String annotationName, String elementName, List annotationList) {