All Projects → isarn → isarn-sketches-spark

isarn / isarn-sketches-spark

Licence: Apache-2.0 license
Routines and data structures for using isarn-sketches idiomatically in Apache Spark

Programming Languages

scala
5932 projects
python
139335 projects - #7 most used programming language

Projects that are alternatives of or similar to isarn-sketches-spark

Spark With Python
Fundamentals of Spark with Python (using PySpark), code examples
Stars: ✭ 150 (+435.71%)
Mutual labels:  apache-spark, pyspark, dataframe
aut
The Archives Unleashed Toolkit is an open-source toolkit for analyzing web archives.
Stars: ✭ 111 (+296.43%)
Mutual labels:  apache-spark, pyspark, dataframe
Mmlspark
Simple and Distributed Machine Learning
Stars: ✭ 2,899 (+10253.57%)
Mutual labels:  apache-spark, pyspark
Sparkflow
Easy to use library to bring Tensorflow on Apache Spark
Stars: ✭ 282 (+907.14%)
Mutual labels:  apache-spark, dataframe
Mobius
C# and F# language binding and extensions to Apache Spark
Stars: ✭ 929 (+3217.86%)
Mutual labels:  apache-spark, dataframe
pyspark-cheatsheet
PySpark Cheat Sheet - example code to help you learn PySpark and develop apps faster
Stars: ✭ 115 (+310.71%)
Mutual labels:  apache-spark, pyspark
pyspark-asyncactions
Asynchronous actions for PySpark
Stars: ✭ 30 (+7.14%)
Mutual labels:  apache-spark, pyspark
Pyspark Boilerplate
A boilerplate for writing PySpark Jobs
Stars: ✭ 318 (+1035.71%)
Mutual labels:  apache-spark, pyspark
jupyterlab-sparkmonitor
JupyterLab extension that enables monitoring launched Apache Spark jobs from within a notebook
Stars: ✭ 78 (+178.57%)
Mutual labels:  apache-spark, pyspark
Awesome Spark
A curated list of awesome Apache Spark packages and resources.
Stars: ✭ 1,061 (+3689.29%)
Mutual labels:  apache-spark, pyspark
Pyspark Stubs
Apache (Py)Spark type annotations (stub files).
Stars: ✭ 98 (+250%)
Mutual labels:  apache-spark, pyspark
Spark-for-data-engineers
Apache Spark for data engineers
Stars: ✭ 22 (-21.43%)
Mutual labels:  apache-spark, pyspark
Quinn
pyspark methods to enhance developer productivity 📣 👯 🎉
Stars: ✭ 217 (+675%)
Mutual labels:  apache-spark, pyspark
mmtf-workshop-2018
Structural Bioinformatics Training Workshop & Hackathon 2018
Stars: ✭ 50 (+78.57%)
Mutual labels:  apache-spark, pyspark
SynapseML
Simple and Distributed Machine Learning
Stars: ✭ 3,355 (+11882.14%)
Mutual labels:  apache-spark, pyspark
Spark Gotchas
Spark Gotchas. A subjective compilation of the Apache Spark tips and tricks
Stars: ✭ 308 (+1000%)
Mutual labels:  apache-spark, pyspark
datalake-etl-pipeline
Simplified ETL process in Hadoop using Apache Spark. Has complete ETL pipeline for datalake. SparkSession extensions, DataFrame validation, Column extensions, SQL functions, and DataFrame transformations
Stars: ✭ 39 (+39.29%)
Mutual labels:  apache-spark, pyspark
Sparkora
Powerful rapid automatic EDA and feature engineering library with a very easy to use API 🌟
Stars: ✭ 51 (+82.14%)
Mutual labels:  apache-spark, pyspark
Live log analyzer spark
Spark Application for analysis of Apache Access logs and detect anamolies! Along with Medium Article.
Stars: ✭ 14 (-50%)
Mutual labels:  apache-spark, pyspark
Azure Cosmosdb Spark
Apache Spark Connector for Azure Cosmos DB
Stars: ✭ 165 (+489.29%)
Mutual labels:  apache-spark, pyspark

isarn-sketches-spark

