All Projects → cerndb → Dist Keras

cerndb / Dist Keras

Licence: gpl-3.0
Distributed Deep Learning, with a focus on distributed training, using Keras and Apache Spark.

Programming Languages

python
139335 projects - #7 most used programming language

Projects that are alternatives of or similar to Dist Keras

Griffon Vm
Griffon Data Science Virtual Machine
Stars: ✭ 128 (-79.12%)
Mutual labels:  data-science, hadoop, apache-spark
Pysparkling
A pure Python implementation of Apache Spark's RDD and DStream interfaces.
Stars: ✭ 231 (-62.32%)
Mutual labels:  data-science, apache-spark
Scalable Data Science
Scalable Data Science, course sets in big data Using Apache Spark over databricks and their mathematical, statistical and computational foundations using SageMath.
Stars: ✭ 142 (-76.84%)
Mutual labels:  data-science, apache-spark
sparkucx
A high-performance, scalable and efficient ShuffleManager plugin for Apache Spark, utilizing UCX communication layer
Stars: ✭ 32 (-94.78%)
Mutual labels:  apache-spark, hadoop
Pulsar Spark
When Apache Pulsar meets Apache Spark
Stars: ✭ 55 (-91.03%)
Mutual labels:  data-science, apache-spark
Learn machine learning
Road to Machine Learning
Stars: ✭ 81 (-86.79%)
Mutual labels:  data-science, hadoop
datalake-etl-pipeline
Simplified ETL process in Hadoop using Apache Spark. Has complete ETL pipeline for datalake. SparkSession extensions, DataFrame validation, Column extensions, SQL functions, and DataFrame transformations
Stars: ✭ 39 (-93.64%)
Mutual labels:  apache-spark, hadoop
Spark With Python
Fundamentals of Spark with Python (using PySpark), code examples
Stars: ✭ 150 (-75.53%)
Mutual labels:  hadoop, apache-spark
leaflet heatmap
简单的可视化湖州通话数据 假设数据量很大,没法用浏览器直接绘制热力图,把绘制热力图这一步骤放到线下计算分析。使用Apache Spark并行计算数据之后,再使用Apache Spark绘制热力图,然后用leafletjs加载OpenStreetMap图层和热力图图层,以达到良好的交互效果。现在使用Apache Spark实现绘制,可能是Apache Spark不擅长这方面的计算或者是我没有设计好算法,并行计算的速度比不上单机计算。Apache Spark绘制热力图和计算代码在这 https://github.com/yuanzhaokang/ParallelizeHeatmap.git .
Stars: ✭ 13 (-97.88%)
Mutual labels:  apache-spark, hadoop
aut
The Archives Unleashed Toolkit is an open-source toolkit for analyzing web archives.
Stars: ✭ 111 (-81.89%)
Mutual labels:  apache-spark, hadoop
Trino
Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
Stars: ✭ 4,581 (+647.31%)
Mutual labels:  data-science, hadoop
H2o 3
H2O is an Open Source, Distributed, Fast & Scalable Machine Learning Platform: Deep Learning, Gradient Boosting (GBM) & XGBoost, Random Forest, Generalized Linear Modeling (GLM with Elastic Net), K-Means, PCA, Generalized Additive Models (GAM), RuleFit, Support Vector Machine (SVM), Stacked Ensembles, Automatic Machine Learning (AutoML), etc.
Stars: ✭ 5,656 (+822.68%)
Mutual labels:  data-science, hadoop
Sparkrdma
RDMA accelerated, high-performance, scalable and efficient ShuffleManager plugin for Apache Spark
Stars: ✭ 215 (-64.93%)
Mutual labels:  hadoop, apache-spark
Agile data code 2
Code for Agile Data Science 2.0, O'Reilly 2017, Second Edition
Stars: ✭ 413 (-32.63%)
Mutual labels:  data-science, apache-spark
Bigdata Playground
A complete example of a big data application using : Kubernetes (kops/aws), Apache Spark SQL/Streaming/MLib, Apache Flink, Scala, Python, Apache Kafka, Apache Hbase, Apache Parquet, Apache Avro, Apache Storm, Twitter Api, MongoDB, NodeJS, Angular, GraphQL
Stars: ✭ 177 (-71.13%)
Mutual labels:  hadoop, apache-spark
learning-hadoop-and-spark
Companion to Learning Hadoop and Learning Spark courses on Linked In Learning
Stars: ✭ 146 (-76.18%)
Mutual labels:  apache-spark, hadoop
DaFlow
Apache-Spark based Data Flow(ETL) Framework which supports multiple read, write destinations of different types and also support multiple categories of transformation rules.
Stars: ✭ 24 (-96.08%)
Mutual labels:  apache-spark, hadoop
Spark Notebook
Interactive and Reactive Data Science using Scala and Spark.
Stars: ✭ 3,081 (+402.61%)
Mutual labels:  data-science, apache-spark
Data Science Ipython Notebooks
Data science Python notebooks: Deep learning (TensorFlow, Theano, Caffe, Keras), scikit-learn, Kaggle, big data (Spark, Hadoop MapReduce, HDFS), matplotlib, pandas, NumPy, SciPy, Python essentials, AWS, and various command lines.
Stars: ✭ 22,048 (+3496.74%)
Mutual labels:  data-science, hadoop
Datasets For Recommender Systems
This is a repository of a topic-centric public data sources in high quality for Recommender Systems (RS)
Stars: ✭ 564 (-7.99%)
Mutual labels:  data-science

