diff --git a/src/main/java/org/influxdb/InfluxDB.java b/src/main/java/org/influxdb/InfluxDB.java index 7825dbfa7..d14c356a5 100644 --- a/src/main/java/org/influxdb/InfluxDB.java +++ b/src/main/java/org/influxdb/InfluxDB.java @@ -439,6 +439,37 @@ public void write(final String database, final String retentionPolicy, */ public void query(Query query, int chunkSize, Consumer consumer); + /** + * Execute a streaming postQuery against a database. + * + * @param query + * the query to execute. + * @param chunkSize + * the number of QueryResults to process in one chunk. + * @param consumer + * the consumer to invoke for each received QueryResult + * @param timeUnit + * the time unit of the results. + */ + public void query(Query query, TimeUnit timeUnit, int chunkSize, Consumer consumer); + + /** + * Execute a streaming postQuery against a database. + * + * @param query + * the query to execute. + * @param timeUnit + * the time unit of the results. + * @param chunkSize + * the number of QueryResults to process in one chunk. + * @param onSuccess + * the consumer to invoke when result is received + * @param onFailure + * the consumer to invoke when error is thrown + */ + public void query(Query query, TimeUnit timeUnit, int chunkSize, + Consumer onSuccess, Consumer onFailure); + /** * Execute a query against a database. * diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index a2f1019b6..982d6f2b6 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -524,42 +524,81 @@ public void onFailure(final Call call, final Throwable throwable) { * {@inheritDoc} */ @Override - public void query(final Query query, final int chunkSize, final Consumer consumer) { - Call call = null; - if (query instanceof BoundParameterQuery) { - BoundParameterQuery boundParameterQuery = (BoundParameterQuery) query; - call = this.influxDBService.query(query.getDatabase(), query.getCommandWithUrlEncoded(), chunkSize, - boundParameterQuery.getParameterJsonWithUrlEncoded()); - } else { - call = this.influxDBService.query(query.getDatabase(), query.getCommandWithUrlEncoded(), chunkSize); - } + public void query(final Query query, final int chunkSize, final Consumer consumer) { + query(query, null, chunkSize, consumer, null); + } + + /** + * {@inheritDoc} + */ + @Override + public void query(final Query query, final TimeUnit timeUnit, + final int chunkSize, final Consumer consumer) { + query(query, timeUnit, chunkSize, consumer, null); + } + /** + * {@inheritDoc} + */ + @Override + public void query(final Query query, final TimeUnit timeUnit, final int chunkSize, + final Consumer onSuccess, final Consumer onFailure) { + Call call = makeResponseBodyCall(query, timeUnit, chunkSize); call.enqueue(new Callback() { @Override public void onResponse(final Call call, final Response response) { try { if (response.isSuccessful()) { ResponseBody chunkedBody = response.body(); - chunkProccesor.process(chunkedBody, consumer); + chunkProccesor.process(chunkedBody, onSuccess); } else { // REVIEW: must be handled consistently with IOException. ResponseBody errorBody = response.errorBody(); if (errorBody != null) { - throw new InfluxDBException(errorBody.string()); + InfluxDBException errorBodyException = new InfluxDBException(errorBody.string()); + if (onFailure != null) { + onFailure.accept(errorBodyException); + } else { + throw errorBodyException; + } } } } catch (IOException e) { QueryResult queryResult = new QueryResult(); queryResult.setError(e.toString()); - consumer.accept(queryResult); + onSuccess.accept(queryResult); } } - @Override - public void onFailure(final Call call, final Throwable t) { - throw new InfluxDBException(t); - } - }); + @Override + public void onFailure(final Call call, final Throwable t) { + if (onFailure != null) { + onFailure.accept(t); + } else { + throw new InfluxDBException(t); + } + } + }); + } + + private Call makeResponseBodyCall(final Query query, final TimeUnit timeUnit, final int chunkSize) { + if (query instanceof BoundParameterQuery) { + BoundParameterQuery boundParameterQuery = (BoundParameterQuery) query; + if (timeUnit == null) { + return this.influxDBService.query(query.getDatabase(), query.getCommandWithUrlEncoded(), chunkSize, + boundParameterQuery.getParameterJsonWithUrlEncoded()); + } + return this.influxDBService.query(query.getDatabase(), query.getCommandWithUrlEncoded(), + TimeUtil.toTimePrecision(timeUnit), + chunkSize, boundParameterQuery.getParameterJsonWithUrlEncoded()); + } else { + if (timeUnit == null) { + return this.influxDBService.query(query.getDatabase(), query.getCommandWithUrlEncoded(), chunkSize); + } + + return this.influxDBService.query(query.getDatabase(), query.getCommandWithUrlEncoded(), + TimeUtil.toTimePrecision(timeUnit), chunkSize); + } } /** diff --git a/src/main/java/org/influxdb/impl/InfluxDBService.java b/src/main/java/org/influxdb/impl/InfluxDBService.java index dfe897257..cf8fcae10 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBService.java +++ b/src/main/java/org/influxdb/impl/InfluxDBService.java @@ -75,8 +75,19 @@ public Call postQuery(@Query(DB) String db, public Call query(@Query(DB) String db, @Query(value = Q, encoded = true) String query, @Query(CHUNK_SIZE) int chunkSize); + @Streaming + @GET("query?chunked=true") + public Call query(@Query(DB) String db, @Query(value = Q, encoded = true) String query, + @Query(EPOCH) String epoch, @Query(CHUNK_SIZE) int chunkSize); + @Streaming @POST("query?chunked=true") public Call query(@Query(DB) String db, @Query(value = Q, encoded = true) String query, @Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params); + + @Streaming + @POST("query?chunked=true") + public Call query(@Query(DB) String db, @Query(value = Q, encoded = true) String query, + @Query(EPOCH) String precision, @Query(CHUNK_SIZE) int chunkSize, + @Query(value = PARAMS, encoded = true) String params); } diff --git a/src/test/java/org/influxdb/InfluxDBTest.java b/src/test/java/org/influxdb/InfluxDBTest.java index d5af2bf2c..aeaeeea6c 100644 --- a/src/test/java/org/influxdb/InfluxDBTest.java +++ b/src/test/java/org/influxdb/InfluxDBTest.java @@ -1,13 +1,10 @@ package org.influxdb; +import okhttp3.OkHttpClient; import org.influxdb.InfluxDB.LogLevel; import org.influxdb.InfluxDB.ResponseFormat; -import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.*; import org.influxdb.dto.BoundParameterQuery.QueryBuilder; -import org.influxdb.dto.Point; -import org.influxdb.dto.Pong; -import org.influxdb.dto.Query; -import org.influxdb.dto.QueryResult; import org.influxdb.dto.QueryResult.Series; import org.influxdb.impl.InfluxDBImpl; import org.junit.jupiter.api.AfterEach; @@ -18,8 +15,6 @@ import org.junit.platform.runner.JUnitPlatform; import org.junit.runner.RunWith; -import okhttp3.OkHttpClient; - import java.io.IOException; import java.time.Instant; import java.time.ZoneId; @@ -28,14 +23,9 @@ import java.util.Arrays; import java.util.List; import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; /** @@ -47,10 +37,13 @@ @RunWith(JUnitPlatform.class) public class InfluxDBTest { - InfluxDB influxDB; + private final static long DEFAULT_OPERATION_TIMEOUT = 5000L; + private final static int DEFAULT_QUERY_CHUNK_SIZE = 10; private final static int UDP_PORT = 8089; final static String UDP_DATABASE = "udp"; + InfluxDB influxDB; + /** * Create a influxDB connection before all tests start. * @@ -137,12 +130,13 @@ public void testBoundParameterQuery() throws InterruptedException { waitForTestresults.notifyAll(); } }; - this.influxDB.query(query, 10, check); + this.influxDB.query(query, DEFAULT_QUERY_CHUNK_SIZE, check); synchronized (waitForTestresults) { - waitForTestresults.wait(2000); + waitForTestresults.wait(DEFAULT_OPERATION_TIMEOUT); } } + /** * Tests for callback query. */ @@ -162,6 +156,97 @@ public void accept(QueryResult queryResult) { result.result(); } + /** + * Tests for the chunk stream query with specified time unit + */ + @Test + public void testChunkTimeUnitCallbackQuery() throws Throwable { + this.influxDB.setDatabase(UDP_DATABASE); + final int chunkCount = 40; + + for (int i = 0; i < chunkCount; i++) { + Point point = Point.measurement("cpu") + .tag("atag", "test") + .addField("idle", 90L + i) + .addField("usertime", 9L + i) + .addField("system", 1L + i) + .build(); + this.influxDB.write(point); + } + + // test + Query query = QueryBuilder.newQuery("SELECT * FROM cpu WHERE atag = $atag") + .forDatabase(UDP_DATABASE) + .bind("atag", "test") + .create(); + + Object waitForTestresults = new Object(); + AtomicInteger callBackCalled = new AtomicInteger(0); + final int expectedCallBackCalled = chunkCount / DEFAULT_QUERY_CHUNK_SIZE; + + Consumer check = (queryResult) -> { + if (queryResult.getResults() == null) { + return; + } + + callBackCalled.getAndIncrement(); + List> values = queryResult.getResults().get(0).getSeries().get(0).getValues(); + Assertions.assertEquals(DEFAULT_QUERY_CHUNK_SIZE, values.size()); + Assertions.assertTrue(values.get(0).get(0) instanceof Number); + + if (callBackCalled.get() == expectedCallBackCalled) { + synchronized (waitForTestresults) { + waitForTestresults.notifyAll(); + } + } + }; + + this.influxDB.query(query, TimeUnit.MILLISECONDS, DEFAULT_QUERY_CHUNK_SIZE, check); + + synchronized (waitForTestresults) { + waitForTestresults.wait(DEFAULT_OPERATION_TIMEOUT); + Assertions.assertEquals(expectedCallBackCalled, callBackCalled.get()); + } + } + + /** + * Test for the failure callback for a stream query + */ + @Test + public void testFailureCallbackQuery() throws Throwable { + this.influxDB.setDatabase(UDP_DATABASE); + Query query = QueryBuilder.newQuery("Invalid query") + .forDatabase(UDP_DATABASE) + .bind("atag", "test") + .create(); + + AtomicBoolean onSuccessCalled = new AtomicBoolean(false); + AtomicBoolean onFailureCalled = new AtomicBoolean(false); + Object waitForTestResults = new Object(); + + Consumer onSuccess = (queryResult) -> { + onSuccessCalled.set(true); + synchronized (waitForTestResults) { + waitForTestResults.notifyAll(); + } + }; + + Consumer onFailure = (throwable) -> { + onFailureCalled.set(true); + synchronized (waitForTestResults) { + waitForTestResults.notifyAll(); + } + }; + + this.influxDB.query(query, TimeUnit.MILLISECONDS, DEFAULT_QUERY_CHUNK_SIZE, onSuccess, onFailure); + + synchronized (waitForTestResults) { + waitForTestResults.wait(DEFAULT_OPERATION_TIMEOUT); + Assertions.assertFalse(onSuccessCalled.get()); + Assertions.assertTrue(onFailureCalled.get()); + } + } + /** * Test that describe Databases works. */ @@ -790,6 +875,35 @@ public void accept(QueryResult result) { Assertions.assertFalse(countDownLatch.await(10, TimeUnit.SECONDS)); } + /** + * Test chunking edge case. + * @throws InterruptedException + */ + @Test + public void testChunkingFailureWithCallback() throws InterruptedException { + if (this.influxDB.version().startsWith("0.") || this.influxDB.version().startsWith("1.0")) { + // do not test version 0.13 and 1.0 + return; + } + String dbName = "write_unittest_" + System.currentTimeMillis(); + this.influxDB.createDatabase(dbName); + final CountDownLatch countDownLatch = new CountDownLatch(1); + Query query = new Query("UNKNOWN_QUERY", dbName); + this.influxDB.query(query, TimeUnit.NANOSECONDS, 10, new Consumer() { + @Override + public void accept(QueryResult result) { + countDownLatch.countDown(); + } + }, new Consumer() { + @Override + public void accept(Throwable throwable) { + countDownLatch.countDown(); + } + }); + this.influxDB.deleteDatabase(dbName); + Assertions.assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + } + /** * Test chunking on 0.13 and 1.0. * @throws InterruptedException