All Projects → mikeghen → airflow-tutorial

mikeghen / airflow-tutorial

Licence: Apache-2.0 License
Use Airflow to move data from multiple MySQL databases to BigQuery

Programming Languages

PLpgSQL
1095 projects
python
139335 projects - #7 most used programming language

Projects that are alternatives of or similar to airflow-tutorial

astro
Astro allows rapid and clean development of {Extract, Load, Transform} workflows using Python and SQL, powered by Apache Airflow.
Stars: ✭ 79 (-17.71%)
Mutual labels:  bigquery, airflow
polygon-etl
ETL (extract, transform and load) tools for ingesting Polygon blockchain data to Google BigQuery and Pub/Sub
Stars: ✭ 53 (-44.79%)
Mutual labels:  bigquery, airflow
viewflow
Viewflow is an Airflow-based framework that allows data scientists to create data models without writing Airflow code.
Stars: ✭ 110 (+14.58%)
Mutual labels:  airflow
Data-Manage-Application
A desktop App that allow users to import Excel data into MySQL database and manage data.
Stars: ✭ 41 (-57.29%)
Mutual labels:  mysql-database
jobAnalytics and search
JobAnalytics system consumes data from multiple sources and provides valuable information to both job hunters and recruiters.
Stars: ✭ 25 (-73.96%)
Mutual labels:  airflow
creek
使用Spring Security + JWT Token + RBAC的方式实现认证和授权,持久层使用Mybatis plus。避免每次重复编写认证和授权功能、角色管理、异常处理、参数校验等代码,直接上手业务代码,不再烦恼于构建项目与风格统一。
Stars: ✭ 21 (-78.12%)
Mutual labels:  mysql-database
Data-Engineering-Projects
Personal Data Engineering Projects
Stars: ✭ 167 (+73.96%)
Mutual labels:  airflow
argon
Campaign Manager 360 and Display & Video 360 Reports to BigQuery connector
Stars: ✭ 31 (-67.71%)
Mutual labels:  bigquery
sqlfuzz
Simple SQL table fuzzing
Stars: ✭ 144 (+50%)
Mutual labels:  mysql-database
naive-bayes-classifier
Implementing Naive Bayes Classification algorithm into PHP to classify given text as ham or spam. This application uses MySql as database.
Stars: ✭ 21 (-78.12%)
Mutual labels:  mysql-database
kuromoji-for-bigquery
Tokenize Japanese text on BigQuery with Kuromoji in Apache Beam/Google Dataflow at scale
Stars: ✭ 11 (-88.54%)
Mutual labels:  bigquery
dbt-ml-preprocessing
A SQL port of python's scikit-learn preprocessing module, provided as cross-database dbt macros.
Stars: ✭ 128 (+33.33%)
Mutual labels:  bigquery
mysql-cluster
Scalable MySQL Cluster with ProxySQL Load Balancer and Orchestrator
Stars: ✭ 42 (-56.25%)
Mutual labels:  mysql-database
Musical-World
DBMS Mini Project that basically designed for online music player
Stars: ✭ 59 (-38.54%)
Mutual labels:  mysql-database
airflow-prometheus-exporter
Export Airflow metrics (from mysql) in prometheus format
Stars: ✭ 25 (-73.96%)
Mutual labels:  airflow
go-bqloader
bqloader is a simple ETL framework to load data from Cloud Storage into BigQuery.
Stars: ✭ 16 (-83.33%)
Mutual labels:  bigquery
migrate-Java-EE-app-to-azure
Migrate an existing Java EE workload to Azure
Stars: ✭ 12 (-87.5%)
Mutual labels:  mysql-database
team-timesheets
Time tracking web app built as a replacement for old school timesheets.
Stars: ✭ 25 (-73.96%)
Mutual labels:  bigquery
CodeSignal-Solutions
CodeSignal solutions
Stars: ✭ 112 (+16.67%)
Mutual labels:  mysql-database
AirDataComputer
Air Data Computer
Stars: ✭ 29 (-69.79%)
Mutual labels:  airflow

Airflow Tutorial

This documents some of the work I did getting started with Airflow on Google Cloud Platform.

⚠️ Work in progress 📝

About this Tutorial

I found the tutorial within the Airflow Documentation to be sparse and I also found that in order to achieve what I was trying to do, I'd have to just read all the documentation. The purpose of this tutorial is to help others get started with Airflow without reading all the documentation. I'd still recommend reading all the documentation at some point but if all you're trying to do is use Airflow to move data from an RDBMS like MySQL or Postgres, this is a great place to start.

In this tutorial, I will walk you through setting up Airflow on Google Cloud Platform. I will cover creating a data flow that moves data from MySQL to BigQuery. My goal is to make this tutorial comprehensive enough so that it can be used to configure a production Airflow deployment.

Setup

I'm using Google Cloud Platform for hosting. The end goal here is to take data from 5 MySQL databases and load it into Google BigQuery.

Installation

I installed Airflow on an instance using Compute Engine (using Ubuntu 16 OS). The installation was pretty trivial simply:

