diff --git a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java index 92d1a4b61..2dbf97a6c 100644 --- a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java @@ -1,6 +1,7 @@ package com.clickhouse.client; import com.clickhouse.client.api.Client; +import com.clickhouse.client.api.ClientException; import com.clickhouse.client.api.ClientFaultCause; import com.clickhouse.client.api.ConnectionInitiationException; import com.clickhouse.client.api.ConnectionReuseStrategy; @@ -21,6 +22,7 @@ import org.apache.hc.core5.http.ConnectionRequestTimeoutException; import org.apache.hc.core5.http.HttpStatus; import org.apache.hc.core5.net.URIBuilder; +import org.testcontainers.utility.ThrowingFunction; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -33,13 +35,18 @@ import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import static com.github.tomakehurst.wiremock.stubbing.Scenario.STARTED; public class HttpTransportTests extends BaseIntegrationTest{ + static { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG"); + } @Test(groups = {"integration"},dataProvider = "testConnectionTTLProvider") @SuppressWarnings("java:S2925") @@ -219,18 +226,16 @@ public void testSecureConnection() { } } - @Test(groups = { "integration" }, enabled = true) - public void testNoHttpResponseFailure() { + @Test(groups = { "integration" }, dataProvider = "NoResponseFailureProvider") + public void testInsertAndNoHttpResponseFailure(String body, int maxRetries, ThrowingFunction function, + boolean shouldFail) { WireMockServer faultyServer = new WireMockServer( WireMockConfiguration .options().port(9090).notifier(new ConsoleNotifier(false))); faultyServer.start(); - byte[] requestBody = ("INSERT INTO table01 FORMAT " + - ClickHouseFormat.TSV.name() + " \n1\t2\t3\n").getBytes(); - // First request gets no response faultyServer.addStubMapping(WireMock.post(WireMock.anyUrl()) - .withRequestBody(WireMock.binaryEqualTo(requestBody)) + .withRequestBody(WireMock.equalTo(body)) .inScenario("Retry") .whenScenarioStateIs(STARTED) .willSetStateTo("Failed") @@ -238,7 +243,7 @@ public void testNoHttpResponseFailure() { // Second request gets a response (retry) faultyServer.addStubMapping(WireMock.post(WireMock.anyUrl()) - .withRequestBody(WireMock.binaryEqualTo(requestBody)) + .withRequestBody(WireMock.equalTo(body)) .inScenario("Retry") .whenScenarioStateIs("Failed") .willSetStateTo("Done") @@ -250,20 +255,53 @@ public void testNoHttpResponseFailure() { .addEndpoint(Protocol.HTTP, "localhost", faultyServer.port(), false) .setUsername("default") .setPassword("") - .useNewImplementation(true) -// .useNewImplementation(System.getProperty("client.tests.useNewImplementation", "false").equals("true")) + .useNewImplementation(true) // because of the internal differences .compressClientRequest(false) - .setOption(ClickHouseClientOption.RETRY.getKey(), "2") + .setMaxRetries(maxRetries) .build(); try { - InsertResponse insertResponse = mockServerClient.insert("table01", - new ByteArrayInputStream("1\t2\t3\n".getBytes()), ClickHouseFormat.TSV).get(30, TimeUnit.SECONDS); - insertResponse.close(); + function.apply(mockServerClient); + } catch (ClientException e) { + e.printStackTrace(); + if (!shouldFail) { + Assert.fail("Unexpected exception", e); + } + return; } catch (Exception e) { Assert.fail("Unexpected exception", e); } finally { faultyServer.stop(); } + + if (shouldFail) { + Assert.fail("Expected exception"); + } + } + + @DataProvider(name = "NoResponseFailureProvider") + public static Object[][] noResponseFailureProvider() { + + String insertBody = "INSERT INTO table01 FORMAT " + ClickHouseFormat.TSV.name() + " \n1\t2\t3\n"; + ThrowingFunction insertFunction = (client) -> { + InsertResponse insertResponse = client.insert("table01", + new ByteArrayInputStream("1\t2\t3\n".getBytes()), ClickHouseFormat.TSV).get(30, TimeUnit.SECONDS); + insertResponse.close(); + return null; + }; + + String selectBody = "select timezone()"; + ThrowingFunction queryFunction = (client) -> { + QueryResponse response = client.query("select timezone()").get(30, TimeUnit.SECONDS); + response.close(); + return null; + }; + + return new Object[][]{ + {insertBody, 1, insertFunction, false}, + {selectBody, 1, queryFunction, false}, + {insertBody, 0, insertFunction, true}, + {selectBody, 0, queryFunction, true} + }; } } diff --git a/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java b/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java index 8f3cabf92..eac611bf6 100644 --- a/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java @@ -169,51 +169,4 @@ public void insertRawDataSimple(int numberOfRecords) throws Exception { OperationMetrics metrics = response.getMetrics(); assertEquals((int)response.getWrittenRows(), numberOfRecords ); } - - @Test(groups = { "integration" }, enabled = true) - public void testNoHttpResponseFailure() { - WireMockServer faultyServer = new WireMockServer( WireMockConfiguration - .options().port(9090).notifier(new ConsoleNotifier(false))); - faultyServer.start(); - - byte[] requestBody = ("INSERT INTO table01 FORMAT " + - ClickHouseFormat.TSV.name() + " \n1\t2\t3\n").getBytes(); - - // First request gets no response - faultyServer.addStubMapping(WireMock.post(WireMock.anyUrl()) - .withRequestBody(WireMock.binaryEqualTo(requestBody)) - .inScenario("Retry") - .whenScenarioStateIs(STARTED) - .willSetStateTo("Failed") - .willReturn(WireMock.aResponse().withFault(Fault.EMPTY_RESPONSE)).build()); - - // Second request gets a response (retry) - faultyServer.addStubMapping(WireMock.post(WireMock.anyUrl()) - .withRequestBody(WireMock.binaryEqualTo(requestBody)) - .inScenario("Retry") - .whenScenarioStateIs("Failed") - .willSetStateTo("Done") - .willReturn(WireMock.aResponse() - .withHeader("X-ClickHouse-Summary", - "{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}")).build()); - - Client mockServerClient = new Client.Builder() - .addEndpoint(Protocol.HTTP, "localhost", faultyServer.port(), false) - .setUsername("default") - .setPassword("") - .useNewImplementation(true) -// .useNewImplementation(System.getProperty("client.tests.useNewImplementation", "false").equals("true")) - .compressClientRequest(false) - .setOption(ClickHouseClientOption.RETRY.getKey(), "2") - .build(); - try { - InsertResponse insertResponse = mockServerClient.insert("table01", - new ByteArrayInputStream("1\t2\t3\n".getBytes()), ClickHouseFormat.TSV, settings).get(30, TimeUnit.SECONDS); - insertResponse.close(); - } catch (Exception e) { - Assert.fail("Unexpected exception", e); - } finally { - faultyServer.stop(); - } - } }