All Projects → mohaseeb → beam-nuggets

mohaseeb / beam-nuggets

Licence: MIT License
Collection of transforms for the Apache beam python SDK.

Programming Languages

python
139335 projects - #7 most used programming language
shell
77523 projects

Projects that are alternatives of or similar to beam-nuggets

DataflowTemplate
Mercari Dataflow Template
Stars: ✭ 46 (-28.12%)
Mutual labels:  apache-beam
framework
Solu Framework is a full featured, ORM-backed, isomorphic framework using RPython, Pouch/CouchDB and React.
Stars: ✭ 20 (-68.75%)
Mutual labels:  relational-databases
kuromoji-for-bigquery
Tokenize Japanese text on BigQuery with Kuromoji in Apache Beam/Google Dataflow at scale
Stars: ✭ 11 (-82.81%)
Mutual labels:  apache-beam
aws-dbs-refarch-rdbms
Reference Architectures for Relational Databases on AWS
Stars: ✭ 23 (-64.06%)
Mutual labels:  relational-databases
activerecord-setops
Union, Intersect, and Difference set operations for ActiveRecord (also, SQL's UnionAll).
Stars: ✭ 21 (-67.19%)
Mutual labels:  relational-databases
db seeder
Relational database data generator..
Stars: ✭ 36 (-43.75%)
Mutual labels:  relational-databases
proxima-platform
The Proxima platform.
Stars: ✭ 17 (-73.44%)
Mutual labels:  apache-beam
jds
Jenesis Data Store: a dynamic, cross platform, high performance, ORM data-mapper. Designed to assist in rapid development and data mining
Stars: ✭ 17 (-73.44%)
Mutual labels:  relational-databases
datajoint-python
Relational data pipelines for the science lab
Stars: ✭ 140 (+118.75%)
Mutual labels:  relational-databases
weather-tools
Apache Beam pipelines to make weather data accessible and useful.
Stars: ✭ 72 (+12.5%)
Mutual labels:  apache-beam
generaptr
Generaptr is a node package that helps when starting up a project by generating boilerplate code for Express api.
Stars: ✭ 16 (-75%)
Mutual labels:  relational-databases
DataflowTemplates
Convenient Dataflow pipelines for transforming data between cloud data sources
Stars: ✭ 22 (-65.62%)
Mutual labels:  apache-beam
bigquery-to-datastore
Export a whole BigQuery table to Google Datastore with Apache Beam/Google Dataflow
Stars: ✭ 56 (-12.5%)
Mutual labels:  apache-beam
BirDayBer
'BirDayBer' is an application made for irresponsible people with friends or family birthdays like me. So it allows you to add birthdays and other minimal information to a database to notify you to remember them.
Stars: ✭ 22 (-65.62%)
Mutual labels:  relational-databases
RDMP
Research Data Management Platform (RDMP) is an open source application for the loading,linking,anonymisation and extraction of datasets stored in relational databases.
Stars: ✭ 20 (-68.75%)
Mutual labels:  relational-databases
database
Relational database access made simpler and safer
Stars: ✭ 40 (-37.5%)
Mutual labels:  relational-databases
spiced-final-project
Career explorer platform developed in React.js in 6 days.
Stars: ✭ 14 (-78.12%)
Mutual labels:  relational-databases
data processing course
Some class materials for a data processing course using PySpark
Stars: ✭ 50 (-21.87%)
Mutual labels:  apache-beam
oesophagus
Enterprise Grade Single-Step Streaming Data Infrastructure Setup. (Under Development)
Stars: ✭ 12 (-81.25%)
Mutual labels:  relational-databases
AlgebraicRelations.jl
Relational Algebra, now with more algebra!
Stars: ✭ 31 (-51.56%)
Mutual labels:  relational-databases

PyPI PyPI - Downloads

About

A collection of random transforms for the Apache beam python SDK . Many are simple transforms. The most useful ones are those for reading/writing from/to relational databases.

Installation

  • Using pip
pip install beam-nuggets
  • From source
git clone [email protected]:mohaseeb/beam-nuggets.git
cd beam-nuggets
pip install .

Supported transforms

IO

Others

Documentation

See here.

Usage

Write data to an SQLite table using beam-nugget's relational_db.Write transform.

# write_sqlite.py contents
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db

records = [
    {'name': 'Jan', 'num': 1},
    {'name': 'Feb', 'num': 2}
]

source_config = relational_db.SourceConfiguration(
    drivername='sqlite',
    database='/tmp/months_db.sqlite',
    create_if_missing=True  # create the database if not there 
)

table_config = relational_db.TableConfiguration(
    name='months',
    create_if_missing=True,  # automatically create the table if not there
    primary_key_columns=['num']  # and use 'num' column as primary key
)
    
with beam.Pipeline(options=PipelineOptions()) as p:  # Will use local runner
    months = p | "Reading month records" >> beam.Create(records)
    months | 'Writing to DB' >> relational_db.Write(
        source_config=source_config,
        table_config=table_config
    )

Execute the pipeline

python write_sqlite.py 

Examine the contents

sqlite3 /tmp/months_db.sqlite 'select * from months'
# output:
# 1.0|Jan
# 2.0|Feb

To write the same data to a PostgreSQL table instead, just create a suitable relational_db.SourceConfiguration as follows.

source_config = relational_db.SourceConfiguration(
    drivername='postgresql+pg8000',
    host='localhost',
    port=5432,
    username='postgres',
    password='password',
    database='calendar',
    create_if_missing=True  # create the database if not there 
)

Click here for more examples, including writing to PostgreSQL in Google Cloud Platform using the DataFlowRunner.

An example showing how you can use beam-nugget's relational_db.ReadFromDB transform to read from a PostgreSQL database table.

from __future__ import print_function
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db

with beam.Pipeline(options=PipelineOptions()) as p:
    source_config = relational_db.SourceConfiguration(
        drivername='postgresql+pg8000',
        host='localhost',
        port=5432,
        username='postgres',
        password='password',
        database='calendar',
    )
    records = p | "Reading records from db" >> relational_db.ReadFromDB(
        source_config=source_config,
        table_name='months',
        query='select num, name from months'  # optional. When omitted, all table records are returned. 
    )
    records | 'Writing to stdout' >> beam.Map(print)

See here for more examples.

Development

  • Install
git clone [email protected]:mohaseeb/beam-nuggets.git
cd beam-nuggets
export BEAM_NUGGETS_ROOT=`pwd`
pip install -e .[dev]
  • Make changes on dedicated dev branches
  • Run tests
cd $BEAM_NUGGETS_ROOT
python -m unittest discover -v
  • Generate docs
cd $BEAM_NUGGETS_ROOT
docs/generate_docs.sh
  • Create a PR against master.
  • After merging the accepted PR and updating the local master, upload a new build to pypi.
cd $BEAM_NUGGETS_ROOT
scripts/build_test_deploy.sh

Backlog

  • versioned docs?
  • Summarize the investigation of using Source/Sink Vs ParDo(and GroupBy) for IO
  • more nuggets: WriteToCsv
  • Investigate readiness of SDF ParDo, and possibility to use for relational_db.ReadFromDB
  • integration tests
  • DB transforms failures handling on IO transforms
  • more nuggets: Elasticsearch, Mongo
  • WriteToRelationalDB, logging

Contributions by

mohaseeb, astrocox, 2514millerj, alfredo, shivangkumar

Licence

MIT

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].