All Projects → bluejoe2008 → spark-http-stream

bluejoe2008 / spark-http-stream

Licence: BSD-2-Clause License
spark structured streaming via HTTP communication

Programming Languages

scala
5932 projects
java
68154 projects - #9 most used programming language

Projects that are alternatives of or similar to spark-http-stream

spark-structured-streaming-examples
Spark structured streaming examples with using of version 3.0.0
Stars: ✭ 23 (+35.29%)
Mutual labels:  spark, spark-structured-streaming
spark learning
尚硅谷大数据Spark-2019版最新 Spark 学习
Stars: ✭ 42 (+147.06%)
Mutual labels:  spark
visions
Type System for Data Analysis in Python
Stars: ✭ 136 (+700%)
Mutual labels:  spark
blog
blog entries
Stars: ✭ 39 (+129.41%)
Mutual labels:  spark
Casper
A compiler for automatically re-targeting sequential Java code to Apache Spark.
Stars: ✭ 45 (+164.71%)
Mutual labels:  spark
confluent-spark-avro
Spark UDFs to deserialize Avro messages with schemas stored in Schema Registry.
Stars: ✭ 18 (+5.88%)
Mutual labels:  spark
aut
The Archives Unleashed Toolkit is an open-source toolkit for analyzing web archives.
Stars: ✭ 111 (+552.94%)
Mutual labels:  spark
daf-kylo
Kylo integration with PDND (previously DAF).
Stars: ✭ 20 (+17.65%)
Mutual labels:  spark
spark-data-sources
Developing Spark External Data Sources using the V2 API
Stars: ✭ 36 (+111.76%)
Mutual labels:  spark
SparkV
🤖⚡ | The most POWERFUL multipurpose chat/meme bot that will boost the activity in your server.
Stars: ✭ 24 (+41.18%)
Mutual labels:  spark
data-algorithms-with-spark
O'Reilly Book: [Data Algorithms with Spark] by Mahmoud Parsian
Stars: ✭ 34 (+100%)
Mutual labels:  spark
bigdata-fun
A complete (distributed) BigData stack, running in containers
Stars: ✭ 14 (-17.65%)
Mutual labels:  spark
bigkube
Minikube for big data with Scala and Spark
Stars: ✭ 16 (-5.88%)
Mutual labels:  spark
smolder
HL7 Apache Spark Datasource
Stars: ✭ 33 (+94.12%)
Mutual labels:  spark
Spotify-Song-Recommendation-ML
UC Berkeley team's submission for RecSys Challenge 2018
Stars: ✭ 70 (+311.76%)
Mutual labels:  spark
spark-demos
Collection of different demo applications using Apache Spark
Stars: ✭ 15 (-11.76%)
Mutual labels:  spark
trembita
Model complex data transformation pipelines easily
Stars: ✭ 44 (+158.82%)
Mutual labels:  spark
Covid19Tracker
A Robinhood style COVID-19 🦠 Android tracking app for the US. Open source and built with Kotlin.
Stars: ✭ 65 (+282.35%)
Mutual labels:  spark
basin
Basin is a visual programming editor for building Spark and PySpark pipelines. Easily build, debug, and deploy complex ETL pipelines from your browser
Stars: ✭ 25 (+47.06%)
Mutual labels:  spark
dllib
dllib is a distributed deep learning library running on Apache Spark
Stars: ✭ 32 (+88.24%)
Mutual labels:  spark

spark-http-stream

spark-http-stream transfers Spark structured stream over HTTP protocol. Unlike tcp streams, Kafka streams and HDFS file streams, http streams often flow across distributed big data clusters on the Web. This feature is very helpful to build global data processing pipelines across different data centers (scientific research institues, for example) who own seperated data sets.

spark-http-stream provides:

  • HttpStreamServer: a HTTP server which receives, collects and provides http streams
  • HttpStreamSource: reads messages from a HttpStreamServer, acts as a structured streaming Source
  • HttpStreamSink: sends messages to a HttpStreamServer using HTTP-POST commands, acts as a structured streaming Sink

