All Projects → anguenot → pyspark-cassandra

anguenot / pyspark-cassandra

Licence: Apache-2.0 license
pyspark-cassandra is a Python port of the awesome @datastax Spark Cassandra connector. Compatible w/ Spark 2.0, 2.1, 2.2, 2.3 and 2.4

Programming Languages

python
139335 projects - #7 most used programming language
scala
5932 projects
Makefile
30231 projects
shell
77523 projects

Projects that are alternatives of or similar to pyspark-cassandra

Gimel
Big Data Processing Framework - Unified Data API or SQL on Any Storage
Stars: ✭ 216 (+208.57%)
Mutual labels:  cassandra, pyspark
Morphl Community Edition
MorphL Community Edition uses big data and machine learning to predict user behaviors in digital products and services with the end goal of increasing KPIs (click-through rates, conversion rates, etc.) through personalization
Stars: ✭ 253 (+261.43%)
Mutual labels:  cassandra, pyspark
Ebook Chat App Spring Websocket Cassandra Redis Rabbitmq
Pro Java Clustering and Scalability: Building Real-Time Apps with Spring, Cassandra, Redis, WebSocket and RabbitMQ
Stars: ✭ 186 (+165.71%)
Mutual labels:  cassandra
Alia
High performance Cassandra client for clojure
Stars: ✭ 224 (+220%)
Mutual labels:  cassandra
Scalardb
Universal transaction manager
Stars: ✭ 178 (+154.29%)
Mutual labels:  cassandra
Firecamp
Serverless Platform for the stateful services
Stars: ✭ 194 (+177.14%)
Mutual labels:  cassandra
Cassandra Reaper
Software to run automated repairs of cassandra
Stars: ✭ 235 (+235.71%)
Mutual labels:  cassandra
spark-dgraph-connector
A connector for Apache Spark and PySpark to Dgraph databases.
Stars: ✭ 36 (-48.57%)
Mutual labels:  pyspark
Cassandra Operator
Kubernetes operator for Apache Cassandra
Stars: ✭ 215 (+207.14%)
Mutual labels:  cassandra
Cass Operator
The DataStax Kubernetes Operator for Apache Cassandra
Stars: ✭ 208 (+197.14%)
Mutual labels:  cassandra
Cassandra
A Docker Cassandra container
Stars: ✭ 198 (+182.86%)
Mutual labels:  cassandra
Migrate
Database migrations. CLI and Golang library.
Stars: ✭ 2,315 (+3207.14%)
Mutual labels:  cassandra
Xandra
Fast, simple, and robust Cassandra driver for Elixir.
Stars: ✭ 239 (+241.43%)
Mutual labels:  cassandra
Cassandradump
A data exporting tool for Cassandra inspired from mysqldump, with some additional slice and dice capabilities
Stars: ✭ 186 (+165.71%)
Mutual labels:  cassandra
workshop-spark
Código para workshops Spark com ambiente de desenvolvimento em docker
Stars: ✭ 27 (-61.43%)
Mutual labels:  pyspark
Phpfastcache
A high-performance backend cache system. It is intended for use in speeding up dynamic web applications by alleviating database load. Well implemented, it can drops the database load to almost nothing, yielding faster page load times for users, better resource utilization. It is simple yet powerful.
Stars: ✭ 2,171 (+3001.43%)
Mutual labels:  cassandra
ecchronos
Ericsson distributed repair scheduler for Apache Cassandra
Stars: ✭ 22 (-68.57%)
Mutual labels:  cassandra
spark3D
Spark extension for processing large-scale 3D data sets: Astrophysics, High Energy Physics, Meteorology, …
Stars: ✭ 23 (-67.14%)
Mutual labels:  pyspark
optimus
🚚 Agile Data Preparation Workflows made easy with Pandas, Dask, cuDF, Dask-cuDF, Vaex and PySpark
Stars: ✭ 1,351 (+1830%)
Mutual labels:  pyspark
Cstar
Apache Cassandra cluster orchestration tool for the command line
Stars: ✭ 242 (+245.71%)
Mutual labels:  cassandra

