All Projects → michaelosthege → fairflow

michaelosthege / fairflow

Licence: MIT license
Functional Airflow DAG definitions.

Programming Languages

python
139335 projects - #7 most used programming language
shell
77523 projects

Projects that are alternatives of or similar to fairflow

Airflow
Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
Stars: ✭ 24,101 (+63323.68%)
Mutual labels:  airflow, apache-airflow
airflow-boilerplate
A complete development environment setup for working with Airflow
Stars: ✭ 94 (+147.37%)
Mutual labels:  airflow, apache-airflow
airflow-client-python
Apache Airflow - OpenApi Client for Python
Stars: ✭ 172 (+352.63%)
Mutual labels:  airflow, apache-airflow
airflow-prometheus-exporter
Export Airflow metrics (from mysql) in prometheus format
Stars: ✭ 25 (-34.21%)
Mutual labels:  airflow, apache-airflow
airflow-code-editor
A plugin for Apache Airflow that allows you to edit DAGs in browser
Stars: ✭ 195 (+413.16%)
Mutual labels:  airflow, apache-airflow
openverse-catalog
Identifies and collects data on cc-licensed content across web crawl data and public apis.
Stars: ✭ 27 (-28.95%)
Mutual labels:  airflow, apache-airflow
viewflow
Viewflow is an Airflow-based framework that allows data scientists to create data models without writing Airflow code.
Stars: ✭ 110 (+189.47%)
Mutual labels:  airflow, apache-airflow
airflow-user-management-plugin
A plugin for Apache Airflow that allows you to manage the users that can login
Stars: ✭ 13 (-65.79%)
Mutual labels:  airflow, apache-airflow
Awesome Apache Airflow
Curated list of resources about Apache Airflow
Stars: ✭ 2,755 (+7150%)
Mutual labels:  airflow, apache-airflow
Soda Sql
Metric collection, data testing and monitoring for SQL accessible data
Stars: ✭ 173 (+355.26%)
Mutual labels:  airflow
AirflowETL
Blog post on ETL pipelines with Airflow
Stars: ✭ 20 (-47.37%)
Mutual labels:  airflow
Airflow Doc Zh
📖 [译] Airflow 中文文档
Stars: ✭ 169 (+344.74%)
Mutual labels:  airflow
Airflow Scheduler Failover Controller
A process that runs in unison with Apache Airflow to control the Scheduler process to ensure High Availability
Stars: ✭ 204 (+436.84%)
Mutual labels:  airflow
kedro-airflow-k8s
Kedro Plugin to support running pipelines on Kubernetes using Airflow.
Stars: ✭ 22 (-42.11%)
Mutual labels:  airflow
Airflow Testing
Airflow Unit Tests and Integration Tests
Stars: ✭ 175 (+360.53%)
Mutual labels:  airflow
airflow-site
Apache Airflow Website
Stars: ✭ 95 (+150%)
Mutual labels:  airflow
Airflow Exporter
Airflow plugin to export dag and task based metrics to Prometheus.
Stars: ✭ 161 (+323.68%)
Mutual labels:  airflow
Data Science Stack Cookiecutter
🐳📊🤓Cookiecutter template to launch an awesome dockerized Data Science toolstack (incl. Jupyster, Superset, Postgres, Minio, AirFlow & API Star)
Stars: ✭ 153 (+302.63%)
Mutual labels:  airflow
incremental training
Repo that relates to the Medium blog 'Keeping your ML model in shape with Kafka, Airflow' and MLFlow'
Stars: ✭ 110 (+189.47%)
Mutual labels:  airflow
Insight-GDELT-Feed
A way for home buyers to know about factors affecting a state
Stars: ✭ 43 (+13.16%)
Mutual labels:  airflow

fairflow

Pythonically functional DAG-definitions.

Why would you want to use fairflow?

DAGs are often made up of tasks that are functionally separated, for example ETL jobs, data analysis and reporting. If you are writing a new reporting tasks, should you really worry about dependencies in the ETL jobs?