export AIRFLOW_HOME=~/airflow
pip install airflow
airflow initdb
airflow webserver -p 8080

and I was up and running. You can find more on the installation in the Airflow Documentation.

About the Installation

Airflow is install as a Python package and all the configuration files are stored in ~/airflow.

The primary file you need to know of is ~/airflow/airflow.cfg which stores the configuration information for Airflow. I will edit those in the next section to setup Security.

Airflow is a Flask application by the way.

Database Setup

According to the Airflow Documentation:

If you want to take a real test drive of Airflow, you should consider setting up a real database backend and switching to the LocalExecutor.

I decided I would just install Postgres on my Airflow instance (Ubuntu 16):

sudo apt-get install postgresql postgresql-contrib
pip install psycopg2

Then to create a user for airflow:

$ sudo -u postgres createuser --interactive
Enter name of role to add: airflow
Shall the new role be a superuser? (y/n) n
Shall the new role be allowed to create databases? (y/n) n
Shall the new role be allowed to create more new roles? (y/n) n

Then set the user's password and create the database:

sudo -u postgres psql
psql (9.5.7)
Type "help" for help.

postgres=# ALTER USER airflow WITH PASSWORD 'airflow_password';
ALTER ROLE
postgres=# CREATE DATABASE airflow;
CREATE DATABASE

Next, edit the airflow.cfg to use Postgres by adding:

# The Postgres connection string
sql_alchemy_conn = postgresql://airflow:airflow_password@localhost/airflow

and comment out the SQLite config.

Finally, reinitialize the database:

airflow initdb

Restarting Airflow

I had to restart Airflow which wasn't as simple as I expected. I ended up using kill -9 to kill all the airflow processes. I tried other solutions posted on Stack Overflow, but eventually just killed the processes using -9 and restarted using:

airflow webserver -p 8080 -D

⚠️ You should really configure systemd

Starting the Scheduler

The scheduler needs to be running in order for jobs and tasks to be executed. To start the scheduler, run:

airflow scheduler

Integration with systemd

📝 http://pythonhosted.org/airflow/configuration.html#integration-with-systemd

Sponsorship Message

This Airflow Tutorial is being sponsored by the following tool; please help to support us by taking a look and signing up to a free trial

GitAds

Security

User Access Control

One of my concerns was user access controls so after the install I jumped down to the Security portion of the Airflow Documentation. Per the docs:

By default, all gates are opened. An easy way to restrict access to the web application is to do it at the network level, or by using SSH tunnels.

I plan on setting up SSH tunneling in production (until we have a VPN in place) but I still want my users to have to authenticate.

To setup Airflow to require usernames and password, I edited my aiflow.cfg file under the [webserver] section per the documentation:

authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth

Next, you'll need to install flask_bcrypt. Since Airflow is a Flask application it will need this package to manage authentication encryption:

pip install flask_bcrypt

Finally, I needed to create the initial user by running Python on the command line first:

cd ~/airflow
python

Then in the Python interpreter:

import airflow
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser
user = PasswordUser(models.User())
user.username = 'admin'
user.email = '[email protected]'
user.password = 'admin_password'
session = settings.Session()
session.add(user)
session.commit()
session.close()
exit()

Finally, restart airflow.

📝 I will probably come back later and setup GitHub Enterprise Auth

Encryption

When clicking around, I ran into this nasty message on /admin/connections:

Warning: Connection passwords are stored in plaintext until you install the Python "cryptography" library. You can find installation instructions here: https://cryptography.io/en/latest/installation/. Once installed, instructions for creating an encryption key will be displayed the next time you import Airflow.

So I did a simple install of two encryption packages, both recommended in the Airflow Documentation:

pip install cryptography
pip install crypto

and the message went away. I will revisit this when I start setting up connections.

📝 Still figuring this out...

Clearing out the Default DAGs and Connections

Clearing out the default connections was easy. I just selected all and then With Selected > Delete them.

I added back a single connection to the local Airflow Postgres database.

I made the mistake of trying to delete the DAGs from Postgres CLI, ⚠️ bad idea... Just edit airflow.cfg and set:

load_examples = False

📝 I had a problem where connections showed by up after I restarted Airflow, still figuring this out...

Final Notes on Setup

There are probably a few other setup related things to do like workers and backups but I'm all set for now. Next, I move onto setting up for my data flows.

Working with DAGs

The Airflow Documentation talks a lot about "DAGs" but I found the documentation spread out all over the place. This section will walk you through configuring Airflow to move data from MySQL databases into BigQuery.

Operators and Hooks

Before I get into coding up some things, I think it's important to understand what Operators and Hooks are within Airflow.

Operators allow for generation of certain types of tasks that become nodes in the DAG when instantiated.

There are 3 main types of operators:

  • Operators that performs an action, or tell another system to perform an action
    • e.g. BashOperator
  • Transfer operators move data from one system to another
    • e.g. MySqlToGoogleCloudStorageOperator or MySqlToGoogleCloudStorageOperator
  • Sensors are a certain type of operator that will keep running until a certain criterion is met.
    • e.g. GoogleCloudStorageObjectSensor

