All Projects → couler-proj → Couler

couler-proj / Couler

Licence: apache-2.0
Unified Interface for Constructing and Managing Workflows on different workflow engines, such as Argo Workflows, Tekton Pipelines, and Apache Airflow.

Programming Languages

python
139335 projects - #7 most used programming language

Projects that are alternatives of or similar to Couler

Cuneiform
Cuneiform distributed programming language
Stars: ✭ 175 (-56.79%)
Mutual labels:  workflow-engine, distributed-computing
Cylc Flow
Cylc: a workflow engine for cycling systems. Repository master branch: core meta-scheduler component of cylc-8 (in development); Repository 7.8.x branch: full cylc-7 system.
Stars: ✭ 154 (-61.98%)
Mutual labels:  scheduler, workflow-engine
Openmole
Workflow engine for exploration of simulation models using high throughput computing
Stars: ✭ 120 (-70.37%)
Mutual labels:  workflow-engine, distributed-computing
Argo Workflows
Workflow engine for Kubernetes
Stars: ✭ 10,024 (+2375.06%)
Mutual labels:  workflow-engine, cloud-native
Kogito Runtimes
Kogito Runtimes - Kogito is a cloud-native business automation technology for building cloud-ready business applications.
Stars: ✭ 188 (-53.58%)
Mutual labels:  cloud-native, workflow-engine
Aiida Core
The official repository for the AiiDA code
Stars: ✭ 238 (-41.23%)
Mutual labels:  scheduler, workflow-engine
Scheduling
Multi-platform Scheduling and Workflows Engine
Stars: ✭ 44 (-89.14%)
Mutual labels:  scheduler, distributed-computing
zenaton-node
⚡ Node.js library to run and orchestrate background jobs with Zenaton Workflow Engine
Stars: ✭ 50 (-87.65%)
Mutual labels:  workflow-engine, scheduler
Kogito Examples
Kogito examples - Kogito is a cloud-native business automation technology for building cloud-ready business applications.
Stars: ✭ 96 (-76.3%)
Mutual labels:  cloud-native, workflow-engine
Workflows
Run Cloud Native workflows on any environment using Dapr
Stars: ✭ 84 (-79.26%)
Mutual labels:  cloud-native, workflow-engine
rce
Distributed, workflow-driven integration environment
Stars: ✭ 42 (-89.63%)
Mutual labels:  workflow-engine, distributed-computing
kar
KAR: A Runtime for the Hybrid Cloud
Stars: ✭ 18 (-95.56%)
Mutual labels:  distributed-computing, cloud-native
Piper
piper - a distributed workflow engine
Stars: ✭ 374 (-7.65%)
Mutual labels:  workflow-engine
Howtheyaws
A curated collection of publicly available resources on how technology and tech-savvy organizations around the world use Amazon Web Services (AWS)
Stars: ✭ 389 (-3.95%)
Mutual labels:  cloud-native
Wedatasphere
WeDataSphere is a financial level one-stop open-source suitcase for big data platforms. Currently the source code of Scriptis and Linkis has already been released to the open-source community. WeDataSphere, Big Data Made Easy!
Stars: ✭ 372 (-8.15%)
Mutual labels:  scheduler
Eventmesh
EventMesh is a dynamic cloud-native eventing infrastruture used to decouple the application and backend middleware layer, which supports a wide range of use cases that encompass complex multi-cloud, widely distributed topologies using diverse technology stacks.
Stars: ✭ 356 (-12.1%)
Mutual labels:  cloud-native
Taskscheduler
Cross-platform, fiber-based, multi-threaded task scheduler designed for video games.
Stars: ✭ 402 (-0.74%)
Mutual labels:  scheduler
Backburner.js
A rewrite of the Ember.js run loop as a generic microlibrary
Stars: ✭ 391 (-3.46%)
Mutual labels:  scheduler
Azkaban
Azkaban workflow manager.
Stars: ✭ 3,914 (+866.42%)
Mutual labels:  workflow-engine
Buildkit
concurrent, cache-efficient, and Dockerfile-agnostic builder toolkit
Stars: ✭ 4,537 (+1020.25%)
Mutual labels:  cloud-native

CI Slack Twitter

Couler

What is Couler?

Couler aims to provide a unified interface for constructing and managing workflows on different workflow engines, such as Argo Workflows, Tekton Pipelines, and Apache Airflow.

Couler is included in CNCF Cloud Native Landscape and LF AI Landscape.

Who uses Couler?

You can find a list of organizations who are using Couler in ADOPTERS.md. If you'd like to add your organization to the list, please send us a pull request.

Why use Couler?

Many workflow engines exist nowadays, e.g. Argo Workflows, Tekton Pipelines, and Apache Airflow. However, their programming experience varies and they have different level of abstractions that are often obscure and complex. The code snippets below are some examples for constructing workflows using Apache Airflow and Kubeflow Pipelines.

Apache Airflow Kubeflow Pipelines

def create_dag(dag_id,
               schedule,
               dag_number,
               default_args):
    def hello_world_py(*args):
        print('Hello World')

    dag = DAG(dag_id,
              schedule_interval=schedule,
              default_args=default_args)
    with dag:
        t1 = PythonOperator(
            task_id='hello_world',
            python_callable=hello_world_py,
            dag_number=dag_number)
    return dag

for n in range(1, 10):
    default_args = {'owner': 'airflow',
                    'start_date': datetime(2018, 1, 1)
                    }
    globals()[dag_id] = create_dag(
        'hello_world_{}'.format(str(n)),
        '@daily',
        n,
        default_args)

