All Projects → lightcopy → Parquet Index

lightcopy / Parquet Index

Licence: apache-2.0
Spark SQL index for Parquet tables

Programming Languages

scala
5932 projects

Projects that are alternatives of or similar to Parquet Index

Parquet Generator
Parquet file generator
Stars: ✭ 16 (-85.32%)
Mutual labels:  sql, spark, parquet
Datafusion
DataFusion has now been donated to the Apache Arrow project
Stars: ✭ 611 (+460.55%)
Mutual labels:  sql, spark
Mlinterview
A curated awesome list of AI Startups in India & Machine Learning Interview Guide. Feel free to contribute!
Stars: ✭ 410 (+276.15%)
Mutual labels:  sql, statistics
Scriptis
Scriptis is for interactive data analysis with script development(SQL, Pyspark, HiveQL), task submission(Spark, Hive), UDF, function, resource management and intelligent diagnosis.
Stars: ✭ 696 (+538.53%)
Mutual labels:  sql, spark
Kyuubi
Kyuubi is a unified multi-tenant JDBC interface for large-scale data processing and analytics, built on top of Apache Spark
Stars: ✭ 363 (+233.03%)
Mutual labels:  sql, spark
Iceberg
Iceberg is a table format for large, slow-moving tabular data
Stars: ✭ 393 (+260.55%)
Mutual labels:  spark, parquet
Devops Python Tools
80+ DevOps & Data CLI Tools - AWS, GCP, GCF Python Cloud Function, Log Anonymizer, Spark, Hadoop, HBase, Hive, Impala, Linux, Docker, Spark Data Converters & Validators (Avro/Parquet/JSON/CSV/INI/XML/YAML), Travis CI, AWS CloudFormation, Elasticsearch, Solr etc.
Stars: ✭ 406 (+272.48%)
Mutual labels:  spark, parquet
Data Science Best Resources
Carefully curated resource links for data science in one place
Stars: ✭ 1,104 (+912.84%)
Mutual labels:  sql, statistics
Rumble
⛈️ Rumble 1.11.0 "Banyan Tree"🌳 for Apache Spark | Run queries on your large-scale, messy JSON-like data (JSON, text, CSV, Parquet, ROOT, AVRO, SVM...) | No install required (just a jar to download) | Declarative Machine Learning and more
Stars: ✭ 58 (-46.79%)
Mutual labels:  spark, parquet
Pucket
Bucketing and partitioning system for Parquet
Stars: ✭ 29 (-73.39%)
Mutual labels:  spark, parquet
Metorikku
A simplified, lightweight ETL Framework based on Apache Spark
Stars: ✭ 361 (+231.19%)
Mutual labels:  sql, spark
Spark Website
Apache Spark Website
Stars: ✭ 75 (-31.19%)
Mutual labels:  sql, spark
Oap
Optimized Analytics Package for Spark* Platform
Stars: ✭ 343 (+214.68%)
Mutual labels:  spark, parquet
Sqlindexmanager
Free GUI Tool for Index Maintenance on SQL Server and Azure
Stars: ✭ 403 (+269.72%)
Mutual labels:  sql, index
Roapi
Create full-fledged APIs for static datasets without writing a single line of code.
Stars: ✭ 253 (+132.11%)
Mutual labels:  sql, parquet
experiments
Code examples for my blog posts
Stars: ✭ 21 (-80.73%)
Mutual labels:  spark, parquet
Linkis
Linkis helps easily connect to various back-end computation/storage engines(Spark, Python, TiDB...), exposes various interfaces(REST, JDBC, Java ...), with multi-tenancy, high performance, and resource control.
Stars: ✭ 2,323 (+2031.19%)
Mutual labels:  sql, spark
Xsql
Unified SQL Analytics Engine Based on SparkSQL
Stars: ✭ 176 (+61.47%)
Mutual labels:  sql, spark
Spark
Apache Spark - A unified analytics engine for large-scale data processing
Stars: ✭ 31,618 (+28907.34%)
Mutual labels:  sql, spark
Kamu Cli
Next generation tool for decentralized exchange and transformation of semi-structured data
Stars: ✭ 69 (-36.7%)
Mutual labels:  sql, spark

parquet-index

Spark SQL index for Parquet tables

Build Status Coverage Status Join the chat at https://gitter.im/lightcopy/parquet-index

Overview

Package allows to create index for Parquet tables (as datasource and persistent tables) to reduce query latency when used for almost interactive analysis or point queries in Spark SQL. It is designed for use case when table does not change frequently, but is used for queries often, e.g. using Thrift JDBC/ODBC server. When indexed, schema and list of files (including partitioning) will be automatically resolved from index metastore instead of inferring schema every time datasource is created.

