All Projects → groupon → Luigi Warehouse

groupon / Luigi Warehouse

Licence: other
A luigi powered analytics / warehouse stack

Programming Languages

python
139335 projects - #7 most used programming language

Projects that are alternatives of or similar to Luigi Warehouse

Udacity Data Engineering
Udacity Data Engineering Nano Degree (DEND)
Stars: ✭ 89 (+23.61%)
Mutual labels:  aws, spark, etl, postgresql, redshift
Aws Data Wrangler
Pandas on AWS - Easy integration with Athena, Glue, Redshift, Timestream, QuickSight, Chime, CloudWatchLogs, DynamoDB, EMR, SecretManager, PostgreSQL, MySQL, SQLServer and S3 (Parquet, CSV, JSON and EXCEL).
Stars: ✭ 2,385 (+3212.5%)
Mutual labels:  aws, etl, mysql, redshift
Dataspherestudio
DataSphereStudio is a one stop data application development& management portal, covering scenarios including data exchange, desensitization/cleansing, analysis/mining, quality measurement, visualization, and task scheduling.
Stars: ✭ 1,195 (+1559.72%)
Mutual labels:  spark, etl, hive, workflow
Dev Setup
macOS development environment setup: Easy-to-understand instructions with automated setup scripts for developer tools like Vim, Sublime Text, Bash, iTerm, Python data analysis, Spark, Hadoop MapReduce, AWS, Heroku, JavaScript web development, Android development, common data stores, and dev-based OS X defaults.
Stars: ✭ 5,590 (+7663.89%)
Mutual labels:  aws, spark, mysql, postgresql
Cube.js
📊 Cube — Open-Source Analytics API for Building Data Apps
Stars: ✭ 11,983 (+16543.06%)
Mutual labels:  spark, hive, mysql, postgresql
Redash
Make Your Company Data Driven. Connect to any data source, easily visualize, dashboard and share your data.
Stars: ✭ 20,147 (+27881.94%)
Mutual labels:  spark, mysql, postgresql, redshift
Kiba Plus
Kiba enhancement for Ruby ETL.
Stars: ✭ 47 (-34.72%)
Mutual labels:  etl, mysql, postgresql
Devops Bash Tools
550+ DevOps Bash Scripts - AWS, GCP, Kubernetes, Kafka, Docker, APIs, Hadoop, SQL, PostgreSQL, MySQL, Hive, Impala, Travis CI, Jenkins, Concourse, GitHub, GitLab, BitBucket, Azure DevOps, TeamCity, Spotify, MP3, LDAP, Code/Build Linting, pkg mgmt for Linux, Mac, Python, Perl, Ruby, NodeJS, Golang, Advanced dotfiles: .bashrc, .vimrc, .gitconfig, .screenrc, .tmux.conf, .psqlrc ...
Stars: ✭ 226 (+213.89%)
Mutual labels:  aws, mysql, postgresql
Wedatasphere
WeDataSphere is a financial level one-stop open-source suitcase for big data platforms. Currently the source code of Scriptis and Linkis has already been released to the open-source community. WeDataSphere, Big Data Made Easy!
Stars: ✭ 372 (+416.67%)
Mutual labels:  spark, etl, hive
Node Orm2
Object Relational Mapping
Stars: ✭ 3,063 (+4154.17%)
Mutual labels:  mysql, postgresql, redshift
Ddlparse
DDL parase and Convert to BigQuery JSON schema and DDL statements
Stars: ✭ 52 (-27.78%)
Mutual labels:  mysql, postgresql, redshift
Locopy
locopy: Loading/Unloading to Redshift and Snowflake using Python.
Stars: ✭ 73 (+1.39%)
Mutual labels:  aws, etl, redshift
Bigdata docker
Big Data Ecosystem Docker
Stars: ✭ 161 (+123.61%)
Mutual labels:  spark, hive, mysql
Pointblank
Data validation and organization of metadata for data frames and database tables
Stars: ✭ 480 (+566.67%)
Mutual labels:  spark, mysql, postgresql
Szt Bigdata
深圳地铁大数据客流分析系统🚇🚄🌟
Stars: ✭ 826 (+1047.22%)
Mutual labels:  spark, hive, mysql
Datafaker
Datafaker is a large-scale test data and flow test data generation tool. Datafaker fakes data and inserts to varied data sources. 测试数据生成工具
Stars: ✭ 327 (+354.17%)
Mutual labels:  hive, mysql, postgresql
Linq2db
Linq to database provider.
Stars: ✭ 2,211 (+2970.83%)
Mutual labels:  etl, mysql, postgresql
Storagetapper
StorageTapper is a scalable realtime MySQL change data streaming, logical backup and logical replication service
Stars: ✭ 232 (+222.22%)
Mutual labels:  etl, mysql, postgresql
Pyetl
python ETL framework
Stars: ✭ 33 (-54.17%)
Mutual labels:  etl, hive, mysql
Tbls
tbls is a CI-Friendly tool for document a database, written in Go.
Stars: ✭ 940 (+1205.56%)
Mutual labels:  mysql, postgresql, redshift

