All Projects → radix-ai → graphchain

radix-ai / graphchain

Licence: MIT license
⚡️ An efficient cache for the execution of dask graphs.

Programming Languages

python
139335 projects - #7 most used programming language
Dockerfile
14818 projects

Projects that are alternatives of or similar to graphchain

Git S3 Push
Deploy your git repo to an S3 bucket
Stars: ✭ 182 (+188.89%)
Mutual labels:  s3
Bucketstore
A simple library for interacting with Amazon S3.
Stars: ✭ 209 (+231.75%)
Mutual labels:  s3
Nextjs Aws S3
Example Next.js app to upload photos to an S3 bucket.
Stars: ✭ 229 (+263.49%)
Mutual labels:  s3
Gatsby Plugin S3
Deploy your gatsby site to a S3 bucket.
Stars: ✭ 186 (+195.24%)
Mutual labels:  s3
Aws Mobile React Native Starter
AWS Mobile React Native Starter App https://aws.amazon.com/mobile
Stars: ✭ 2,247 (+3466.67%)
Mutual labels:  s3
Image Upload Example
Demonstration of how to upload images from the ImagePicker, using a node backend to upload to S3
Stars: ✭ 214 (+239.68%)
Mutual labels:  s3
S3contents
A S3 backed ContentsManager implementation for Jupyter
Stars: ✭ 175 (+177.78%)
Mutual labels:  s3
Litestream
Streaming replication for SQLite.
Stars: ✭ 3,795 (+5923.81%)
Mutual labels:  s3
Cakephp File Storage
Abstract file storage and upload plugin for CakePHP. Write to local disk, FTP, S3, Dropbox and more through a single interface. It's not just yet another uploader but a complete storage solution.
Stars: ✭ 202 (+220.63%)
Mutual labels:  s3
S3sync
Really fast sync tool for S3
Stars: ✭ 224 (+255.56%)
Mutual labels:  s3
Chubaofs
ChubaoFS (abbrev. CBFS) is a cloud native distributed file system and object store.
Stars: ✭ 2,482 (+3839.68%)
Mutual labels:  s3
Walrus
🔥 Fast, Secure and Reliable System Backup, Set up in Minutes.
Stars: ✭ 197 (+212.7%)
Mutual labels:  s3
Rocket
Automated software delivery as fast and easy as possible 🚀
Stars: ✭ 217 (+244.44%)
Mutual labels:  s3
Slurp
Evaluate the security of S3 buckets
Stars: ✭ 183 (+190.48%)
Mutual labels:  s3
Storagetapper
StorageTapper is a scalable realtime MySQL change data streaming, logical backup and logical replication service
Stars: ✭ 232 (+268.25%)
Mutual labels:  s3
Rust S3
Rust library for interfacing with AWS S3 and other API compatible services
Stars: ✭ 177 (+180.95%)
Mutual labels:  s3
Duplicacy Autobackup
💾 Painless automated backups to multiple storage providers with Docker and duplicacy.
Stars: ✭ 214 (+239.68%)
Mutual labels:  s3
amazon-sns-java-extended-client-lib
This AWS SNS client library allows to publish messages to SNS that exceed the 256 KB message size limit.
Stars: ✭ 23 (-63.49%)
Mutual labels:  s3
Node S3 Uploader
Flexible and efficient resize, rename, and upload images to Amazon S3 disk storage. Uses the official AWS Node SDK for transfer, and ImageMagick for image processing. Support for multiple image versions targets.
Stars: ✭ 237 (+276.19%)
Mutual labels:  s3
Sftpgo
Fully featured and highly configurable SFTP server with optional HTTP, FTP/S and WebDAV support - S3, Google Cloud Storage, Azure Blob
Stars: ✭ 3,534 (+5509.52%)
Mutual labels:  s3

License PyPI

Graphchain

What is graphchain?

Graphchain is like joblib.Memory for dask graphs. Dask graph computations are cached to a local or remote location of your choice, specified by a PyFilesystem FS URL.

