All Projects → mastak → airflow_multi_dagrun

mastak / airflow_multi_dagrun

Licence: Apache-2.0 License
triggering a DAG run multiple times

Programming Languages

python
139335 projects - #7 most used programming language
Makefile
30231 projects

Projects that are alternatives of or similar to airflow multi dagrun

airflow-user-management-plugin
A plugin for Apache Airflow that allows you to manage the users that can login
Stars: ✭ 13 (-82.43%)
Mutual labels:  airflow, airflow-plugin
airflow-code-editor
A plugin for Apache Airflow that allows you to edit DAGs in browser
Stars: ✭ 195 (+163.51%)
Mutual labels:  airflow, airflow-plugin
dbt-cloud-plugin
DBT Cloud Plugin for Airflow
Stars: ✭ 35 (-52.7%)
Mutual labels:  airflow, airflow-plugin
kedro-airflow
Kedro-Airflow makes it easy to deploy Kedro projects to Airflow.
Stars: ✭ 121 (+63.51%)
Mutual labels:  airflow, airflow-plugin
viewflow
Viewflow is an Airflow-based framework that allows data scientists to create data models without writing Airflow code.
Stars: ✭ 110 (+48.65%)
Mutual labels:  airflow
opentrials-airflow
Configuration and definitions of Airflow for OpenTrials
Stars: ✭ 18 (-75.68%)
Mutual labels:  airflow
FastETL
Plugins do Airflow para implementação de pipelines de dados
Stars: ✭ 31 (-58.11%)
Mutual labels:  airflow
dbt-airflow-docker-compose
Execution of DBT models using Apache Airflow through Docker Compose
Stars: ✭ 76 (+2.7%)
Mutual labels:  airflow
airflow-dbt
Apache Airflow integration for dbt
Stars: ✭ 233 (+214.86%)
Mutual labels:  airflow
AirDataComputer
Air Data Computer
Stars: ✭ 29 (-60.81%)
Mutual labels:  airflow
ml-ops
Get your MLOps (Level 1) platform started and going fast.
Stars: ✭ 81 (+9.46%)
Mutual labels:  airflow
airflow-ci
Apache Airflow CI pipeline
Stars: ✭ 18 (-75.68%)
Mutual labels:  airflow
jobAnalytics and search
JobAnalytics system consumes data from multiple sources and provides valuable information to both job hunters and recruiters.
Stars: ✭ 25 (-66.22%)
Mutual labels:  airflow
torchx
TorchX is a universal job launcher for PyTorch applications. TorchX is designed to have fast iteration time for training/research and support for E2E production ML pipelines when you're ready.
Stars: ✭ 165 (+122.97%)
Mutual labels:  airflow
astro
Astro allows rapid and clean development of {Extract, Load, Transform} workflows using Python and SQL, powered by Apache Airflow.
Stars: ✭ 79 (+6.76%)
Mutual labels:  airflow
airflow-boilerplate
A complete development environment setup for working with Airflow
Stars: ✭ 94 (+27.03%)
Mutual labels:  airflow
airflow-prometheus-exporter
Export Airflow metrics (from mysql) in prometheus format
Stars: ✭ 25 (-66.22%)
Mutual labels:  airflow
dataops-platform-airflow-dbt
Build DataOps platform with Apache Airflow and dbt on AWS
Stars: ✭ 33 (-55.41%)
Mutual labels:  airflow
aircal
Visualize Airflow's schedule by exporting future DAG runs as events to Google Calendar.
Stars: ✭ 66 (-10.81%)
Mutual labels:  airflow
observatory-platform
Observatory Platform Package
Stars: ✭ 15 (-79.73%)
Mutual labels:  airflow

Build Status

Multi dag run

This plugin contains operators for triggering a DAG run multiple times and you can dynamically specify how many DAG run instances create.

It can be useful when you have to handle a big data and you want to split it into chunks and run multiple instances of the same task in parallel.

When you see a lot launched target DAGs you can set up more workers. So this makes it pretty easy to scale.

Install

pip install airflow_multi_dagrun

Example

Code for scheduling dags

import datetime as dt
from airflow import DAG

from airflow_multi_dagrun.operators import TriggerMultiDagRunOperator


def generate_dag_run():
    for i in range(100):
        yield {'index': i}


default_args = {
    'owner': 'airflow',
    'start_date': dt.datetime(2015, 6, 1),
}


dag = DAG('reindex_scheduler', schedule_interval=None, default_args=default_args)


ran_dags = TriggerMultiDagRunOperator(
    task_id='gen_target_dag_run',
    dag=dag,
    trigger_dag_id='example_target_dag',
    python_callable=generate_dag_run,
)

This code will schedule dag with id example_target_dag 100 times and pass payload to it.

Example of triggered dag:

dag = DAG(
   dag_id='example_target_dag',
   schedule_interval=None,
   default_args={'start_date': datetime.utcnow(), 'owner': 'airflow'},
)


def run_this_func(dag_run, **kwargs):
   print("Chunk received: {}".format(dag_run.conf['index']))


chunk_handler = PythonOperator(
   task_id='chunk_handler',
   provide_context=True,
   python_callable=run_this_func,
   dag=dag
)

Run example

There is docker-compose config, so it requires docker to be installed: docker, docker-compose

  1. make init - create db
  2. make add-admin - create admin user (is asks a password)
  3. make web - start docker containers, run airflow webserver
  4. make scheduler - start docker containers, run airflow scheduler

make down will stop and remove docker containers

Contributions

If you have found a bug or have some idea for improvement feel free to create an issue or pull request.

License

Apache 2.0

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].