Luigi-Warehouse

A boilerplate implementation of Luigi at Groupon

pic

  • Luigi is a Python package that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization, handling failures, command line integration, and much more

  • Luigi-Warehouse adds

  • example workflows (i.e. replicating postgresql tables to redshift)

  • more data sources

  • variable data sources that do not rely on default luigi behavior/configs (i.e. VariableS3Client)

Install / Setup

  • Install python3 - This repo has been tested against python 3.4+
Simple

python setup.py install

Developers - if you're wanting to modify/use the workflows with your custom logic
  • Clone this repo
  • pip3 install -r requirements.txt if you want full functionality of all data sources
Post-Install
  • mkdir your-path-to/data
  • Put your credentials and settings in luigi.cfg. luigi.cfg-example shows some possible options. You can also $ export LUIGI_CONFIG_PATH=/path/to/your/luigi.cfg && python...
  • You're ready to replicate or move data around...

Getting Started

  • Some example workflows are included. Assumptions, Args & Comments are in the File
File Description Main Class(es)
gsheet_to_redshift.py replicates all data from a google sheet to a redshift table (full copy/replace) Run
gsheet_to_hadoop.py replicates all data from a google sheet to a hadoop hive table via spark (full copy/replace) main
postgres_to_redshift.py replicates postgres tables to redshift (incrementally or full copy/replace) Run - PerformIncrementalImport PerformFullImport
postgres_to_hadoop.py spark app that replicates postgres tables to hadoop(hive) (incrementally or copy/replace) Run - RunIncremental RunFromScratch
salesforce_to_redshift.py replicates a salesforce report or SOQL to a redshift table(full copy/replace) SOQLtoRedshift ReporttoRedshift
teradata_to_redshift.py replicates given teradata SQL to redshift table (incrementally or full copy/replace) Run
typeform_to_redshift.py replicates all data from typeform responses to a redshift table (full copy/replace) Run
zendesk_to_redshift.py extracts users,orgs,tickets,ticket_events from zendesk to redshift (partially incremental) Run
zendesk_to_hadoop.py generic class to extract from zendesk API and load to hadoop hive via spark (incrementally or full copy/replace) ZendeskSpark
  • Example to start the luigi scheduler daemon
$ ./start_luigi_server.bash
  • Example to run a workflow with multiple workers in parallel
$ LUIGI_CONFIG_PATH=/path/to/your/luigi.cfg && python3 luigi_warehouse/postgres_to_redshift.py Run --params here --workers 50

Data Sources

Dependent python packages required & API reference

Luigi - Spotify/Luigi
Postgres / Redshift - psycopg2
MySQL - pymysql
Adwords - googleads : API Reference
Googlesheets - gspread : API Reference
Slack - slackclient : API Reference
Five9 - suds : API Reference
Twilio - twilio : API Reference
Livechat - API Reference
Zendesk - zdesk : API Reference
Shiftplanning - API Reference
Kochava - API Reference
Teradata - teradata
  • requires some configuring to install. We typically have to do
$ mv ~/.odbc.ini ~/.odbc.ini.orig 
$ cp /opt/teradata/client/15.10/odbc_64/odbcinst.ini ~/.odbcinst.ini 
$ cp /opt/teradata/client/15.10/odbc_64/odbc.ini ~/.odbc.ini
OnboardIQ - API Reference
AppBoy - API Reference
Salesforce - simple-salesforce : API Reference
  • Props to cghall for the capability to query salesforce reports directly using the analytics API

  • Also available are SalesforceBulk and SalesforceBulkJob classes which use the Salesforce bulk API

Braintree - braintree : API Reference
Typeform - API Reference
Checkr - API Reference
AWS - boto : boto3

Notifications

  • We currently use slack or email for job status notifications which can easily be added

  • luigi-slack

from luigi_slack import SlackBot, notify
slack_channel = 'luigi-status-messages'
...
...
...