Also, they are usually built from upstream to downstream which makes it hard to share substructures. By functionally building them you can start thinking downstream-upstream and also share substructures accross modules.

Related: Streamlined (Functional) Airflow in the Wiki

How does it work?

In pure airflow you would construct a DAG by instantiating a bunch of Operators and then setting their relationships. However they require a DAG instance for instantiation! (It can only be inferred by one level.)

In fairflow you construct the DAG from a bunch of FOperators that only instantiate the required Operators when you call the last FOperator on a DAG instance.

The result is a DAG definition that is exactly the same as the one you had before, but now you can re-use and import them from other packages.

Show me the code!

The core of fairflow is the tiny abstract base class FOperator that can be inherited to make functional airflow operator definitions. It takes care of instantiating your airflow operators and setting their dependencies.

The following Compare FOperator for example will create a PythonOperator that executes a callable and uses xcom_pull to get the return values of upstream tasks. To make it work with an arbitrary number of upstream model tasks, we can feed a list of FOperator instances to its constructor.

class Compare(fairflow.FOperator):
    """A task that compare the LinearModel with the PolynomialModel. Returns: pandas.DataFrame"""
    def __init__(self, fops_models, id=None):
        self.fops_models = fops_models
        return super().__init__(id)

    @staticmethod
    def compare(**context):
        """Accumulates the results of upstream tasks into a DataFrame"""
        task_ids = fairflow.utils.get_param("model_taskids", context)				# get the task ids of the upstream tasks
        comparison = pandas.DataFrame(columns=["modelname", "result"])
        for task_id in task_ids:
            modelresult = context["ti"].xcom_pull(task_id)					# pull the return value of the upstream task
            comparison.loc[-1] = task_id, modelresult
        return comparison

    def call(self, dag):
        """Instantiate upstream tasks, this task and set dependencies. Returns: task"""
        model_tasks = [					# instantiate tasks for running the different models
            f(dag)                      # by calling their FOperators on the current `dag`
            for f in self.fops_models	# notice that we do not know about the models upstream dependencies!
        ]
        t = python_operator.PythonOperator(
            task_id=self.__class__.__name__,
            python_callable=self.compare,
            provide_context=True,
            templates_dict={
                "model_taskids": [mt.task_id for mt in model_tasks]
            },
            dag=dag
        )
        t.set_upstream(model_tasks)
        return t

In your DAG definition file, you create an instance of the task you want to get done.

f_linear = LinearModel()
f_poly = PolynomialModel(degree=3)
f_compare = Compare([f_linear, f_poly])

Then you create a DAG like you would usually do and call your task on the DAG:

dag = airflow.DAG('model-comparison',
    default_args=default_args,
    schedule_interval=None
)

t_compare = f_compare(dag)

And that's how you functionally define a workflow.

Did you notice that in our DAG-definition, we did not explicitly instantiate the Dataset task? The LinearModel.call or PolynomialModel.call methods did that on their own. So we do not need to care about the models dependencies and can focus on comparing them.

Testing

The repository comes with an example DAG (example_models) and another one (test_fairflow) that runs some testing.

The example-DAG: Both models depend on the same dataset and are compared.

The test-DAG: The leftmost tasks return json-dumpsed integers (1,2,3) and the ones in the middle xcom_pull those return values to apply some operations.

After activating your virtual environment with airflow[mysql] installed, you can cd to the repository and run the following scripts:

bash run_airflow-setup.sh
bash run_webserver.sh
bash run_scheduler.sh

They will use the repository folder as AIRFLOW_HOME.

FAQ

What if two FOperator classes have the same upstream dependencies?

The task will only be instantiated once, because the FOperator.call method caches all tasks in a dictionary by their task_id. This means that Bob and Charlie can independently depend on Alice and Daniel can still merge Bob's and Charlie's work into the same DAG by simply importing their FOperator definitions.

How is the resulting DAG different to the one I have right now?

fairflow only matters at DAG-definition time and the resulting DAG is identical to the one you get by instantiating all tasks in the same file.

How do I get it?

pip install fairflow

What if ...?

Open an issue and let's have a discussion about it!

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].