All Projects â†’ samelamin â†’ Spark Bigquery

samelamin / Spark Bigquery

Licence: apache-2.0
Google BigQuery support for Spark, Structured Streaming, SQL, and DataFrames with easy Databricks integration.

Programming Languages

scala
5932 projects

Projects that are alternatives of or similar to Spark Bigquery

Apache Spark Node
Node.js bindings for Apache Spark DataFrame APIs
Stars: ✭ 136 (+109.23%)
Mutual labels:  spark, data-frame
Cube.js
📊 Cube — Open-Source Analytics API for Building Data Apps
Stars: ✭ 11,983 (+18335.38%)
Mutual labels:  spark, bigquery
Spark Bigquery Connector
BigQuery data source for Apache Spark: Read data from BigQuery into DataFrames, write DataFrames into BigQuery tables.
Stars: ✭ 126 (+93.85%)
Mutual labels:  spark, bigquery
Spark Excel
A Spark plugin for reading Excel files via Apache POI
Stars: ✭ 216 (+232.31%)
Mutual labels:  spark, data-frame
firestore-to-bigquery-export
NPM package for copying and converting Cloud Firestore data to BigQuery.
Stars: ✭ 26 (-60%)
Mutual labels:  bigquery, schema
Redash
Make Your Company Data Driven. Connect to any data source, easily visualize, dashboard and share your data.
Stars: ✭ 20,147 (+30895.38%)
Mutual labels:  spark, bigquery
Datavec
ETL Library for Machine Learning - data pipelines, data munging and wrangling
Stars: ✭ 272 (+318.46%)
Mutual labels:  schema, spark
Pointblank
Data validation and organization of metadata for data frames and database tables
Stars: ✭ 480 (+638.46%)
Mutual labels:  spark, data-frame
Squid
🦑 Provides SQL tagged template strings and schema definition functions.
Stars: ✭ 57 (-12.31%)
Mutual labels:  schema
Autocomplete Json
Atom autocomplete for JSON files
Stars: ✭ 60 (-7.69%)
Mutual labels:  schema
Model Serving Tutorial
Code and presentation for Strata Model Serving tutorial
Stars: ✭ 57 (-12.31%)
Mutual labels:  spark
Rumble
⛈️ Rumble 1.11.0 "Banyan Tree"🌳 for Apache Spark | Run queries on your large-scale, messy JSON-like data (JSON, text, CSV, Parquet, ROOT, AVRO, SVM...) | No install required (just a jar to download) | Declarative Machine Learning and more
Stars: ✭ 58 (-10.77%)
Mutual labels:  spark
Captionjs
An open-source jQuery plugin to easily and semantically add captions to images.
Stars: ✭ 60 (-7.69%)
Mutual labels:  schema
Docker Spark Cluster
A Spark cluster setup running on Docker containers
Stars: ✭ 57 (-12.31%)
Mutual labels:  spark
Spark Doc Zh
Apache Spark 官方文档中文版
Stars: ✭ 1,126 (+1632.31%)
Mutual labels:  spark
Awesome Pulsar
A curated list of Pulsar tools, integrations and resources.
Stars: ✭ 57 (-12.31%)
Mutual labels:  spark
Net.jgp.labs.spark
Apache Spark examples exclusively in Java
Stars: ✭ 55 (-15.38%)
Mutual labels:  spark
Pyspark Twitter Stream Mining
Real-time Machine Learning with Apache Spark on Twitter Public Stream
Stars: ✭ 64 (-1.54%)
Mutual labels:  spark
Roffildlibrary
Library for MQL5 (MetaTrader) with Python, Java, Apache Spark, AWS
Stars: ✭ 63 (-3.08%)
Mutual labels:  spark
Data Science Cookbook
🎓 Jupyter notebooks from UFC data science course
Stars: ✭ 60 (-7.69%)
Mutual labels:  spark

spark-bigquery

Build Status

This Spark module allows saving DataFrame as BigQuery table.

The project was inspired by spotify/spark-bigquery, but there are several differences and enhancements:

  • Use of the Structured Streaming API

  • Use within Pyspark

  • Saving via Decorators

  • Allow saving to partitioned tables

  • Easy integration with Databricks

  • Use of Standard SQL

  • Use Of Time-Ingested Partition Columns

  • Run Data Manipulation Language Queries DML

  • Update schemas on writes using the setSchemaUpdateOptions

  • JSON is used as an intermediate format instead of Avro. This allows having fields on different levels named the same:

{
  "obj": {
    "data": {
      "data": {}
    }
  }
}
  • DataFrame's schema is automatically adapted to a legal one:

    1. Illegal characters are replaced with _
    2. Field names are converted to lower case to avoid ambiguity
    3. Duplicate field names are given a numeric suffix (_1, _2, etc.)

Docker!

I created a container that launches zepplin with spark and the connector for ease of use and quick startup. You can find it here

Usage

Including spark-bigquery into your project

Maven

<repositories>
  <repository>
    <id>oss-sonatype</id>
    <name>oss-sonatype</name>
    <url>https://oss.sonatype.org/content/repositories/releases/</url>
    <snapshots>
      <enabled>true</enabled>
    </snapshots>
  </repository>
</repositories>

<dependencies>
  <dependency>
    <groupId>com.github.samelamin</groupId>
    <artifactId>spark-bigquery_${scala.binary.version}</artifactId>
    <version>0.2.6</version>
  </dependency>
</dependencies>

SBT

To use it in a local SBT console first add the package as a dependency then set up your project details

resolvers += Opts.resolver.sonatypeReleases

libraryDependencies += "com.github.samelamin" %% "spark-bigquery" % "0.2.6"
import com.samelamin.spark.bigquery._