PySpark Cassandra

Build Status APACHE2 License

pyspark-cassandra is a Python port of the awesome DataStax Cassandra Connector.

This module provides Python support for Apache Spark's Resilient Distributed Datasets from Apache Cassandra CQL rows using Cassandra Spark Connector within PySpark, both in the interactive shell and in Python programs submitted with spark-submit.

This project was initially forked from @TargetHolding since they no longer maintain it.

Contents:

Compatibility

Feedback on (in-)compatibility are much appreciated.

Spark

The current version of PySpark Cassandra (2.4.x) is successfully used with Spark version 2.4.x.

For Spark versions 2.3.x: use version 2.3.x

For Spark versions 2.0.x, 2.1.x, 2.2.x: use version 0.9.0

for Spark 1.5.x, 1.6.x use older versions

Cassandra

PySpark Cassandra is compatible with Cassandra:

  • 2.2.x
  • 3.0.x
  • 3.11.x

Python

PySpark Cassandra is used with python 2.7, python 3.4+

Scala

PySpark Cassandra is currently only packaged for Scala 2.11

Using with PySpark

With Spark Packages

Pyspark Cassandra is published at Spark Packages. This allows easy usage with Spark through:

spark-submit \
	--packages anguenot/pyspark-cassandra:<version> \
	--conf spark.cassandra.connection.host=your,cassandra,node,names

Without Spark Packages

spark-submit \
	--jars /path/to/pyspark-cassandra-assembly-<version>.jar \
	--py-files /path/to/pyspark-cassandra-assembly-<version>.jar \
	--conf spark.cassandra.connection.host=your,cassandra,node,names \
	--master spark://spark-master:7077 \
	yourscript.py

(also not that the assembly will include the python source files, quite similar to a python source distribution)

Using with PySpark shell

Replace spark-submit with pyspark to start the interactive shell and don't provide a script as argument and then import PySpark Cassandra. Note that when performing this import the sc variable in pyspark is augmented with the cassandraTable(...) method.

import pyspark_cassandra

Building

For Spark Packages Pyspark Cassandra can be published using:

sbt compile

The package can be published locally with:

sbt spPublishLocal

The package can be published to Spark Packages with (requires authentication and authorization):

make publish

For local testing / without Spark Packages

A Java / JVM library as well as a python library is required to use PySpark Cassandra. They can be built with:

make dist

This creates a fat jar with the Spark Cassandra Connector and additional classes for bridging Spark and PySpark for Cassandra data and the .py source files at: target/scala-2.11/pyspark-cassandra-assembly-<version>.jar

API

The PySpark Cassandra API aims to stay close to the Cassandra Spark Connector API. Reading its documentation is a good place to start.

pyspark_cassandra.RowFormat

The primary representation of CQL rows in PySpark Cassandra is the ROW format. However sc.cassandraTable(...) supports the row_format argument which can be any of the constants from RowFormat:

  • DICT: The default layout, a CQL row is represented as a python dict with the CQL row columns as keys.
  • TUPLE: A CQL row is represented as a python tuple with the values in CQL table column order / the order of the selected columns.
  • ROW: A pyspark_cassandra.Row object representing a CQL row.

Column values are related between CQL and python as follows:

CQL python
ascii unicode string
bigint long
blob bytearray
boolean boolean
counter int, long
decimal decimal
double float
float float
inet str
int int
map dict
set set
list list
text unicode string
date datetime.date
timestamp datetime.datetime
timeuuid uuid.UUID
varchar unicode string
varint long
uuid uuid.UUID
UDT pyspark_cassandra.UDT

pyspark_cassandra.Row

This is the default type to which CQL rows are mapped. It is directly compatible with pyspark.sql.Row but is (correctly) mutable and provides some other improvements.

pyspark_cassandra.UDT

This type is structurally identical to pyspark_cassandra.Row but serves user defined types. Mapping to custom python types (e.g. via CQLEngine) is not yet supported.

