Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51340][ML][CONNECT] Model size estimation for linear classification & regression models #50106

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,7 @@ sealed trait Vector extends Serializable {
def compressed: Vector = compressedWithNNZ(numNonzeros)

private[ml] def compressedWithNNZ(nnz: Int): Vector = {
// A dense vector needs 8 * size + 8 bytes, while a sparse vector needs 12 * nnz + 20 bytes.
if (1.5 * (nnz + 1.0) < size) {
if (Vectors.getSparseSize(nnz) < Vectors.getDenseSize(size)) {
toSparseWithSize(nnz)
} else {
toDense
Expand Down Expand Up @@ -230,6 +229,8 @@ sealed trait Vector extends Serializable {
*/
private[spark] def nonZeroIterator: Iterator[(Int, Double)] =
activeIterator.filter(_._2 != 0)

private[ml] def getSizeInBytes: Long
}

/**
Expand Down Expand Up @@ -504,6 +505,27 @@ object Vectors {

/** Max number of nonzero entries used in computing hash code. */
private[linalg] val MAX_HASH_NNZ = 128

private[ml] def getSparseSize(nnz: Long): Long = {
/*
A sparse vector stores one double array, one int array and one int:
8 * values.length + 4 * values.length + arrayHeader * 2 + 4
*/
val doubleBytes = java.lang.Double.BYTES
val intBytes = java.lang.Integer.BYTES
val arrayHeader = 12L
(doubleBytes + intBytes) * nnz + arrayHeader * 2L + 4L
}

private[ml] def getDenseSize(size: Long): Long = {
/*
A dense vector stores one double array:
8 * values.length + arrayHeader
*/
val doubleBytes = java.lang.Double.BYTES
val arrayHeader = 12L
doubleBytes * size + arrayHeader
}
}

/**
Expand Down Expand Up @@ -596,6 +618,8 @@ class DenseVector @Since("2.0.0") ( @Since("2.0.0") val values: Array[Double]) e

private[spark] override def activeIterator: Iterator[(Int, Double)] =
iterator

override private[ml] def getSizeInBytes: Long = Vectors.getDenseSize(values.length)
}

@Since("2.0.0")
Expand Down Expand Up @@ -845,6 +869,8 @@ class SparseVector @Since("2.0.0") (
val localValues = values
Iterator.tabulate(numActives)(j => (localIndices(j), localValues(j)))
}

override private[ml] def getSizeInBytes: Long = Vectors.getSparseSize(values.length)
}

@Since("2.0.0")
Expand Down
17 changes: 17 additions & 0 deletions mllib/src/main/scala/org/apache/spark/ml/Estimator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,21 @@ abstract class Estimator[M <: Model[M]] extends PipelineStage {
}

override def copy(extra: ParamMap): Estimator[M]

/**
* For ml connect only.
* Estimate an upper-bound size of the model to be fitted in bytes, based on the
* parameters and the dataset, e.g., using $(k) and numFeatures to estimate a
* k-means model size.
* 1, Only driver side memory usage is counted, distributed objects (like DataFrame,
* RDD, Graph, Summary) are ignored.
* 2, Lazy vals are not counted, e.g., an auxiliary object used in prediction.
* 3, If there is no enough information to get an accurate size, try to estimate the
* upper-bound size, e.g.
* - Given a LogisticRegression estimator, assume the coefficients are dense, even
* though the actual fitted model might be sparse (by L1 penalty).
* - Given a tree model, assume all underlying trees are complete binary trees, even
* though some branches might be pruned or truncated.
*/
private[spark] def estimateModelSize(dataset: Dataset[_]): Long = throw new NotImplementedError
}
19 changes: 19 additions & 0 deletions mllib/src/main/scala/org/apache/spark/ml/Model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,23 @@ abstract class Model[M <: Model[M]] extends Transformer {
def hasParent: Boolean = parent != null

override def copy(extra: ParamMap): M

/**
* For ml connect only.
* Estimate the size of this model in bytes.
* This is an approximation, the real size might be different.
* 1, Only driver side memory usage is counted, distributed objects (like DataFrame,
* RDD, Graph, Summary) are ignored.
* 2, Lazy vals are not counted, e.g., an auxiliary object used in prediction.
* 3, Using SizeEstimator to estimate the driver memory usage of distributed objects
* is not accurate, because the size of SparkSession/SparkContext is also included, e.g.
* val df = spark.range(1)
* SizeEstimator.estimate(df) -> 3310984
* SizeEstimator.estimate(df.rdd) -> 3331352
* SizeEstimator.estimate(df.sparkSession) -> 3249464
* SizeEstimator.estimate(df.rdd.sparkContext) -> 3249744
* 4, For 3-rd extension, if external languages are used, it is recommended to override
* this method and return a proper size.
*/
private[spark] def estimatedSize: Long = throw new NotImplementedError
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,16 @@ class FMClassifier @Since("3.0.0") (

@Since("3.0.0")
override def copy(extra: ParamMap): FMClassifier = defaultCopy(extra)

override def estimateModelSize(dataset: Dataset[_]): Long = {
val numFeatures = DatasetUtils.getNumFeatures(dataset, $(featuresCol))

var size = this.estimateMatadataSize
size += java.lang.Double.BYTES // intercept
size += Vectors.getDenseSize(numFeatures) // linear
size += Matrices.getDenseSize(numFeatures, $(factorSize)) // factors
size
}
}

@Since("3.0.0")
Expand Down Expand Up @@ -312,6 +322,18 @@ class FMClassificationModel private[classification] (
copyValues(new FMClassificationModel(uid, intercept, linear, factors), extra)
}

override def estimatedSize: Long = {
var size = this.estimateMatadataSize
size += java.lang.Double.BYTES // intercept
if (this.linear != null) {
size += this.linear.getSizeInBytes
}
if (this.factors != null) {
size += this.factors.getSizeInBytes
}
size
}

@Since("3.0.0")
override def write: MLWriter =
new FMClassificationModel.FMClassificationModelWriter(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,14 @@ class LinearSVC @Since("2.2.0") (
@Since("3.1.0")
def setMaxBlockSizeInMB(value: Double): this.type = set(maxBlockSizeInMB, value)

private[spark] override def estimateModelSize(dataset: Dataset[_]): Long = {
val numFeatures = DatasetUtils.getNumFeatures(dataset, $(featuresCol))
var size = this.estimateMatadataSize
size += Vectors.getDenseSize(numFeatures) // coefficients
size += java.lang.Double.BYTES // intercept
size
}

@Since("2.2.0")
override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra)

Expand Down Expand Up @@ -259,7 +267,7 @@ class LinearSVC @Since("2.2.0") (
if (featuresStd(i) != 0.0) rawCoefficients(i) / featuresStd(i) else 0.0
}
val intercept = if ($(fitIntercept)) rawCoefficients.last else 0.0
createModel(dataset, Vectors.dense(coefficientArray), intercept, objectiveHistory)
createModel(dataset, Vectors.dense(coefficientArray).compressed, intercept, objectiveHistory)
}

private def createModel(
Expand Down Expand Up @@ -421,6 +429,15 @@ class LinearSVCModel private[classification] (
copyValues(new LinearSVCModel(uid, coefficients, intercept), extra).setParent(parent)
}

private[spark] override def estimatedSize: Long = {
var size = this.estimateMatadataSize
if (this.coefficients != null) {
size += this.coefficients.getSizeInBytes
}
size += java.lang.Double.BYTES // intercept
size
}

@Since("2.2.0")
override def write: MLWriter = new LinearSVCModel.LinearSVCWriter(this)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.VersionUtils
import org.apache.spark.util._

/**
* Params for logistic regression.
Expand Down Expand Up @@ -1041,6 +1041,24 @@ class LogisticRegression @Since("1.2.0") (
(solution, arrayBuilder.result())
}

private[spark] override def estimateModelSize(dataset: Dataset[_]): Long = {
// TODO: get numClasses and numFeatures together from dataset
val numClasses = DatasetUtils.getNumClasses(dataset, $(labelCol))
val numFeatures = DatasetUtils.getNumFeatures(dataset, $(featuresCol))

var size = this.estimateMatadataSize
if (checkMultinomial(numClasses)) {
size += Matrices.getDenseSize(numFeatures, numClasses) // coefficientMatrix
size += Vectors.getDenseSize(numClasses) // interceptVector
} else {
size += Matrices.getDenseSize(numFeatures, 1) // coefficientMatrix
size += Vectors.getDenseSize(1) // interceptVector
}
size += java.lang.Integer.BYTES // numClasses
size += 1 // isMultinomial
size
}

@Since("1.4.0")
override def copy(extra: ParamMap): LogisticRegression = defaultCopy(extra)
}
Expand Down Expand Up @@ -1248,6 +1266,19 @@ class LogisticRegressionModel private[spark] (
}
}

private[spark] override def estimatedSize: Long = {
var size = this.estimateMatadataSize
if (this.coefficientMatrix != null) {
size += this.coefficientMatrix.getSizeInBytes
}
if (this.interceptVector != null) {
size += this.interceptVector.getSizeInBytes
}
size += java.lang.Integer.BYTES // numClasses
size += 1 // isMultinomial
size
}

@Since("1.4.0")
override def copy(extra: ParamMap): LogisticRegressionModel = {
val newModel = copyValues(new LogisticRegressionModel(uid, coefficientMatrix, interceptVector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ class MultilayerPerceptronClassifier @Since("1.5.0") (
@Since("1.5.0")
override def copy(extra: ParamMap): MultilayerPerceptronClassifier = defaultCopy(extra)

private[spark] override def estimateModelSize(dataset: Dataset[_]): Long = {
val topology = FeedForwardTopology.multiLayerPerceptron($(layers), softmaxOnTop = true)
val expectedWeightSize = topology.layers.map(_.weightSize).sum

var size = this.estimateMatadataSize
size += Vectors.getDenseSize(expectedWeightSize) // weights
size
}

/**
* Train a model using the given dataset and parameters.
* Developers can implement this instead of `fit()` to avoid dealing with schema validation
Expand Down Expand Up @@ -328,6 +337,14 @@ class MultilayerPerceptronClassificationModel private[ml] (
copyValues(copied, extra)
}

private[spark] override def estimatedSize: Long = {
var size = this.estimateMatadataSize
if (this.weights != null) {
size += this.weights.getSizeInBytes
}
size
}

@Since("2.0.0")
override def write: MLWriter =
new MultilayerPerceptronClassificationModel.MultilayerPerceptronClassificationModelWriter(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,22 @@ class NaiveBayes @Since("1.5.0") (
new NaiveBayesModel(uid, pi.compressed, theta.compressed, sigma.compressed)
}

private[spark] override def estimateModelSize(dataset: Dataset[_]): Long = {
val numClasses = DatasetUtils.getNumClasses(dataset, $(labelCol))
val numFeatures = DatasetUtils.getNumFeatures(dataset, $(featuresCol))

var size = this.estimateMatadataSize
size += Vectors.getDenseSize(numClasses) // pi
size += Matrices.getDenseSize(numClasses, numFeatures) // theta
$(modelType) match {
case Multinomial | Bernoulli | Complement =>
size += Matrices.getDenseSize(0, 0) // sigma
case _ =>
size += Matrices.getDenseSize(numClasses, numFeatures) // sigma
}
size
}

@Since("1.5.0")
override def copy(extra: ParamMap): NaiveBayes = defaultCopy(extra)
}
Expand Down Expand Up @@ -551,6 +567,20 @@ class NaiveBayesModel private[ml] (
}
}

private[spark] override def estimatedSize: Long = {
var size = this.estimateMatadataSize
if (this.pi != null) {
size += this.pi.getSizeInBytes
}
if (this.theta != null) {
size += this.theta.getSizeInBytes
}
if (this.sigma != null) {
size += this.sigma.getSizeInBytes
}
size
}

@Since("1.5.0")
override def copy(extra: ParamMap): NaiveBayesModel = {
copyValues(new NaiveBayesModel(uid, pi, theta, sigma).setParent(this.parent), extra)
Expand Down
5 changes: 5 additions & 0 deletions mllib/src/main/scala/org/apache/spark/ml/param/params.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.annotation.Since
import org.apache.spark.ml.linalg.{JsonMatrixConverter, JsonVectorConverter, Matrix, Vector}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.SizeEstimator

/**
* A param with self-contained documentation and optionally default value. Primitive-typed param
Expand Down Expand Up @@ -647,6 +648,10 @@ case class ParamPair[T] @Since("1.2.0") (
*/
trait Params extends Identifiable with Serializable {

private[ml] def estimateMatadataSize: Long = {
SizeEstimator.estimate((this.paramMap, this.defaultParamMap, this.uid))
}

/**
* Returns all params sorted by their names. The default implementation uses Java reflection to
* list all public methods that have no arguments and return [[Param]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,15 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S

@Since("1.6.0")
override def copy(extra: ParamMap): AFTSurvivalRegression = defaultCopy(extra)

private[spark] override def estimateModelSize(dataset: Dataset[_]): Long = {
val numFeatures = DatasetUtils.getNumFeatures(dataset, $(featuresCol))

var size = this.estimateMatadataSize
size += Vectors.getDenseSize(numFeatures) // coefficients
size += java.lang.Double.BYTES * 2 // intercept, scale
size
}
}

@Since("1.6.0")
Expand Down Expand Up @@ -469,6 +478,15 @@ class AFTSurvivalRegressionModel private[ml] (
.setParent(parent)
}

private[spark] override def estimatedSize: Long = {
var size = this.estimateMatadataSize
if (this.coefficients != null) {
size += this.coefficients.getSizeInBytes
}
size += java.lang.Double.BYTES * 2 // intercept, scale
size
}

@Since("1.6.0")
override def write: MLWriter =
new AFTSurvivalRegressionModel.AFTSurvivalRegressionModelWriter(this)
Expand Down
Loading