Hooks manage the interaction between systems (e.g. BigQuery) and Airflow. Hooks provide methods like get_conn and get_pandas_df which interact with systems to access data.

Creating and Testing the First DAGs

DAGs are created using Python code. To make a DAG, you can create a Python script and save it into dag_folder as specified in airflow.cfg (defaults to ~/airflow/dags). I'm going to create a simple DAG to test that Airflow is finding DAGs correctly. So I'm creating a file call tutorial.py in ~/airflow/dags.

To make sure Airflow finds the DAG I ran:

airflow list_dags

form the command line.

Hands On Exercises 🏋️‍♂️

The first real DAG I want to make is one that exports data from a MySQL Database and dumps it into Google Cloud Storage.

Creating a MySQL to Google Cloud Storage DAG

Setup

To setup for this exercise, I first needed to create some infrastructure to simulate an operational system I could do some ETL on. My setup looked like this:

  1. A MySQL database using GCP's SQL which I loaded MySQL's "sakila" database into (see sakila-database.sql for the dump file I imported into my instance)
  2. A Google Cloud Storage Bucket I could dump some data into

Instructions

The first task is to demonstrate that I could use the MySqlToGoogleCloudStorageOperator to export data from MySQL to a GCS bucket. I crafted this simple DAG mysql_to_gcs.py.

There is some backend configuration work to do before this DAG will run.

First, setup connection in Airflow.

  1. Create a MySQL connection:
Conn Id: sakila_test
Conn Type: MySQL
Host: 10.10.10.10
Schema: sakila
Login: airflow
Password: airflow_password
  1. Create a GCP connection
  2. Create a Service Account and download the credentials you need, save them to somewhere on the airflow instance. I put mine in /etc/gcp/creds.json
  3. Setup the connections:
Conn Id: gcp_test
Conn Type: Google Cloud Platform
Project Id: my-gcp-project-id-00000
Keyfile Path: /etc/gcp/creds.json
Scopes (comma seperated): https://www.googleapis.com/auth/cloud-platform
  1. Install MySQL dependacies on the airflow instance:
sudo apt-get install python-mysqldb
pip install pymysql
  1. Create the mysql_to_gcs.py DAG in ~/airflow/dags (find code in ./dags)

  2. Test for python compilation to make sure you don't have any syntax errors:

cd ~/airflow/dags
python mysql_to_gcs.py
  1. Now test run the task using airflow. This will actually execute a DAG task as if it were running in airflow so expect to see a file created in the bucket you're using:
airflow test mysql_to_gcs.py extract_actor 08-11-2017
  1. Once you've tested it, you're all set!

Creating a DAG to Extract multple tables from multiple MySQL databases to BigQuery

In this exercise, we'll pull data from two MySQL databases and dump it to GCS then load it from GCS to BigQuery.

Setup

To setup for this exercise, I first needed to create some infrastructure to simulate an operational system I could do some ETL on. My setup looked like this:

  1. Create your first MySQL database (sakila_1) using GCP's SQL which you will loaded MySQL's "sakila" database into (see sakila-database.sql for the dump file I imported into my instance)
  2. Create your second MySQL database (sakila_2) using GCP's SQL which you will loaded MySQL's "sakila" database into (see sakila-database.sql for the dump file I imported into my instance)
  3. Create a Google Cloud Storage Bucket you could dump some data into
  4. Create a BigQuery Dataset for sakila_1 and sakila_2 (could probably make one dataset now that I think about it)

Instructions

  1. Create two MySQL connection:
Conn Id: sakila_1
Conn Type: MySQL
Host: 10.10.10.10
Schema: sakila
Login: airflow
Password: airflow_password
Conn Id: sakila_2
Conn Type: MySQL
Host: 10.10.10.11
Schema: sakila
Login: airflow
Password: airflow_password
  1. Create a GCP connection
  2. Create a Service Account and download the credentials you need, save them to somewhere on the airflow instance. I put mine in /etc/gcp/creds.json
  3. Setup the connections:
Conn Id: gcp_test
Conn Type: Google Cloud Platform
Project Id: my-gcp-project-id-00000
Keyfile Path: /etc/gcp/creds.json
Scopes (comma seperated): https://www.googleapis.com/auth/cloud-platform
  1. Create the sakila_main_tables.py DAG in ~/airflow/dags (find code in ./dags)

  2. Test for python compilation to make sure you don't have any syntax errors:

cd ~/airflow/dags
python sakila_main_tables.py
  1. Now test run the two main tasks using airflow. This will actually execute a DAG task as if it were running in airflow so expect to see a file created in the bucket and a table created in BigQuery when you get a success:
airflow test sakila_main_tables.py extract_mysql_sakila_1_actor 08-11-2017
airflow test sakila_main_tables.py load_bq_sakila_1_actor 08-11-2017

Future Works, To Do Items

  • Add eth_rates.py exercise with an example showing how to use plugins
  • Figure out Encryption for connections
  • Document setting up for CeleryExecutor
  • Include instructions for setting up systemd

Sponsorship Message

Future works coming soon thanks to my sponsor: GitAds

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].