When you change your dask graph (by changing a computation's implementation or its inputs), graphchain will take care to only recompute the minimum number of computations necessary to fetch the result. This allows you to iterate quickly over your graph without spending time on recomputing previously computed keys.


Source: xkcd.com/1205/

The main difference between graphchain and joblib.Memory is that in graphchain a computation's materialised inputs are not serialised and hashed (which can be very expensive when the inputs are large objects such as pandas DataFrames). Instead, a chain of hashes (hence the name graphchain) of the computation object and its dependencies (which are also computation objects) is used to identify the cache file.

Additionally, the result of a computation is only cached if it is estimated that loading that computation from cache will save time compared to simply computing the computation. The decision on whether to cache depends on the characteristics of the cache location, which are different when caching to the local filesystem compared to caching to S3 for example.

Usage by example

Basic usage

Install graphchain with pip to get started:

pip install graphchain

To demonstrate how graphchain can save you time, let's first create a simple dask graph that (1) creates a few pandas DataFrames, (2) runs a relatively heavy operation on these DataFrames, and (3) summarises the results.

import dask
import graphchain
import pandas as pd

def create_dataframe(num_rows, num_cols):
    print("Creating DataFrame...")
    return pd.DataFrame(data=[range(num_cols)]*num_rows)

def expensive_computation(df, num_quantiles):
    print("Running expensive computation on DataFrame...")
    return df.quantile(q=[i / num_quantiles for i in range(num_quantiles)])

def summarize_dataframes(*dfs):
    print("Summing DataFrames...")
    return sum(df.sum().sum() for df in dfs)

dsk = {
    "df_a": (create_dataframe, 10_000, 1000),
    "df_b": (create_dataframe, 10_000, 1000),
    "df_c": (expensive_computation, "df_a", 2048),
    "df_d": (expensive_computation, "df_b", 2048),
    "result": (summarize_dataframes, "df_c", "df_d")
}

Using dask.get to fetch the "result" key takes about 6 seconds:

>>> %time dask.get(dsk, "result")

Creating DataFrame...
Running expensive computation on DataFrame...
Creating DataFrame...
Running expensive computation on DataFrame...
Summing DataFrames...

CPU times: user 7.39 s, sys: 686 ms, total: 8.08 s
Wall time: 6.19 s

On the other hand, using graphchain.get for the first time to fetch 'result' takes only 4 seconds:

>>> %time graphchain.get(dsk, "result")

Creating DataFrame...
Running expensive computation on DataFrame...
Summing DataFrames...

CPU times: user 4.7 s, sys: 519 ms, total: 5.22 s
Wall time: 4.04 s

The reason graphchain.get is faster than dask.get is because it can load df_b and df_d from cache after df_a and df_c have been computed and cached. Note that graphchain will only cache the result of a computation if loading that computation from cache is estimated to be faster than simply running the computation.

Running graphchain.get a second time to fetch "result" will be almost instant since this time the result itself is also available from cache:

>>> %time graphchain.get(dsk, "result")

CPU times: user 4.79 ms, sys: 1.79 ms, total: 6.58 ms
Wall time: 5.34 ms

Now let's say we want to change how the result is summarised from a sum to an average:

def summarize_dataframes(*dfs):
    print("Averaging DataFrames...")
    return sum(df.mean().mean() for df in dfs) / len(dfs)

If we then ask graphchain to fetch "result", it will detect that only summarize_dataframes has changed and therefore only recompute this function with inputs loaded from cache:

>>> %time graphchain.get(dsk, "result")

Averaging DataFrames...

CPU times: user 123 ms, sys: 37.2 ms, total: 160 ms
Wall time: 86.6 ms

Storing the graphchain cache remotely

Graphchain's cache is by default ./__graphchain_cache__, but you can ask graphchain to use a cache at any PyFilesystem FS URL such as s3://mybucket/__graphchain_cache__:

graphchain.get(dsk, "result", location="s3://mybucket/__graphchain_cache__")

Excluding keys from being cached

In some cases you may not want a key to be cached. To avoid writing certain keys to the graphchain cache, you can use the skip_keys argument:

graphchain.get(dsk, "result", skip_keys=["result"])

Using graphchain with dask.delayed

Alternatively, you can use graphchain together with dask.delayed for easier dask graph creation:

import dask
import pandas as pd

@dask.delayed
def create_dataframe(num_rows, num_cols):
    print("Creating DataFrame...")
    return pd.DataFrame(data=[range(num_cols)]*num_rows)

@dask.delayed
def expensive_computation(df, num_quantiles):
    print("Running expensive computation on DataFrame...")
    return df.quantile(q=[i / num_quantiles for i in range(num_quantiles)])

@dask.delayed
def summarize_dataframes(*dfs):
    print("Summing DataFrames...")
    return sum(df.sum().sum() for df in dfs)

df_a = create_dataframe(num_rows=10_000, num_cols=1000)
df_b = create_dataframe(num_rows=10_000, num_cols=1000)
df_c = expensive_computation(df_a, num_quantiles=2048)
df_d = expensive_computation(df_b, num_quantiles=2048)
result = summarize_dataframes(df_c, df_d)

After which you can compute result by setting the delayed_optimize method to graphchain.optimize:

import graphchain
from functools import partial

optimize_s3 = partial(graphchain.optimize, location="s3://mybucket/__graphchain_cache__/")

with dask.config.set(scheduler="sync", delayed_optimize=optimize_s3):
    print(result.compute())

Using a custom a serializer/deserializer

By default graphchain will cache dask computations with joblib.dump and LZ4 compression. However, you may also supply a custom serialize and deserialize function that writes and reads computations to and from a PyFilesystem filesystem, respectively. For example, the following snippet shows how to serialize dask DataFrames with dask.dataframe.to_parquet, while other objects are serialized with joblib:

import dask.dataframe
import graphchain
import fs.osfs
import joblib
import os
from functools import partial
from typing import Any

def custom_serialize(obj: Any, fs: fs.osfs.OSFS, key: str) -> None:
    """Serialize dask DataFrames with to_parquet, and other objects with joblib.dump."""
    if isinstance(obj, dask.dataframe.DataFrame):
        obj.to_parquet(os.path.join(fs.root_path, "parquet", key))
    else:
        with fs.open(f"{key}.joblib", "wb") as fid:
            joblib.dump(obj, fid)

def custom_deserialize(fs: fs.osfs.OSFS, key: str) -> Any:
    """Deserialize dask DataFrames with read_parquet, and other objects with joblib.load."""
    if fs.exists(f"{key}.joblib"):
        with fs.open(f"{key}.joblib", "rb") as fid:
            return joblib.load(fid)
    else:
        return dask.dataframe.read_parquet(os.path.join(fs.root_path, "parquet", key))

optimize_parquet = partial(
    graphchain.optimize,
    location="./__graphchain_cache__/custom/",
    serialize=custom_serialize,
    deserialize=custom_deserialize
)

with dask.config.set(scheduler="sync", delayed_optimize=optimize_parquet):
    print(result.compute())

Contributing

Setup: once per device
  1. Generate an SSH key and add the SSH key to your GitHub account.
  2. Configure SSH to automatically load your SSH keys:
    cat << EOF >> ~/.ssh/config
    Host *
      AddKeysToAgent yes
      IgnoreUnknown UseKeychain
      UseKeychain yes
    EOF
  3. Install Docker Desktop.
  4. Install VS Code and VS Code's Remote-Containers extension. Alternatively, install PyCharm.
Setup: once per project
  1. Clone this repository.
  2. Start a Dev Container in your preferred development environment:
    • VS Code: open the cloned repository and run Ctrl/⌘ + + PRemote-Containers: Reopen in Container.
    • PyCharm: open the cloned repository and configure Docker Compose as a remote interpreter.
    • Terminal: open the cloned repository and run docker compose run --rm dev to start an interactive Dev Container.
Developing
  • This project follows the Conventional Commits standard to automate Semantic Versioning and Keep A Changelog with Commitizen.
  • Run poe from within the development environment to print a list of Poe the Poet tasks available to run on this project.
  • Run poetry add {package} from within the development environment to install a run time dependency and add it to pyproject.toml and poetry.lock.
  • Run poetry remove {package} from within the development environment to uninstall a run time dependency and remove it from pyproject.toml and poetry.lock.
  • Run poetry update from within the development environment to upgrade all dependencies to the latest versions allowed by pyproject.toml.
  • Run cz bump to bump the package's version, update the CHANGELOG.md, and create a git tag.

Developed by Radix

Radix is a Belgium-based Machine Learning company.

Our vision is to make technology work for and with us. We believe that if technology is used in a creative way, jobs become more fulfilling, people become the best version of themselves, and companies grow.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].