pyspark_cassandra.CassandraSparkContext

A CassandraSparkContext is very similar to a regular SparkContext. It is created in the same way, can be used to read files, parallelize local data, broadcast a variable, etc. See the Spark Programming Guide for more details. But it exposes one additional method:

  • cassandraTable(keyspace, table, ...): Returns a CassandraRDD for the given keyspace and table. Additional arguments which can be provided:

    • row_format can be set to any of the pyspark_cassandra.RowFormat values (defaults to ROW)
    • split_size sets the size in the number of CQL rows in each partition (defaults to 100000)
    • fetch_size sets the number of rows to fetch per request from Cassandra (defaults to 1000)
    • consistency_level sets with which consistency level to read the data (defaults to LOCAL_ONE)
    • connection_config map with connection information that can be used to connect non-default cluster (defaults to None)

pyspark.RDD

PySpark Cassandra supports saving arbitrary RDD's to Cassandra using:

  • rdd.saveToCassandra(keyspace, table, ...): Saves an RDD to Cassandra. The RDD is expected to contain dicts with keys mapping to CQL columns. Additional arguments which can be supplied are:

    • columns(iterable): The columns to save, i.e. which keys to take from the dicts in the RDD.
      • array: list of columns to be saved with default behaviour (overwrite)
      • dictionary: list of columns (keys) and cassandra collections modify operation to perform (values)
        -"" : emulate 'array' default behaviour. to be used for non-collection fields
        -"append" | "add" : append to a collection (lists, sets, maps)
        -"prepend" : prepend to a collection (lists)
        -"remove" : remove from a collection (lists, sets)
        -"overwrite" : overwrite a collection (lists, sets, maps)
    • batch_size(int): The size in bytes to batch up in an unlogged batch of CQL inserts.
    • batch_buffer_size(int): The maximum number of batches which are 'pending'.
    • batch_grouping_key(string): The way batches are formed (defaults to "partition"):
      • all: any row can be added to any batch
      • replicaset: rows are batched for replica sets
      • partition: rows are batched by their partition key
    • consistency_level(cassandra.ConsistencyLevel): The consistency level used in writing to Cassandra.
    • parallelism_level(int): The maximum number of batches written in parallel.
    • throughput_mibps: Maximum write throughput allowed per single core in MB/s.
    • ttl(int or timedelta): The time to live as milliseconds or timedelta to use for the values.
    • timestamp(int, date or datetime): The timestamp in milliseconds, date or datetime to use for the values.
    • metrics_enabled(bool): Whether to enable task metrics updates.

pyspark_cassandra.CassandraRDD

A CassandraRDD is very similar to a regular RDD in pyspark. It is extended with the following methods:

  • select(*columns): Creates a CassandraRDD with the select clause applied.
  • where(clause, *args): Creates a CassandraRDD with a CQL where clause applied. The clause can contain ? markers with the arguments supplied as *args.
  • limit(num): Creates a CassandraRDD with the limit clause applied.
  • take(num): Takes at most num records from the Cassandra table. Note that if limit() was invoked before take() a normal pyspark take() is performed. Otherwise, first limit is set and then a take() is performed.
  • cassandraCount(): Lets Cassandra perform a count, instead of loading the data to Spark first.
  • saveToCassandra(...): As above, but the keyspace and/or table may be omitted to save to the same keyspace and/or table.
  • spanBy(*columns): Groups rows by the given columns without shuffling.
  • joinWithCassandraTable(keyspace, table): Join an RDD with a Cassandra table on the partition key. Use .on(...) to specify other columns to join on. .select(...), .where(...) and .limit(...) can be used as well.
  • deleteFromCassandra(keyspace, table, ...): Delete rows and columns from cassandra by implicit deleteFromCassandra call

pyspark_cassandra.streaming