Project is experimental. Any feedback, issues, or PRs are welcome.

Documentation reflects changes in master branch, for documentation on a specific version, please select corresponding version tag or branch.

Metastore

Metastore keeps information about all indexed tables and can be created on local file system or HDFS (see available options below) with support for in-memory cache of index (after first scan). Each created index includes different statistics (min/max/null) and, optionally, column filters statistics (e.g. bloom filters) on indexed columns.

Supported predicates

Index is automatically enabled for scan when provided predicate contains one or several filters with indexed columns; if no filters on indexed columns are provided, then normal scan is used, but with benefits of already resolved partitions and schema. Applying min/max statistics and column filter statistics (if available) happens after partition pruning. Statistics are kept per Parquet block metadata. Note that performance also depends on values distribution and predicate selectivity. Spark Parquet reader is used to read data.

Most of the Spark SQL predicates are supported to use statistics and/or column filter (EqualTo, In, GreaterThan, LessThan, and others). Note that predicates work best for equality or isin conditions and logical operators (And, Or, Not), e.g. $"a" === 1 && $"b" === "abc" or $"a".isin("a", "b", "c").

Supported Spark SQL types

Currently only these types are supported for indexed columns:

  • IntegerType
  • LongType
  • StringType
  • DateType
  • TimestampType

Limitations

  • Indexed columns must be top level primitive columns with types above
  • Indexed columns cannot be the same as partitioning columns
  • Append mode is not yet supported for Parquet table when creating index
  • Certain Spark versions are supported (see table below)

Requirements

Spark version parquet-index latest version
1.6.x Not supported
2.0.0 0.2.3
2.0.1 0.2.3
2.0.2 0.2.3
2.1.x 0.3.0
2.2.x 0.4.0
2.3.x fd442d (not released yet)
2.4.x 5051f9 (not released yet)
3.0.0 0.5.0
  • Scala 2.12.x
  • JDK 8+

Previous versions have support for Scala 2.11.x, Scala 2.10.x, and JDK 7, see README and build.sbt for corresponding tag or branch. See build section to compile for desired Java/Scala versions.

And, if using the Python API, Python 3.x with a working version of pyspark.

The current version parts ways with Python 2 definitely. Python 2.7 is officially deprecated, which is the reason why we opted not to write a retrocompatible wrapper around the Scala API.

Linking

The parquet-index package can be added to Spark by using the --packages command line option. For example, run this to include it when starting spark-shell (Scala 2.12.x):

 $SPARK_HOME/bin/spark-shell --packages lightcopy:parquet-index:0.5.0-s_2.12

Or for pyspark to use Python 3 API (see section below):

$SPARK_HOME/bin/pyspark --packages lightcopy:parquet-index:0.5.0-s_2.12

Options

Currently supported options, use --conf key=value on a command line to provide options similar to other Spark configuration or add them to spark-defaults.conf file.

