All Projects → vincentclaes → datajob

vincentclaes / datajob

Licence: Apache-2.0 license
Build and deploy a serverless data pipeline on AWS with no effort.

Programming Languages

python
139335 projects - #7 most used programming language
Dockerfile
14818 projects
Makefile
30231 projects

Projects that are alternatives of or similar to datajob

saisoku
Saisoku is a Python module that helps you build complex pipelines of batch file/directory transfer/sync jobs.
Stars: ✭ 40 (-60.4%)
Mutual labels:  pipeline, data-pipeline
instance-watcher
Get notified for Instances mistakenly left running across all AWS regions for specific AWS Account
Stars: ✭ 90 (-10.89%)
Mutual labels:  glue, sagemaker
aws-pdf-textract-pipeline
🔍 Data pipeline for crawling PDFs from the Web and transforming their contents into structured data using AWS textract. Built with AWS CDK + TypeScript
Stars: ✭ 141 (+39.6%)
Mutual labels:  data-pipeline, aws-cdk
sagemaker-sparkml-serving-container
This code is used to build & run a Docker container for performing predictions against a Spark ML Pipeline.
Stars: ✭ 44 (-56.44%)
Mutual labels:  pipeline, sagemaker
transtats
Track translations and automate workflow.
Stars: ✭ 31 (-69.31%)
Mutual labels:  pipeline
targets-tutorial
Short course on the targets R package
Stars: ✭ 87 (-13.86%)
Mutual labels:  pipeline
GLUE-bert4keras
基于bert4keras的GLUE基准代码
Stars: ✭ 59 (-41.58%)
Mutual labels:  glue
gofast
High performance transport protocol for distributed applications.
Stars: ✭ 19 (-81.19%)
Mutual labels:  pipeline
nemesyst
Generalised and highly customisable, hybrid-parallelism, database based, deep learning framework.
Stars: ✭ 17 (-83.17%)
Mutual labels:  pipeline
bodywork-ml-pipeline-project
Deployment template for a continuous training pipeline.
Stars: ✭ 22 (-78.22%)
Mutual labels:  pipeline
makepipe
Tools for constructing simple make-like pipelines in R.
Stars: ✭ 23 (-77.23%)
Mutual labels:  pipeline
scATAC-pro
A comprehensive tool for processing, analyzing and visulizing single cell chromatin accessibility sequencing data
Stars: ✭ 63 (-37.62%)
Mutual labels:  pipeline
lncpipe
UNDER DEVELOPMENT--- Analysis of long non-coding RNAs from RNA-seq datasets
Stars: ✭ 24 (-76.24%)
Mutual labels:  pipeline
scicloj.ml
A Clojure machine learning library
Stars: ✭ 152 (+50.5%)
Mutual labels:  data-pipeline
AnimationDNA
Maya > Arnold > Nuke pipeline
Stars: ✭ 101 (+0%)
Mutual labels:  pipeline
assume-role-arn
🤖🎩assume-role-arn allows you to easily assume an AWS IAM role in your CI/CD pipelines, without worrying about external dependencies.
Stars: ✭ 54 (-46.53%)
Mutual labels:  pipeline
AirflowETL
Blog post on ETL pipelines with Airflow
Stars: ✭ 20 (-80.2%)
Mutual labels:  data-pipeline
pipe
Functional Pipeline in Go
Stars: ✭ 30 (-70.3%)
Mutual labels:  pipeline
pd3f
🏭 PDF text extraction pipeline: self-hosted, local-first, Docker-based
Stars: ✭ 132 (+30.69%)
Mutual labels:  pipeline
managed ml systems and iot
Managed Machine Learning Systems and Internet of Things Live Lesson
Stars: ✭ 35 (-65.35%)
Mutual labels:  sagemaker

Awesome logo

Build and deploy a serverless data pipeline on AWS with no effort.
Our goal is to let developers think about the business logic, datajob does the rest...



  • Deploy code to python shell / pyspark AWS Glue jobs.
  • Use AWS Sagemaker to create ML Models.
  • Orchestrate the above jobs using AWS Stepfunctions as simple as task1 >> task2
  • Let us know what you want to see next.


Installation

Datajob can be installed using pip.
Beware that we depend on aws cdk cli!

pip install datajob
npm install -g [email protected] # latest version of datajob depends this version

Quickstart

You can find the full example in examples/data_pipeline_simple.

We have a simple data pipeline composed of 2 glue jobs orchestrated sequentially using step functions.

from aws_cdk import core

