All Projects → qubole → Kinesis Sql

qubole / Kinesis Sql

Licence: apache-2.0
Kinesis Connector for Structured Streaming

Programming Languages

scala
5932 projects

Projects that are alternatives of or similar to Kinesis Sql

Learningspark
Scala examples for learning to use Spark
Stars: ✭ 421 (+250.83%)
Mutual labels:  spark, spark-streaming
Mobius
C# and F# language binding and extensions to Apache Spark
Stars: ✭ 929 (+674.17%)
Mutual labels:  spark, spark-streaming
Cdap
An open source framework for building data analytic applications.
Stars: ✭ 509 (+324.17%)
Mutual labels:  spark, spark-streaming
Gimel
Big Data Processing Framework - Unified Data API or SQL on Any Storage
Stars: ✭ 216 (+80%)
Mutual labels:  spark, spark-streaming
Spark Mllib Twitter Sentiment Analysis
🌟 ✨ Analyze and visualize Twitter Sentiment on a world map using Spark MLlib
Stars: ✭ 113 (-5.83%)
Mutual labels:  spark, spark-streaming
Data Accelerator
Data Accelerator for Apache Spark simplifies onboarding to Streaming of Big Data. It offers a rich, easy to use experience to help with creation, editing and management of Spark jobs on Azure HDInsights or Databricks while enabling the full power of the Spark engine.
Stars: ✭ 247 (+105.83%)
Mutual labels:  spark, spark-streaming
Angel
A Flexible and Powerful Parameter Server for large-scale machine learning
Stars: ✭ 6,458 (+5281.67%)
Mutual labels:  spark, spark-streaming
Azure Event Hubs Spark
Enabling Continuous Data Processing with Apache Spark and Azure Event Hubs
Stars: ✭ 140 (+16.67%)
Mutual labels:  spark, spark-streaming
Waterdrop
Production Ready Data Integration Product, documentation:
Stars: ✭ 1,856 (+1446.67%)
Mutual labels:  spark, spark-streaming
Real Time Stream Processing Engine
This is an example of real time stream processing using Spark Streaming, Kafka & Elasticsearch.
Stars: ✭ 37 (-69.17%)
Mutual labels:  spark, spark-streaming
Example Spark
Spark, Spark Streaming and Spark SQL unit testing strategies
Stars: ✭ 205 (+70.83%)
Mutual labels:  spark, spark-streaming
Pyspark Examples
Code examples on Apache Spark using python
Stars: ✭ 58 (-51.67%)
Mutual labels:  spark, spark-streaming
Spark Streaming With Kafka
Self-contained examples of Apache Spark streaming integrated with Apache Kafka.
Stars: ✭ 180 (+50%)
Mutual labels:  spark, spark-streaming
Coolplayspark
酷玩 Spark: Spark 源代码解析、Spark 类库等
Stars: ✭ 3,318 (+2665%)
Mutual labels:  spark, spark-streaming
Pyspark Learning
Updated repository
Stars: ✭ 147 (+22.5%)
Mutual labels:  spark, spark-streaming
Sparta
Real Time Analytics and Data Pipelines based on Spark Streaming
Stars: ✭ 513 (+327.5%)
Mutual labels:  spark, spark-streaming
Example Spark Kafka
Apache Spark and Apache Kafka integration example
Stars: ✭ 120 (+0%)
Mutual labels:  spark, spark-streaming
Spark
.NET for Apache® Spark™ makes Apache Spark™ easily accessible to .NET developers.
Stars: ✭ 1,721 (+1334.17%)
Mutual labels:  spark, spark-streaming
Learning Spark
零基础学习spark,大数据学习
Stars: ✭ 37 (-69.17%)
Mutual labels:  spark, spark-streaming
Utils4s
scala、spark使用过程中,各种测试用例以及相关资料整理
Stars: ✭ 1,070 (+791.67%)
Mutual labels:  spark, spark-streaming

Build Status

Kinesis Connector for Structured Streaming

Implementation of Kinesis Source Provider in Spark Structured Streaming. SPARK-18165 describes the need for such implementation. More details on the implementation can be read in this blog

Downloading and Using the Connector

The connector is available from the Maven Central repository. It can be used using the --packages option or the spark.jars.packages configuration property. Use the following connector artifact

Spark 3.0: com.qubole.spark/spark-sql-kinesis_2.12/1.2.0-spark_3.0
Spark 2.4: com.qubole.spark/spark-sql-kinesis_2.11/1.2.0-spark_2.4

Developer Setup

Checkout kinesis-sql branch depending upon your Spark version. Use Master branch for the latest Spark version

Spark version 3.0.x
git clone [email protected]:qubole/kinesis-sql.git
git checkout master
cd kinesis-sql
mvn install -DskipTests

This will create target/spark-sql-kinesis_2.12-*.jar file which contains the connector code and its dependency jars.

How to use it

Setup Kinesis

Refer Amazon Docs for more options

Create Kinesis Stream
$ aws kinesis create-stream --stream-name test --shard-count 2
Add Records in the stream
$ aws kinesis put-record --stream-name test --partition-key 1 --data 'Kinesis'
$ aws kinesis put-record --stream-name test --partition-key 1 --data 'Connector'
$ aws kinesis put-record --stream-name test --partition-key 1 --data 'for'
$ aws kinesis put-record --stream-name test --partition-key 1 --data 'Apache'
$ aws kinesis put-record --stream-name test --partition-key 1 --data 'Spark'

