All Projects → zero323 → pyspark-asyncactions

zero323 / pyspark-asyncactions

Licence: Apache-2.0 License
Asynchronous actions for PySpark

Programming Languages

python
139335 projects - #7 most used programming language

Projects that are alternatives of or similar to pyspark-asyncactions

Quinn
pyspark methods to enhance developer productivity 📣 👯 🎉
Stars: ✭ 217 (+623.33%)
Mutual labels:  apache-spark, pyspark
spark-twitter-sentiment-analysis
Sentiment Analysis of a Twitter Topic with Spark Structured Streaming
Stars: ✭ 55 (+83.33%)
Mutual labels:  apache-spark, pyspark
spark3D
Spark extension for processing large-scale 3D data sets: Astrophysics, High Energy Physics, Meteorology, …
Stars: ✭ 23 (-23.33%)
Mutual labels:  apache-spark, pyspark
Pyspark Stubs
Apache (Py)Spark type annotations (stub files).
Stars: ✭ 98 (+226.67%)
Mutual labels:  apache-spark, pyspark
pyspark-cheatsheet
PySpark Cheat Sheet - example code to help you learn PySpark and develop apps faster
Stars: ✭ 115 (+283.33%)
Mutual labels:  apache-spark, pyspark
Spark With Python
Fundamentals of Spark with Python (using PySpark), code examples
Stars: ✭ 150 (+400%)
Mutual labels:  apache-spark, pyspark
learn-by-examples
Real-world Spark pipelines examples
Stars: ✭ 84 (+180%)
Mutual labels:  apache-spark, pyspark
Spark Gotchas
Spark Gotchas. A subjective compilation of the Apache Spark tips and tricks
Stars: ✭ 308 (+926.67%)
Mutual labels:  apache-spark, pyspark
jupyterlab-sparkmonitor
JupyterLab extension that enables monitoring launched Apache Spark jobs from within a notebook
Stars: ✭ 78 (+160%)
Mutual labels:  apache-spark, pyspark
Sparkora
Powerful rapid automatic EDA and feature engineering library with a very easy to use API 🌟
Stars: ✭ 51 (+70%)
Mutual labels:  apache-spark, pyspark
Awesome Spark
A curated list of awesome Apache Spark packages and resources.
Stars: ✭ 1,061 (+3436.67%)
Mutual labels:  apache-spark, pyspark
Spark-for-data-engineers
Apache Spark for data engineers
Stars: ✭ 22 (-26.67%)
Mutual labels:  apache-spark, pyspark
Live log analyzer spark
Spark Application for analysis of Apache Access logs and detect anamolies! Along with Medium Article.
Stars: ✭ 14 (-53.33%)
Mutual labels:  apache-spark, pyspark
Azure Cosmosdb Spark
Apache Spark Connector for Azure Cosmos DB
Stars: ✭ 165 (+450%)
Mutual labels:  apache-spark, pyspark
Pyspark Boilerplate
A boilerplate for writing PySpark Jobs
Stars: ✭ 318 (+960%)
Mutual labels:  apache-spark, pyspark
isarn-sketches-spark
Routines and data structures for using isarn-sketches idiomatically in Apache Spark
Stars: ✭ 28 (-6.67%)
Mutual labels:  apache-spark, pyspark
mmtf-workshop-2018
Structural Bioinformatics Training Workshop & Hackathon 2018
Stars: ✭ 50 (+66.67%)
Mutual labels:  apache-spark, pyspark
Mmlspark
Simple and Distributed Machine Learning
Stars: ✭ 2,899 (+9563.33%)
Mutual labels:  apache-spark, pyspark
datalake-etl-pipeline
Simplified ETL process in Hadoop using Apache Spark. Has complete ETL pipeline for datalake. SparkSession extensions, DataFrame validation, Column extensions, SQL functions, and DataFrame transformations
Stars: ✭ 39 (+30%)
Mutual labels:  apache-spark, pyspark
SynapseML
Simple and Distributed Machine Learning
Stars: ✭ 3,355 (+11083.33%)
Mutual labels:  apache-spark, pyspark

pyspark-asyncactions

Build Status PyPI version Conda Forge version

A proof of concept asynchronous actions for PySpark using concurent.futures. Originally developed as proof-of-concept solution for SPARK-20347.

How does it work?

The package patches RDD, Estimator, DataFrame and DataFrameWriter classes by adding thin wrappers to the commonly used action methods.

Methods are patched by retrieving shared ThreadPoolExecutor (attached to SparkContext) and applying its submit method:

def async_action(f):
    def async_action_(self, *args, **kwargs):
        executor = get_context(self)._get_executor()
        return executor.submit(f, self, *args, **kwargs)
    return async_action_

The naming convention for the patched methods is methodNameAsync, for example:

  • RDD.countRDD.countAsync
  • DataFrame.takeRDD.takeAsync
  • DataFrameWriter.saveDataFrameWriter.saveAsync

Number of threads is determined as follows:

  • spark.driver.cores if is set.
  • 2 otherwise.

Usage

To patch existing classes just import the package:

>>> import asyncactions
>>> from pyspark.sql import SparkSession
>>>
>>> spark = SparkSession.builder.getOrCreate()

All *Async methods return concurrent.futures.Future:

>>> rdd = spark.sparkContext.range(100)
>>> f = rdd.countAsync()
>>> f
<Future at ... state=running>
>>> type(f)
concurrent.futures._base.Future
>>> f.add_done_callback(lambda f: print(f.result()))
100

and the result can be used whenever Future is expected.

Installation

The package is available on PyPI:

pip install pyspark-asyncactions

and conda-forge:

conda install -c conda-forge pyspark-asyncactions

Installation is required only on the driver node.

Dependencies

The package supports Python 3.6 or later and requires no external dependencies.

Do it yourself

Define actions dictionary which maps from the method name to the docstring:

>>> actions = {"evaluate": """Asynchronously evaluates the output with optional parameters.
...         :param dataset: a dataset that contains labels/observations and
...                         predictions
...         :param params: an optional param map that overrides embedded
...                        params
...         :return: :py:class:`concurrent.futures.Future` of metric
...         """}

Call asyncactions.utils.patch_all method with class and actions as the arguments

>>> import asyncactions.utils
>>> from pyspark.ml.evaluation import Evaluator, RegressionEvaluator
>>> asyncactions.utils.patch_all(Evaluator, actions)

Enjoy your new asynchronous method

>>> import asyncactions
>>> df = spark.createDataFrame([(1.0, 1.0), (1.0, -1.0), (0.0, 1.0)], ("label", "prediction"))
>>> metrics = RegressionEvaluator().evaluateAsync(df)
>>> metrics.result()  # Note that result is blocking
1.2909944487358058

FAQ

  • Why would I need that? Processing in Spark is already distributed.

    As explained in the Job Scheduling documentation

    (...) within each Spark application, multiple “jobs” (Spark actions) may be running concurrently if they were submitted by different threads.

    However all PySpark actions are blocking. This means that, even if there are free resources on the cluster, each jobs will be executed sequentially (paraphrasing XKCD, I am not slacking off, just fitting a Pipeline).

    It is perfectly possible to achieve the same result using threads or concurrent.futures directly, but the resulting code but resulting can be quite verbose, especially when used in an interactive environment. The goal of this package is to make this process as streamlined as possible by hiding all the details (creating and stopping thread pool, job submission).

  • What about GIL?

    The goal of the package is to enable non-blocking submission of jobs (see above) while the actual processing is handled by the Spark cluster. Since heavy lifting is performed by JVM or Python workers as standalone processes, interpreter lock is of lesser concern.

    Because final merging process is applied on the driver, GIL might affect jobs depending heavily on computationally expensive Accumulators or reduce-like (reduce, fold, aggregate) jobs with computationally expensive function. The latter problem can be partially addressed using treeReduce.

  • Why not merge this into PySpark?

    TL;DR There was not enough consensus if the feature is essential enough, and if it is, what implementation should be used (piggyback onto JVM AsyncRDDActions vs. native Python implementation). For details see corresponding PR.

    Keeping a separate package gives more freedom (we can add a number of methods not present in the Scala API) and better integration with plain Python code, at expense of some more advanced features (most notably support for canceling running Spark jobs).

  • When not to use this package?

    This package is intended primarily to achieve small scale concurrent job execution when working with interactive environments. While theoretically it should be possible to use it to submit hundreds of independent jobs, it will is likely to stress driver process and Py4j gateway and crash the application.

    Therefore I strongly recommend against using it as substitute for a workflow management software.

  • Is this package actively maintained?

    In general, it has been designed to be as lean as possible, and piggyback on top of standard library, and allow users to add necessary wrappers when needed. Hence, it is pretty much maintenance free and seen almost no activity since its initial release.

    Nonetheless, I consider it actively maintained, and please open an issue if you experience any problems or have feature (like new built-in wrappers) request.

Disclaimer

Apache Spark, Spark, PySpark, Apache, and the Spark logo are trademarks of The Apache Software Foundation. This project is not owned, endorsed, or sponsored by The Apache Software Foundation.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].