Distributed Keras

Distributed Deep Learning with Apache Spark and Keras.

Introduction

Distributed Keras is a distributed deep learning framework built op top of Apache Spark and Keras, with a focus on "state-of-the-art" distributed optimization algorithms. We designed the framework in such a way that a new distributed optimizer could be implemented with ease, thus enabling a person to focus on research. Several distributed methods are supported, such as, but not restricted to, the training of ensembles and models using data parallel methods.

Most of the distributed optimizers we provide, are based on data parallel methods. A data parallel method, as described in [1], is a learning paradigm where multiple replicas of a single model are used to optimize a single objective. Using this approach, we are able to dignificantly reduce the training time of a model. Depending on the parametrization, we also observed that it is possible to achieve better statistical model performance compared to a more traditional approach (e.g., like the SingleTrainer implementation), and yet, spending less wallclock time on the training of the model. However, this is subject to further research.

Attention: A rather complete introduction to the problem of Distributed Deep Learning is presented in my Master Thesis http://github.com/JoeriHermans/master-thesis. Furthermore, the thesis describes includes several novel insights, such as a redefinition of parameter staleness, and several new distributed optimizers such as AGN and ADAG.

Installation

We will guide you how to install Distributed Keras. However, we will assume that an Apache Spark installation is available. In the following subsections, we describe two approaches to achieve this.

pip

When you only require the framework for development purposes, just use pip to install dist-keras.

pip install --upgrade dist-keras

# OR

pip install --upgrade git+https://github.com/JoeriHermans/dist-keras.git

git & pip

However, if you would like to contribute, or run some of the examples. It is probably best to clone the repository directly from GitHub and install it afterwards using pip. This will also resolve possible missing dependencies.

git clone https://github.com/JoeriHermans/dist-keras
cd dist-keras
pip install -e .

General notes

.bashrc

Make sure the following variables are set in your .bashrc. It is possible, depending on your system configuration, that the following configuration doesn't have to be applied.

# Example of a .bashrc configuration.
export SPARK_HOME=/usr/lib/spark
export PYTHONPATH="$SPARK_HOME/python/:$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPATH"

Running an example

We would like to refer the reader to the workflow.ipynb notebook in the examples folder. This will give you a complete introduction to the problem of distributed deep learning, and will guide you through the steps that have to be executed.

Furthermore, we would also like to show how you exactly should process "big" datasets. This is shown in the examples starting with the prefix example_. Please execute them in the provided sequence.

Spark 2.0

If you want to run the examples using Apache Spark 2.0.0 and higher. You will need to remove the line containing sqlContext = SQLContext(sc). We need to do this because in Spark 2.0+, the SQLContext, and Hive context are now merged in the Spark session.

Optimization Algorithms

Sequential Trainer

This optimizer follows the traditional scheme of training a model, i.e., it uses sequential gradient updates to optimize the parameters. It does this by executing the training procedure on a single Spark executor.

SingleTrainer(model, features_col, label_col, batch_size, optimizer, loss, metrics=["accuracy"])

ADAG (Currently Recommended)

DOWNPOUR variant which is able to achieve significantly better statistical performance while being less sensitive to hyperparameters. This optimizer was developed using insights gained while developing this framework. More research regarding parameter staleness is still being conducted to further improve this optimizer.

ADAG(keras_model, worker_optimizer, loss, metrics=["accuracy"], num_workers=2, batch_size=32,
     features_col="features", label_col="label", num_epoch=1, communication_window=12)

Dynamic SGD

Dynamic SGD, dynamically maintains a learning rate for every worker by incorperating parameter staleness. This optimization scheme is introduced in "Heterogeneity-aware Distributed Parameter Servers" at the SIGMOD 2017 conference [5].

DynSGD(keras_model, worker_optimizer, loss, metrics=["accuracy"], num_workers=2, batch_size=32,
       features_col="features", label_col="label", num_epoch=1, communication_window=10)

Asynchronous Elastic Averaging SGD (AEASGD)

The distinctive idea of EASGD is to allow the local workers to perform more exploration (small rho) and the master to perform exploitation. This approach differs from other settings explored in the literature, and focus on how fast the center variable converges [2] .

In this section we show the asynchronous version of EASGD. Instead of waiting on the synchronization of other trainers, this method communicates the elastic difference (as described in the paper), with the parameter server. The only synchronization mechanism that has been implemented, is to ensure no race-conditions occur when updating the center variable.

AEASGD(keras_model, worker_optimizer, loss, metrics=["accuracy"], num_workers, batch_size, features_col,
       label_col, num_epoch, communication_window, rho, learning_rate)

Asynchronous Elastic Averaging Momentum SGD (AEAMSGD)

