All Projects → Samsung → spark-cep

Samsung / spark-cep

Licence: Apache-2.0 License
Spark CEP is an extension of Spark Streaming to support SQL-based query processing

Programming Languages

scala
5932 projects
java
68154 projects - #9 most used programming language
shell
77523 projects

Spark CEP

Spark CEP is a stream processing engine on top of Apache Spark supporting continuous query language. It has following improvements comparing to the existing Spark Streaming query engines.

  • Support more efficient windowed aggregation.
  • Support "Insert Into" query.

Quick Start

Creating StreamSQLContext

StreamSQLContext is the main entry point for all streaming sql related functionalities. StreamSQLContext can be created by:

val ssc: StreamingContext
val sqlContext: SQLContext
    
val streamSqlContext = new StreamSQLContext(ssc, sqlContext)

Or you could use HiveContext to get full Hive semantics support, like:

val ssc: StreamingContext
val hiveContext: HiveContext

val streamSqlContext = new StreamSQLContext(ssc, hiveContext)

Running SQL on DStreams

case class Person(name: String, age: String)

// Create an DStream of Person objects and register it as a stream.
val people: DStream[Person] = ssc.socketTextStream(serverIP, serverPort)
  .map(_.split(","))
  .map(p => Person(p(0), p(1).toInt))
    
val schemaPeopleStream = streamSqlContext.createSchemaDStream(people)
schemaPeopleStream.registerAsTable("people")
    
val teenagers = sql("SELECT name FROM people WHERE age >= 10 && age <= 19")
    
// The results of SQL queries are themselves DStreams and support all the normal operations
teenagers.map(t => "Name: " + t(0)).print()
ssc.start()
ssc.awaitTerminationOrTimeout(30 * 1000)
ssc.stop()

Stream Relation Join

val userStream: DStream[User]
streamSqlContext.registerDStreamAsTable(userStream, "user")
    
val itemStream: DStream[Item]
streamSqlContext.registerDStreamAsTable(itemStream, "item")
    
sql("SELECT * FROM user JOIN item ON user.id = item.id").print()
    
val historyItem: DataFrame
historyItem.registerTempTable("history")
sql("SELECT * FROM user JOIN item ON user.id = history.id").print()

Time Based Windowing Join/Aggregation

sql(
  """
    |SELECT t.word, COUNT(t.word)
    |FROM (SELECT * FROM test) OVER (WINDOW '9' SECONDS, SLIDE '3' SECONDS) AS t
    |GROUP BY t.word
  """.stripMargin)

sql(
  """
    |SELECT * FROM
    |  user1 OVER (WINDOW '9' SECONDS, SLIDE '6' SECONDS) AS u
    |JOIN
    |  user2 OVER (WINDOW '9' SECONDS, SLIDE '6' SECONDS) AS v
        |ON u.id = v.id
        |WHERE u.id > 1 and u.id < 3 and v.id > 1 and v.id < 3
      """.stripMargin)

Note: For time-based windowing join, the sliding size should be same for all the joined streams. This is the limitation of Spark Streaming.

External Source API Support for Kafka

streamSqlContext.command(
      s"""
         |CREATE TEMPORARY TABLE t_kafka (
         |word string
         |,num int
         |)
         |USING org.apache.spark.sql.streaming.sources.KafkaSource
         |OPTIONS(
         |zkQuorum "10.10.10.1:2181",
         |brokerList "10.10.10.1:9092,10.10.10.2:9092",
         |groupId  "test",
         |topics   "aa:10",
         |messageToRow "org.apache.spark.sql.streaming.sources.MessageDelimiter")
      """.stripMargin)

How to Build and Deploy

Spark CEP is built with sbt, you could use sbt related commands to test/compile/package.

Spark CEP is built on >= Spark-1.5, you could change the Spark version in Build.scala to the version you wanted, currently Spark CEP can be worked with Spark version 1.5+.

To use Spark CEP, put the packaged jar into your environment where Spark could access, you could use spark-submit --jars or other ways.

Spark CEP job submission sample

{$SPARK_HOME}/bin/spark-submit \
     --class StreamHQL \
     --name "CQLDemo" \
     --master yarn-cluster \
     --num-executors 4 \
     --driver-memory 256m \
     --executor-memory 512m \
     --executor-cores 1 \
     --conf spark.default.parallelism=5 \
     lib/spark-cep-assembly-0.1.0-SNAPSHOT.jar \
     "{ \
    \"kafka.zookeeper.quorum\": \"10.10.10.1:2181\", \
    \"redis.shards\": \"shard1\",\
    \"redis.sentinels\": \"10.10.10.2:26379\",\
    \"redis.database\": \"0\", \
    \"redis.expire.sec\": \"600\", \
    \"spark.sql.shuffle.partitions\": \"10\" \
    }" \
    sample_query \
    SELECT COUNT(DISTINCT t.duid) FROM stream_test OVER (WINDOW '300' SECONDS, SLIDE '5' SECONDS) AS t

There are few arguments being passed to the Spark CEP job. First, it requires zookeeper url (kafka.zookeeper.quorum) for consuming stream from Kafka. Since it stores the result within a window to redis, it also requires Redis connection information. You can pass continuous query against a Kafka topic (stream_test).

If you want to contribute our project, please refer to Governance


Contact: Robert B. Kim, Jun-Seok Heo


Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].