if __name__ == '__main__':
  slack_channel = 'luigi-status-messages'
  slacker = SlackBot(token=luigi.configuration.get_config().get('slackbots', 'BOWSER_SLACK_API_KEY'),
                   channels=[slack_channel])
  with notify(slacker):
    luigi.run() 
import boto3

class Email:
  def __init__(self, region, aws_access_key_id, aws_secret_access_key):
    self.client = boto3.client('ses',region_name=region,aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)

  def send(self, from_, to_list, subject, body):
    return self.client.send_email(Source=from_,
                                  Destination={'ToAddresses': to_list},
                                  Message={'Subject':
                                                     {'Data': subject},
                                           'Body':
                                                     {'Text':
                                                             {'Data': body},
                                                      'Html':
                                                              {'Data':' '}
                                                      }
                                            }
                                   )

Data Validation

  • Targeted towards ensuring successful replication of data to Redshift (see modules/validation.py)
Structure
  • if the same number of columns in the csv are in the target table
  • if the columns have the same datatypes in the same order (VARCHAR is acceptable for any python datatype)
    • uses python_redshift_dtypes to convert
LoadError
  • Checks for load errors for the target:schema:table provided since the load_start provided timestamp
RunAnywayTarget
  • Use the wrapper class RunAnywayTarget if you want to make it easier as we make each validation scheme better

  • pass in the taskobj with the following attributes

    • type = ['LoadError', 'Structure']
    • target = Redshift
    • table =
    • schema =
    • local_file = local csv file path
    • load_start = when you started to copy the records from S3
  • doing RunAnywayTarget(self).done() will not do validation

  • doing RunAnywayTarget(self).validation() will do the validation and if successful also say we're done the task

OrderedDF
  • Takes the following args
  1. target_cols : a list of columns ordered for how you want your dataframe to be structured
  2. df : your dataframe you want restructured
  • example: I my dataframe to have columns in this order ['one','two','three','four','five','six']
>>> from validation import OrderedDF
>>> import pandas as pd
>>> test = [[None,'',1,7,8],[None,'',2,5,6]]
>>> test = pd.DataFrame(test,columns=['one','two','four','five','three'])
>>> test
    one two  four  five  three
0  None         1     7      8
1  None         2     5      6
>>> result = OrderedDF(['one','two','three','four','five','six'],t)
>>> result.df
    one two  three  four  five   six
0  None          8     1     7  None
1  None          6     2     5  None
StructureDynamic
  • This class will fix tables for you
  1. Check for copy errors
  2. Handle the copy errors
  • Add column(s) if needed
  • Change dtype(s) if needed
  1. Get orig table's schema
  2. Craft new table's schema with changes from errors
  3. Make the change and retry the copy and remove duplicate * records
  4. While there are copy errors
  • handle the errors

  • attempt to fix

  • retry copy

  • remove duplicate * records

  • To run use

StructureDynamic(target_schema=  ,# redshift schema your table is in
                 target_table=    # your table
                 )
                 .run(
                      add_cols=  ,# True or False for if you want columns added in attempting to fix
                      change_dtypes=  ,# True or False if you want column data types changed in attempting to fix
                      copy=           ,# copy command you attempted
                      load_start=      # when you started the copy command, '%Y-%m-%d %H:%M:$S
                      )
  • Example usage:
    • sql prep: create the table
CREATE TABLE public.test(id INT, col VARCHAR);
INSERT INTO test VALUES (1,'2');
INSERT INTO test VALUES (2, 'two');
  • test.csv: create the csv you want to attempt to copy
1,2
two,2
3,4
5,6
ab,test
  • we attempt to copy normally but we get load errors because one of the columns isn't right
COPY public.test FROM 's3://luigi-godata/test.csv' 
CREDENTIALS 'aws_access_key_id=XXXX;aws_secret_access_key=XXXX'
CSV DELIMITER ',' COMPUPDATE ON MAXERROR 0;
  • we run ValidationDynamic
from validation import StructureDynamic
copy = '''COPY public.test FROM 's3://luigi-godata/test.csv' 
          CREDENTIALS 'aws_access_key_id=XXXX;aws_secret_access_key=XXXX'
          CSV DELIMITER ',' COMPUPDATE ON MAXERROR 0;'''
StructureDynamic(target_schema='public',target_table='test').run(add_cols=True,change_dtypes=True,copy=copy,load_start='2016-10-6 10:15:00')
  • our table is fixed and called public.test
  • our original table is kept as public.test_orig_backup
  • stdout lists the stl_load_errors
  • the changes made to the table's ddl is printed to stdout
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].