Routines and data structures for using isarn-sketches idiomatically in Apache Spark

API documentation

https://isarn.github.io/isarn-sketches-spark/latest/api/#org.isarnproject.sketches.spark

How to use in your project

// Note that the version of spark is part of the release name.
// This example is for spark 3.2:
libraryDependencies += "org.isarnproject" %% "isarn-sketches-spark" % "0.6.0-sp3.2"

// This package builds against some `% Provided` Apache Spark dependencies:
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion
libraryDependencies += "org.apache.spark" %% "spark-mllib" % sparkVersion

Python code for supporting pyspark is also packaged with all of the artifacts above. Spark will automatically extract and compile Python components for use with PySpark, as illustrated in pyspark examples below. Python 2 is no longer supported, but may work.

Beginning with isarn-sketches-spark 0.5.0, only spark >= 3.0 is supported, due to substantial changes to Spark's API for user defined aggregation. Some context on these changes is available in this Spark Summmit talk.

Versions of Spark and Scala that are currently supported:

  • v0.6.0: spark 3.2, scala 2.12
  • v0.5.2: spark 3.0, scala 2.12

If you are interested in a spark/scala build that is not listed above, please contact me and/or file an issue!

How to use from the Spark CLI

Several Spark CLI tools accept the --packages argument, including spark-shell and pyspark. Following is an example using spark-shell:

$ spark-shell --packages "org.isarnproject:isarn-sketches-spark_2.12:0.6.0-sp3.2"

Note that you need to explicitly include the scala version as part of the package name.

Examples

Sketch a numeric column (scala)

scala> import org.isarnproject.sketches.java.TDigest, org.isarnproject.sketches.spark.tdigest._, scala.util.Random._

scala> val data = spark.createDataFrame(Vector.fill(1000){(nextInt(10), nextGaussian)})
data: org.apache.spark.sql.DataFrame = [_1: int, _2: double]