also spark-http-stream provides:

  • HttpStreamClient: a client used to communicate with a HttpStreamServer, developped upon HttpClient
  • HttpStreamSourceProvider: a StreamSourceProvider which creates HttpStreamSource
  • HttpStreamSinkProvider: a StreamSinkProvider which creates HttpStreamSink

The simple archtecture of spark-http-stream is shown below:

importing spark-http-stream

use maven to import spark-http-stream:

<dependency>
    <groupId>org.grapheco</groupId>
    <artifactId>spark-http-stream</artifactId>
    <version>0.9.1</version>
</dependency>

Starts a standalone HttpStreamServer

HttpStreamServer is actually a Jetty server with a HttpStreamServlet, it can be started using following code:

val server = HttpStreamServer.start("/xxxx", 8080);

When http://localhost:8080/xxxx is requested, the HttpStreamServlet will use an embeded ActionsHandler to parse request message, perform certain action(fecthSchema, fetchStream, etc), and return response message.

By default, an NullActionsHandler is provided. Of coz it can be replaced with a MemoryBufferAsReceiver:

server.withBuffer()
	.addListener(new ObjectArrayPrinter())
	.createTopic[(String, Int, Boolean, Float, Double, Long, Byte)]("topic-1")
	.createTopic[String]("topic-2");

or with a KafkaAsReceiver:

server.withKafka("vm105:9092,vm106:9092,vm107:9092,vm181:9092,vm182:9092")
	.addListener(new ObjectArrayPrinter());

as shown above, several kinds of ActionsHandler are defined in spark-http-stream:

  • NullActionsHandler: does nothing
  • MemoryBufferAsReceiver: maintains a local memory buffer, stores data sent from producers into buffer, and allows consumers to fetch data in batch
  • KafkaAsReceiver: forwards all received data to Kafka

Notes that MemoryBufferAsReceiver maintains a server-side message buffer, while KafkaAsReceiver only forwards messages to Kafka cluster.

HttpStreamSource, HttpStreamSink

The following code shows how to load messages from a HttpStreamSource:

val lines = spark.readStream.format(classOf[HttpStreamSourceProvider].getName)
	.option("httpServletUrl", "http://localhost:8080/xxxx")
	.option("topic", "topic-1");
	.option("includesTimestamp", "true")
	.load();

options:

  • httpServletUrl: path to the servlet
  • topic: topic name of messages to be consumed
  • includesTimestamp: tells if each row in the loaded DataFrame includes a time stamp or not, default value is false
  • timestampColumnName: name assigned to the time stamp column, default value is '_TIMESTAMP_'
  • msFetchPeriod: time interval in milliseconds for message polling, default value is 1(1ms)

The following code shows how to output messages to a HttpStreamSink:

val query = lines.writeStream
	.format(classOf[HttpStreamSinkProvider].getName)
	.option("httpServletUrl", "http://localhost:8080/xxxx")
	.option("topic", "topic-1")
	.start();

options:

  • httpServletUrl: path to the servlet
  • topic: topic name of produced messages
  • maxPacketSize: max size in bytes of each message packet, if the actual DataFrame is too large, it will be splitted into serveral packets, default value is 10*1024*1024(10M)

Note that HttpStreamSource is only available when the HttpStreamServer is equiped with a MemoryBufferAsReceiver (use withBuffer, as shown above). If the HttpStreamServer choose Kafka as back-end message system (use withKafka), it is wrong to consume data from HttpStreamSource, just use KafkaSource (see http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html) instead:

val df = spark
	.readStream
	.format("kafka")
	.option("kafka.bootstrap.servers", "vm105:9092,vm106:9092,vm107:9092,vm181:9092,vm182:9092")
	.option("subscribe", "topic-1")
	.load()

see https://github.com/bluejoe2008/spark-http-stream/blob/master/src/test/scala/HttpStreamSourceSinkTest.scala and https://github.com/bluejoe2008/spark-http-stream/blob/master/src/test/scala/HttpStreamKafkaTest.scala to get complete example code.