When importing pyspark_cassandra.streaming the method ``saveToCassandra(...)``` is made available on DStreams. Also support for joining with a Cassandra table is added:

  • joinWithCassandraTable(keyspace, table, selected_columns, join_columns): Join an RDD with a Cassandra table on the partition key. Use .on(...) to specify other columns to join on. .select(...), .where(...) and .limit(...) can be used as well.
  • deleteFromCassandra(keyspace, table, ...): Delete rows and columns from cassandra by implicit deleteFromCassandra call

Examples

Creating a SparkContext with Cassandra support

import pyspark_cassandra

conf = SparkConf() \
	.setAppName("PySpark Cassandra Test") \
	.setMaster("spark://spark-master:7077") \
	.set("spark.cassandra.connection.host", "cas-1")

sc = CassandraSparkContext(conf=conf)

Using select and where to narrow the data in an RDD and then filter, map, reduce and collect it::

sc \
	.cassandraTable("keyspace", "table") \
	.select("col-a", "col-b") \
	.where("key=?", "x") \
	.filter(lambda r: r["col-b"].contains("foo")) \
	.map(lambda r: (r["col-a"], 1)
	.reduceByKey(lambda a, b: a + b)
	.collect()

Reading from different clusters::

rdd_one = sc \
	.cassandraTable("keyspace", "table_one", connection_config={"spark_cassandra_connection_host": "cas-1"})

rdd_two = sc \
	.cassandraTable("keyspace", "table_two", connection_config={"spark_cassandra_connection_host": "cas-2"})

Storing data in Cassandra::

rdd = sc.parallelize([{
	"key": k,
	"stamp": datetime.now(),
	"val": random() * 10,
	"tags": ["a", "b", "c"],
	"options": {
		"foo": "bar",
		"baz": "qux",
	}
} for k in ["x", "y", "z"]])

rdd.saveToCassandra(
	"keyspace",
	"table",
	ttl=timedelta(hours=1),
)

# Storing to non-default cluster
rdd.saveToCassandra(
	"keyspace",
	"table",
	ttl=timedelta(hours=1),
    connection_config={"spark_cassandra_connection_host": "cas-2"}
)

Modify CQL collections::

# Cassandra test table schema
# create table test (user_id text, city text,  test_set set<text>, test_list list<text>, test_map map<text,text>, PRIMARY KEY (user_id));

rdd = sc.parallelize([{"user_id":"123","city":"berlin","test_set":["a"],"test_list":["a"],"test_map":{"a":"1"}}])

rdd.saveToCassandra("ks","test")

rdd = sc.parallelize([{"user_id":"123","city":"berlin","test_set":["a"],"test_list":["b"],"test_map":{"b":"2"}}])

rdd.saveToCassandra("ks","test", {"user_id":"", "city":"", "test_set":"remove", "test_list":"prepend", "test_map":"append"})

Create a streaming context, convert every line to a generater of words which are saved to cassandra. Through this example all unique words are stored in Cassandra.

The words are wrapped as a tuple so that they are in a format which can be stored. A dict or a pyspark_cassandra.Row object would have worked as well.

from pyspark.streaming import StreamingContext
from pyspark_cassandra import streaming

ssc = StreamingContext(sc, 2)

ssc \
    .socketTextStream("localhost", 9999) \
    .flatMap(lambda l: ((w,) for w in (l,))) \
    .saveToCassandra('keyspace', 'words')

ssc.start()

Joining with Cassandra:

joined = rdd \
    .joinWithCassandraTable('keyspace', 'accounts') \
    .on('id') \
    .select('e-mail', 'followers')

for left, right in joined:
    ...

Or with a DStream:

joined = dstream.joinWithCassandraTable(self.keyspace, self.table, \
    ['e-mail', 'followers'], ['id'])

Releasing

$ pip install bumpversion

$ bumpversion --dry-run --verbose $CURRENT_VERSION --new-version=$NEW_VERSION

$ bumpversion $CURRENT_VERSION --new-version=$NEW_VERSION

$ git push

$ git push --tags origin

Problems / ideas?

Feel free to use the issue tracker propose new functionality and / or report bugs. In case of bugs please provides some code to reproduce the issue or at least some context information such as software used, CQL schema, etc.

Contributing

  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create new Pull Request
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].