diff --git a/.gitignore b/.gitignore index d5188c1..b84ea99 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,18 @@ hs_err_pid* replay_pid* /cmake-build-debug/ *.lst + +############################## +## Maven +############################## +target/ + +############################## +## IntelliJ +############################## +out/ +.idea/ +.idea_modules/ +*.iml +*.ipr +*.iws \ No newline at end of file diff --git a/pom.xml b/pom.xml index 36203c4..83e5999 100644 --- a/pom.xml +++ b/pom.xml @@ -12,6 +12,7 @@ <maven.compiler.source>22</maven.compiler.source> <maven.compiler.target>22</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <arrow.version>18.1.0</arrow.version> </properties> <dependencies> @@ -23,6 +24,23 @@ <version>5.8.1</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-vector</artifactId> + <version>${arrow.version}</version> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-unsafe</artifactId> + <version>${arrow.version}</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-compression</artifactId> + <version>${arrow.version}</version> + <scope>runtime</scope> + </dependency> </dependencies> <build> @@ -38,6 +56,7 @@ <includes> <include>**/*Test.java</include> </includes> + <argLine>--add-opens=java.base/java.nio=ALL-UNNAMED</argLine> </configuration> </plugin> </plugins> diff --git a/src/jni/chdb_jni.cpp b/src/jni/chdb_jni.cpp index c10bb53..bcd5726 100644 --- a/src/jni/chdb_jni.cpp +++ b/src/jni/chdb_jni.cpp @@ -48,19 +48,21 @@ local_result_v2 * queryToBuffer( } -JNIEXPORT jobject JNICALL Java_org_chdb_jdbc_ChdbJniUtil_executeQuery(JNIEnv *env, jclass clazz, jstring query) { +JNIEXPORT jobject JNICALL Java_org_chdb_jdbc_ChdbJniUtil_executeQuery(JNIEnv *env, jclass clazz, jstring query, jstring format) { // 1. Convert Java String to C++ string std::cout << "call func: ChdbJniUtil_executeQuery!" << std::endl; const char *queryStr = env->GetStringUTFChars(query, nullptr); + const char *formatStr = env->GetStringUTFChars(format, nullptr); + if (queryStr == nullptr) { std::cerr << "Error: Failed to convert Java string to C++ string" << std::endl; return nullptr; } // 2. Call the native query function - local_result_v2 *result = queryToBuffer(queryStr); + local_result_v2 *result = queryToBuffer(queryStr, formatStr); // 3. Release the Java string resources env->ReleaseStringUTFChars(query, queryStr); diff --git a/src/jni/chdb_jni.h b/src/jni/chdb_jni.h index c2ff837..5fb1126 100644 --- a/src/jni/chdb_jni.h +++ b/src/jni/chdb_jni.h @@ -7,8 +7,7 @@ extern "C" { #endif -JNIEXPORT jobject JNICALL Java_org_chdb_jdbc_ChdbJniUtil_executeQuery(JNIEnv *, jclass, jstring); -//JNIEXPORT jstring JNICALL Java_org_chdb_jdbc_ChdbJniUtil_executeQuery(JNIEnv *, jclass, jstring); +JNIEXPORT jobject JNICALL Java_org_chdb_jdbc_ChdbJniUtil_executeQuery(JNIEnv *, jclass, jstring, jstring); #ifdef __cplusplus } diff --git a/src/main/java/org/chdb/jdbc/ChdbJniUtil.java b/src/main/java/org/chdb/jdbc/ChdbJniUtil.java index b0b2211..2c6c7e3 100644 --- a/src/main/java/org/chdb/jdbc/ChdbJniUtil.java +++ b/src/main/java/org/chdb/jdbc/ChdbJniUtil.java @@ -5,8 +5,11 @@ public class ChdbJniUtil { System.loadLibrary("chdbjni"); } - public static native LocalResultV2 executeQuery(String query); -// public static native String executeQuery(String query); + public static LocalResultV2 executeQuery(String query) { + return executeQuery(query, "CSV"); + } + + public static native LocalResultV2 executeQuery(String query, String format); // public static void main(String[] args) { // String query = "SELECT 1"; diff --git a/src/main/java/org/chdb/jdbc/ChdbResultSet.java b/src/main/java/org/chdb/jdbc/ChdbResultSet.java index 32057b5..56d0cb9 100644 --- a/src/main/java/org/chdb/jdbc/ChdbResultSet.java +++ b/src/main/java/org/chdb/jdbc/ChdbResultSet.java @@ -1,33 +1,38 @@ package org.chdb.jdbc; -import java.io.*; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.chdb.jdbc.memory.ArrowMemoryManger; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; import java.math.BigDecimal; import java.net.URL; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.sql.*; -import java.util.ArrayList; import java.util.Calendar; -import java.util.List; import java.util.Map; public class ChdbResultSet implements ResultSet { - private LocalResultV2 result; private int cursor = -1; - private List<String> records; + private int loadedRows = 0; + private ArrowStreamReader arrowStreamReader; + private VectorSchemaRoot batch; + private int batchCursor; public ChdbResultSet(LocalResultV2 result) throws IOException { - this.result = result; - this.records = new ArrayList<>(); - parseData(result.getBuf()); + parseData(result.getBuf()); } private void parseData(ByteBuffer buffer) throws IOException { - BufferedReader reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(toByteArray(buffer)), StandardCharsets.UTF_8)); - String line; - while ((line = reader.readLine()) != null) { - records.add(line); - } + ByteArrayInputStream inputStream = new ByteArrayInputStream(toByteArray(buffer)); + this.arrowStreamReader = new ArrowStreamReader(inputStream, ArrowMemoryManger.ROOT_ALLOCATOR); } private byte[] toByteArray(ByteBuffer buffer) { @@ -38,24 +43,54 @@ private byte[] toByteArray(ByteBuffer buffer) { @Override public boolean next() throws SQLException { - if (cursor < records.size() - 1) { + if (cursor < loadedRows - 1) { cursor++; + batchCursor++; return true; } - return false; + try { + if (arrowStreamReader.loadNextBatch()) { + AutoCloseables.close(batch); + batch = arrowStreamReader.getVectorSchemaRoot(); + loadedRows += batch.getRowCount(); + cursor++; + batchCursor = 0; + return true; + } else { + return false; + } + } catch (Exception e) { + throw new SQLException(e); + } } - private String getValue(int columnIndex) throws SQLException { - if (cursor < 0 || cursor >= records.size()) { - throw new SQLException("Cursor out of bounds"); + /** + * Checks to see whether the given index is a valid column number and throws + * an <code>SQLException</code> if it is not. The index is out of bounds + * if it is less than <code>1</code> or greater than the number of + * columns in this rowset. + * <p> + * This method is called internally by the <code>getXXX</code> and + * <code>updateXXX</code> methods. + * + * @param idx the number of a column, must be between <code>1</code> + * and the number of rows in this rowset + * @throws SQLException if the given index is out of bounds + */ + private void checkColumnIndex(int idx) throws SQLException { + if (idx < 1 || idx > getColumnCount()) { + throw new SQLException("Column index " + idx + " is out of bound[1, " + getColumnCount() +"]"); } + } - return records.get(cursor); + private int getColumnCount() { + return batch.getSchema().getFields().size(); } @Override public String getString(int columnIndex) throws SQLException { - return getValue(columnIndex); + checkColumnIndex(columnIndex); + return batch.getVector(columnIndex - 1).getObject(batchCursor).toString(); } @Override @@ -75,12 +110,22 @@ public short getShort(int i) throws SQLException { @Override public int getInt(int columnIndex) throws SQLException { - return Integer.parseInt(getValue(columnIndex)); + checkColumnIndex(columnIndex); + FieldVector vector = batch.getVector(columnIndex - 1); + if (vector instanceof IntVector) { + return ((IntVector) vector).get(batchCursor); + } + throw new SQLException("Column [" + vector.getField() + "] is not int"); } @Override - public long getLong(int i) throws SQLException { - throw new SQLException("This method has not been implemented yet."); + public long getLong(int columnIndex) throws SQLException { + checkColumnIndex(columnIndex); + FieldVector vector = batch.getVector(columnIndex); + if (vector instanceof BigIntVector) { + return ((BigIntVector) vector).get(batchCursor); + } + throw new SQLException("Column [" + vector.getField() + "] is not long"); } @Override @@ -135,7 +180,11 @@ public InputStream getBinaryStream(int i) throws SQLException { @Override public void close() throws SQLException { - // No resources to close + try { + AutoCloseables.close(batch, arrowStreamReader); + } catch (Exception e) { + throw new SQLException(e); + } } @Override diff --git a/src/main/java/org/chdb/jdbc/ChdbStatement.java b/src/main/java/org/chdb/jdbc/ChdbStatement.java index 9cff3a0..f0388b5 100644 --- a/src/main/java/org/chdb/jdbc/ChdbStatement.java +++ b/src/main/java/org/chdb/jdbc/ChdbStatement.java @@ -12,7 +12,7 @@ public ChdbStatement(ChdbConnection connection) { @Override public ResultSet executeQuery(String sql) throws SQLException { - LocalResultV2 result = ChdbJniUtil.executeQuery(sql); + LocalResultV2 result = ChdbJniUtil.executeQuery(sql, "ArrowStream"); System.out.println("sql: " + sql); try { return new ChdbResultSet(result); diff --git a/src/main/java/org/chdb/jdbc/memory/ArrowMemoryManger.java b/src/main/java/org/chdb/jdbc/memory/ArrowMemoryManger.java new file mode 100644 index 0000000..43af7b3 --- /dev/null +++ b/src/main/java/org/chdb/jdbc/memory/ArrowMemoryManger.java @@ -0,0 +1,7 @@ +package org.chdb.jdbc.memory; + +import org.apache.arrow.memory.RootAllocator; + +public class ArrowMemoryManger { + public static RootAllocator ROOT_ALLOCATOR = new RootAllocator(); +} diff --git a/src/test/java/ChdbJdbcTest.java b/src/test/java/ChdbJdbcTest.java index 48f3f86..658d985 100644 --- a/src/test/java/ChdbJdbcTest.java +++ b/src/test/java/ChdbJdbcTest.java @@ -1,4 +1,4 @@ -import org.junit.jupiter.api.Test; +import org.chdb.jdbc.memory.ArrowMemoryManger; import java.sql.Connection; import java.sql.DriverManager; @@ -7,8 +7,10 @@ import static org.junit.jupiter.api.Assertions.*; -public class ChdbJdbcTest { +public class ChdbJdbcTest { + // TODO failed to run by junit, it will crash + // Note: adding --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED public static void main(String[] args) throws Exception { // Load the JDBC driver Class.forName("org.chdb.jdbc.ChdbDriver"); @@ -31,5 +33,6 @@ public static void main(String[] args) throws Exception { rs.close(); conn.close(); + assertEquals(0, ArrowMemoryManger.ROOT_ALLOCATOR.getAllocatedMemory()); } }