Asynchronous EAMSGD is a variant of asynchronous EASGD. It is based on the Nesterov's momentum scheme, where the update of the local worker is modified to incorepare a momentum term [2] .

EAMSGD(keras_model, worker_optimizer, loss, metrics=["accuracy"], num_workers, batch_size,
       features_col, label_col, num_epoch, communication_window, rho,
       learning_rate, momentum)

DOWNPOUR

An asynchronous stochastic gradient descent procedure introduced by Dean et al., supporting a large number of model replicas and leverages adaptive learning rates. This implementation is based on the pseudocode provided by [1] .

DOWNPOUR(keras_model, worker_optimizer, loss, metrics=["accuracy"], num_workers, batch_size,
         features_col, label_col, num_epoch, learning_rate, communication_window)

Ensemble Training

In ensemble training, we train n models in parallel on the same dataset. All models are trained in parallel, but the training of a single model is done in a sequential manner using Keras optimizers. After the training process, one can combine and, for example, average the output of the models.

EnsembleTrainer(keras_model, worker_optimizer, loss, metrics=["accuracy"], features_col,
                label_col, batch_size, num_ensembles)

Model Averaging

Model averaging is a data parallel technique which will average the trainable parameters of model replicas after every epoch.

AveragingTrainer(keras_model, worker_optimizer, loss, metrics=["accuracy"], features_col,
                 label_col, num_epoch, batch_size, num_workers)

Job deployment

We also support remote job deployment. For example, imagine you are developing your model on a local notebook using a small development set. However, in order to submit your job on a remote cluster, you first need to develop a cluster job, and run the job there. In order to simplify this process, we have developed a simplified interface for a large scale machine learning job.

In order to submit a job to a remote cluster, you simply run the following code:

# Define the distributed optimization procedure, and its parameters.
trainer = ADAG(keras_model=mlp, worker_optimizer=optimizer_mlp, loss=loss_mlp, metrics=["accuracy"], num_workers=20,
               batch_size=32, communication_window=15, num_epoch=1,
               features_col="features_normalized_dense", label_col="label_encoded")

# Define the job parameters.
job = Job(secret, job_name, data_path, num_executors, num_processes, trainer)
job.send('http://yourcluster:[port]')
job.wait_completion()
# Fetch the trained model, and history for training evaluation.
trained_model = job.get_trained_model()
history = job.get_history()

Punchcard Server

Job scheduling, and execution is handled by our Punchcard server. This server will accept requests from a remote location given a specific secret, which is basically a long identification string of a specific user. However, a user can have multiple secrets. At the moment, a job is only executed if there are no other jobs running for the specified secret.

In order to submit jobs to Punchcard we need to specify a secrets file. This file is basically a JSON structure, it will have the following structure:

[
    {
        "secret": "secret_of_user_1",
        "identity": "user1"
    },
    {
        "secret": "secret_of_user_2",
        "identity": "user2"
    }
]

After the secrets file has been constructed, the Punchcard server can be started by issueing the following command.

python scripts/punchcard.py --secrets /path/to/secrets.json

Secret Generation

In order to simplify secret generation, we have added a costum script which will generate a unique key for the specified identity. The structure can be generated by running the following command.

python scripts/generate_secret.py --identity userX

Optimization Schemes

TODO

General note

It is known that adding more asynchronous workers deteriorates the statistical performance of the model. There have been some studies which examinate this particular effect. However, some of them conclude that actually adding more asynchronous workers contributes to something what they call implicit momentum [3]. However, this is subject to further investigation.

Known issues

  • Python 3 compatibility.

TODO's

List of possible future additions.

  • Save Keras model to HDFS.
  • Load Keras model from HDFS.
  • Compression / decompression of network transmissions.
  • Stop on target loss.
  • Multiple parameter servers for large Deep Networks.
  • Python 3 compatibility.
  • For every worker, spawn an additional thread which is responsible for sending updates to the parameter server. The actual worker thread will just submit tasks to this queue.

Citing

If you use this framework in any academic work, please use the following BibTex code.

@misc{dist_keras_joerihermans,
  author = {Joeri R. Hermans, CERN IT-DB},
  title = {Distributed Keras: Distributed Deep Learning with Apache Spark and Keras},
  year = {2016},
  publisher = {GitHub},
  journal = {GitHub Repository},
  howpublished = {\url{https://github.com/JoeriHermans/dist-keras/}},
}

References

  • Dean, J., Corrado, G., Monga, R., Chen, K., Devin, M., Mao, M., ... & Ng, A. Y. (2012). Large scale distributed deep networks. In Advances in neural information processing systems (pp. 1223-1231). [1]

  • Zhang, S., Choromanska, A. E., & LeCun, Y. (2015). Deep learning with elastic averaging SGD. In Advances in Neural Information Processing Systems (pp. 685-693). [2]

  • Mitliagkas, Ioannis, et al. "Asynchrony begets Momentum, with an Application to Deep Learning." arXiv preprint arXiv:1605.09774 (2016). [3]

Licensing

GPLv3 CERN

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].