All Projects → HCADatalab → Powderkeg

HCADatalab / Powderkeg

Licence: epl-1.0
Live-coding the cluster!

Programming Languages

clojure
4091 projects

Labels

Projects that are alternatives of or similar to Powderkeg

Quicksql
A Flexible, Fast, Federated(3F) SQL Analysis Middleware for Multiple Data Sources
Stars: ✭ 1,821 (+1098.03%)
Mutual labels:  spark
Nd4j
Fast, Scientific and Numerical Computing for the JVM (NDArrays)
Stars: ✭ 1,742 (+1046.05%)
Mutual labels:  spark
Spark With Python
Fundamentals of Spark with Python (using PySpark), code examples
Stars: ✭ 150 (-1.32%)
Mutual labels:  spark
Sparkling Graph
SparklingGraph provides easy to use set of features that will give you ability to proces large scala graphs using Spark and GraphX.
Stars: ✭ 139 (-8.55%)
Mutual labels:  spark
Rasterframes
Geospatial Raster support for Spark DataFrames
Stars: ✭ 142 (-6.58%)
Mutual labels:  spark
Spark Cassandra Connector
DataStax Spark Cassandra Connector
Stars: ✭ 1,816 (+1094.74%)
Mutual labels:  spark
Apache Spark Node
Node.js bindings for Apache Spark DataFrame APIs
Stars: ✭ 136 (-10.53%)
Mutual labels:  spark
Spark Ml Source Analysis
spark ml 算法原理剖析以及具体的源码实现分析
Stars: ✭ 1,873 (+1132.24%)
Mutual labels:  spark
Spark Authorizer
A Spark SQL extension which provides SQL Standard Authorization for Apache Spark
Stars: ✭ 141 (-7.24%)
Mutual labels:  spark
Cc Pyspark
Process Common Crawl data with Python and Spark
Stars: ✭ 147 (-3.29%)
Mutual labels:  spark
Ecommercerecommendsystem
商品大数据实时推荐系统。前端:Vue + TypeScript + ElementUI,后端 Spring + Spark
Stars: ✭ 139 (-8.55%)
Mutual labels:  spark
Data science blogs
A repository to keep track of all the code that I end up writing for my blog posts.
Stars: ✭ 139 (-8.55%)
Mutual labels:  spark
Datacompy
Pandas and Spark DataFrame comparison for humans
Stars: ✭ 147 (-3.29%)
Mutual labels:  spark
Isolation Forest
A Spark/Scala implementation of the isolation forest unsupervised outlier detection algorithm.
Stars: ✭ 139 (-8.55%)
Mutual labels:  spark
Aztk
AZTK powered by Azure Batch: On-demand, Dockerized, Spark Jobs on Azure
Stars: ✭ 152 (+0%)
Mutual labels:  spark
Spark On Lambda
Apache Spark on AWS Lambda
Stars: ✭ 137 (-9.87%)
Mutual labels:  spark
Technology Talk
汇总java生态圈常用技术框架、开源中间件,系统架构、数据库、大公司架构案例、常用三方类库、项目管理、线上问题排查、个人成长、思考等知识
Stars: ✭ 12,136 (+7884.21%)
Mutual labels:  spark
Spark Tsne
Distributed t-SNE via Apache Spark
Stars: ✭ 151 (-0.66%)
Mutual labels:  spark
Benchm Ml
A minimal benchmark for scalability, speed and accuracy of commonly used open source implementations (R packages, Python scikit-learn, H2O, xgboost, Spark MLlib etc.) of the top machine learning algorithms for binary classification (random forests, gradient boosted trees, deep neural networks etc.).
Stars: ✭ 1,835 (+1107.24%)
Mutual labels:  spark
Pyspark Learning
Updated repository
Stars: ✭ 147 (-3.29%)
Mutual labels:  spark

powderkeg

Live-coding the cluster!

Circle CI status

Usage

First, add Powderkeg and Spark (1.5 to 2.1 are supported, any scala version) to your dependencies:

E.g.