// Set up GCP credentials
sqlContext.setGcpJsonKeyFile("<JSON_KEY_FILE>")

// Set up BigQuery project and bucket
sqlContext.setBigQueryProjectId("<BILLING_PROJECT>")
sqlContext.setBigQueryGcsBucket("<GCS_BUCKET>")

// Set up BigQuery dataset location, default is US
sqlContext.setBigQueryDatasetLocation("<DATASET_LOCATION>")

Structured Streaming from S3/HDFS to BigQuery

S3,Blob Storage or HDFS are the defacto technology for storage in the cloud, this package allows you to stream any data added to a Big Query Table of your choice

import com.samelamin.spark.bigquery._

val df = spark.readStream.json("s3a://bucket")

df.writeStream
      .option("checkpointLocation", "s3a://checkpoint/dir")
      .option("tableReferenceSink","my-project:my_dataset.my_table")
      .format("com.samelamin.spark.bigquery")
      .start()

Structured Streaming from BigQuery Table

You can use this connector to stream from a BigQuery Table. The connector uses a Timestamped column to get offsets.

import com.samelamin.spark.bigquery._

val df = spark
          .readStream
          .option("tableReferenceSource","my-project:my_dataset.my_table")
          .format("com.samelamin.spark.bigquery")
          .load()

You can also specify a custom timestamp column:

import com.samelamin.spark.bigquery._

sqlContext.setBQTableTimestampColumn("column_name")

You can also specify a custom Time Ingested Partition column:

import com.samelamin.spark.bigquery._

sqlContext.setBQTimePartitioningField("column_name")

Saving DataFrame using BigQuery Hadoop writer API

By Default any table created by this connector has a timestamp column of bq_load_timestamp which has the value of the current timestamp.

import com.samelamin.spark.bigquery._

val df = ...
df.saveAsBigQueryTable("project-id:dataset-id.table-name")

You can also save to a table decorator by saving to dataset-id.table-name$YYYYMMDD

Saving DataFrame using Pyspark

from pyspark.sql import SparkSession

BQ_PROJECT_ID = "projectId"
DATASET_ID = "datasetId"
TABLE_NAME = "tableName"

KEY_FILE = "/path/to/service_account.json" # When not on GCP
STAGING_BUCKET = "gcs-bucket"              # Intermediate JSON files
DATASET_LOCATION = "US"                    # Location for dataset creation

# Start session and reference the JVM package via py4j for convienence
session = SparkSession.builder.getOrCreate()
bigquery = session._sc._jvm.com.samelamin.spark.bigquery

# Prepare the bigquery context
bq = bigquery.BigQuerySQLContext(session._wrapped._jsqlContext)
bq.setGcpJsonKeyFile(KEY_FILE)
bq.setBigQueryProjectId(BQ_PROJECT_ID)
bq.setGSProjectId(BQ_PROJECT_ID)
bq.setBigQueryGcsBucket(STAGING_BUCKET)
bq.setBigQueryDatasetLocation(DATASET_LOCATION)

# Extract and Transform a dataframe
# df = session.read.csv(...)

# Load into a table or table partition
bqDF = bigquery.BigQueryDataFrame(df._jdf)
bqDF.saveAsBigQueryTable(
    "{0}:{1}.{2}".format(BQ_PROJECT_ID, DATASET_ID, TABLE_NAME),
    False, # Day paritioned when created
    0,     # Partition expired when created
    bigquery.__getattr__("package$WriteDisposition$").__getattr__("MODULE$").WRITE_EMPTY(),
    bigquery.__getattr__("package$CreateDisposition$").__getattr__("MODULE$").CREATE_IF_NEEDED(),
)

Submit with:

pyspark yourjob.py --packages com.github.samelamin:spark-bigquery_2.11:0.2.6

Or

gcloud dataproc jobs submit pyspark yourjob.py --properties spark.jars.packages=com.github.samelamin:spark-bigquery_2.11:0.2.6

Reading DataFrame From BigQuery

import com.samelamin.spark.bigquery._
val sqlContext = spark.sqlContext

sqlContext.setBigQueryGcsBucket("bucketname")
sqlContext.setBigQueryProjectId("projectid")
sqlContext.setGcpJsonKeyFile("keyfilepath")
sqlContext.hadoopConf.set("fs.gs.project.id","projectid")

val df = spark.sqlContext.read.format("com.samelamin.spark.bigquery").option("tableReferenceSource","bigquery-public-data:samples.shakespeare").load()
``

### Reading DataFrame From BigQuery in Pyspark

```python
bq = spark._sc._jvm.com.samelamin.spark.bigquery.BigQuerySQLContext(spark._wrapped._jsqlContext)
df= DataFrame(bq.bigQuerySelect("SELECT word, word_count FROM [bigquery-public-data:samples.shakespeare]"), session._wrapped)

Running DML Queries

import com.samelamin.spark.bigquery._

// Load results from a SQL query
sqlContext.runDMLQuery("UPDATE dataset-id.table-name SET test_col = new_value WHERE test_col = old_value")

Please note that DML queries need to be done using Standard SQL

Update Schemas

You can also allow the saving of a dataframe to update a schema:

import com.samelamin.spark.bigquery._

sqlContext.setAllowSchemaUpdates()

Notes on using this API:

  • Structured Streaming needs a partitioned table which is created by default when writing a stream
  • Structured Streaming needs a timestamp column where offsets are retrieved from, by default all tables are created with a bq_load_timestamp column with a default value of the current timstamp.
  • For use with Databricks please follow this guide

License

Copyright 2016 samelamin.

Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].