from datajob.datajob_stack import DataJobStack
from datajob.glue.glue_job import GlueJob
from datajob.stepfunctions.stepfunctions_workflow import StepfunctionsWorkflow

app = core.App()

# The datajob_stack is the instance that will result in a cloudformation stack.
# We inject the datajob_stack object through all the resources that we want to add.
with DataJobStack(scope=app, id="data-pipeline-simple") as datajob_stack:
    # We define 2 glue jobs with the relative path to the source code.
    task1 = GlueJob(
        datajob_stack=datajob_stack, name="task1", job_path="glue_jobs/task.py"
    )
    task2 = GlueJob(
        datajob_stack=datajob_stack, name="task2", job_path="glue_jobs/task2.py"
    )

    # We instantiate a step functions workflow and orchestrate the glue jobs.
    with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn:
        task1 >> task2

app.synth()

We add the above code in a file called datajob_stack.py in the root of the project.

Configure CDK

Follow the steps here to configure your credentials.

export AWS_PROFILE=default
# use the aws cli to get your account number
export AWS_ACCOUNT=$(aws sts get-caller-identity --query Account --output text --profile $AWS_PROFILE)
export AWS_DEFAULT_REGION=eu-west-1

# init cdk
cdk bootstrap aws://$AWS_ACCOUNT/$AWS_DEFAULT_REGION

Deploy

Deploy the pipeline using CDK.

cd examples/data_pipeline_simple
cdk deploy --app  "python datajob_stack.py" --require-approval never

Execute

datajob execute --state-machine data-pipeline-simple-workflow

The terminal will show a link to the step functions page to follow up on your pipeline run.

sfn

Destroy

cdk destroy --app  "python datajob_stack.py"

Examples

All our examples are in ./examples

Functionality

Deploy to a stage

Specify a stage to deploy an isolated pipeline.

Typical examples would be dev , prod, ...

cdk deploy --app "python datajob_stack.py" --context stage=my-stage
Using datajob's S3 data bucket

Dynamically reference the datajob_stack data bucket name to the arguments of your GlueJob by calling datajob_stack.context.data_bucket_name.

import pathlib

from aws_cdk import core
from datajob.datajob_stack import DataJobStack
from datajob.glue.glue_job import GlueJob
from datajob.stepfunctions.stepfunctions_workflow import StepfunctionsWorkflow

current_dir = str(pathlib.Path(__file__).parent.absolute())

app = core.App()

with DataJobStack(
        scope=app, id="datajob-python-pyspark", project_root=current_dir
) as datajob_stack:
    pyspark_job = GlueJob(
        datajob_stack=datajob_stack,
        name="pyspark-job",
        job_path="glue_job/glue_pyspark_example.py",
        job_type="glueetl",
        glue_version="2.0",  # we only support glue 2.0
        python_version="3",
        worker_type="Standard",  # options are Standard / G.1X / G.2X
        number_of_workers=1,
        arguments={
            "--source": f"s3://{datajob_stack.context.data_bucket_name}/raw/iris_dataset.csv",
            "--destination": f"s3://{datajob_stack.context.data_bucket_name}/target/pyspark_job/iris_dataset.parquet",
        },
    )

    with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn:
        pyspark_job >> ...

you can find this example here

Deploy files to the datajob's deployment bucket

Specify the path to the folder we would like to include in the deployment bucket.

from aws_cdk import core
from datajob.datajob_stack import DataJobStack

app = core.App()

with DataJobStack(
    scope=app, id="some-stack-name", include_folder="path/to/folder/"
) as datajob_stack:

    ...
Package your project as a wheel and ship it to AWS

You can find the example here

# We add the path to the project root in the constructor of DataJobStack.
# By specifying project_root, datajob will look for a .whl in
# the dist/ folder in your project_root.
with DataJobStack(
    scope=app, id="data-pipeline-pkg", project_root=current_dir
) as datajob_stack:

Package you project using poetry

poetry build
cdk deploy --app "python datajob_stack.py"

Package you project using setup.py

python setup.py bdist_wheel
cdk deploy --app "python datajob_stack.py"

you can also use the datajob cli to do the two commands at once:

# for poetry
datajob deploy --config datajob_stack.py --package poetry

# for setup.py
datajob deploy --config datajob_stack.py --package setuppy
Processing big data using a Glue Pyspark job
import pathlib

from aws_cdk import core
from datajob.datajob_stack import DataJobStack
from datajob.glue.glue_job import GlueJob

current_dir = str(pathlib.Path(__file__).parent.absolute())

app = core.App()

