All Projects → CityOfPhiladelphia → Phila Airflow

CityOfPhiladelphia / Phila Airflow

Programming Languages

python
139335 projects - #7 most used programming language

Projects that are alternatives of or similar to Phila Airflow

naas
⚙️ Schedule notebooks, run them like APIs, expose securely your assets: Jupyter as a viable ⚡️ Production environment
Stars: ✭ 219 (+1268.75%)
Mutual labels:  pipeline, etl
lineage
Generate beautiful documentation for your data pipelines in markdown format
Stars: ✭ 16 (+0%)
Mutual labels:  pipeline, etl
polygon-etl
ETL (extract, transform and load) tools for ingesting Polygon blockchain data to Google BigQuery and Pub/Sub
Stars: ✭ 53 (+231.25%)
Mutual labels:  airflow, etl
Linq2db
Linq to database provider.
Stars: ✭ 2,211 (+13718.75%)
Mutual labels:  oracle, etl
AirflowDataPipeline
Example of an ETL Pipeline using Airflow
Stars: ✭ 24 (+50%)
Mutual labels:  airflow, etl
AirflowETL
Blog post on ETL pipelines with Airflow
Stars: ✭ 20 (+25%)
Mutual labels:  airflow, etl
sparklanes
A lightweight data processing framework for Apache Spark
Stars: ✭ 17 (+6.25%)
Mutual labels:  pipeline, etl
Example Airflow Dags
Example DAGs using hooks and operators from Airflow Plugins
Stars: ✭ 243 (+1418.75%)
Mutual labels:  airflow, etl
Addax
Addax is an open source universal ETL tool that supports most of those RDBMS and NoSQLs on the planet, helping you transfer data from any one place to another.
Stars: ✭ 615 (+3743.75%)
Mutual labels:  etl, oracle
etl
M-Lab ingestion pipeline
Stars: ✭ 15 (-6.25%)
Mutual labels:  pipeline, etl
Datax
DataX is an open source universal ETL tool that support Cassandra, ClickHouse, DBF, Hive, InfluxDB, Kudu, MySQL, Oracle, Presto(Trino), PostgreSQL, SQL Server
Stars: ✭ 116 (+625%)
Mutual labels:  oracle, etl
Datavec
ETL Library for Machine Learning - data pipelines, data munging and wrangling
Stars: ✭ 272 (+1600%)
Mutual labels:  pipeline, etl
Csv2db
The CSV to database command line loader
Stars: ✭ 102 (+537.5%)
Mutual labels:  oracle, etl
DataX-src
DataX 是异构数据广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。
Stars: ✭ 21 (+31.25%)
Mutual labels:  etl, oracle
Pyetl
python ETL framework
Stars: ✭ 33 (+106.25%)
Mutual labels:  oracle, etl
mydataharbor
🇨🇳 MyDataHarbor是一个致力于解决任意数据源到任意数据源的分布式、高扩展性、高性能、事务级的数据同步中间件。帮助用户可靠、快速、稳定的对海量数据进行准实时增量同步或者定时全量同步,主要定位是为实时交易系统服务,亦可用于大数据的数据同步(ETL领域)。
Stars: ✭ 28 (+75%)
Mutual labels:  pipeline, etl
Udacity Data Engineering
Udacity Data Engineering Nano Degree (DEND)
Stars: ✭ 89 (+456.25%)
Mutual labels:  airflow, etl
Aws Ecs Airflow
Run Airflow in AWS ECS(Elastic Container Service) using Fargate tasks
Stars: ✭ 107 (+568.75%)
Mutual labels:  airflow, etl
astro
Astro allows rapid and clean development of {Extract, Load, Transform} workflows using Python and SQL, powered by Apache Airflow.
Stars: ✭ 79 (+393.75%)
Mutual labels:  airflow, etl
basin
Basin is a visual programming editor for building Spark and PySpark pipelines. Easily build, debug, and deploy complex ETL pipelines from your browser
Stars: ✭ 25 (+56.25%)
Mutual labels:  pipeline, etl

phila-airflow

Airflow Configuration for The City of Philadelphia

Local setup

# clone the repository
git clone https://github.com/CityOfPhiladelphia/phila-airflow.git
cd phila-airflow

# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=`pwd`

# install the project requirements
pip install -r requirements.txt

# initialize the database
airflow initdb

# start the web server, default port is 8080
airflow webserver -p 8080

Troubleshooting

Several DAGs (pipelines) depend on python packages that depend on system packages. For instance, datum depends on Oracle. If these system packages are not available, pip install -r requirements.txt will likely fail. These issues are DAG-specific but will be documented in this section to facilitate easier troubleshooting.