Example Streaming Job

Refering $SPARK_HOME to the Spark installation directory.

Open Spark-Shell
$SPARK_HOME/bin/spark-shell --jars target/spark-sql-kinesis_2.11-2.2.0.jar
Subscribe to Kinesis Source
// Subscribe the "test" stream
scala> :paste
val kinesis = spark
	.readStream
	.format("kinesis")
	.option("streamName", "spark-streaming-example")
   	.option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com")
    .option("awsAccessKeyId", [ACCESS_KEY])
    .option("awsSecretKey", [SECRET_KEY])
    .option("startingposition", "TRIM_HORIZON")
	.load
Check Schema
scala> kinesis.printSchema
root
|-- data: binary (nullable = true)
|-- streamName: string (nullable = true)
|-- partitionKey: string (nullable = true)
|-- sequenceNumber: string (nullable = true)
|-- approximateArrivalTimestamp: timestamp (nullable = true)
Word Count
// Cast data into string and group by data column
scala> :paste

	 kinesis
    .selectExpr("CAST(data AS STRING)").as[(String)]
    .groupBy("data").count()
	.writeStream
	.format("console")
    .outputMode("complete") 
	.start()
	.awaitTermination()
Output in Console
+------------+-----+
|        data|count|
+------------+-----+
|         for|    1|
|      Apache|    1|
|       Spark|    1|
|     Kinesis|    1|
|   Connector|    1|
+------------+-----+ 
Using the Kinesis Sink
// Cast data into string and group by data column
    scala> :paste
    kinesis
    .selectExpr("CAST(rand() AS STRING) as partitionKey","CAST(data AS STRING)").as[(String,String)]
    .groupBy("data").count()
    .writeStream
    .format("kinesis")
    .outputMode("update") 
    .option("streamName", "spark-sink-example")
    .option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com")
    .option("awsAccessKeyId", [ACCESS_KEY])
    .option("awsSecretKey", [SECRET_KEY])
    .start()
    .awaitTermination()

Kinesis Source Configuration

Option-Name Default-Value Description
streamName - Name of the stream in Kinesis to read from
endpointUrl https://kinesis.us-east-1.amazonaws.com end-point URL for Kinesis Stream
awsAccessKeyId - AWS Credentials for Kinesis describe, read record operations
awsSecretKey - AWS Credentials for Kinesis describe, read record operations
awsSTSRoleARN - AWS STS Role ARN for Kinesis describe, read record operations
awsSTSSessionName - AWS STS Session name for Kinesis describe, read record operations
awsUseInstanceProfile true Use Instance Profile Credentials if none of credentials provided
startingPosition LATEST Starting Position in Kinesis to fetch data from. Possible values are "latest", "trim_horizon", "earliest" (alias for trim_horizon), or JSON serialized map shardId->KinesisPosition
failondataloss true fail the streaming job if any active shard is missing or expired
kinesis.executor.maxFetchTimeInMs 1000 Maximum time spent in executor to fetch record from Kinesis per Shard
kinesis.executor.maxFetchRecordsPerShard 100000 Maximum Number of records to fetch per shard
kinesis.executor.maxRecordPerRead 10000 Maximum Number of records to fetch per getRecords API call
kinesis.executor.addIdleTimeBetweenReads false Add delay between two consecutive getRecords API call
kinesis.executor.idleTimeBetweenReadsInMs 1000 Minimum delay between two consecutive getRecords
kinesis.client.describeShardInterval 1s (1 second) Minimum Interval between two ListShards API calls to consider resharding
kinesis.client.numRetries 3 Maximum Number of retries for Kinesis API requests
kinesis.client.retryIntervalMs 1000 Cool-off period before retrying Kinesis API
kinesis.client.maxRetryIntervalMs 10000 Max Cool-off period between 2 retries
kinesis.client.avoidEmptyBatches false Avoid creating an empty microbatch job by checking upfront if there are any unread data in the stream before the batch is started

Kinesis Sink Configuration

Option-Name Default-Value Description
streamName - Name of the stream in Kinesis to write to
endpointUrl https://kinesis.us-east-1.amazonaws.com The aws endpoint of the kinesis Stream
awsAccessKeyId - AWS Credentials for Kinesis describe, read record operations
awsSecretKey - AWS Credentials for Kinesis describe, read record
awsSTSRoleARN - AWS STS Role ARN for Kinesis describe, read record operations
awsSTSSessionName - AWS STS Session name for Kinesis describe, read record operations
awsUseInstanceProfile true Use Instance Profile Credentials if none of credentials provided
kinesis.executor.recordMaxBufferedTime 1000 (millis) Specify the maximum buffered time of a record
kinesis.executor.maxConnections 1 Specify the maximum connections to Kinesis
kinesis.executor.aggregationEnabled true Specify if records should be aggregated before sending them to Kinesis
kniesis.executor.flushwaittimemillis 100 Wait time while flushing records to Kinesis on Task End

Roadmap

  • We need to migrate to DataSource V2 APIs for MicroBatchExecution.
  • Maintain Per Micro-Batch Shard Commit state in Dynamo DB

Acknowledgement

This connector would not have been possible without reference implemetation of Kafka connector for Structured streaming, Kinesis Connector for Legacy Streaming and Kinesis Client Library. Structure of some part of the code is influenced by the excellent work done by various Apache Spark Contributors.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].