Skip to content

Commit

Permalink
[ML-163] use feature column instead of "features" (#164)
Browse files Browse the repository at this point in the history
* use feature column instead of "features"

Signed-off-by: minmingzhu <[email protected]>

* update

Signed-off-by: minmingzhu <[email protected]>

* use feature column instead of "features"

Signed-off-by: minmingzhu <[email protected]>

* update NaiveBayesExample

Signed-off-by: minmingzhu <[email protected]>

* update code with comments

Signed-off-by: minmingzhu <[email protected]>

* 1. add labelCol parameter on train function
2. by select function aim to let the column follow the order of "label" and "features"

Signed-off-by: minmingzhu <[email protected]>

* add unit test

Signed-off-by: minmingzhu <[email protected]>
  • Loading branch information
minmingzhu authored Jan 9, 2022
1 parent 5fcc42e commit 471718c
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ object LinearRegressionExample {
println(s"LinearRegressionExample with parameters:\n$params")

val training = spark.read.format("libsvm")
.load(params.input)
.load(params.input).toDF("label", "features")

val lir = new LinearRegression()
.setFeaturesCol("features")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ object NaiveBayesExample {

// $example on$
// Load the data stored in LIBSVM format as a DataFrame.
val data = spark.read.format("libsvm").load(args(0))

// Split the data into training and test sets (30% held out for testing)
val data = spark.read.format("libsvm").load(args(0)).toDF("label", "features")
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3), seed = 1234L)

// Train a NaiveBayes model.
val model = new NaiveBayes()
.fit(trainingData)
.setLabelCol("label")
.setFeaturesCol("features")
.fit(data)

// Select example rows to display.
val predictions = model.transform(testData)
Expand All @@ -59,7 +59,6 @@ object NaiveBayesExample {
val accuracy = evaluator.evaluate(predictions)
println(s"Test set accuracy = $accuracy")
// $example off$

spark.stop()
}
}
Expand Down
21 changes: 14 additions & 7 deletions mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import com.intel.daal.data_management.data.{CSRNumericTable, HomogenNumericTable
import com.intel.daal.services.DaalContext
import org.apache.spark.SparkContext
import org.apache.spark.ml.linalg.{DenseMatrix, DenseVector, Matrix, SparseVector, Vector, Vectors}
import org.apache.spark.mllib.linalg.{Vector => OldVector, Matrix => OldMatrix, DenseMatrix => OldDenseMatrix}
import org.apache.spark.mllib.linalg.{DenseMatrix => OldDenseMatrix, Matrix => OldMatrix, Vector => OldVector}
import org.apache.spark.rdd.{ExecutorInProcessCoalescePartitioner, RDD}
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.storage.StorageLevel

import java.lang
import java.nio.DoubleBuffer
import java.util.logging.{Level, Logger}

import scala.collection.mutable.ArrayBuffer

object OneDAL {
Expand Down Expand Up @@ -73,8 +73,8 @@ object OneDAL {
OldMatrix
}

def isDenseDataset(ds: Dataset[_]): Boolean = {
val row = ds.select("features").head()
def isDenseDataset(ds: Dataset[_], featuresCol: String): Boolean = {
val row = ds.select(featuresCol).head()

row.get(0).isInstanceOf[DenseVector]
}
Expand Down Expand Up @@ -176,6 +176,8 @@ object OneDAL {


def rddLabeledPointToSparseTables(labeledPoints: Dataset[_],
labelCol: String,
featuresCol: String,
executorNum: Int): RDD[(Long, Long)] = {
require(executorNum > 0)

Expand All @@ -193,7 +195,7 @@ object OneDAL {

dataForConversion.cache().count()

val labeledPointsRDD = dataForConversion.toDF().map {
val labeledPointsRDD = dataForConversion.select(labelCol, featuresCol).toDF().map {
case Row(label: Double, features: Vector) => (features, label)
}.rdd

Expand Down Expand Up @@ -283,14 +285,16 @@ object OneDAL {
}

def rddLabeledPointToSparseTables_shuffle(labeledPoints: Dataset[_],
labelCol: String,
featuresCol: String,
executorNum: Int): RDD[(Long, Long)] = {
require(executorNum > 0)

logger.info(s"Processing partitions with $executorNum executors")

val spark = SparkSession.active

val labeledPointsRDD = labeledPoints.rdd.map {
val labeledPointsRDD = labeledPoints.select(labelCol, featuresCol).rdd.map {
case Row(label: Double, features: Vector) => (features, label)
}

Expand Down Expand Up @@ -321,6 +325,8 @@ object OneDAL {
}

def rddLabeledPointToMergedTables(labeledPoints: Dataset[_],
labelCol: String,
featuresCol: String,
executorNum: Int): RDD[(Long, Long)] = {
require(executorNum > 0)

Expand All @@ -336,7 +342,8 @@ object OneDAL {
labeledPoints
}

val tables = dataForConversion.toDF().mapPartitions { it: Iterator[Row] =>
val tables = dataForConversion.select(labelCol, featuresCol)
.toDF().mapPartitions { it: Iterator[Row] =>
val rows = it.toArray

val features = rows.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,16 @@ class NaiveBayesDALImpl(val uid: String,
val executorNum: Int,
val executorCores: Int
) extends Serializable with Logging {
def train(labeledPoints: Dataset[_]): NaiveBayesDALModel = {
def train(labeledPoints: Dataset[_],
labelCol: String,
featuresCol: String): NaiveBayesDALModel = {

val kvsIPPort = getOneCCLIPPort(labeledPoints.rdd)

val labeledPointsTables = if (OneDAL.isDenseDataset(labeledPoints)) {
OneDAL.rddLabeledPointToMergedTables(labeledPoints, executorNum)
val labeledPointsTables = if (OneDAL.isDenseDataset(labeledPoints, featuresCol)) {
OneDAL.rddLabeledPointToMergedTables(labeledPoints, labelCol, featuresCol, executorNum)
} else {
OneDAL.rddLabeledPointToSparseTables(labeledPoints, executorNum)
OneDAL.rddLabeledPointToSparseTables(labeledPoints, labelCol, featuresCol, executorNum)
}

val results = labeledPointsTables.mapPartitionsWithIndex {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,18 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean,
s"elasticNetParam must be in [0, 1]: $elasticNetParam")

/**
* Creates a [[LinearRegressionDALModel]] from an RDD of [[Vector]]s.
*/
def train(labeledPoints: Dataset[_]): LinearRegressionDALModel = {
* Creates a [[LinearRegressionDALModel]] from an RDD of [[Vector]]s.
*/
def train(labeledPoints: Dataset[_],
labelCol: String,
featuresCol: String): LinearRegressionDALModel = {

val kvsIPPort = getOneCCLIPPort(labeledPoints.rdd)

val labeledPointsTables = if (OneDAL.isDenseDataset(labeledPoints)) {
OneDAL.rddLabeledPointToMergedTables(labeledPoints, executorNum)
val labeledPointsTables = if (OneDAL.isDenseDataset(labeledPoints, featuresCol)) {
OneDAL.rddLabeledPointToMergedTables(labeledPoints, labelCol, featuresCol, executorNum)
} else {
OneDAL.rddLabeledPointToSparseTables(labeledPoints, executorNum)
OneDAL.rddLabeledPointToSparseTables(labeledPoints, labelCol, featuresCol, executorNum)
}

val results = labeledPointsTables.mapPartitionsWithIndex {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class NaiveBayes @Since("1.5.0") (
.select(col(getLabelCol), DatasetUtils.columnToVector(dataset, getFeaturesCol))

val dalModel = new NaiveBayesDALImpl(uid, numClasses,
executor_num, executor_cores).train(labeledPointsDS)
executor_num, executor_cores).train(labeledPointsDS, ${labelCol}, ${featuresCol})

val model = copyValues(new NaiveBayesModel(
dalModel.uid, dalModel.pi, dalModel.theta, dalModel.sigma))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ class LinearRegression @Since("1.3") (@Since("1.3.0") override val uid: String)
executor_num, executor_cores)

// Return same model as WeightedLeastSquaresModel
val model = optimizer.train(dataset)
val model = optimizer.train(dataset, $(labelCol), $(featuresCol))

val lrModel = copyValues(
new LinearRegressionModel(uid, model.coefficients, model.intercept))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
executor_num, executor_cores)

// Return same model as WeightedLeastSquaresModel
val model = optimizer.train(dataset)
val model = optimizer.train(dataset, $(labelCol), $(featuresCol))

val lrModel = copyValues(
new LinearRegressionModel(uid, model.coefficients, model.intercept))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,35 @@ class MLlibNaiveBayesSuite extends MLTest with DefaultReadWriteTest {
assert(expected.sigma === Matrices.zeros(0, 0) && actual.sigma === Matrices.zeros(0, 0))
}
}

test("oap-mllib-163: should support all kind of labelCol name and featuresCol name") {
val df_1 = complementDataset.toDF("label_alias", "features_alias")
val nb_1 = new NaiveBayes().setLabelCol("label_alias")
val e_1 = intercept[IllegalArgumentException](nb_1.fit(df_1)).getMessage
assert(e_1.contains("features does not exist. Available: label_alias, features_alias"))

val df_2 = complementDataset.toDF("label_alias", "features_alias")
val nb_2 = new NaiveBayes().setFeaturesCol("features_alias")
val e_2 = intercept[IllegalArgumentException](nb_2.fit(df_2)).getMessage
assert(e_2.contains("label does not exist. Available: label_alias, features_alias"))

val df_3 = complementDataset.toDF("label_alias", "features_alias")
val nb_3 = new NaiveBayes().setLabelCol("label_alias").setFeaturesCol("features_alias")
val model = nb_3.fit(df_3)
assert(model.hasParent)
}

test("oap-mllib-163: should support column in any order") {
val df_1 = complementDataset.select("label", "features").toDF()
val nb_1 = new NaiveBayes().setLabelCol("label").setFeaturesCol("features")
val model_1 = nb_1.fit(df_1)
assert(model_1.hasParent)

val df_2 = complementDataset.select("features", "label").toDF()
val nb_2 = new NaiveBayes().setLabelCol("label").setFeaturesCol("features")
val model_2 = nb_2.fit(df_2)
assert(model_2.hasParent)
}
}

object NaiveBayesSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1292,6 +1292,38 @@ class MLlibLinearRegressionSuite extends MLTest with DefaultReadWriteTest with P
assert(model1.coefficients ~== model2.coefficients relTol 1E-3)
assert(model1.intercept ~== model2.intercept relTol 1E-3)
}

test("oap-mllib-163: should support all kind of labelCol name and featuresCol name") {
val df_1 = datasetWithDenseFeature.toDF("label_alias", "features_alias")
val nb_1 = new LinearRegression().setLabelCol("label_alias")
val e_1 = intercept[IllegalArgumentException](nb_1.fit(df_1)).getMessage
assert(e_1.contains("features does not exist. Available: label_alias, features_alias"))

val df_2 = datasetWithDenseFeature.toDF("label_alias", "features_alias")
val nb_2 = new LinearRegression().setFeaturesCol("features_alias")
val e_2 = intercept[IllegalArgumentException](nb_2.fit(df_2)).getMessage
assert(e_2.contains("label does not exist. Available: label_alias, features_alias"))

val df_3 = datasetWithDenseFeature.toDF("label_alias", "features_alias")
val nb_3 = new LinearRegression().setLabelCol("label_alias").setFeaturesCol("features_alias")
val model = nb_3.fit(df_3)
assert(model.hasParent)
assert(model.hasSummary)
}

test("oap-mllib-163: should support column in any order") {
val df_1 = datasetWithDenseFeature.select("label", "features").toDF()
val nb_1 = new LinearRegression().setLabelCol("label").setFeaturesCol("features")
val model_1 = nb_1.fit(df_1)
assert(model_1.hasParent)
assert(model_1.hasSummary)

val df_2 = datasetWithDenseFeature.select("features", "label").toDF()
val nb_2 = new LinearRegression().setLabelCol("label").setFeaturesCol("features")
val model_2 = nb_2.fit(df_2)
assert(model_2.hasParent)
assert(model_2.hasSummary)
}
}

object LinearRegressionSuite {
Expand Down

0 comments on commit 471718c

Please sign in to comment.