Name Description Default
spark.sql.index.metastore Index metastore location, created if does not exist (file:/folder, hdfs://host:port/folder) ./index_metastore
spark.sql.index.parquet.filter.enabled When set to true, write filter statistics for indexed columns when creating table index, otherwise only min/max statistics are used. Filter statistics are used during filtering stage, if applicable (true, false) true
spark.sql.index.parquet.filter.type When filter statistics enabled, select type of statistics to use when creating index (bloom, dict) bloom
spark.sql.index.parquet.filter.eagerLoading When set to true, read and load all filter statistics in memory the first time catalog is resolved, otherwise load them lazily as needed when evaluating predicate (true, false) false
spark.sql.index.createIfNotExists When set to true, create index if one does not exist in metastore for the table, and will use all available columns for indexing (true, false) false
spark.sql.index.partitions When creating index uses this number of partitions. If value is non-positive or not provided then uses sc.defaultParallelism * 3 or spark.sql.shuffle.partitions configuration value, whichever is smaller min(default parallelism * 3, shuffle partitions)

Example

Scala API

Most of the API is defined in DataFrameIndexManager. Usage is similar to Spark's DataFrameReader, but for spark.index. See example below on different commands (runnable in spark-shell).

// Start spark-shell and create dummy table "codes.parquet", use repartition
// to create more or less generic situation with value distribution
spark.range(0, 1000000).
  select($"id", $"id".cast("string").as("code"), lit("xyz").as("name")).
  repartition(400).
  write.partitionBy("name").parquet("temp/codes.parquet")

import com.github.lightcopy.implicits._
// Create index for table, this will create index files in index_metastore,
// you can configure different options - see table above

// All Spark SQL modes are available ('append', 'overwrite', 'ignore', 'error')
// You can also use `.indexByAll` to choose all columns in schema that
// can be indexed
spark.index.create.
  mode("overwrite").indexBy($"id", $"code").parquet("temp/codes.parquet")

// Check if index for table exists, should return "true"
spark.index.exists.parquet("temp/codes.parquet")

// Query table using index, should return 1 record, and will scan only small
// number of files (1 file usually if filter statistics are enabled). This
// example uses filters on both columns, though any filters can be used,
// e.g. only on id or code
// Metastore will cache index catalog to reduce time for subsequent calls
spark.index.parquet("temp/codes.parquet").
  filter($"id" === 123 && $"code" === "123").collect

// Delete index in metastore, also invalidates cache
// no-op if there is such index does not exist
// (does NOT delete original table)
spark.index.delete.parquet("temp/codes.parquet")

// You can compare performance with this
spark.read.parquet("temp/codes.parquet").
  filter($"id" === 123 && $"code" === "123").collect

Java API

To use indexing in Java create QueryContext based on SparkSession and invoke method index() to get index functionality. Example below illustrates how to use indexing in standalone application.

import com.github.lightcopy.QueryContext;

// Optionally use `config(key, value)` to specify additional index configuration
SparkSession spark = SparkSession.
  builder().
  master("local[*]").
  appName("Java example").
  getOrCreate();

// Create query context, entry point to working with parquet-index
QueryContext context = new QueryContext(spark);

// Create index by inferring columns from Parquet table
context.index().create().indexByAll().parquet("table.parquet");

// Create index by specifying index columns, you can also provide `Column` instances, e.g.
// `new Column[] { new Column("col1"), new Column("col2") }`.
// Mode can be provided as `org.apache.spark.sql.SaveMode` or String value
context.index().create().
  mode("overwrite").
  indexBy(new String[] { "col1", "col2" }).
  parquet("table.parquet");

// Check if index exists for the table
boolean exists = context.index().exists().parquet("table.parquet");

// Run query for indexed table
Dataset<Row> df = context.index().parquet("table.parquet").filter("col2 = 'c'");

// Delete index from metastore
context.index().delete().parquet("table.parquet");

Python 3.x API

Following example shows usage of Python 3 API (runnable in pyspark)

from lightcopy.index import QueryContext

# Create QueryContext from SparkSession
context = QueryContext(spark)

# Create index in metastore for Parquet table 'table.parquet' using 'col1' and 'col2' columns
context.index.create.indexBy('col1', 'col2').parquet('table.parquet')

# Create index in metastore for Parquet table 'table.parquet' using all inferred columns and
# overwrite any existing index for this table
context.index.create.mode('overwrite').indexByAll().parquet('table.parquet')

# Check if index exists, returns 'True' if exists, otherwise 'False'
context.index.exists.parquet('table.parquet')

# Query index for table, returns DataFrame
df = context.index.parquet('table.parquet').filter('col1 = 123')

# Delete index from metastore, if index does not exist - no-op
context.index.delete.parquet('table.parquet')

Persistent tables API

Package also supports index for persistent tables that are saved using saveAsTable() in Parquet format and accessible using spark.table(tableName). When using with persistent tables, just replace .parquet(path_to_the_file) with .table(table_name). API is available in Scala, Java and Python 3.

Scala

import com.github.lightcopy.implicits._

// Create index for table name that exists in Spark catalog
spark.index.create.indexBy("col1", "col2", "col3").table("table_name")

// Check if index exists for persistent table
val exists: Boolean = spark.index.exists.table("table_name")

// Query index for persistent table
val df = spark.index.table("table_name").filter("col1 > 1")

// Delete index for persistent table (does not drop table itself)
spark.index.delete.table("table_name")

Java

// Java API is very similar to Scala API
import com.github.lightcopy.QueryContext;

SparkSession spark = ...;

QueryContext context = new QueryContext(spark);

// Create index for persistent table
context.index().create().indexByAll().table("table_name");

// Check if index exists for persistent table
boolean exists = context.index().exists().table("table_name");

// Run query for indexed persistent table
Dataset<Row> df = context.index().table("table_name").filter("col2 = 'c'");

// Delete index from metastore (does not drop table)
context.index().delete().table("table_name");

Python 3

from lightcopy.index import QueryContext

context = QueryContext(spark)

# Create index from Spark persistent table
context.index.create.mode('overwrite').indexBy('col1', 'col2').table('table_name')

# Check if index exists for persistent table. 'True' if exists in metastore, 'False' otherwise
context.index.exists.table('table_name')

# Query indexed persistent table
df = context.index.table('table_name').filter('col1 = 123')

# Delete index for persistent table (does not drop table)
context.index.delete.table('table_name')

Building From Source

This library is built using sbt, to build a JAR file simply run sbt package from project root.

Testing

Run sbt test from project root. See .travis.yml for CI build matrix.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].