Skip to content

Commit c28a6fa

Browse files
srowendongjoon-hyun
authored andcommitted
[SPARK-29292][SQL][ML] Update rest of default modules (Hive, ML, etc) for Scala 2.13 compilation
### What changes were proposed in this pull request? Same as apache#29078 and apache#28971 . This makes the rest of the default modules (i.e. those you get without specifying `-Pyarn` etc) compile under Scala 2.13. It does not close the JIRA, as a result. this also of course does not demonstrate that tests pass yet in 2.13. Note, this does not fix the `repl` module; that's separate. ### Why are the changes needed? Eventually, we need to support a Scala 2.13 build, perhaps in Spark 3.1. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. (2.13 was not tested; this is about getting it to compile without breaking 2.12) Closes apache#29111 from srowen/SPARK-29292.3. Authored-by: Sean Owen <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent b05f309 commit c28a6fa

File tree

36 files changed

+106
-102
lines changed

36 files changed

+106
-102
lines changed

examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import java.util.Arrays;
2424
import java.util.List;
2525

26-
import scala.collection.mutable.WrappedArray;
26+
import scala.collection.mutable.Seq;
2727

2828
import org.apache.spark.ml.feature.RegexTokenizer;
2929
import org.apache.spark.ml.feature.Tokenizer;
@@ -69,7 +69,7 @@ public static void main(String[] args) {
6969
.setPattern("\\W"); // alternatively .setPattern("\\w+").setGaps(false);
7070

7171
spark.udf().register(
72-
"countTokens", (WrappedArray<?> words) -> words.size(), DataTypes.IntegerType);
72+
"countTokens", (Seq<?> words) -> words.size(), DataTypes.IntegerType);
7373

7474
Dataset<Row> tokenized = tokenizer.transform(sentenceDataFrame);
7575
tokenized.select("sentence", "words")

examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala

+7-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ object SparkKMeans {
8282
while(tempDist > convergeDist) {
8383
val closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
8484

85-
val pointStats = closest.reduceByKey{case ((p1, c1), (p2, c2)) => (p1 + p2, c1 + c2)}
85+
val pointStats = closest.reduceByKey(mergeResults)
8686

8787
val newPoints = pointStats.map {pair =>
8888
(pair._1, pair._2._1 * (1.0 / pair._2._2))}.collectAsMap()
@@ -102,5 +102,11 @@ object SparkKMeans {
102102
kPoints.foreach(println)
103103
spark.stop()
104104
}
105+
106+
private def mergeResults(
107+
a: (Vector[Double], Int),
108+
b: (Vector[Double], Int)): (Vector[Double], Int) = {
109+
(a._1 + b._1, a._2 + b._2)
110+
}
105111
}
106112
// scalastyle:on println

external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ object SchemaConverters {
8585
StructField(f.name, schemaType.dataType, schemaType.nullable)
8686
}
8787

88-
SchemaType(StructType(fields), nullable = false)
88+
SchemaType(StructType(fields.toSeq), nullable = false)
8989

9090
case ARRAY =>
9191
val schemaType = toSqlTypeHelper(avroSchema.getElementType, existingRecordNames)
@@ -126,7 +126,7 @@ object SchemaConverters {
126126
StructField(s"member$i", schemaType.dataType, nullable = true)
127127
}
128128

129-
SchemaType(StructType(fields), nullable = false)
129+
SchemaType(StructType(fields.toSeq), nullable = false)
130130
}
131131

132132
case other => throw new IncompatibleSchemaException(s"Unsupported type $other")

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ private[kafka010] class KafkaOffsetReader(
336336
}
337337
})
338338
}
339-
incorrectOffsets
339+
incorrectOffsets.toSeq
340340
}
341341