class FlipCoinOp(dsl.ContainerOp):
    """Flip a coin and output heads or tails randomly."""
    def __init__(self):
        super(FlipCoinOp, self).__init__(
            name='Flip',
            image='python:alpine3.6',
            command=['sh', '-c'],
            arguments=['python -c "import random; result = \'heads\' if random.randint(0,1) == 0 '
                       'else \'tails\'; print(result)" | tee /tmp/output'],
            file_outputs={'output': '/tmp/output'})

class PrintOp(dsl.ContainerOp):
    """Print a message."""
    def __init__(self, msg):
        super(PrintOp, self).__init__(
            name='Print',
            image='alpine:3.6',
            command=['echo', msg],
        )

# define the recursive operation
@graph_component
def flip_component(flip_result):
    print_flip = PrintOp(flip_result)
    flipA = FlipCoinOp().after(print_flip)
    with dsl.Condition(flipA.output == 'heads'):
        flip_component(flipA.output)

@dsl.pipeline(
    name='pipeline flip coin',
    description='shows how to use graph_component.'
)
def recursive():
    flipA = FlipCoinOp()
    flipB = FlipCoinOp()
    flip_loop = flip_component(flipA.output)
    flip_loop.after(flipB)
    PrintOp('cool, it is over. %s' % flipA.output).after(flip_loop)

Couler provides a unified interface for constructing and managing workflows that provides the following:

  • Simplicity: Unified interface and imperative programming style for defining workflows with automatic construction of directed acyclic graph (DAG).
  • Extensibility: Extensible to support various workflow engines.
  • Reusability: Reusable steps for tasks such as distributed training of machine learning models.
  • Efficiency: Automatic workflow and resource optimizations under the hood.

Please see the following sections for installation guide and examples.

Installation

  • Couler currently only supports Argo Workflows. Please see instructions here to install Argo Workflows on your Kubernetes cluster.
  • Install Python 3.6+
  • Install Couler Python SDK via the following pip command:
pip install git+https://github.com/couler-proj/couler

Alternatively, you can clone this repository and then run the following to install:

python setup.py install

Examples

Coin Flip

This example combines the use of a Python function result, along with conditionals, to take a dynamic path in the workflow. In this example, depending on the result of the first step defined in flip_coin(), the template will either run the heads() step or the tails() step.

Steps can be defined via either couler.run_script() for Python functions or couler.run_container() for containers. In addition, the conditional logic to decide whether to flip the coin in this example is defined via the combined use of couler.when() and couler.equal().

import couler.argo as couler
from couler.argo_submitter import ArgoSubmitter


def random_code():
    import random

    res = "heads" if random.randint(0, 1) == 0 else "tails"
    print(res)


def flip_coin():
    return couler.run_script(image="python:alpine3.6", source=random_code)


def heads():
    return couler.run_container(
        image="alpine:3.6", command=["sh", "-c", 'echo "it was heads"']
    )


def tails():
    return couler.run_container(
        image="alpine:3.6", command=["sh", "-c", 'echo "it was tails"']
    )


result = flip_coin()
couler.when(couler.equal(result, "heads"), lambda: heads())
couler.when(couler.equal(result, "tails"), lambda: tails())

submitter = ArgoSubmitter()
couler.run(submitter=submitter)

DAG

This example demonstrates different ways to define the workflow as a directed-acyclic graph (DAG) by specifying the dependencies of each task via couler.set_dependencies() and couler.dag(). Please see the code comments for the specific shape of DAG that we've defined in linear() and diamond().

import couler.argo as couler
from couler.argo_submitter import ArgoSubmitter


def job_a(message):
    couler.run_container(
        image="docker/whalesay:latest",
        command=["cowsay"],
        args=[message],
        step_name="A",
    )


def job_b(message):
    couler.run_container(
        image="docker/whalesay:latest",
        command=["cowsay"],
        args=[message],
        step_name="B",
    )


def job_c(message):
    couler.run_container(
        image="docker/whalesay:latest",
        command=["cowsay"],
        args=[message],
        step_name="C",
    )


def job_d(message):
    couler.run_container(
        image="docker/whalesay:latest",
        command=["cowsay"],
        args=[message],
        step_name="D",
    )

#     A
#    / \
#   B   C
#  /
# D
def linear():
    couler.set_dependencies(lambda: job_a(message="A"), dependencies=None)
    couler.set_dependencies(lambda: job_b(message="B"), dependencies=["A"])
    couler.set_dependencies(lambda: job_c(message="C"), dependencies=["A"])
    couler.set_dependencies(lambda: job_d(message="D"), dependencies=["B"])


#   A
#  / \
# B   C
#  \ /
#   D
def diamond():
    couler.dag(
        [
            [lambda: job_a(message="A")],
            [lambda: job_a(message="A"), lambda: job_b(message="B")],  # A -> B
            [lambda: job_a(message="A"), lambda: job_c(message="C")],  # A -> C
            [lambda: job_b(message="B"), lambda: job_d(message="D")],  # B -> D
            [lambda: job_c(message="C"), lambda: job_d(message="D")],  # C -> D
        ]
    )


linear()
submitter = ArgoSubmitter()
couler.run(submitter=submitter)

Note that the current version only works with Argo Workflows but we are actively working on the design of the unified interface that is extensible to additional workflow engines. Please stay tuned for more updates and we welcome any feedback and contributions from the community.

Community Blogs and Presentations

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].