with DataJobStack(
        scope=app, id="datajob-python-pyspark", project_root=current_dir
) as datajob_stack:
    pyspark_job = GlueJob(
        datajob_stack=datajob_stack,
        name="pyspark-job",
        job_path="glue_job/glue_pyspark_example.py",
        job_type="glueetl",
        glue_version="2.0",  # we only support glue 2.0
        python_version="3",
        worker_type="Standard",  # options are Standard / G.1X / G.2X
        number_of_workers=1,
        arguments={
            "--source": f"s3://{datajob_stack.context.data_bucket_name}/raw/iris_dataset.csv",
            "--destination": f"s3://{datajob_stack.context.data_bucket_name}/target/pyspark_job/iris_dataset.parquet",
        },
    )

full example can be found in examples/data_pipeline_pyspark.

Orchestrate stepfunctions tasks in parallel
# Task2 comes after task1. task4 comes after task3.
# Task 5 depends on both task2 and task4 to be finished.
# Therefore task1 and task2 can run in parallel,
# as well as task3 and task4.
with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn:
    task1 >> task2
    task3 >> task4
    task2 >> task5
    task4 >> task5

More can be found in examples/data_pipeline_parallel

Orchestrate 1 stepfunction task

Use the Ellipsis object to be able to orchestrate 1 job via step functions.

some_task >> ...
Notify in case of error/success

Provide the parameter notification in the constructor of a StepfunctionsWorkflow object. This will create an SNS Topic which will be triggered in case of failure or success. The email will subscribe to the topic and receive the notification in its inbox.

with StepfunctionsWorkflow(datajob_stack=datajob_stack,
                           name="workflow",
                           notification="[email protected]") as sfn:
    task1 >> task2

You can provide 1 email or a list of emails ["[email protected]", "[email protected]"].

Datajob in depth

The datajob_stack is the instance that will result in a cloudformation stack. The path in project_root helps datajob_stack locate the root of the project where the setup.py/poetry pyproject.toml file can be found, as well as the dist/ folder with the wheel of your project .

import pathlib
from aws_cdk import core

from datajob.datajob_stack import DataJobStack

current_dir = pathlib.Path(__file__).parent.absolute()
app = core.App()

with DataJobStack(
    scope=app, id="data-pipeline-pkg", project_root=current_dir
) as datajob_stack:

    ...

When entering the contextmanager of DataJobStack:

A DataJobContext is initialized to deploy and run a data pipeline on AWS. The following resources are created:

  1. "data bucket"
    • an S3 bucket that you can use to dump ingested data, dump intermediate results and the final output.
    • you can access the data bucket as a Bucket object via datajob_stack.context.data_bucket
    • you can access the data bucket name via datajob_stack.context.data_bucket_name
  2. "deployment bucket"
    • an s3 bucket to deploy code, artifacts, scripts, config, files, ...
    • you can access the deployment bucket as a Bucket object via datajob_stack.context.deployment_bucket
    • you can access the deployment bucket name via datajob_stack.context.deployment_bucket_name

when exiting the context manager all the resources of our DataJobStack object are created.

We can write the above example more explicitly...
import pathlib
from aws_cdk import core

from datajob.datajob_stack import DataJobStack
from datajob.glue.glue_job import GlueJob
from datajob.stepfunctions.stepfunctions_workflow import StepfunctionsWorkflow

current_dir = pathlib.Path(__file__).parent.absolute()

app = core.App()

datajob_stack = DataJobStack(scope=app, id="data-pipeline-pkg", project_root=current_dir)
datajob_stack.init_datajob_context()

task1 = GlueJob(datajob_stack=datajob_stack, name="task1", job_path="glue_jobs/task.py")
task2 = GlueJob(datajob_stack=datajob_stack, name="task2", job_path="glue_jobs/task2.py")

with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as step_functions_workflow:
    task1 >> task2

datajob_stack.create_resources()
app.synth()

Ideas

Any suggestions can be shared by starting a discussion

These are the ideas, we find interesting to implement;

  • add a time based trigger to the step functions workflow.
  • add an s3 event trigger to the step functions workflow.
  • add a lambda that copies data from one s3 location to another.
  • version your data pipeline.
  • cli command to view the logs / glue jobs / s3 bucket
  • implement sagemaker services
    • processing jobs
    • hyperparameter tuning jobs
    • training jobs
  • implement lambda
  • implement ECS Fargate
  • create a serverless UI that follows up on the different pipelines deployed on possibly different AWS accounts using Datajob

Feedback is much appreciated!

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].