Understanding ActionsHandler

as shown in previous section, serveral kinds of ActionsHandler are defined in spark-http-stream: NullActionsHandler, MemoryBufferAsReceiver, KafkaAsReceiver.

users can also customize their own ActionsHandler as they will. The interface looks like:

trait ActionsHandler {
	def listActionHandlerEntries(requestBody: Map[String, Any]): ActionHandlerEntries;
	def destroy();
}

here ActionHandlerEntries is just an alias of PartialFunction[String, Map[String, Any]], which accepts an input argument action: String, and returns an output argument responseBody: Map[String, Any]. the listActionHandlerEntries method is often written as a set of case expression:

override def listActionHandlerEntries(requestBody: Map[String, Any])
	: PartialFunction[String, Map[String, Any]] = {
	case "actionSendStream" ⇒ handleSendStream(requestBody);
}

the code shown above says: this ActionsHandler only handles action actionSendStream, in this case, it calls the method handleSendStream(requestBody) to handle request and output its return value as response. If other action is requested, an UnsupportedActionException will be thrown by the HttpStreamServer.

ActionsHandlerFactory is defined to tell how to create a ActionsHandler with required parameters:

trait ActionsHandlerFactory {
	def createInstance(params: Params): ActionsHandler;
}

Embedding HttpStreamServer in Web application servers

spark-http-stream provides a servlet named ConfigurableHttpStreamingServlet, users can configure the servlet in web.xml:

<servlet>
	<servlet-name>httpStreamServlet</servlet-name>
	<servlet-class>org.apache.spark.sql.execution.streaming.http.ConfigurableHttpStreamServlet</servlet-class>
	<init-param>
		<param-name>handlerFactoryName</param-name>
		<param-value>org.apache.spark.sql.execution.streaming.http.KafkaAsReceiverFactory</param-value>
	</init-param>
	<init-param>
		<param-name>bootstrapServers</param-name>
		<param-value>vm105:9092,vm106:9092,vm107:9092,vm181:9092,vm182:9092</param-value>
	</init-param>
</servlet>

<servlet-mapping>
	<servlet-name>httpStreamServlet</servlet-name>
	<url-pattern>/xxxx</url-pattern>
</servlet-mapping>

As shown above, a servlet of ConfigurableHttpStreamServlet is defined with a ActionsHandlerFactory KafkaAsReceiverFactory, required parameters for the ActionsHandlerFactory (bootstrapServers, for example), are defined as a set of init-params.

Using HttpStreamClient