Installing Oracle on OSX

Following this guide:

  1. Install the Oracle Instant Client. Download the 64-bit versions of the basic and sdk zip files from oracle.
  2. Create a global oracle directory in a location such as ~/.local/share/oracle and copy the two .zip files into it
  3. Unzip the .zip files into that directory. When finished, the oracle directory should contain a bunch of files in it (rather than containing a single directory of files).
  4. Inside the oracle directory, create symbolic links using:
ln -s libclntsh.dylib.11.1 libclntsh.dylib
ln -s libocci.dylib.11.1 libocci.dylib

Finally, add the following environment variables to your ~/.bash_profile, replacing the value of ORACLE_HOME with the absolute path to your new oracle directory.

export ORACLE_HOME="/path/to/oracle"
export DYLD_LIBRARY_PATH=$ORACLE_HOME
export LD_LIBRARY_PATH=$ORACLE_HOME

Note: Alternatively, you can install the Oracle Instant Client inside your virtual environment directory and set the environment variables contextually for the specific project.

rtree

Command "python setup.py egg_info" failed with error code 1 in /private/.../rtree

The taxi trips workflow uses rtree, which depends on system packages that do not ship with OS X natively. To install them, use:

brew install spatialindex

cryptography

Command "/.../cryptography" failed with error code 1 in /.../cryptography

Make sure you're using the latest version of pip:

pip install --upgrade pip

Encryption

You can store connection credentials and other sensitive information in your database, but you'll want to store it encrypted. Open your airflow.cfg file and find the fernet_key line. In a terminal, run:

scripts/generate_enc_key

Use the output of that command as the fernet_key value.

Dag Construction

Creating your transformation script

Portions of a DAG can exist in code outside of the phila-airflow repository. For example, you may want to create transformation scripts that can be plugged in to Airflow that get installed when the server is deployed. Creating these as separate scripts can make them easier to test.

Two handy libraries for writing transformation scripts are:

Also, to gain insight into any exceptions as they occur, install and list the raven library as a dependency for your script.

import click
import petl
import raven


# NOTE: The following function is just a sample command. Neither the name of
# the function nor the contents are special. They are simply for demonstration.
@click.command()
def cmd():
    t = petl.fromcsv('...')\
        .addfield('foo', 1)\
        .addfield('bar', lambda r: r[foo] + 1)
        .cutout('baz')\
        .tocsv()


if __name__ == '__main__':
    client = raven.Client()
    try:
        cmd()  # Call the function defined above.
    except:
        client.captureException()
        raise

You can reuse your own python modules in pipelines. In order to use a module, it must be installable -- e.g., it should have a setup.py file. A minimal setup.py file looks something like:

from distutils.core import setup

setup(
    name='phl-data-transformation',
    version='1.0.0',
    scripts=['transform.py'],
)

Defining the DAG

You should think of your pipeline as a series of stages.

# constructing a new dag requires importing airflow’s DAG class:
from airflow import DAG

# use the DAG class to instantiate a new dag, provide arguments for the name, and default settings.
# The latter can be created with a dictionary and will be applied to all of your operators:

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': seven_days_ago,
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
 }

dag = DAG(dag_name, default_args=default_args)

# create the individual tasks you would like to run:
# run ChangeProjections.py

task1 =  BashOperator(
          task_id ='batch_projections',
          bash_command='''python  ChangeProjections.py ''',
          dag=dag
        )

# run Compare_privileges.py

task2 = BashOperator(
          task_id ='compare_priv',
          bash_command='''python Compare_privileges.py ''',
          dag=dag
        )

# run knack.py

  task3 = BashOperator(
          task_id='back_up_knack',
          bash_command='''python knack.py ''',
          dag=dag
  )


success = 'Your Task finished without errors'

task4 = SlackAPIPostOperator(
   task_id='notfiy_success',
   token=Variable.get('slack_token'),
   channel='yourchannel',
   text=success,
   dag=dag
)

task2.execute()

# to set the dependency of one task on another use the convention:
# set task4 downstream of task1
task1 >> task4

# set task 4 downstream of task2
task2 >> task4

# set task4 downstream of task3
task3 >> task4

Troubleshooting Plugins

Sometimes Airflow will fail to import components that you've made available via plugins. In order to troubleshoot your plugin, follow these instructions:

  1. Open a Python REPL and import Airflow

    >>> from airflow import *
    

    If Python gives you any exceptions, you may have a syntax error in one of your modules.

  2. Import the components that the plugin provides. For example, if your plugin exposes a hook, import it from airflow.hooks; if it exposes an operator import it from airflow.operators.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].