342342
// Retry to fetch latest offsets when detecting incorrect offsets. We don't use

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -1540,8 +1540,8 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
15401540
makeSureGetOffsetCalled,
15411541
Execute { q =>
15421542
// wait to reach the last offset in every partition
1543-
q.awaitOffset(
1544-
0, KafkaSourceOffset(partitionOffsets.mapValues(_ => 3L)), streamingTimeout.toMillis)
1543+
q.awaitOffset(0,
1544+
KafkaSourceOffset(partitionOffsets.mapValues(_ => 3L).toMap), streamingTimeout.toMillis)
15451545
},
15461546
CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22),
15471547
StopStream,

mllib/src/main/scala/org/apache/spark/ml/Estimator.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ abstract class Estimator[M <: Model[M]] extends PipelineStage {
7676
* @return fitted models, matching the input parameter maps
7777
*/
7878
@Since("2.0.0")
79-
def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M] = {
79+
def fit(dataset: Dataset[_], paramMaps: Seq[ParamMap]): Seq[M] = {
8080
paramMaps.map(fit(dataset, _))
8181
}
8282

mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala

+14-14
Original file line numberDiff line numberDiff line change
@@ -492,12 +492,7 @@ class GaussianMixture @Since("2.0.0") (
492492
(i, (agg.means(i), agg.covs(i), agg.weights(i), ws))
493493
}
494494
} else Iterator.empty
495-
}.reduceByKey { case ((mean1, cov1, w1, ws1), (mean2, cov2, w2, ws2)) =>
496-
// update the weights, means and covariances for i-th distributions
497-
BLAS.axpy(1.0, mean2, mean1)
498-
BLAS.axpy(1.0, cov2, cov1)
499-
(mean1, cov1, w1 + w2, ws1 + ws2)
500-
}.mapValues { case (mean, cov, w, ws) =>
495+
}.reduceByKey(GaussianMixture.mergeWeightsMeans).mapValues { case (mean, cov, w, ws) =>
501496
// Create new distributions based on the partial assignments
502497
// (often referred to as the "M" step in literature)
503498
GaussianMixture.updateWeightsAndGaussians(mean, cov, w, ws)
@@ -560,12 +555,7 @@ class GaussianMixture @Since("2.0.0") (
560555
agg.meanIter.zip(agg.covIter).zipWithIndex
561556
.map { case ((mean, cov), i) => (i, (mean, cov, agg.weights(i), ws)) }
562557
} else Iterator.empty
563-
}.reduceByKey { case ((mean1, cov1, w1, ws1), (mean2, cov2, w2, ws2)) =>
564-
// update the weights, means and covariances for i-th distributions
565-
BLAS.axpy(1.0, mean2, mean1)
566-
BLAS.axpy(1.0, cov2, cov1)
567-
(mean1, cov1, w1 + w2, ws1 + ws2)
568-
}.mapValues { case (mean, cov, w, ws) =>
558+
}.reduceByKey(GaussianMixture.mergeWeightsMeans).mapValues { case (mean, cov, w, ws) =>
569559
// Create new distributions based on the partial assignments
570560
// (often referred to as the "M" step in literature)
571561
GaussianMixture.updateWeightsAndGaussians(mean, cov, w, ws)
@@ -624,8 +614,8 @@ class GaussianMixture @Since("2.0.0") (
624614
val gaussians = Array.tabulate(numClusters) { i =>
625615
val start = i * numSamples
626616
val end = start + numSamples
627-
val sampleSlice = samples.view(start, end)
628-
val weightSlice = sampleWeights.view(start, end)
617+
val sampleSlice = samples.view.slice(start, end)
618+
val weightSlice = sampleWeights.view.slice(start, end)
629619
val localWeightSum = weightSlice.sum
630620
weights(i) = localWeightSum / weightSum
631621

@@ -691,6 +681,16 @@ object GaussianMixture extends DefaultParamsReadable[GaussianMixture] {
691681
new DenseMatrix(n, n, symmetricValues)
692682
}
693683

684+
private def mergeWeightsMeans(
685+
a: (DenseVector, DenseVector, Double, Double),
686+
b: (DenseVector, DenseVector, Double, Double)): (DenseVector, DenseVector, Double, Double) =
687+
{
688+
// update the weights, means and covariances for i-th distributions
689+
BLAS.axpy(1.0, b._1, a._1)
690+
BLAS.axpy(1.0, b._2, a._2)
691+
(a._1, a._2, a._3 + b._3, a._4 + b._4)
692+
}
693+
694694
/**
695695
* Update the weight, mean and covariance of gaussian distribution.
696696
*

mllib/src/main/scala/org/apache/spark/ml/feature/RobustScaler.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ object RobustScaler extends DefaultParamsReadable[RobustScaler] {
201201
}
202202
Iterator.tabulate(numFeatures)(i => (i, summaries(i).compress))
203203
} else Iterator.empty
204-
}.reduceByKey { case (s1, s2) => s1.merge(s2) }
204+
}.reduceByKey { (s1, s2) => s1.merge(s2) }
205205
} else {
206206
val scale = math.max(math.ceil(math.sqrt(vectors.getNumPartitions)).toInt, 2)
207207
vectors.mapPartitionsWithIndex { case (pid, iter) =>
@@ -214,7 +214,7 @@ object RobustScaler extends DefaultParamsReadable[RobustScaler] {
214214
seqOp = (s, v) => s.insert(v),
215215
combOp = (s1, s2) => s1.compress.merge(s2.compress)
216216
).map { case ((_, i), s) => (i, s)
217-
}.reduceByKey { case (s1, s2) => s1.compress.merge(s2.compress) }
217+
}.reduceByKey { (s1, s2) => s1.compress.merge(s2.compress) }
218218
}
219219
}
220220

mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ class Word2VecModel private[ml] (
291291
val outputSchema = transformSchema(dataset.schema, logging = true)
292292
val vectors = wordVectors.getVectors
293293
.mapValues(vv => Vectors.dense(vv.map(_.toDouble)))
294-
.map(identity) // mapValues doesn't return a serializable map (SI-7005)
294+
.map(identity).toMap // mapValues doesn't return a serializable map (SI-7005)
295295
val bVectors = dataset.sparkSession.sparkContext.broadcast(vectors)
296296
val d = $(vectorSize)
297297
val emptyVec = Vectors.sparse(d, Array.emptyIntArray, Array.emptyDoubleArray)

mllib/src/main/scala/org/apache/spark/ml/param/params.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -937,7 +937,7 @@ final class ParamMap private[ml] (private val map: mutable.Map[Param[Any], Any])
937937

938938
/** Put param pairs with a `java.util.List` of values for Python. */
939939
private[ml] def put(paramPairs: JList[ParamPair[_]]): this.type = {
940-
put(paramPairs.asScala: _*)
940+
put(paramPairs.asScala.toSeq: _*)
941941
}
942942

943943
/**

mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -1223,28 +1223,28 @@ private[python] class PythonMLLibAPI extends Serializable {
12231223
* Python-friendly version of [[MLUtils.convertVectorColumnsToML()]].
12241224
*/
12251225
def convertVectorColumnsToML(dataset: DataFrame, cols: JArrayList[String]): DataFrame = {
1226-
MLUtils.convertVectorColumnsToML(dataset, cols.asScala: _*)
1226+
MLUtils.convertVectorColumnsToML(dataset, cols.asScala.toSeq: _*)
12271227
}
12281228

12291229
/**
12301230
* Python-friendly version of [[MLUtils.convertVectorColumnsFromML()]]
12311231
*/
12321232
def convertVectorColumnsFromML(dataset: DataFrame, cols: JArrayList[String]): DataFrame = {
1233-
MLUtils.convertVectorColumnsFromML(dataset, cols.asScala: _*)
1233+
MLUtils.convertVectorColumnsFromML(dataset, cols.asScala.toSeq: _*)
12341234
}
12351235

12361236
/**
12371237
* Python-friendly version of [[MLUtils.convertMatrixColumnsToML()]].
12381238
*/
12391239
def convertMatrixColumnsToML(dataset: DataFrame, cols: JArrayList[String]): DataFrame = {
1240-
MLUtils.convertMatrixColumnsToML(dataset, cols.asScala: _*)
1240+
MLUtils.convertMatrixColumnsToML(dataset, cols.asScala.toSeq: _*)
12411241
}
12421242

12431243
/**
12441244
* Python-friendly version of [[MLUtils.convertMatrixColumnsFromML()]]
12451245
*/
12461246
def convertMatrixColumnsFromML(dataset: DataFrame, cols: JArrayList[String]): DataFrame = {
1247-
MLUtils.convertMatrixColumnsFromML(dataset, cols.asScala: _*)
1247+
MLUtils.convertMatrixColumnsFromML(dataset, cols.asScala.toSeq: _*)
12481248
}
12491249
}
12501250

mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ class BisectingKMeans private (
225225
divisibleIndices.contains(parentIndex(index))
226226
}
227227
newClusters = summarize(d, newAssignments, dMeasure)
228-
newClusterCenters = newClusters.mapValues(_.center).map(identity)
228+
newClusterCenters = newClusters.mapValues(_.center).map(identity).toMap
229229
}
230230
if (preIndices != null) {
231231
preIndices.unpersist()

mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala

+4-6
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.mllib.clustering
1919

20-
import scala.collection.mutable.IndexedSeq
21-
2220
import breeze.linalg.{diag, DenseMatrix => BreezeMatrix, DenseVector => BDV, Vector => BV}
2321

2422
import org.apache.spark.annotation.Since
@@ -189,8 +187,8 @@ class GaussianMixture private (
189187
case None =>
190188
val samples = breezeData.takeSample(withReplacement = true, k * nSamples, seed)
191189
(Array.fill(k)(1.0 / k), Array.tabulate(k) { i =>
192-
val slice = samples.view(i * nSamples, (i + 1) * nSamples)
193-
new MultivariateGaussian(vectorMean(slice), initCovariance(slice))
190+
val slice = samples.view.slice(i * nSamples, (i + 1) * nSamples)
191+
new MultivariateGaussian(vectorMean(slice.toSeq), initCovariance(slice.toSeq))
194192
})
195193
}
196194

@@ -259,7 +257,7 @@ class GaussianMixture private (
259257
}
260258

261259
/** Average of dense breeze vectors */
262-
private def vectorMean(x: IndexedSeq[BV[Double]]): BDV[Double] = {
260+
private def vectorMean(x: Seq[BV[Double]]): BDV[Double] = {
263261
val v = BDV.zeros[Double](x(0).length)
264262
x.foreach(xi => v += xi)
265263
v / x.length.toDouble
@@ -269,7 +267,7 @@ class GaussianMixture private (
269267
* Construct matrix where diagonal entries are element-wise
270268
* variance of input vectors (computes biased variance)
271269
*/
272-
private def initCovariance(x: IndexedSeq[BV[Double]]): BreezeMatrix[Double] = {
270+
private def initCovariance(x: Seq[BV[Double]]): BreezeMatrix[Double] = {
273271
val mu = vectorMean(x)
274272
val ss = BDV.zeros[Double](x(0).length)
275273
x.foreach { xi =>

mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ object PrefixSpan extends Logging {
335335
largePrefixes = newLargePrefixes
336336
}
337337

338-
var freqPatterns = sc.parallelize(localFreqPatterns, 1)
338+
var freqPatterns = sc.parallelize(localFreqPatterns.toSeq, 1)
339339

340340
val numSmallPrefixes = smallPrefixes.size
341341
logInfo(s"number of small prefixes for local processing: $numSmallPrefixes")

mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int
9797
}
9898
if (sizes(i) + tail.length >= offset + windowSize) {
9999
partitions +=
100-
new SlidingRDDPartition[T](partitionIndex, parentPartitions(i), tail, offset)
100+
new SlidingRDDPartition[T](partitionIndex, parentPartitions(i), tail.toSeq, offset)
101101
partitionIndex += 1
102102
}
103103
}

mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Entropy.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ private[spark] class EntropyAggregator(numClasses: Int)
112112
* @param offset Start index of stats for this (node, feature, bin).
113113
*/
114114
def getCalculator(allStats: Array[Double], offset: Int): EntropyCalculator = {
115-
new EntropyCalculator(allStats.view(offset, offset + statsSize - 1).toArray,
115+
new EntropyCalculator(allStats.view.slice(offset, offset + statsSize - 1).toArray,
116116
allStats(offset + statsSize - 1).toLong)
117117
}
118118
}

mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Gini.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ private[spark] class GiniAggregator(numClasses: Int)
107107
* @param offset Start index of stats for this (node, feature, bin).
108108
*/
109109
def getCalculator(allStats: Array[Double], offset: Int): GiniCalculator = {
110-
new GiniCalculator(allStats.view(offset, offset + statsSize - 1).toArray,
110+
new GiniCalculator(allStats.view.slice(offset, offset + statsSize - 1).toArray,
111111
allStats(offset + statsSize - 1).toLong)
112112
}
113113
}

mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Variance.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ private[spark] class VarianceAggregator()
9595
* @param offset Start index of stats for this (node, feature, bin).
9696
*/
9797
def getCalculator(allStats: Array[Double], offset: Int): VarianceCalculator = {
98-
new VarianceCalculator(allStats.view(offset, offset + statsSize - 1).toArray,
98+
new VarianceCalculator(allStats.view.slice(offset, offset + statsSize - 1).toArray,
9999
allStats(offset + statsSize - 1).toLong)
100100
}
101101
}

mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,10 @@ private[mllib] object NumericParser {
8585
while (parsing && tokenizer.hasMoreTokens()) {
8686
token = tokenizer.nextToken()
8787
if (token == "(") {
88-
items.append(parseTuple(tokenizer))
88+
items += parseTuple(tokenizer)
8989
allowComma = true
9090
} else if (token == "[") {
91-
items.append(parseArray(tokenizer))
91+
items += parseArray(tokenizer)
9292
allowComma = true
9393
} else if (token == ",") {
9494
if (allowComma) {
@@ -102,14 +102,14 @@ private[mllib] object NumericParser {
102102
// ignore whitespaces between delim chars, e.g. ", ["
103103
} else {
104104
// expecting a number
105-
items.append(parseDouble(token))
105+
items += parseDouble(token)
106106
allowComma = true
107107
}
108108
}
109109
if (parsing) {
110110
throw new SparkException(s"A tuple must end with ')'.")
111111
}
112-
items
112+
items.toSeq
113113
}
114114

115115
private def parseDouble(s: String): Double = {

mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ class BisectingKMeansSuite extends MLTest with DefaultReadWriteTest {
219219

220220
model1.clusterCenters.forall(Vectors.norm(_, 2) == 1.0)
221221

222-
val df2 = spark.createDataFrame(spark.sparkContext.parallelize(Array(
222+
val df2 = spark.createDataFrame(spark.sparkContext.parallelize(Seq(
223223
(Vectors.dense(1.0, 1.0), 2.0), (Vectors.dense(10.0, 10.0), 2.0),
224224
(Vectors.dense(1.0, 0.5), 2.0), (Vectors.dense(10.0, 4.4), 2.0),
225225
(Vectors.dense(-1.0, 1.0), 2.0), (Vectors.dense(-100.0, 90.0), 2.0))))
@@ -286,7 +286,7 @@ class BisectingKMeansSuite extends MLTest with DefaultReadWriteTest {
286286

287287
model1.clusterCenters.forall(Vectors.norm(_, 2) == 1.0)
288288

289-
val df2 = spark.createDataFrame(spark.sparkContext.parallelize(Array(
289+
val df2 = spark.createDataFrame(spark.sparkContext.parallelize(Seq(
290290
(Vectors.dense(1.0, 1.0), 1.0), (Vectors.dense(10.0, 10.0), 2.0),
291291
(Vectors.dense(1.0, 0.5), 2.0), (Vectors.dense(10.0, 4.4), 3.0),
292292
(Vectors.dense(-1.0, 1.0), 3.0), (Vectors.dense(-100.0, 90.0), 4.0))))

0 commit comments

Comments
 (0)