diff --git a/examples/linear-regression/src/main/scala/org/apache/spark/examples/ml/LinearRegressionExample.scala b/examples/linear-regression/src/main/scala/org/apache/spark/examples/ml/LinearRegressionExample.scala index 2c72f7702..0912103d3 100644 --- a/examples/linear-regression/src/main/scala/org/apache/spark/examples/ml/LinearRegressionExample.scala +++ b/examples/linear-regression/src/main/scala/org/apache/spark/examples/ml/LinearRegressionExample.scala @@ -108,7 +108,7 @@ object LinearRegressionExample { println(s"LinearRegressionExample with parameters:\n$params") val training = spark.read.format("libsvm") - .load(params.input) + .load(params.input).toDF("label", "features") val lir = new LinearRegression() .setFeaturesCol("features") diff --git a/examples/naive-bayes/src/main/scala/org/apache/spark/examples/ml/NaiveBayesExample.scala b/examples/naive-bayes/src/main/scala/org/apache/spark/examples/ml/NaiveBayesExample.scala index 4dc6c82bd..202930996 100644 --- a/examples/naive-bayes/src/main/scala/org/apache/spark/examples/ml/NaiveBayesExample.scala +++ b/examples/naive-bayes/src/main/scala/org/apache/spark/examples/ml/NaiveBayesExample.scala @@ -38,14 +38,14 @@ object NaiveBayesExample { // $example on$ // Load the data stored in LIBSVM format as a DataFrame. - val data = spark.read.format("libsvm").load(args(0)) - - // Split the data into training and test sets (30% held out for testing) + val data = spark.read.format("libsvm").load(args(0)).toDF("label", "features") val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3), seed = 1234L) // Train a NaiveBayes model. val model = new NaiveBayes() - .fit(trainingData) + .setLabelCol("label") + .setFeaturesCol("features") + .fit(data) // Select example rows to display. val predictions = model.transform(testData) @@ -59,7 +59,6 @@ object NaiveBayesExample { val accuracy = evaluator.evaluate(predictions) println(s"Test set accuracy = $accuracy") // $example off$ - spark.stop() } } diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala index 0af24207c..0558e0092 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala @@ -20,14 +20,14 @@ import com.intel.daal.data_management.data.{CSRNumericTable, HomogenNumericTable import com.intel.daal.services.DaalContext import org.apache.spark.SparkContext import org.apache.spark.ml.linalg.{DenseMatrix, DenseVector, Matrix, SparseVector, Vector, Vectors} -import org.apache.spark.mllib.linalg.{Vector => OldVector, Matrix => OldMatrix, DenseMatrix => OldDenseMatrix} +import org.apache.spark.mllib.linalg.{DenseMatrix => OldDenseMatrix, Matrix => OldMatrix, Vector => OldVector} import org.apache.spark.rdd.{ExecutorInProcessCoalescePartitioner, RDD} import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.apache.spark.storage.StorageLevel - import java.lang import java.nio.DoubleBuffer import java.util.logging.{Level, Logger} + import scala.collection.mutable.ArrayBuffer object OneDAL { @@ -73,8 +73,8 @@ object OneDAL { OldMatrix } - def isDenseDataset(ds: Dataset[_]): Boolean = { - val row = ds.select("features").head() + def isDenseDataset(ds: Dataset[_], featuresCol: String): Boolean = { + val row = ds.select(featuresCol).head() row.get(0).isInstanceOf[DenseVector] } @@ -176,6 +176,8 @@ object OneDAL { def rddLabeledPointToSparseTables(labeledPoints: Dataset[_], + labelCol: String, + featuresCol: String, executorNum: Int): RDD[(Long, Long)] = { require(executorNum > 0) @@ -193,7 +195,7 @@ object OneDAL { dataForConversion.cache().count() - val labeledPointsRDD = dataForConversion.toDF().map { + val labeledPointsRDD = dataForConversion.select(labelCol, featuresCol).toDF().map { case Row(label: Double, features: Vector) => (features, label) }.rdd @@ -283,6 +285,8 @@ object OneDAL { } def rddLabeledPointToSparseTables_shuffle(labeledPoints: Dataset[_], + labelCol: String, + featuresCol: String, executorNum: Int): RDD[(Long, Long)] = { require(executorNum > 0) @@ -290,7 +294,7 @@ object OneDAL { val spark = SparkSession.active - val labeledPointsRDD = labeledPoints.rdd.map { + val labeledPointsRDD = labeledPoints.select(labelCol, featuresCol).rdd.map { case Row(label: Double, features: Vector) => (features, label) } @@ -321,6 +325,8 @@ object OneDAL { } def rddLabeledPointToMergedTables(labeledPoints: Dataset[_], + labelCol: String, + featuresCol: String, executorNum: Int): RDD[(Long, Long)] = { require(executorNum > 0) @@ -336,7 +342,8 @@ object OneDAL { labeledPoints } - val tables = dataForConversion.toDF().mapPartitions { it: Iterator[Row] => + val tables = dataForConversion.select(labelCol, featuresCol) + .toDF().mapPartitions { it: Iterator[Row] => val rows = it.toArray val features = rows.map { diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/NaiveBayesDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/NaiveBayesDALImpl.scala index 88029fc96..8bb39b2e9 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/NaiveBayesDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/NaiveBayesDALImpl.scala @@ -37,14 +37,16 @@ class NaiveBayesDALImpl(val uid: String, val executorNum: Int, val executorCores: Int ) extends Serializable with Logging { - def train(labeledPoints: Dataset[_]): NaiveBayesDALModel = { + def train(labeledPoints: Dataset[_], + labelCol: String, + featuresCol: String): NaiveBayesDALModel = { val kvsIPPort = getOneCCLIPPort(labeledPoints.rdd) - val labeledPointsTables = if (OneDAL.isDenseDataset(labeledPoints)) { - OneDAL.rddLabeledPointToMergedTables(labeledPoints, executorNum) + val labeledPointsTables = if (OneDAL.isDenseDataset(labeledPoints, featuresCol)) { + OneDAL.rddLabeledPointToMergedTables(labeledPoints, labelCol, featuresCol, executorNum) } else { - OneDAL.rddLabeledPointToSparseTables(labeledPoints, executorNum) + OneDAL.rddLabeledPointToSparseTables(labeledPoints, labelCol, featuresCol, executorNum) } val results = labeledPointsTables.mapPartitionsWithIndex { diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala index b673094cf..14ed4853d 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala @@ -55,16 +55,18 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean, s"elasticNetParam must be in [0, 1]: $elasticNetParam") /** - * Creates a [[LinearRegressionDALModel]] from an RDD of [[Vector]]s. - */ - def train(labeledPoints: Dataset[_]): LinearRegressionDALModel = { + * Creates a [[LinearRegressionDALModel]] from an RDD of [[Vector]]s. + */ + def train(labeledPoints: Dataset[_], + labelCol: String, + featuresCol: String): LinearRegressionDALModel = { val kvsIPPort = getOneCCLIPPort(labeledPoints.rdd) - val labeledPointsTables = if (OneDAL.isDenseDataset(labeledPoints)) { - OneDAL.rddLabeledPointToMergedTables(labeledPoints, executorNum) + val labeledPointsTables = if (OneDAL.isDenseDataset(labeledPoints, featuresCol)) { + OneDAL.rddLabeledPointToMergedTables(labeledPoints, labelCol, featuresCol, executorNum) } else { - OneDAL.rddLabeledPointToSparseTables(labeledPoints, executorNum) + OneDAL.rddLabeledPointToSparseTables(labeledPoints, labelCol, featuresCol, executorNum) } val results = labeledPointsTables.mapPartitionsWithIndex { diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/classification/spark320/NaiveBayes.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/classification/spark320/NaiveBayes.scala index 79340fa4e..5b4d27b52 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/classification/spark320/NaiveBayes.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/classification/spark320/NaiveBayes.scala @@ -153,7 +153,7 @@ class NaiveBayes @Since("1.5.0") ( .select(col(getLabelCol), DatasetUtils.columnToVector(dataset, getFeaturesCol)) val dalModel = new NaiveBayesDALImpl(uid, numClasses, - executor_num, executor_cores).train(labeledPointsDS) + executor_num, executor_cores).train(labeledPointsDS, ${labelCol}, ${featuresCol}) val model = copyValues(new NaiveBayesModel( dalModel.uid, dalModel.pi, dalModel.theta, dalModel.sigma)) diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/regression/spark312/LinearRegression.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/regression/spark312/LinearRegression.scala index aed92a156..2065a5433 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/regression/spark312/LinearRegression.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/regression/spark312/LinearRegression.scala @@ -459,7 +459,7 @@ class LinearRegression @Since("1.3") (@Since("1.3.0") override val uid: String) executor_num, executor_cores) // Return same model as WeightedLeastSquaresModel - val model = optimizer.train(dataset) + val model = optimizer.train(dataset, $(labelCol), $(featuresCol)) val lrModel = copyValues( new LinearRegressionModel(uid, model.coefficients, model.intercept)) diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/regression/spark320/LinearRegression.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/regression/spark320/LinearRegression.scala index 203a332ca..67cc05061 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/regression/spark320/LinearRegression.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/regression/spark320/LinearRegression.scala @@ -458,7 +458,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String executor_num, executor_cores) // Return same model as WeightedLeastSquaresModel - val model = optimizer.train(dataset) + val model = optimizer.train(dataset, $(labelCol), $(featuresCol)) val lrModel = copyValues( new LinearRegressionModel(uid, model.coefficients, model.intercept)) diff --git a/mllib-dal/src/test/scala/org/apache/spark/ml/classification/MLlibNaiveBayesSuite.scala b/mllib-dal/src/test/scala/org/apache/spark/ml/classification/MLlibNaiveBayesSuite.scala index 2aa741911..cc09e0fbc 100644 --- a/mllib-dal/src/test/scala/org/apache/spark/ml/classification/MLlibNaiveBayesSuite.scala +++ b/mllib-dal/src/test/scala/org/apache/spark/ml/classification/MLlibNaiveBayesSuite.scala @@ -536,6 +536,35 @@ class MLlibNaiveBayesSuite extends MLTest with DefaultReadWriteTest { assert(expected.sigma === Matrices.zeros(0, 0) && actual.sigma === Matrices.zeros(0, 0)) } } + + test("oap-mllib-163: should support all kind of labelCol name and featuresCol name") { + val df_1 = complementDataset.toDF("label_alias", "features_alias") + val nb_1 = new NaiveBayes().setLabelCol("label_alias") + val e_1 = intercept[IllegalArgumentException](nb_1.fit(df_1)).getMessage + assert(e_1.contains("features does not exist. Available: label_alias, features_alias")) + + val df_2 = complementDataset.toDF("label_alias", "features_alias") + val nb_2 = new NaiveBayes().setFeaturesCol("features_alias") + val e_2 = intercept[IllegalArgumentException](nb_2.fit(df_2)).getMessage + assert(e_2.contains("label does not exist. Available: label_alias, features_alias")) + + val df_3 = complementDataset.toDF("label_alias", "features_alias") + val nb_3 = new NaiveBayes().setLabelCol("label_alias").setFeaturesCol("features_alias") + val model = nb_3.fit(df_3) + assert(model.hasParent) + } + + test("oap-mllib-163: should support column in any order") { + val df_1 = complementDataset.select("label", "features").toDF() + val nb_1 = new NaiveBayes().setLabelCol("label").setFeaturesCol("features") + val model_1 = nb_1.fit(df_1) + assert(model_1.hasParent) + + val df_2 = complementDataset.select("features", "label").toDF() + val nb_2 = new NaiveBayes().setLabelCol("label").setFeaturesCol("features") + val model_2 = nb_2.fit(df_2) + assert(model_2.hasParent) + } } object NaiveBayesSuite { diff --git a/mllib-dal/src/test/scala/org/apache/spark/ml/regression/MLlibLinearRegressionSuite.scala b/mllib-dal/src/test/scala/org/apache/spark/ml/regression/MLlibLinearRegressionSuite.scala index 66352e1d3..dfa174155 100755 --- a/mllib-dal/src/test/scala/org/apache/spark/ml/regression/MLlibLinearRegressionSuite.scala +++ b/mllib-dal/src/test/scala/org/apache/spark/ml/regression/MLlibLinearRegressionSuite.scala @@ -1292,6 +1292,38 @@ class MLlibLinearRegressionSuite extends MLTest with DefaultReadWriteTest with P assert(model1.coefficients ~== model2.coefficients relTol 1E-3) assert(model1.intercept ~== model2.intercept relTol 1E-3) } + + test("oap-mllib-163: should support all kind of labelCol name and featuresCol name") { + val df_1 = datasetWithDenseFeature.toDF("label_alias", "features_alias") + val nb_1 = new LinearRegression().setLabelCol("label_alias") + val e_1 = intercept[IllegalArgumentException](nb_1.fit(df_1)).getMessage + assert(e_1.contains("features does not exist. Available: label_alias, features_alias")) + + val df_2 = datasetWithDenseFeature.toDF("label_alias", "features_alias") + val nb_2 = new LinearRegression().setFeaturesCol("features_alias") + val e_2 = intercept[IllegalArgumentException](nb_2.fit(df_2)).getMessage + assert(e_2.contains("label does not exist. Available: label_alias, features_alias")) + + val df_3 = datasetWithDenseFeature.toDF("label_alias", "features_alias") + val nb_3 = new LinearRegression().setLabelCol("label_alias").setFeaturesCol("features_alias") + val model = nb_3.fit(df_3) + assert(model.hasParent) + assert(model.hasSummary) + } + + test("oap-mllib-163: should support column in any order") { + val df_1 = datasetWithDenseFeature.select("label", "features").toDF() + val nb_1 = new LinearRegression().setLabelCol("label").setFeaturesCol("features") + val model_1 = nb_1.fit(df_1) + assert(model_1.hasParent) + assert(model_1.hasSummary) + + val df_2 = datasetWithDenseFeature.select("features", "label").toDF() + val nb_2 = new LinearRegression().setLabelCol("label").setFeaturesCol("features") + val model_2 = nb_2.fit(df_2) + assert(model_2.hasParent) + assert(model_2.hasSummary) + } } object LinearRegressionSuite {