:dependencies [[hcadatalab/powderkeg "0.5.1"]
               [com.esotericsoftware/kryo-shaded "4.0.0"]  ;; For Spark 2.x support
               [org.apache.spark/spark-core_2.11 "2.1.0"]
               [org.apache.spark/spark-streaming_2.11 "2.1.0"]]

or

:dependencies [[hcadatalab/powderkeg "0.5.1"]
               [org.apache.spark/spark-core_2.10 "1.5.2"]
               [org.apache.spark/spark-streaming_2.10 "1.5.2"]]

Then launch a repl (eg lein repl):

(require '[powderkeg.core :as keg])
(require '[net.cgrand.xforms :as x])

(keg/connect! "spark://macbook-pro.home:7077") ; change uri, "local[2]" can do but that's no fun...

; sample lifted from sparkling
(into [] ; no collect, plain Clojure
  (keg/rdd ["This is a firest line"  ; here we provide data from a clojure collection.
            "Testing spark"
            "and powderkeg"
            "Happy hacking!"]
   (filter #(.contains % "spark")))) ; plain standard transducer, no new API
;=> ["Testing spark"]

Goals

  • Good REPL even (and especially) against a real Spark cluster (as opposed to a local same-JVM node).
  • No AOT.
  • Leveraging Clojure.
  • Not reimplementing every Spark method.

Documentation

Joining a cluster

Powderkeg had been tested against standalone and YARN clusters.

(keg/connect! url) and (keg/disconnect!) are meant only to be used at the repl.

You can also use ./spark-submit --master spark:... --class powderkeg.repl your.jar.

For a socket repl, use: ./spark-submit --conf 'spark.driver.extraJavaOptions=-Dclojure.server.repl="{:port 5555 :accept clojure.core.server/repl}"' --master spark:... --class powderkeg.repl your.jar

Applications should use: ./spark-submit --master spark:... --class powderkeg.repl your.jar your.ns/main-fn app-arg0 ....

Where your.ns/main-fn is a vararg function (which receives arguments as strings) and the entry point of the application. *sc* will be bound.

Creating a RDD

Currently one can only creates a RDD from a collection (including nil) or another RDD (either returned by a keg function or by good old interop).

To add a new kind of source for RDDs, one has to extend the RDDable protocol.

The main function of powderkeg is rdd which takes a source as first argument and zero or more transducers and then options.

(keg/rdd (range 100)     ; source
  (filter odd?)          ; 1st transducer to apply
  (map inc)              ; 2nd transducer
  :partitions 2)         ; and options

This returns a plain org.apache.spark.api.java.JavaRDD instance. No wrapper. At some point it may return a scala RDD. Bypassing the Java api layer is under consideration but it wouldn't change the overall design.

Getting results

RDDs can be reduced so all you have to do is call into on them.

(into [] (keg/rdd (range 10)))
;=> [0 1 2 3 4 5 6 7 8 9]

You may encounter erroneous results with some stateful transducers. However if it's from a very narrow class of stateful transducers: stateful transducers which calls the underlying reducing function on completion (eg partition-by). Furthermore you usually don't want to perform expensive computation (and data transfer) on the driver, so most of the time the only transducers you want to use with into is take. Others tend to be best pushed into the RDD.

This being said if you really have to use a "flushing" transducer then you to work around the issue by either using scomp instead of comp to create a transducer whose reducing functions states will be correctly passed around cluster nodes. Or by using keg/into which is really just into with implicit scomp.

;; Both are safe
(into [] (keg/scomp (take 5)) (range 10))
(keg/into [] (take 5) (range 10))
;; in truth, take is not a problematic transducer and doesn't need to be protected.

;; partition-by is problematic.
(into [] (partition-by #(quot % 6)) (keg/rdd (range 20)))
=> [[0 1 2 3 4 5] [6 7 8 9 10 11] [12 13 14 15 16 17]]
(keg/into [] (partition-by #(quot % 6)) (keg/rdd (range 20)))
=> [[0 1 2 3 4 5] [6 7 8 9 10 11] [12 13 14 15 16 17] [18 19]]
(into [] (keg/scomp (partition-by #(quot % 6))) (keg/rdd (range 20)))
=> [[0 1 2 3 4 5] [6 7 8 9 10 11] [12 13 14 15 16 17] [18 19]]

scomp is still useful if you use reduce, eduction or transduce instead of keg/into but, again, think twice before doing that: can't you push the computation in the RDD?

Pair RDDs

Pair RDDs are just RDDs that happen to contain pairs (scala.Tuple2 to be exact).

Powderkeg automatically converts from/to map entries and 2-item vectors to/from tuple2. Furthermore this conversion may be optimized away when using xforms transducers supporting (deconstructed) pairs.

So (keg/rdd {:a 1 :b 2}) is a pair rdd.

(keg/rdd (range 100) (map (fn [n] [(mod n 7) n]))) and (keg/rdd (range 100) (x/for [n %] [(mod n 7) n])) defines the same pair RDD but the second one doesn't allocate intermediate vectors.

However RDDs created with rdd are not partitioned. To partition a RDD you have to use by-key instead.

=> (keg/rdd (range 100) (x/for [n %] [(mod n 7) n]))
#object[org.apache.spark.api.java.JavaRDD 0x16a8a7f3 "MapPartitionsRDD[164] at mapPartitions at form-init6760256745287702501.clj:7"]
=> (-> *1 .partitioner .orNull)
nil
=> (keg/by-key *1)
#object[org.apache.spark.api.java.JavaRDD 0x29925883 "ShuffledRDD[167] at partitionBy at core.clj:232"]
=> (-> *1 .partitioner .orNull)
#object[org.apache.spark.HashPartitioner 0x637140f3 "[email protected]"]

When the input of by-key is not partitioned yet, it's automatically partitioned on the first item of the pair. Here partitioning means that data is shuffled accross the workers so that for a key, all its pairs are in the same partition (and thus on the same worker).

When the input is not already keyed – or the existing key is not the correct one — the :key option allows to specify a keying function:

=> (keg/by-key (range 100) :key #(mod % 7))
#object[org.apache.spark.api.java.JavaRDD 0x145f61ff "ShuffledRDD[174] at partitionBy at core.clj:232"]
=> (-> *1 .partitioner .orNull)
#object[org.apache.spark.HashPartitioner 0x6dc001ee "[email protected]"]

key-by can also be used to transform values (the pairs second items):

=> (into {}
     (keg/by-key (range 100) :key odd?
       (x/reduce +)))
{false 2450, true 2500}

However when by-key shuffles data around, transformations are applied twice. In the above example that's the case and it's a good thing: values are locally summed before being shuffled and summed again. This works nicely because summing is associative. But more often than not the computation to perform is not associative and having it run twice will lead to either failure or, worst, erroneous results.

=> (into {}
     (keg/by-key {:a 1 :b 2}
       (map inc)))
{:b 4, :a 3} ; oops
;; this doesn't happen when the input is already partitioned:
=> (into {}
     (keg/by-key (keg/by-key {:a 1 :b 2})
       (map inc)
       :shuffle nil))
{:b 3, :a 2}
;; or when we disable partitioning (*)
=> (into {}
     (keg/by-key {:a 1 :b 2}
       (map inc)
       :shuffle nil))
{:a 2, :b 3}
;; or when we perform the computation before shuffling (*)
=> (into {}
     (keg/by-key {:a 1 :b 2}
       :pre (map inc)))
{:b 3, :a 2}
;; or after
=> (into {}
     (keg/by-key {:a 1 :b 2}
       :post (map inc)))
{:b 3, :a 2}
;; (*) if the transformation is an aggregation you may get erroneous results.

All these options may be combined

=> (into {}
     (keg/by-key (range 100) :key odd?
       :pre (map inc)
       (x/reduce +)
       :post (map str)))
{false "2500", true "2550"}

by-key options syntax is relaxed enough: anything preceded by a keyword is an option, else it's a transformation. That's why in the above example the transformation occur in the middle of named options.

TODO

Streams (in progress), broadcast helpers, dataframes...

License

Copyright © 2015-2017 HCA

Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].