paramTreeMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+ paramTreeMap.putAll(parameters);
+
+ HpccOptions options = new HpccOptions(paramTreeMap);
+ return options;
+ }
+
+}
diff --git a/spark-hpcc/src/main/java/org/hpccsystems/spark/datasource/package-info.java b/spark-hpcc/src/main/java/org/hpccsystems/spark/datasource/package-info.java
new file mode 100644
index 000000000..9bf54e0f5
--- /dev/null
+++ b/spark-hpcc/src/main/java/org/hpccsystems/spark/datasource/package-info.java
@@ -0,0 +1,6 @@
+/**
+ *
+ * Provides mechanism to stream HPCC Systems data via Spark Relation.
+ *
+ */
+package org.hpccsystems.spark.datasource;
diff --git a/spark-hpcc/src/main/java/org/hpccsystems/spark/package-info.java b/spark-hpcc/src/main/java/org/hpccsystems/spark/package-info.java
new file mode 100644
index 000000000..5f612d7f1
--- /dev/null
+++ b/spark-hpcc/src/main/java/org/hpccsystems/spark/package-info.java
@@ -0,0 +1,38 @@
+/*******************************************************************************
+ * HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+/**
+ * Provides access to data residing in HPCC Systems or Spark environments.
+ *
+ * The DFSClient from HPCC Systems is used to used to access
+ * HPCC Systems data files' metadata including the location and layout of the file, and
+ * also requests data file access privilages.
+ * An RDD is provided to read the file in parallel by file part.
+ *
+ * The main classes are:
+ *
+ * - Content is the abstract class defining field content. There are concrete
+ * classes for each of the different content types.
+ * - FieldType is an enumeration type listing the types of content.
+ * - HpccPart implements the Spark Partition interface.
+ * - HpccFile is the metadata for a file on an HPCC THOR cluster.
+ * - HpccFileException is the general exception class.
+ * - HpccRDD extends RDD(Record) class for Spark.
+ * - HpccRemoteFileReader is the facade for the type of file reader.
+ * - Record is the container class holding the data for a record from THOR.
+ *
+ *
+ */
+package org.hpccsystems.spark;
diff --git a/spark-hpcc/src/main/javadoc/overview.html b/spark-hpcc/src/main/javadoc/overview.html
new file mode 100644
index 000000000..2d17f3c7e
--- /dev/null
+++ b/spark-hpcc/src/main/javadoc/overview.html
@@ -0,0 +1,7 @@
+
+
+This project enables HPCC Systems / Spark interoperability.
+
+This project contains the classes which expose distributed streaming of HPCC based data via Spark constructs. In addition, the HPCC data is exposed as a Dataframe for the convenience of the Spark developer.
+
+
\ No newline at end of file
diff --git a/spark-hpcc/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/spark-hpcc/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644
index 000000000..570936da3
--- /dev/null
+++ b/spark-hpcc/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -0,0 +1 @@
+org.hpccsystems.spark.datasource.HpccRelationProvider
diff --git a/spark-hpcc/src/main/resources/log4j.properties b/spark-hpcc/src/main/resources/log4j.properties
new file mode 100644
index 000000000..b8747f7c2
--- /dev/null
+++ b/spark-hpcc/src/main/resources/log4j.properties
@@ -0,0 +1,27 @@
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+
+# Set the default spark-shell log level to WARN. When running the spark-shell, the
+# log level for this class is used to overwrite the root logger's log level, so that
+# the user can have different defaults for the shell and regular Spark apps.
+log4j.logger.org.apache.spark.repl.Main=WARN
+
+# Settings to quiet third party logs that are too verbose
+log4j.logger.org.spark_project.jetty=WARN
+log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
+log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
+log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
+log4j.logger.org.apache.parquet=ERROR
+log4j.logger.parquet=ERROR
+log4j.logger.org.apache.axis2.enterprise=FATAL
+log4j.logger.de.hunsicker.jalopy.io=FATAL
+log4j.logger.httpclient.wire.header=FATAL
+log4j.logger.org.apache.commons.httpclient=FATAL
+
+# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
+log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
+log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
\ No newline at end of file
diff --git a/spark-hpcc/src/test/java/org/hpccsystems/spark/BaseIntegrationTest.java b/spark-hpcc/src/test/java/org/hpccsystems/spark/BaseIntegrationTest.java
new file mode 100644
index 000000000..6c1749fa7
--- /dev/null
+++ b/spark-hpcc/src/test/java/org/hpccsystems/spark/BaseIntegrationTest.java
@@ -0,0 +1,167 @@
+/*******************************************************************************
+ * HPCC SYSTEMS software Copyright (C) 2023 HPCC Systems®.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.hpccsystems.spark;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.sql.SparkSession;
+
+class BaseIntegrationTest
+{
+ static SparkContext sparkContext = null;
+
+ public File findRecentlyBuiltSparkJar()
+ {
+ try
+ {
+ URL url = BaseIntegrationTest.class.getProtectionDomain().getCodeSource().getLocation();
+ Path parentPath = Paths.get(url.toURI()).getParent();
+
+ FilenameFilter filter = new FilenameFilter()
+ {
+ @Override
+ public boolean accept(File dir, String name)
+ {
+ return name.matches("spark-hpcc.*-jar-with-dependencies\\.jar");
+ }
+ };
+
+ File[] files = parentPath.toFile().listFiles(filter);
+ if (files != null && files.length > 0)
+ {
+ // Return the mostly recently modified Spark jar. This should always be the correct jar
+ // as the integration tests will run right after the build step is complete.
+ File mostRecentlyModifiedFile = null;
+ long lastModifiedTime = Long.MIN_VALUE;
+
+ for (File file : files)
+ {
+ long modifiedTime = file.lastModified();
+ if (modifiedTime > lastModifiedTime)
+ {
+ mostRecentlyModifiedFile = file;
+ lastModifiedTime = modifiedTime;
+ }
+ }
+
+ return mostRecentlyModifiedFile;
+ }
+ } catch (Exception e)
+ {
+ System.out.println("Error finding spark jar file with exception: " + e.getMessage());
+ }
+
+ return null;
+ }
+
+ public SparkConf getDefaultSparkConf()
+ {
+ File sparkJar = findRecentlyBuiltSparkJar();
+
+ String sparkJarPath = "";
+ if (sparkJar != null)
+ {
+ sparkJarPath = sparkJar.getAbsolutePath();
+ System.out.println("Spark jar: " + sparkJarPath);
+ }
+ else
+ {
+ System.out.println("Unable to find spark jar matching pattern: spark-hpcc.*-jar-with-dependencies.jar, "
+ + "in directory [PROJECT_ROOT]/spark-hpcc/target/, check maven package / verify output for errors.");
+ }
+
+ String[] jars = {
+ sparkJarPath
+ };
+
+ return new SparkConf()
+ .setMaster("local")
+ .setAppName("Spark-HPCC-Connector-Test")
+ .set("spark.driver.allowMultipleContexts", "false")
+ .set("spark.sql.allowMultipleContexts", "false")
+ .setJars(jars);
+ }
+
+ public SparkContext getOrCreateSparkContext()
+ {
+ if (sparkContext != null)
+ {
+ return sparkContext;
+ }
+
+ return getOrCreateSparkContext(getDefaultSparkConf());
+ }
+
+ public SparkContext getOrCreateSparkContext(SparkConf conf)
+ {
+ if (sparkContext != null)
+ {
+ sparkContext.stop();
+ SparkSession.clearActiveSession();
+ SparkSession.clearDefaultSession();
+
+ sparkContext = new SparkContext(conf);
+ }
+
+ return sparkContext;
+ }
+
+ public SparkSession getOrCreateSparkSession()
+ {
+ SparkSession spark = SparkSession
+ .builder()
+ .appName("Spark-HPCC-Connector-Test")
+ .config(getDefaultSparkConf())
+ .getOrCreate();
+ return spark;
+ }
+
+ public SparkSession getOrCreateSparkSession(SparkConf conf)
+ {
+ SparkSession spark = SparkSession
+ .builder()
+ .appName("Spark-HPCC-Connector-Test")
+ .config(conf)
+ .getOrCreate();
+ return spark;
+ }
+
+ public String getHPCCClusterURL()
+ {
+ return System.getProperty("hpccconn", "https://eclwatch.default:8010");
+ }
+
+ public String getHPCCClusterUser()
+ {
+ return System.getProperty("hpccuser", "");
+ }
+
+ public String getHPCCClusterPass()
+ {
+ return System.getProperty("hpccpass", "");
+ }
+
+ public String getThorCluster()
+ {
+ return System.getProperty("thorclustername", "data");
+ }
+}
diff --git a/spark-hpcc/src/test/java/org/hpccsystems/spark/DataframeIntegrationTest.java b/spark-hpcc/src/test/java/org/hpccsystems/spark/DataframeIntegrationTest.java
new file mode 100644
index 000000000..46123424a
--- /dev/null
+++ b/spark-hpcc/src/test/java/org/hpccsystems/spark/DataframeIntegrationTest.java
@@ -0,0 +1,179 @@
+/*******************************************************************************
+ * HPCC SYSTEMS software Copyright (C) 2023 HPCC Systems®.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.hpccsystems.spark;
+
+import java.util.List;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DataframeIntegrationTest extends BaseIntegrationTest
+{
+
+ @Test
+ public void integerKeyValueWriteReadTest()
+ {
+ SparkSession spark = getOrCreateSparkSession();
+
+ // Create the schema
+ StructType schema = DataTypes.createStructType(new StructField[] {
+ DataTypes.createStructField("key", DataTypes.LongType, false),
+ DataTypes.createStructField("value", DataTypes.LongType, false)
+ });
+
+ // Write dataset to HPCC
+ List rows = new ArrayList();
+ for (int i = 0; i < 1000; i++) {
+ Object[] fields = new Object[2];
+ fields[0] = Long.valueOf(i);
+ fields[1] = Long.valueOf(i);
+ rows.add(new GenericRowWithSchema(fields, schema));
+ }
+
+ Dataset writtenDataSet = spark.createDataFrame(rows, schema);
+
+ String datasetPath = "spark::test::integer_kv";
+ writtenDataSet.write()
+ .format("hpcc")
+ .mode("overwrite")
+ .option("cluster", getThorCluster())
+ .option("host", getHPCCClusterURL())
+ .option("username", getHPCCClusterUser())
+ .option("password", getHPCCClusterPass())
+ .save(datasetPath);
+
+ // Read dataset from HPCC
+ Dataset readDataSet = spark.read()
+ .format("hpcc")
+ .option("cluster", getThorCluster())
+ .option("host", getHPCCClusterURL())
+ .option("username", getHPCCClusterUser())
+ .option("password", getHPCCClusterPass())
+ .load(datasetPath);
+
+ StructType readSchema = readDataSet.schema();
+ System.out.println(readSchema);
+
+ Dataset diff = writtenDataSet.exceptAll(readDataSet);
+ Assert.assertTrue("Difference found between written and read datasets", diff.isEmpty());
+ }
+
+ @Test
+ public void allTypesWriteReadTest()
+ {
+ SparkSession spark = getOrCreateSparkSession();
+
+ StructType inlineSchema = DataTypes.createStructType(new StructField[] {
+ DataTypes.createStructField("key", DataTypes.IntegerType, false),
+ DataTypes.createStructField("val", DataTypes.IntegerType, false)
+ });
+
+ StructType childSchema = DataTypes.createStructType(new StructField[] {
+ DataTypes.createStructField("test", DataTypes.IntegerType, false),
+ DataTypes.createStructField("test2", DataTypes.IntegerType, false)
+ });
+
+ // Create the schema
+ StructType schema = DataTypes.createStructType(new StructField[] {
+ DataTypes.createStructField("byteVal", DataTypes.ByteType, false),
+ DataTypes.createStructField("shortVal", DataTypes.ShortType, false),
+ DataTypes.createStructField("intVal", DataTypes.IntegerType, false),
+ DataTypes.createStructField("longVal", DataTypes.LongType, false),
+ DataTypes.createStructField("floatVal", DataTypes.FloatType, false),
+ DataTypes.createStructField("doubleVal", DataTypes.DoubleType, false),
+ DataTypes.createStructField("decimalVal", DataTypes.createDecimalType(16, 8), false),
+ DataTypes.createStructField("stringVal", DataTypes.StringType, false),
+ DataTypes.createStructField("binaryVal", DataTypes.BinaryType, false),
+ DataTypes.createStructField("setVal", DataTypes.createArrayType(DataTypes.IntegerType), false),
+ DataTypes.createStructField("inlineRec", inlineSchema, false),
+ DataTypes.createStructField("childDataset", DataTypes.createArrayType(childSchema), false),
+ });
+
+ // Write dataset to HPCC
+ List rows = new ArrayList();
+ for (int i = 0; i < 1000; i++)
+ {
+ Object[] fields = new Object[12];
+ fields[0] = Byte.valueOf((byte) i);
+ fields[1] = Short.valueOf((short) i);
+ fields[2] = Integer.valueOf((int) i);
+ fields[3] = Long.valueOf((long) i);
+ fields[4] = Float.valueOf(0);
+ fields[5] = Double.valueOf(10.42);
+ fields[6] = new BigDecimal(10.42);
+ fields[7] = "TestString";
+ fields[8] = new String("BinaryVal").getBytes();
+
+ Integer[] set = new Integer[2];
+ set[0] = Integer.valueOf(i);
+ set[1] = Integer.valueOf(i);
+ fields[9] = set;
+
+ Object[] inlineRec = new Object[2];
+ inlineRec[0] = Integer.valueOf(i);
+ inlineRec[1] = Integer.valueOf(i);
+ fields[10] = new GenericRowWithSchema(inlineRec, childSchema);
+
+ int numChildRows = 10;
+ List childDataset = new ArrayList();
+ for (int j = 0; j < numChildRows; j++)
+ {
+ Object[] childRec = new Object[2];
+ childRec[0] = Integer.valueOf(j);
+ childRec[1] = Integer.valueOf(j);
+
+ childDataset.add(new GenericRowWithSchema(childRec, childSchema));
+ }
+ fields[11] = childDataset.toArray();
+
+ rows.add(new GenericRowWithSchema(fields, schema));
+ }
+
+ Dataset writtenDataSet = spark.createDataFrame(rows, schema);
+
+ String datasetPath = "spark::test::all_types";
+ writtenDataSet.write()
+ .format("hpcc")
+ .mode("overwrite")
+ .option("cluster", getThorCluster())
+ .option("host", getHPCCClusterURL())
+ .option("username", getHPCCClusterUser())
+ .option("password", getHPCCClusterPass())
+ .save(datasetPath);
+
+ // Read dataset from HPCC
+ Dataset readDataSet = spark.read()
+ .format("hpcc")
+ .option("cluster", getThorCluster())
+ .option("host", getHPCCClusterURL())
+ .option("username", getHPCCClusterUser())
+ .option("password", getHPCCClusterPass())
+ .load(datasetPath);
+
+ Dataset diff = writtenDataSet.exceptAll(readDataSet);
+ Assert.assertTrue("Difference found between written and read datasets", diff.isEmpty());
+ }
+}
diff --git a/spark-hpcc/src/test/java/org/hpccsystems/spark/FileFilterTests.java b/spark-hpcc/src/test/java/org/hpccsystems/spark/FileFilterTests.java
new file mode 100644
index 000000000..bedb8ab6a
--- /dev/null
+++ b/spark-hpcc/src/test/java/org/hpccsystems/spark/FileFilterTests.java
@@ -0,0 +1,111 @@
+/*******************************************************************************
+ * HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.hpccsystems.spark;
+
+import org.junit.Assert;
+import org.apache.spark.sql.sources.EqualTo;
+import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.sources.GreaterThan;
+import org.apache.spark.sql.sources.GreaterThanOrEqual;
+import org.apache.spark.sql.sources.In;
+import org.apache.spark.sql.sources.LessThan;
+import org.apache.spark.sql.sources.Not;
+import org.apache.spark.sql.sources.Or;
+import org.apache.spark.sql.sources.StringStartsWith;
+import org.hpccsystems.commons.ecl.FileFilter;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.junit.experimental.categories.Category;
+
+@Category(org.hpccsystems.commons.annotations.BaseTests.class)
+public class FileFilterTests
+{
+
+ @Before
+ public void setUp() throws Exception
+ {
+ }
+
+ @Test
+ public void testNotSparkFilterstoHPCCFilters()
+ {
+ System.out.println("\n----------Spark 'Not' filter to HPCC Tests----------");
+
+ try
+ {
+ Filter child = new LessThan("field1", 8);
+ Not notlessthan = new Not(child);
+ FileFilter hpccnotlessthan = FileFilterConverter.ConvertToHPCCFileFilterString(notlessthan);
+ Assert.assertNotNull(hpccnotlessthan);
+
+ GreaterThanOrEqual gte = new GreaterThanOrEqual("field1", 8);
+ FileFilter hpccgte = FileFilterConverter.ConvertToHPCCFileFilterString(gte);
+ Assert.assertNotNull(hpccgte);
+
+ Assert.assertEquals(hpccnotlessthan.toJson(), hpccgte.toJson());
+ }
+ catch (Exception e) {
+ // TODO: handle exception
+ }
+ }
+
+ @Test
+ public void testSparkFilterstoHPCCFilters()
+ {
+
+ System.out.println("\n----------Spark to HPCC filter Tests----------");
+
+ org.apache.spark.sql.sources.Filter [] sparkfilters = new org.apache.spark.sql.sources.Filter[8];
+ StringStartsWith ssw = new StringStartsWith("Fname", "Rod");
+ LessThan lt = new LessThan("field1", 12);
+ GreaterThan gt = new GreaterThan("field1", 8);
+ Or or = new Or(lt, gt);
+ sparkfilters[0] = ssw;
+ sparkfilters[1] = or;
+
+ In in = new In("field1", new Object [] { "str", "values", "etc"});
+ sparkfilters[2] = in;
+
+ In innumber = new In("field1", new Object [] { 1, 2, 3, 4, 5.6});
+ sparkfilters[3] = innumber;
+
+ LessThan lta = new LessThan("alphafield", "XYZ");
+ sparkfilters[4] = lta;
+
+ Filter child = new EqualTo("field1", "true");
+ org.apache.spark.sql.sources.Not n = new org.apache.spark.sql.sources.Not(child );
+ sparkfilters[5] = n;
+
+ Filter eq5 = new EqualTo("field1", 5);
+ sparkfilters[6] = eq5;
+
+ child = new LessThan("field1", -3.2);
+ n = new Not(child);
+ sparkfilters[7] = n;
+
+ try
+ {
+ FileFilter hpccfilters = FileFilterConverter.CovertToHPCCFileFilter(sparkfilters);
+ System.out.println("\n----------Converting Spark to HPCC filter output----------");
+ System.out.println(hpccfilters.toJson());
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/spark-hpcc/src/test/java/org/hpccsystems/spark/HpccRelationIntegrationTest.java b/spark-hpcc/src/test/java/org/hpccsystems/spark/HpccRelationIntegrationTest.java
new file mode 100644
index 000000000..6d1590f15
--- /dev/null
+++ b/spark-hpcc/src/test/java/org/hpccsystems/spark/HpccRelationIntegrationTest.java
@@ -0,0 +1,192 @@
+package org.hpccsystems.spark;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TreeMap;
+
+import javax.xml.validation.Schema;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+import org.apache.spark.sql.sources.EqualTo;
+import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.sources.GreaterThan;
+import org.apache.spark.sql.sources.In;
+import org.apache.spark.sql.sources.IsNull;
+import org.apache.spark.sql.sources.LessThan;
+import org.apache.spark.sql.sources.Not;
+import org.apache.spark.sql.sources.Or;
+import org.apache.spark.sql.sources.StringContains;
+import org.apache.spark.sql.sources.StringEndsWith;
+import org.apache.spark.sql.sources.StringStartsWith;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.hpccsystems.dfs.client.CompressionAlgorithm;
+import org.hpccsystems.spark.datasource.HpccOptions;
+import org.hpccsystems.spark.datasource.HpccRelation;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+@Category(org.hpccsystems.commons.annotations.BaseTests.class)
+public class HpccRelationIntegrationTest extends BaseIntegrationTest
+{
+ @Test
+ public void testbuildScanAllValid() throws Exception
+ {
+ SparkSession spark = getOrCreateSparkSession();
+ SQLContext sqlcontext = new SQLContext(spark);
+
+ // Create the schema
+ StructType schema = DataTypes.createStructType(new StructField[] {
+ DataTypes.createStructField("key", DataTypes.LongType, false),
+ DataTypes.createStructField("value", DataTypes.LongType, false)
+ });
+
+ // Write dataset to HPCC
+ List rows = new ArrayList();
+ for (int i = 0; i < 1000; i++) {
+ Object[] fields = new Object[2];
+ fields[0] = Long.valueOf(i);
+ fields[1] = Long.valueOf(i);
+ rows.add(new GenericRowWithSchema(fields, schema));
+ }
+
+ Dataset writtenDataSet = spark.createDataFrame(rows, schema);
+
+ String testDataset = "spark::test::integer_kv";
+ writtenDataSet.write()
+ .format("hpcc")
+ .mode("overwrite")
+ .option("cluster", getThorCluster())
+ .option("host", getHPCCClusterURL())
+ .option("username", getHPCCClusterUser())
+ .option("password", getHPCCClusterPass())
+ .save(testDataset);
+
+ TreeMap paramTreeMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+ paramTreeMap.put("host", getHPCCClusterURL());
+ paramTreeMap.put("path", testDataset);
+ paramTreeMap.put("cluster", getThorCluster());
+ paramTreeMap.put("username", getHPCCClusterUser());
+ paramTreeMap.put("password", getHPCCClusterPass());
+
+ HpccOptions hpccopts = new HpccOptions(paramTreeMap);
+ HpccRelation hpccRelation = new HpccRelation(sqlcontext, hpccopts);
+
+ Filter[] supportedSparkFilters = {
+ new Or(new LessThan("key", 12), new GreaterThan("key", 8)),
+ new In("key", new Object [] { 1, 2, 3, 4, 5}),
+ new EqualTo("key", 5),
+ new Not(new LessThan("key", 3)),
+ };
+
+ RDD rdd = hpccRelation.buildScan(new String[]{"key"}, supportedSparkFilters);
+ Assert.assertTrue("Unexpected filter result count", rdd.count() == 1);
+ }
+
+ @Test
+ public void testOptionsPassThrough() throws Exception
+ {
+ SparkSession spark = getOrCreateSparkSession();
+ SQLContext sqlcontext = new SQLContext(spark);
+
+ TreeMap paramTreeMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+
+ String url = getHPCCClusterURL();
+ String user = "user";
+ String pass = "pass";
+ paramTreeMap.put("host", url);
+ paramTreeMap.put("username", user);
+ paramTreeMap.put("password", pass);
+
+ String path = "spark::test::integer_kv";
+ paramTreeMap.put("path", path);
+
+ paramTreeMap.put("cluster", getThorCluster());
+ paramTreeMap.put("useTLK", "True"); // Defaults to false, also should be case insensitive
+ paramTreeMap.put("fileAccessTimeout", "120000");
+ paramTreeMap.put("limitPerFilePart", "100");
+
+ String projectList = "key, value";
+ paramTreeMap.put("projectList", projectList);
+
+ String filterStr = "key > 5";
+ paramTreeMap.put("filter", filterStr);
+
+ paramTreeMap.put("compression", "LZ4");
+
+ HpccOptions hpccopts = new HpccOptions(paramTreeMap);
+
+ // These options don't currently have accessors in HPCCFile
+ Assert.assertEquals(url, hpccopts.connectionInfo.getUrl());
+ Assert.assertEquals(user, hpccopts.connectionInfo.getUserName());
+ Assert.assertEquals(pass, hpccopts.connectionInfo.getPassword());
+ Assert.assertEquals(filterStr, hpccopts.filterString);
+ Assert.assertEquals(CompressionAlgorithm.LZ4, hpccopts.compression);
+
+ HpccRelation hpccRelation = new HpccRelation(sqlcontext, hpccopts);
+
+ Assert.assertEquals(true, hpccRelation.getFile().getUseTLK());
+ Assert.assertEquals(getThorCluster(), hpccRelation.getFile().getTargetfilecluster());
+ Assert.assertEquals(path, hpccRelation.getFile().getFileName());
+ Assert.assertEquals(120000, hpccRelation.getFile().getFileAccessExpirySecs());
+ Assert.assertEquals(100, hpccRelation.getFile().getFilePartRecordLimit());
+ Assert.assertEquals(projectList, hpccRelation.getFile().getProjectList());
+ }
+
+ @Test
+ public void testUnhandledFiltersAllValid() throws Exception
+ {
+ HpccRelation hpccRelation = new HpccRelation(null, null);
+
+ Filter[] supportedSparkFilters = {
+ new StringStartsWith("fixstr8", "Rod"),
+ new Or(new LessThan("int8", 12), new GreaterThan("int8", 8)),
+ new In("int8", new Object [] { "str", "values", "etc"}),
+ new In("int8", new Object [] { 1, 2, 3, 4, 5.6}),
+ new LessThan("fixstr8", "XYZ"),
+ new Not(new EqualTo("fixstr8", "true")),
+ new EqualTo("int8", 5),
+ new Not(new LessThan("int8", 3))
+ };
+
+ Filter [] unhandledsparkfilters = hpccRelation.unhandledFilters(supportedSparkFilters);
+
+ Assert.assertTrue("Unexpected unhandled filters detected" , unhandledsparkfilters.length == 0);
+ }
+
+ @Test
+ public void testUnhandledFiltersNoneValid() throws Exception
+ {
+ HpccRelation hpccRelation = new HpccRelation(null, null);
+
+ Filter[] unsupportedSparkFilters = {
+ new IsNull("something"),
+ new Or(new LessThan("int8", 12), new GreaterThan("int4", 8)),
+ new Not(new Or(new LessThan("int8", 12), new GreaterThan("int8", 8))),
+ new Not(new In("int8", new Object [] { 1, 2, 3, 4, 5.6})),
+ new StringContains("somestring", "some"),
+ new StringEndsWith("somestring", "ing")
+ };
+
+ Filter[] unhandledsparkfilters = hpccRelation.unhandledFilters(unsupportedSparkFilters);
+
+ Assert.assertTrue("Unexpected unhandled filters detected" , unhandledsparkfilters.length == unsupportedSparkFilters.length);
+ }
+}