scala> val udf = TDigestAggregator.udf[Double](compression = 0.2, maxDiscrete = 25)
udf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedAggregator( ...

scala> val agg = data.agg(udf($"_1"), udf($"_2")).first
agg: org.apache.spark.sql.Row = [TDigest(0.0 -> (105.0, 105.0), ...

scala> val (td1, td2) = (agg.getAs[TDigest](0), agg.getAs[TDigest](1))
td1: org.isarnproject.sketches.java.TDigest = TDigest(0.0 -> (105.0, 105.0), ...

scala> (td1.cdf(2), td2.cdf(2))
res0: (Double, Double) = (0.2365,0.9682691795524728)

scala> (td1.samplePMF, td2.samplePDF)
res1: (Double, Double) = (8.0,-0.6712314520185372)

Sketch a numeric array column (scala)

scala> import org.isarnproject.sketches.java.TDigest, org.isarnproject.sketches.spark.tdigest._, scala.util.Random._

scala> val data = spark.createDataFrame(Vector.fill(1000){(nextInt(10), Vector.fill(5){nextGaussian})})
data: org.apache.spark.sql.DataFrame = [_1: int, _2: array<double>]

scala> val udf1 = TDigestAggregator.udf[Int](maxDiscrete = 25)
udf1: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedAggregator( ...

scala> val udf2 = TDigestArrayAggregator.udf[Double](compression = 0.5)
udf2: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedAggregator( ...

scala> val agg = data.agg(udf1($"_1"), udf2($"_2")).first
agg: org.apache.spark.sql.Row = [TDigest(0.0 -> (104.0, 104.0), ...

scala> agg.getAs[TDigest](0).samplePMF
res0: Double = 9.0

scala> agg.getAs[Seq[TDigest]](1).map(_.samplePDF)
res1: Seq[Double] = ArrayBuffer(-0.40804832001013713, -0.5952280168647848, -0.4973297020191356, -0.9404857531406351, 2.347194542873795)

Sketch a column of ML Vector (scala)

scala> import org.isarnproject.sketches.java.TDigest, org.isarnproject.sketches.spark.tdigest._, scala.util.Random._, org.apache.spark.ml.linalg.Vectors

scala> val data = spark.createDataFrame(Vector.fill(1000){(nextInt(10), Vectors.dense(nextGaussian,nextGaussian,nextGaussian))})
data: org.apache.spark.sql.DataFrame = [_1: int, _2: vector]

scala> val udf = TDigestMLVecAggregator.udf(compression = 0.5, maxDiscrete = 0)
udf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedAggregator( ...

scala> val agg = data.agg(udf($"_2")).first
agg: org.apache.spark.sql.Row = [WrappedArray(TDigest(-3.881918499979969 -> (1.0, 1.0), ...

scala> val samples = agg.getAs[Seq[TDigest]](0).map(_.samplePDF)
samples: Seq[Double] = ArrayBuffer(0.28973374164214244, 0.4981749043377094, -0.8945453848202787)

scala> val medians = agg.getAs[Seq[TDigest]](0).map(_.cdfInverse(0.5))
medians: Seq[Double] = ArrayBuffer(0.03123637037282659, -0.07172360154570709, -0.04260955558310061)

Sketch a column of MLLib Vector (scala)

scala> import org.isarnproject.sketches.java.TDigest, org.isarnproject.sketches.spark.tdigest._, scala.util.Random._, org.apache.spark.mllib.linalg.Vectors

scala> val data = spark.createDataFrame(Vector.fill(1000){(nextInt(10), Vectors.dense(nextGaussian,nextGaussian,nextGaussian))})
data: org.apache.spark.sql.DataFrame = [_1: int, _2: vector]

scala> val udf = TDigestMLLibVecAggregator.udf(compression = 0.5, maxDiscrete = 0)
udf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedAggregator( ...

scala> val agg = data.agg(udf($"_2")).first
agg: org.apache.spark.sql.Row = [WrappedArray(TDigest(-3.7272857909819344 -> (1.0, 1.0), ...

scala> val samples = agg.getAs[Seq[TDigest]](0).map(_.samplePDF)
samples: Seq[Double] = ArrayBuffer(0.8780228679691738, -0.7636457587390327, 2.213341479782084)

scala> val medians = agg.getAs[Seq[TDigest]](0).map(_.cdfInverse(0.5))
medians: Seq[Double] = ArrayBuffer(-0.01676307618586101, 0.03846529110807051, -0.029124197911563777)

Reduce a column (or grouping) of T-Digests (scala)

scala> import org.isarnproject.sketches.java.TDigest, org.isarnproject.sketches.spark.tdigest._, scala.util.Random._

scala> val data = spark.createDataFrame(Vector.fill(5000){(nextInt(5), nextGaussian)}).toDF("g", "x")
data: org.apache.spark.sql.DataFrame = [g: int, x: double]

scala> val udf = TDigestAggregator.udf[Double]()
udf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedAggregator( ...

scala> val grp = data.groupBy("g").agg(udf($"x").alias("tdigests"))
grp: org.apache.spark.sql.DataFrame = [g: int, tdigests: tdigest]

scala> grp.show()
+---+--------------------+                                                      
|  g|            tdigests|
+---+--------------------+
|  1|TDigest(-3.054140...|
|  3|TDigest(-3.368392...|
|  4|TDigest(-3.439268...|
|  2|TDigest(-3.927057...|
|  0|TDigest(-3.169235...|
+---+--------------------+

scala> val udfred = TDigestReduceAggregator.udf(compression = 0.7)
udfred: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedAggregator( ...

scala> val agg = grp.agg(udfred($"tdigests")).first
agg: org.apache.spark.sql.Row = [TDigest(-3.9270575018252663 -> (1.0, 1.0), ...

scala> val sample = agg.getAs[TDigest](0).sample
sample: Double = 0.6633542575218054

Reduce a column (or grouping) of T-Digest Arrays (scala)

scala> import org.isarnproject.sketches.java.TDigest, org.isarnproject.sketches.spark.tdigest._, scala.util.Random._

scala> val data = spark.createDataFrame(Vector.fill(5000){(nextInt(5), Vector.fill(3) { nextGaussian })}).toDF("g", "x")
data: org.apache.spark.sql.DataFrame = [g: int, x: array<double>]

scala> val udf = TDigestArrayAggregator.udf[Double]()
udf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedAggregator( ...

scala> val grp = data.groupBy("g").agg(udf($"x").alias("tdigests"))
grp: org.apache.spark.sql.DataFrame = [g: int, tdigests: array<tdigest>]

scala> grp.show()
+---+--------------------+                                                      
|  g|            tdigests|
+---+--------------------+
|  1|[TDigest(-3.25266...|
|  3|[TDigest(-3.05690...|
|  4|[TDigest(-3.66651...|
|  2|[TDigest(-3.46231...|
|  0|[TDigest(-2.94039...|
+---+--------------------+

scala> val udfred = TDigestArrayReduceAggregator.udf(compression = 0.7)
udfred: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedAggregator( ...

scala> val agg = grp.agg(udfred($"tdigests")).first
agg: org.apache.spark.sql.Row = [WrappedArray(TDigest(-3.6665115390677423 -> (1.0, 1.0), ...

scala> val samples = agg.getAs[Seq[TDigest]](0).map(_.sample)
samples: Seq[Double] = ArrayBuffer(-0.741335878221013, 0.981730493526761, -0.6359834079354106)

Sketch a numeric column (python)

>>> from random import gauss, randint
>>> from isarnproject.sketches.spark.tdigest import *
>>> data = spark.createDataFrame([[randint(1,10),gauss(0,1)] for x in range(1000)])
>>> udf1 = tdigestIntUDF("_1", maxDiscrete = 25)
>>> udf2 = tdigestDoubleUDF("_2", compression = 0.5)
>>> agg = data.agg(udf1, udf2).first()
>>> agg[0].samplePMF()
2.0
>>> agg[1].samplePDF()
-0.8707704090068431

Sketch a numeric array column (python)

>>> from random import gauss, randint
>>> from isarnproject.sketches.spark.tdigest import *
>>> data = spark.createDataFrame([[[gauss(0,1),gauss(0,1),gauss(0,1)]] for x in range(1000)])
>>> udf = tdigestDoubleArrayUDF("_1", compression = 0.7)
>>> agg = data.agg(udf).first()
>>> [td.samplePDF() for td in agg[0]]
[0.6802628605487977, -0.649936837383734, -0.84228662547744]

Sketch a column of ML Vectors (python)

>>> from random import gauss, randint
>>> from isarnproject.sketches.spark.tdigest import *
>>> from pyspark.ml.linalg import Vectors
>>> data = spark.createDataFrame([[Vectors.dense([gauss(0,1),gauss(0,1),gauss(0,1)])] for x in range(1000)])
>>> udf = tdigestMLVecUDF("_1", compression = 0.7)
>>> agg = data.agg(udf).first()
>>> [td.cdfInverse(0.5) for td in agg[0]]
[-0.03088430803668949, -0.002903353148573491, 0.01640559766046329]

Sketch a column of MLLib Vectors (python)

>>> from random import gauss, randint
>>> from isarnproject.sketches.spark.tdigest import *
>>> from pyspark.mllib.linalg import Vectors
>>> data = spark.createDataFrame([[Vectors.dense([gauss(0,1),gauss(0,1),gauss(0,1)])] for x in range(1000)])
>>> udf = tdigestMLLibVecUDF("_1", compression = 0.7)
>>> agg = data.agg(udf).first()
>>> [td.cdfInverse(0.5) for td in agg[0]]                                       
[-0.03390700592837903, -0.0479047778031452, -0.02260427238692181]

Reduce a column (or grouping) of T-Digests (python)

>>> from random import gauss, randint
>>> from isarnproject.sketches.spark.tdigest import *
>>> data = spark.createDataFrame([[randint(1,5), gauss(0,1)] for x in range(5000)]).toDF("g","x")
>>> grp = data.groupBy("g").agg(tdigestDoubleUDF("x").alias("tdigests"))
>>> grp.show()
+---+--------------------+
|  g|            tdigests|
+---+--------------------+
|  5|TDigest(-2.907724...|
|  1|TDigest(-2.914628...|
|  3|TDigest(-3.288239...|
|  2|TDigest(-3.389084...|
|  4|TDigest(-3.507597...|
+---+--------------------+

>>> udf = tdigestReduceUDF("tdigests")
>>> agg = grp.agg(udf).first()
>>> agg[0].sample()                                                             
-0.14793866496592997

Reduce a column (or grouping) of T-Digest Arrays (python)

>>> from random import gauss, randint
>>> from isarnproject.sketches.spark.tdigest import *
>>> data = spark.createDataFrame([[randint(1,5), [gauss(0,1),gauss(0,1),gauss(0,1)]] for x in range(5000)]).toDF("g","x")
>>> grp = data.groupBy("g").agg(tdigestDoubleArrayUDF("x").alias("tdigests"))
>>> grp.show()
+---+--------------------+                                                      
|  g|            tdigests|
+---+--------------------+
|  5|[TDigest(-3.38098...|
|  1|[TDigest(-2.88380...|
|  3|[TDigest(-3.40987...|
|  2|[TDigest(-3.75224...|
|  4|[TDigest(-2.66571...|
+---+--------------------+

>>> udf = tdigestArrayReduceUDF("tdigests")
>>> agg = grp.agg(udf).first()
>>> [td.cdfInverse(0.5) for td in agg[0]]                                       
[-0.04635615835441749, -0.025723034166600753, -0.025168480174964893]

Compute feature importance with respect to a predictive model

scala> import org.isarnproject.pipelines.spark.fi.{TDigestFI,TDigestFIModel}, org.apache.spark.ml.regression.LinearRegression

scala> val training = spark.read.format("libsvm").load("data/mllib/sample_linear_regression_data.txt")
training: org.apache.spark.sql.DataFrame = [label: double, features: vector]    

scala> val lr = new LinearRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
lr: org.apache.spark.ml.regression.LinearRegression = linReg_5d7a1cf3dafa

scala> val lrModel = lr.fit(training)
lrModel: org.apache.spark.ml.regression.LinearRegressionModel = LinearRegressionModel: uid=linReg_5d7a1cf3dafa, numFeatures=10

scala> val fi = new TDigestFI().setCompression(0.3).setMaxDiscrete(10)
fi: org.isarnproject.pipelines.spark.fi.TDigestFI = TDigestFI_6837561844f2

scala> val fiMod = fi.fit(training).setTargetModel(lrModel).setDeviationMeasure("rms-dev").setFeatureNames(Array.tabulate(10){j=>s"x$j"})
fiMod: org.isarnproject.pipelines.spark.fi.TDigestFIModel = TDigestFI_6837561844f2

scala> val imp = fiMod.transform(training)
imp: org.apache.spark.sql.DataFrame = [name: string, importance: double]

scala> imp.show
+----+-------------------+
|name|         importance|
+----+-------------------+
|  x0|                0.0|
|  x1| 0.2642731504552658|
|  x2| 0.2775267570310568|
|  x3|   1.48027354456237|
|  x4| 0.0442095774509019|
|  x5|  0.620636336433091|
|  x6|                0.0|
|  x7|0.12650113005096197|
|  x8| 0.1644528333598182|
|  x9| 0.1883875750326046|
+----+-------------------+

Compute feature importance with respect to a predictive model (python)

>>> from isarnproject.pipelines.spark.fi import *
>>> from pyspark.ml.regression import LinearRegression
>>> training = spark.read.format("libsvm").load("data/mllib/sample_linear_regression_data.txt")
>>> lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)        
>>> lrModel = lr.fit(training)
>>> fi = TDigestFI(compression = 0.3, maxDiscrete = 10)
>>> fiMod = fi.fit(training).setTargetModel(lrModel).setDeviationMeasure("rms-dev").setFeatureNames(["x%d" % (j) for j in range(10)])
>>> imp = fiMod.transform(training)
>>> imp.show()
+----+--------------------+
|name|          importance|
+----+--------------------+
|  x0|                 0.0|
|  x1|  0.2617304778862077|
|  x2| 0.26451433792352613|
|  x3|  1.5244246022297059|
|  x4|0.043227915487816015|
|  x5|  0.6195605571925815|
|  x6|                 0.0|
|  x7| 0.11735009989902982|
|  x8| 0.17250227692634765|
|  x9| 0.18251143533748138|
+----+--------------------+
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].