HttpStreamClientprovides a HTTP client used to communicate with aHttpStreamServer`. It contains serveral methods:

  • sendDataFrame: send a DataFrame to the server, if the DataFrame is too large, it will be splitted into smaller packets
  • sendRows: send data (as Array[Row]) to server
  • fetchSchema: retrieves schema of certain topic
  • fecthStream: retrieves data (as 'Array[RowEx]') from server
  • subscribe: subscribe a topic and retrieves a subscriberId
  • unsubscribe: unsubscribe

Note that some methods are only available when the server is equipped with correct ActionsHandler. As an example, the KafkaAsReceiver only handles action actionSendStream, that means, if you called fetchStream and sendDataFrame methods of the HttpStreamClient, it works well. But it will fail and throw an UnsupportedActionException when you called subscribe method.

+---------------+------------------------+-----------------+
|  methods      | MemoryBufferAsReceiver | KafkaAsReceiver |
+---------------+------------------------+-----------------+
| sendDataFrame |             √          |        √        |
+---------------+------------------------+-----------------+
| sendRows      |             √          |        √        |
+---------------+------------------------+-----------------+
| fetchSchema   |             √          |        X        |
+---------------+------------------------+-----------------+
| fecthStream   |             √          |        X        |
+---------------+------------------------+-----------------+
| subscribe     |             √          |        X        |
+---------------+------------------------+-----------------+
| unsubscribe   |             √          |        X        |
+---------------+------------------------+-----------------+

StreamListener

StreamListener works when new data is arrived and will be consumed by ActionsHandler:

trait StreamListener {
	def onArrive(topic: String, objects: Array[RowEx]);
}

Two kinds of StreamListeners are provided:

  • StreamCollector: collects data in a local memory buffer
  • StreamPrinter: prints data while arriving

an example messages look like this:

++++++++topic=topic-1++++++++
RowEx([hello1,1,true,0.1,0.1,1,49],1,0,2017-08-27 20:37:56.432)
RowEx([hello2,2,false,0.2,0.2,2,50],1,1,2017-08-27 20:37:56.432)
RowEx([hello3,3,true,0.3,0.3,3,51],1,2,2017-08-27 20:37:56.432)

Schema, data types, RowEx

spark-http-stream only supports data types which can be recognized by Spark Encoders. These data types includes: String, Boolean, Int, Long, Float, Double, Byte, Array[].

A row will be wrapped as a RowEx object on receiving. RowEx is a data structure richer than Row. It contains some members and methods:

  • originalRow: original row
  • batchId: batch id passed by Spark
  • offsetInBatch: offset of this row in current batch
  • withTimestamp(): returns a Row with a timestamp
  • withId(): returns a Row with its id
  • extra(): returns a triple (batchId, offsetInBatch, timestamp)

Considering an original row has values [hello1,1,true,0.1,0.1,1,49], following code show contents of mentioned structures:

originalRow:

+---------------+-------+--------------+-----------+------------+--------+---------+
| String:hello1 | Int:1 | Boolean:true | Float:0.1 | Double:0.1 | Long:1 | Byte:49 | 
+---------------+-------+--------------+-----------+------------+--------+---------+

RowEx:

+---------------+-------+--------------+-----++--------+-------+-------------------------------+
| String:hello1 | Int:1 | Boolean:true | ... || Long:1 | Int:0 | Timestamp:2017-08-27 20:37:56 |
+---------------+-------+--------------+-----++--------+-------+-------------------------------+

withTimestamp():

+---------------+-------+--------------+-----------+-----+-------------------------------+
| String:hello1 | Int:1 | Boolean:true | Float:0.1 | ... | Timestamp:2017-08-27 20:37:56 |
+---------------+-------+--------------+-----------+-----+-------------------------------+

withId():

+---------------+-------+--------------+-----------+------------+--------+---------+------------+
| String:hello1 | Int:1 | Boolean:true | Float:0.1 | Double:0.1 | Long:1 | Byte:49 | String:1-0 |
+---------------+-------+--------------+-----------+------------+--------+---------+------------+

extra():

+--------+-------+-------------------------------+
| Long:1 | Int:0 | Timestamp:2017-08-27 20:37:56 |
+--------+-------+-------------------------------+

SerDe

spark-http-stream defines a SerilizerFactory to create a SerializerInstance:

trait SerializerFactory {
	def getSerializerInstance(serializerName: String): SerializerInstance;
}

an SerializerFactory.DEFAULT object is provided which is able to create two kinds of serializers:

  • java: creates a JavaSerializer
  • kryo: creates a KryoSerializer

New kind of Serializer, json serializer, for example, is welcome.

By default, HttpStreamClient and HttpStreamServer uses kryo serializer.

Tests

steps to tests HttpStreamDemo:

  1. choose machine A, run HttpStreamDemo start-server-on 8080 /xxxx, this starts a HTTP server which receives data from machine B
  2. choose machine B, run nc -lk 9999
  3. run HttpStreamDemo read-from http://machine-a-host:8080/xxxx on machine B
  4. run HttpStreamDemo write-into http://machine-a-host:8080/xxxx on machine C
  5. type some text in nc, data will be received by HttpStreamSink and then consumed as HttpStreamSource, finally displayed on console

dependencies

  • kafka-clients-0.10: used by KafkaAsReceiver
  • httpclient-4.5: HttpStreamClient uses HttpClient project
  • jetty-9.0: HttpStreamServer is devploped upon Jetty
  • spark-2.1: spark structued streaming libray
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].