Skip to content

Commit

Permalink
Merged version 2.2.6
Browse files Browse the repository at this point in the history
  • Loading branch information
vruusmann committed Feb 10, 2025
2 parents 3b0dc06 + c88667d commit ab2749f
Show file tree
Hide file tree
Showing 22 changed files with 67 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;

import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
import ml.dmlc.xgboost4j.scala.Booster;
import ml.dmlc.xgboost4j.scala.spark.params.GeneralParams;
import org.apache.spark.ml.Model;
Expand Down Expand Up @@ -56,7 +57,7 @@ public <M extends Model<M> & HasPredictionCol & GeneralParams, C extends ModelCo
learner = XGBoostUtil.loadLearner(is);
}

MoreFiles.deleteRecursively(tmpBoosterFile.toPath());
MoreFiles.deleteRecursively(tmpBoosterFile.toPath(), RecursiveDeleteOption.ALLOW_INSECURE);
} catch(Exception e){
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,13 @@
*/
package org.jpmml.sparkml.xgboost.testing;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Predicate;

import com.google.common.base.Equivalence;
import ml.dmlc.xgboost4j.scala.spark.XGBoostClassificationModel;
import ml.dmlc.xgboost4j.scala.spark.XGBoostRegressionModel;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PredictionModel;
import org.apache.spark.ml.util.MLReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.dmg.pmml.Model;
import org.dmg.pmml.PMML;
import org.dmg.pmml.VerificationField;
Expand All @@ -47,8 +36,6 @@
import org.jpmml.evaluator.ResultField;
import org.jpmml.evaluator.testing.FloatEquivalence;
import org.jpmml.model.visitors.AbstractVisitor;
import org.jpmml.sparkml.ArchiveUtil;
import org.jpmml.sparkml.PipelineModelUtil;
import org.jpmml.sparkml.testing.SparkMLEncoderBatch;
import org.jpmml.sparkml.testing.SparkMLEncoderBatchTest;
import org.jpmml.xgboost.HasXGBoostOptions;
Expand Down Expand Up @@ -95,7 +82,7 @@ public PMML getPMML() throws Exception {
@Override
public VisitorAction visit(Model model){

if(Objects.equals(dataset, AUDIT)){
if(Objects.equals(dataset, AUDIT) || Objects.equals(dataset, AUDIT_NA)){
model.setModelVerification(null);
}

Expand All @@ -115,86 +102,6 @@ public VisitorAction visit(VerificationField verificationField){

return pmml;
}

@Override
protected PipelineModel loadPipelineModel(SparkSession sparkSession, List<File> tmpResources) throws IOException {
String dataset = getDataset();

if(Objects.equals(dataset, AUDIT_NA)){
return loadPipelineModel(sparkSession, "Transformers", "XGBoostClassificationModel", tmpResources);
} else

if(Objects.equals(dataset, AUTO_NA)){
return loadPipelineModel(sparkSession, "Transformers", "XGBoostRegressionModel", tmpResources);
} else

{
return super.loadPipelineModel(sparkSession, tmpResources);
}
}

private PipelineModel loadPipelineModel(SparkSession sparkSession, String pipelineModelName, String modelName, List<File> tmpResources) throws IOException {
String dataset = getDataset();

PipelineModel pipelineModel;

try(InputStream is = open("/pipeline/" + pipelineModelName + dataset + ".zip")){
File tmpZipFile = toTmpFile(is, pipelineModelName + dataset, ".zip");

tmpResources.add(tmpZipFile);

File tmpPipelineModelDir = ArchiveUtil.uncompress(tmpZipFile);

tmpResources.add(tmpPipelineModelDir);

pipelineModel = PipelineModelUtil.load(sparkSession, tmpPipelineModelDir);
}

PredictionModel<?, ?> model;

try(InputStream is = open("/pipeline/" + modelName + dataset + ".zip")){
File tmpZipFile = toTmpFile(is, modelName + dataset, ".zip");

tmpResources.add(tmpZipFile);

File tmpModelDir = ArchiveUtil.uncompress(tmpZipFile);

tmpResources.add(tmpModelDir);

MLReader<?> mlReader;

if(modelName.endsWith("ClassificationModel")){
mlReader = XGBoostClassificationModel.read();
} else

if(modelName.endsWith("RegressionModel")){
mlReader = XGBoostRegressionModel.read();
} else

{
throw new IllegalArgumentException();
}

mlReader.session(sparkSession);

model = (PredictionModel<?, ?>)mlReader.load(tmpModelDir.getAbsolutePath());
}

PipelineModelUtil.addStage(pipelineModel, (pipelineModel.stages()).length, model);

return pipelineModel;
}

@Override
public Dataset<Row> getVerificationDataset(Dataset<Row> inputDataset){
String dataset = getDataset();

if(Objects.equals(dataset, AUDIT_NA) || Objects.equals(dataset, AUTO_NA)){
return null;
}

return super.getVerificationDataset(inputDataset);
}
};

return result;
Expand Down
2 changes: 1 addition & 1 deletion pmml-sparkml-xgboost/src/test/resources/XGBoostAudit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ val cont_cols = Array("Age", "Hours", "Income")
val labelIndexer = new StringIndexer().setInputCol("Adjusted").setOutputCol("idx_Adjusted")

val indexer = new StringIndexer().setInputCols(cat_cols).setOutputCols(cat_cols.map(cat_col => "idx_" + cat_col))
val ohe = new OneHotEncoder().setDropLast(false).setInputCols(indexer.getOutputCols).setOutputCols(cat_cols.map(cat_col => "ohe_" + cat_col))
val ohe = new OneHotEncoder().setHandleInvalid("keep").setDropLast(true).setInputCols(indexer.getOutputCols).setOutputCols(cat_cols.map(cat_col => "ohe_" + cat_col))
val assembler = new VectorAssembler().setInputCols(ohe.getOutputCols ++ cont_cols).setOutputCol("featureVector")

val sparse2dense = new SparseToDenseTransformer().setInputCol(assembler.getOutputCol).setOutputCol("denseFeatureVec")
Expand Down
10 changes: 1 addition & 9 deletions pmml-sparkml-xgboost/src/test/resources/XGBoostAuditNA.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,7 @@ val classifier = new XGBoostClassifier(Map("objective" -> "binary:logistic", "nu
val pipeline = new Pipeline().setStages(Array(labelIndexer, indexer, indexTransformer, assembler, sparse2dense, classifier))
val pipelineModel = pipeline.fit(df)

//PipelineModelUtil.storeZip(pipelineModel, new File("pipeline/XGBoostAuditNA.zip"))

val transformers = pipelineModel.copy(new ParamMap())
val classificationModel = PipelineModelUtil.removeStage(transformers, 5)

PipelineModelUtil.storeZip(transformers, new File("pipeline/TransformersAuditNA.zip"))

val mlWriter = classificationModel.asInstanceOf[MLWritable].write.option("format", "json")
ArchiveUtil.storeZip(mlWriter, new File("pipeline/XGBoostClassificationModelAuditNA.zip"))
PipelineModelUtil.storeZip(pipelineModel, new File("pipeline/XGBoostAuditNA.zip"))

val predLabel = udf{ (value: Float) => value.toInt.toString }
val vectorToColumn = udf{ (vec: Vector, index: Int) => vec(index).toFloat }
Expand Down
2 changes: 1 addition & 1 deletion pmml-sparkml-xgboost/src/test/resources/XGBoostAuto.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ val cat_cols = Array("cylinders", "model_year", "origin")
val cont_cols = Array("acceleration", "displacement", "horsepower", "weight")

val indexer = new StringIndexer().setInputCols(cat_cols).setOutputCols(cat_cols.map(cat_col => "idx_" + cat_col))
val ohe = new OneHotEncoder().setDropLast(false).setInputCols(indexer.getOutputCols).setOutputCols(cat_cols.map(cat_col => "ohe_" + cat_col))
val ohe = new OneHotEncoder().setHandleInvalid("keep").setDropLast(false).setInputCols(indexer.getOutputCols).setOutputCols(cat_cols.map(cat_col => "ohe_" + cat_col))
val assembler = new VectorAssembler().setInputCols(ohe.getOutputCols ++ cont_cols).setOutputCol("featureVector")

val sparse2dense = new SparseToDenseTransformer().setInputCol(assembler.getOutputCol).setOutputCol("denseFeatureVec")
Expand Down
10 changes: 1 addition & 9 deletions pmml-sparkml-xgboost/src/test/resources/XGBoostAutoNA.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,7 @@ val regressor = new XGBoostRegressor(Map("objective" -> "reg:squarederror", "num
val pipeline = new Pipeline().setStages(Array(indexer, indexTransformer, assembler, regressor))
val pipelineModel = pipeline.fit(df)

//PipelineModelUtil.storeZip(pipelineModel, new File("pipeline/XGBoostAutoNA.zip"))

val transformers = pipelineModel.copy(new ParamMap())
val regressionModel = PipelineModelUtil.removeStage(transformers, 3)

PipelineModelUtil.storeZip(transformers, new File("pipeline/TransformersAutoNA.zip"))

val mlWriter = regressionModel.asInstanceOf[MLWritable].write.option("format", "json")
ArchiveUtil.storeZip(mlWriter, new File("pipeline/XGBoostRegressionModelAutoNA.zip"))
PipelineModelUtil.storeZip(pipelineModel, new File("pipeline/XGBoostAutoNA.zip"))

var xgbDf = pipelineModel.transform(df)
xgbDf = xgbDf.selectExpr("prediction as mpg")
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
5 changes: 3 additions & 2 deletions pmml-sparkml/src/main/java/org/jpmml/sparkml/ArchiveUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import com.google.common.io.ByteStreams;
import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.util.MLReadable;
import org.apache.spark.ml.util.MLReader;
Expand All @@ -55,7 +56,7 @@ public <E extends PipelineStage & MLReadable<E>> E loadZip(MLReader<E> mlReader,

E stage = mlReader.load(tmpDir.getAbsolutePath());

MoreFiles.deleteRecursively(tmpDir.toPath());
MoreFiles.deleteRecursively(tmpDir.toPath(), RecursiveDeleteOption.ALLOW_INSECURE);

return stage;
}
Expand All @@ -76,7 +77,7 @@ public void storeZip(MLWriter mlWriter, File file) throws IOException {

ArchiveUtil.compress(tmpDir, file);

MoreFiles.deleteRecursively(tmpDir.toPath());
MoreFiles.deleteRecursively(tmpDir.toPath(), RecursiveDeleteOption.ALLOW_INSECURE);
}

static
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.common.io.CharStreams;
import com.google.common.io.Files;
import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
Expand Down Expand Up @@ -125,7 +126,7 @@ public boolean accept(File file){

Files.copy(csvFiles[0], file);

MoreFiles.deleteRecursively(tmpDir.toPath());
MoreFiles.deleteRecursively(tmpDir.toPath(), RecursiveDeleteOption.ALLOW_INSECURE);
}

static
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;

import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.Transformer;
import org.apache.spark.ml.util.MLReader;
Expand Down Expand Up @@ -72,7 +73,7 @@ public PipelineModel loadZip(SparkSession sparkSession, File file) throws IOExce

PipelineModel pipelineModel = load(sparkSession, tmpDir);

MoreFiles.deleteRecursively(tmpDir.toPath());
MoreFiles.deleteRecursively(tmpDir.toPath(), RecursiveDeleteOption.ALLOW_INSECURE);

return pipelineModel;
}
Expand All @@ -95,7 +96,7 @@ public void storeZip(PipelineModel pipelineModel, File file) throws IOException

ArchiveUtil.compress(tmpDir, file);

MoreFiles.deleteRecursively(tmpDir.toPath());
MoreFiles.deleteRecursively(tmpDir.toPath(), RecursiveDeleteOption.ALLOW_INSECURE);
}

private static final Field FIELD_STAGES = ReflectionUtil.getField(PipelineModel.class, "stages");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public List<Feature> encodeFeatures(SparkMLEncoder encoder){
// XXX
inputCol = name;

features = (List)OneHotEncoderModelConverter.encodeFeature(encoder, categoricalFeature, categoricalFeature.getValues(), false);
features = (List)OneHotEncoderModelConverter.encodeFeature(encoder, categoricalFeature, categoricalFeature.getValues());
}
} // End if

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

import com.google.common.collect.Iterables;
import org.apache.spark.ml.feature.OneHotEncoderModel;
import org.dmg.pmml.DataType;
import org.jpmml.converter.BinaryFeature;
import org.jpmml.converter.CategoricalFeature;
import org.jpmml.converter.Feature;
Expand All @@ -43,6 +45,9 @@ public List<Feature> encodeFeatures(SparkMLEncoder encoder){
OneHotEncoderModel transformer = getTransformer();

boolean dropLast = transformer.getDropLast();
String handleInvalid = transformer.getHandleInvalid();

boolean keepInvalid = Objects.equals("keep", handleInvalid);

InOutMode inputMode = getInputMode();

Expand All @@ -52,9 +57,28 @@ public List<Feature> encodeFeatures(SparkMLEncoder encoder){
for(String inputCol : inputCols){
CategoricalFeature categoricalFeature = (CategoricalFeature)encoder.getOnlyFeature(inputCol);

DataType dataType = categoricalFeature.getDataType();

// XXX
String invalidCategory = StringIndexerModelConverter.getInvalidCategory(dataType);

List<?> values = categoricalFeature.getValues();

List<BinaryFeature> binaryFeatures = OneHotEncoderModelConverter.encodeFeature(encoder, categoricalFeature, values, dropLast);
List<BinaryFeature> binaryFeatures = OneHotEncoderModelConverter.encodeFeature(encoder, categoricalFeature, values);

if(!dropLast && keepInvalid){
BinaryFeature invalidCategoryFeature = new BinaryFeature(encoder, categoricalFeature, invalidCategory);

binaryFeatures.add(invalidCategoryFeature);
} else

if(dropLast && !keepInvalid){
binaryFeatures = binaryFeatures.subList(0, binaryFeatures.size() - 1);
} else

{
// Ignored: No-op
}

result.add(new BinarizedCategoricalFeature(encoder, categoricalFeature, binaryFeatures));
}
Expand Down Expand Up @@ -95,13 +119,9 @@ public void registerFeatures(SparkMLEncoder encoder){
}

static
public List<BinaryFeature> encodeFeature(PMMLEncoder encoder, Feature feature, List<?> values, boolean dropLast){
public List<BinaryFeature> encodeFeature(PMMLEncoder encoder, Feature feature, List<?> values){
List<BinaryFeature> result = new ArrayList<>();

if(dropLast){
values = values.subList(0, values.size() - 1);
}

for(Object value : values){
result.add(new BinaryFeature(encoder, feature, value));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public StringIndexerModelConverter(StringIndexerModel transformer){
public List<Feature> encodeFeatures(SparkMLEncoder encoder){
StringIndexerModel transformer = getTransformer();

String handleInvalid = transformer.getHandleInvalid();
String[][] labelsArray = transformer.labelsArray();

InOutMode inputMode = getInputMode();
Expand All @@ -63,24 +64,12 @@ public List<Feature> encodeFeatures(SparkMLEncoder encoder){

Feature feature = encoder.getOnlyFeature(inputCol);

DataType dataType = feature.getDataType();

List<String> categories = new ArrayList<>();
categories.addAll(Arrays.asList(labels));

String invalidCategory;

DataType dataType = feature.getDataType();
switch(dataType){
case INTEGER:
case FLOAT:
case DOUBLE:
invalidCategory = "-999";
break;
default:
invalidCategory = "__unknown";
break;
}

String handleInvalid = transformer.getHandleInvalid();
String invalidCategory = getInvalidCategory(dataType);

Field<?> field = encoder.toCategorical(feature.getName(), categories);

Expand Down Expand Up @@ -146,4 +135,17 @@ public List<Feature> encodeFeatures(SparkMLEncoder encoder){

return result;
}

static
public String getInvalidCategory(DataType dataType){

switch(dataType){
case INTEGER:
case FLOAT:
case DOUBLE:
return "-999";
default:
return "__unknown";
}
}
}
Loading

0 comments on commit ab2749f

Please sign in to comment.