11package ru .yandex .clickhouse ;
22
3- import java .io .ByteArrayInputStream ;
4- import java .io .IOException ;
5- import java .io .InputStream ;
6- import java .net .HttpURLConnection ;
7- import java .net .URI ;
8- import java .net .URISyntaxException ;
9- import java .sql .ResultSet ;
10- import java .sql .SQLException ;
11- import java .sql .SQLWarning ;
12- import java .util .ArrayList ;
13- import java .util .EnumMap ;
14- import java .util .List ;
15- import java .util .Map ;
16- import java .util .TimeZone ;
17- import java .util .UUID ;
18-
3+ import com .google .common .base .Strings ;
194import org .apache .http .HttpEntity ;
205import org .apache .http .HttpResponse ;
216import org .apache .http .NameValuePair ;
227import org .apache .http .client .methods .HttpGet ;
238import org .apache .http .client .methods .HttpPost ;
249import org .apache .http .client .utils .URIBuilder ;
2510import org .apache .http .entity .ContentType ;
26- import org .apache .http .entity .InputStreamEntity ;
2711import org .apache .http .entity .StringEntity ;
2812import org .apache .http .entity .mime .MultipartEntityBuilder ;
2913import org .apache .http .impl .client .CloseableHttpClient ;
3014import org .apache .http .message .BasicNameValuePair ;
3115import org .apache .http .util .EntityUtils ;
3216import org .slf4j .Logger ;
3317import org .slf4j .LoggerFactory ;
34-
35- import com .google .common .base .Strings ;
36-
3718import ru .yandex .clickhouse .domain .ClickHouseFormat ;
3819import ru .yandex .clickhouse .except .ClickHouseException ;
3920import ru .yandex .clickhouse .except .ClickHouseExceptionSpecifier ;
40- import ru .yandex .clickhouse .response .ClickHouseLZ4Stream ;
41- import ru .yandex .clickhouse .response .ClickHouseResponse ;
42- import ru .yandex .clickhouse .response .ClickHouseResultSet ;
43- import ru .yandex .clickhouse .response .ClickHouseScrollableResultSet ;
44- import ru .yandex .clickhouse .response .FastByteArrayOutputStream ;
21+ import ru .yandex .clickhouse .response .*;
4522import ru .yandex .clickhouse .settings .ClickHouseProperties ;
4623import ru .yandex .clickhouse .settings .ClickHouseQueryParam ;
4724import ru .yandex .clickhouse .util .ClickHouseRowBinaryInputStream ;
4825import ru .yandex .clickhouse .util .ClickHouseStreamCallback ;
49- import ru .yandex .clickhouse .util .ClickHouseStreamHttpEntity ;
5026import ru .yandex .clickhouse .util .Utils ;
5127import ru .yandex .clickhouse .util .guava .StreamUtils ;
5228
29+ import java .io .ByteArrayInputStream ;
30+ import java .io .IOException ;
31+ import java .io .InputStream ;
32+ import java .net .HttpURLConnection ;
33+ import java .net .URI ;
34+ import java .net .URISyntaxException ;
35+ import java .sql .ResultSet ;
36+ import java .sql .SQLException ;
37+ import java .sql .SQLWarning ;
38+ import java .util .*;
39+
5340
5441public class ClickHouseStatementImpl implements ClickHouseStatement {
5542
@@ -616,9 +603,7 @@ private InputStream getInputStream(
616603 requestEntity = entityBuilder .build ();
617604 }
618605
619- if (properties .isDecompress ()) {
620- requestEntity = new LZ4EntityWrapper (requestEntity , properties .getMaxCompressBufferSize ());
621- }
606+ requestEntity = applyRequestBodyCompression (requestEntity );
622607
623608 HttpEntity entity = null ;
624609 try {
@@ -775,9 +760,8 @@ public void sendRowBinaryStream(String sql, ClickHouseStreamCallback callback) t
775760
776761 @ Override
777762 public void sendRowBinaryStream (String sql , Map <ClickHouseQueryParam , String > additionalDBParams , ClickHouseStreamCallback callback ) throws SQLException {
778- sendStream (
779- new ClickHouseStreamHttpEntity (callback , getConnection ().getTimeZone (), properties ), sql , ClickHouseFormat .RowBinary , additionalDBParams
780- );
763+ write ().withDbParams (additionalDBParams )
764+ .send (sql , callback , ClickHouseFormat .RowBinary );
781765 }
782766
783767 @ Override
@@ -787,73 +771,81 @@ public void sendNativeStream(String sql, ClickHouseStreamCallback callback) thro
787771
788772 @ Override
789773 public void sendNativeStream (String sql , Map <ClickHouseQueryParam , String > additionalDBParams , ClickHouseStreamCallback callback ) throws SQLException {
790- sendStream (
791- new ClickHouseStreamHttpEntity (callback , getConnection ().getTimeZone (), properties ), sql , ClickHouseFormat .Native , additionalDBParams
792- );
774+ write ().withDbParams (additionalDBParams )
775+ .send (sql , callback , ClickHouseFormat .Native );
793776 }
794777
795778 @ Override
796- public void sendCSVStream (InputStream content , String table , Map <ClickHouseQueryParam , String > additionalDBParams ) throws ClickHouseException {
797- String query = "INSERT INTO " + table ;
798- sendStream (new InputStreamEntity (content , -1 ), query , ClickHouseFormat .CSV , additionalDBParams );
779+ public void sendCSVStream (InputStream content , String table , Map <ClickHouseQueryParam , String > additionalDBParams ) throws SQLException {
780+ write ()
781+ .table (table )
782+ .withDbParams (additionalDBParams )
783+ .data (content )
784+ .format (ClickHouseFormat .CSV )
785+ .send ();
799786 }
800787
801788 @ Override
802- public void sendCSVStream (InputStream content , String table ) throws ClickHouseException {
789+ public void sendCSVStream (InputStream content , String table ) throws SQLException {
803790 sendCSVStream (content , table , null );
804791 }
805792
806793 @ Override
807- public void sendStream (InputStream content , String table ) throws ClickHouseException {
794+ public void sendStream (InputStream content , String table ) throws SQLException {
808795 sendStream (content , table , null );
809796 }
810797
811798 @ Override
812799 public void sendStream (InputStream content , String table ,
813- Map <ClickHouseQueryParam , String > additionalDBParams ) throws ClickHouseException {
814- String query = "INSERT INTO " + table ;
815- sendStream (new InputStreamEntity (content , -1 ), query , ClickHouseFormat .TabSeparated , additionalDBParams );
800+ Map <ClickHouseQueryParam , String > additionalDBParams ) throws SQLException {
801+ write ()
802+ .table (table )
803+ .data (content )
804+ .withDbParams (additionalDBParams )
805+ .format (ClickHouseFormat .TabSeparated )
806+ .send ();
816807 }
817808
809+ @ Deprecated
818810 public void sendStream (HttpEntity content , String sql ) throws ClickHouseException {
819811 sendStream (content , sql , ClickHouseFormat .TabSeparated , null );
820812 }
821813
814+ @ Deprecated
822815 public void sendStream (HttpEntity content , String sql ,
823816 Map <ClickHouseQueryParam , String > additionalDBParams ) throws ClickHouseException {
824817 sendStream (content , sql , ClickHouseFormat .TabSeparated , additionalDBParams );
825818 }
826819
827820 private void sendStream (HttpEntity content , String sql , ClickHouseFormat format ,
828821 Map <ClickHouseQueryParam , String > additionalDBParams ) throws ClickHouseException {
829- sendStreamSQL (content , sql + " FORMAT " + format .name (), additionalDBParams );
822+
823+ Writer writer = write ().format (format ).withDbParams (additionalDBParams ).sql (sql );
824+ sendStream (writer , content );
830825 }
831826
832827 @ Override
833828 public void sendStreamSQL (InputStream content , String sql ,
834- Map <ClickHouseQueryParam , String > additionalDBParams ) throws ClickHouseException {
835- sendStreamSQL ( new InputStreamEntity (content , - 1 ), sql , additionalDBParams );
829+ Map <ClickHouseQueryParam , String > additionalDBParams ) throws SQLException {
830+ write (). data (content ). sql ( sql ). withDbParams ( additionalDBParams ). send ( );
836831 }
837832
838833 @ Override
839- public void sendStreamSQL (InputStream content , String sql ) throws ClickHouseException {
840- sendStreamSQL ( new InputStreamEntity ( content , - 1 ), sql , null );
834+ public void sendStreamSQL (InputStream content , String sql ) throws SQLException {
835+ write (). sql ( sql ). data ( content ). send ( );
841836 }
842837
843- private void sendStreamSQL (HttpEntity content , String sql ,
844- Map <ClickHouseQueryParam , String > additionalDBParams ) throws ClickHouseException {
845- // echo -ne '10\n11\n12\n' | POST 'http://localhost:8123/?query=INSERT INTO t FORMAT TabSeparated'
838+ void sendStream (Writer writer , HttpEntity content ) throws ClickHouseException {
846839 HttpEntity entity = null ;
847840 try {
848- URI uri = buildRequestUri (null , null , additionalDBParams , null , false );
841+
842+ URI uri = buildRequestUri (writer .getSql (), null , writer .getAdditionalDBParams (), writer .getRequestParams (), false );
849843 uri = followRedirects (uri );
850- HttpEntity requestEntity = new BodyEntityWrapper (sql , content );
844+
845+ content = applyRequestBodyCompression (content );
851846
852847 HttpPost httpPost = new HttpPost (uri );
853- if (properties .isDecompress ()) {
854- requestEntity = new LZ4EntityWrapper (requestEntity , properties .getMaxCompressBufferSize ());
855- }
856- httpPost .setEntity (requestEntity );
848+ httpPost .setEntity (content );
857849 HttpResponse response = client .execute (httpPost );
858850 entity = response .getEntity ();
859851 checkForErrorAndThrow (entity , response );
@@ -892,6 +884,13 @@ public boolean isCloseOnCompletion() throws SQLException {
892884 return closeOnCompletion ;
893885 }
894886
887+ private HttpEntity applyRequestBodyCompression (final HttpEntity entity ) {
888+ if (properties .isDecompress ()) {
889+ return new LZ4EntityWrapper (entity , properties .getMaxCompressBufferSize ());
890+ }
891+ return entity ;
892+ }
893+
895894 private ClickHouseResultSet createResultSet (InputStream is , int bufferSize , String db , String table , boolean usesWithTotals ,
896895 ClickHouseStatement statement , TimeZone timezone , ClickHouseProperties properties ) throws IOException {
897896 if (isResultSetScrollable ) {
@@ -916,4 +915,9 @@ private Map<ClickHouseQueryParam, String> addQueryIdTo(Map<ClickHouseQueryParam,
916915
917916 return parameters ;
918917 }
918+
919+ @ Override
920+ public Writer write () {
921+ return new Writer (this );
922+ }
919923}
0 commit comments