All Projects → streamnative → Pulsar Spark

streamnative / Pulsar Spark

Licence: apache-2.0
When Apache Pulsar meets Apache Spark

Programming Languages

scala
5932 projects

Projects that are alternatives of or similar to Pulsar Spark

Pulsar Flink
Elastic data processing with Apache Pulsar and Apache Flink
Stars: ✭ 126 (+129.09%)
Mutual labels:  stream-processing, flink, data-processing, batch-processing
Pysparkling
A pure Python implementation of Apache Spark's RDD and DStream interfaces.
Stars: ✭ 231 (+320%)
Mutual labels:  data-science, apache-spark, data-processing
Agile data code 2
Code for Agile Data Science 2.0, O'Reilly 2017, Second Edition
Stars: ✭ 413 (+650.91%)
Mutual labels:  data-science, spark, apache-spark
Flink Learning
flink learning blog. http://www.54tianzhisheng.cn/ 含 Flink 入门、概念、原理、实战、性能调优、源码解析等内容。涉及 Flink Connector、Metrics、Library、DataStream API、Table API & SQL 等内容的学习案例,还有 Flink 落地应用的大型项目案例(PVUV、日志存储、百亿数据实时去重、监控告警)分享。欢迎大家支持我的专栏《大数据实时计算引擎 Flink 实战与性能优化》
Stars: ✭ 11,378 (+20587.27%)
Mutual labels:  spark, stream-processing, flink
data processing course
Some class materials for a data processing course using PySpark
Stars: ✭ 50 (-9.09%)
Mutual labels:  spark, stream-processing, data-processing
Spark Notebook
Interactive and Reactive Data Science using Scala and Spark.
Stars: ✭ 3,081 (+5501.82%)
Mutual labels:  data-science, spark, apache-spark
Data Ingestion Platform
Stars: ✭ 39 (-29.09%)
Mutual labels:  spark, flink, batch-processing
Awesome Kafka
A list about Apache Kafka
Stars: ✭ 397 (+621.82%)
Mutual labels:  apache-spark, stream-processing, data-processing
Streaming Readings
Streaming System 相关的论文读物
Stars: ✭ 554 (+907.27%)
Mutual labels:  apache-spark, stream-processing, flink
Dataflowjavasdk
Google Cloud Dataflow provides a simple, powerful model for building both batch and streaming parallel data processing pipelines.
Stars: ✭ 854 (+1452.73%)
Mutual labels:  data-science, data-processing
Hazelcast Jet
Distributed Stream and Batch Processing
Stars: ✭ 855 (+1454.55%)
Mutual labels:  stream-processing, batch-processing
Data Science On Gcp
Source code accompanying book: Data Science on the Google Cloud Platform, Valliappa Lakshmanan, O'Reilly 2017
Stars: ✭ 864 (+1470.91%)
Mutual labels:  data-science, data-processing
Tiledb Vcf
Efficient variant-call data storage and retrieval library using the TileDB storage library.
Stars: ✭ 26 (-52.73%)
Mutual labels:  data-science, spark
Mobius
C# and F# language binding and extensions to Apache Spark
Stars: ✭ 929 (+1589.09%)
Mutual labels:  spark, apache-spark
Bigdata Interview
🎯 🌟[大数据面试题]分享自己在网络上收集的大数据相关的面试题以及自己的答案总结.目前包含Hadoop/Hive/Spark/Flink/Hbase/Kafka/Zookeeper框架的面试题知识总结
Stars: ✭ 857 (+1458.18%)
Mutual labels:  spark, flink
Szt Bigdata
深圳地铁大数据客流分析系统🚇🚄🌟
Stars: ✭ 826 (+1401.82%)
Mutual labels:  spark, flink
Spark Flamegraph
Easy CPU Profiling for Apache Spark applications
Stars: ✭ 30 (-45.45%)
Mutual labels:  spark, apache-spark
Spark Nkp
Natural Korean Processor for Apache Spark
Stars: ✭ 50 (-9.09%)
Mutual labels:  spark, apache-spark
Bigdataguide
大数据学习,从零开始学习大数据,包含大数据学习各阶段学习视频、面试资料
Stars: ✭ 817 (+1385.45%)
Mutual labels:  spark, flink
Live log analyzer spark
Spark Application for analysis of Apache Access logs and detect anamolies! Along with Medium Article.
Stars: ✭ 14 (-74.55%)
Mutual labels:  spark, apache-spark

pulsar-spark

Build Status Version Bintray

License FOSSA Status

Unified data processing with Apache Pulsar and Apache Spark.

Prerequisites

  • Java 8 or later
  • Spark 2.4.0 or later
  • Pulsar 2.4.0 or later

Preparations

Link

Client library

For Scala/Java applications using SBT/Maven project definitions, link your application with the following artifact:

    groupId = io.streamnative.connectors
    artifactId = pulsar-spark-connector_{{SCALA_BINARY_VERSION}}
    version = {{PULSAR_SPARK_VERSION}}

Currently, the artifact is available in Bintray Maven repository of StreamNative. For Maven project, you can add the repository to your pom.xml as follows:

  <repositories>
    <repository>
      <id>central</id>
      <layout>default</layout>
      <url>https://repo1.maven.org/maven2</url>
    </repository>
    <repository>
      <id>bintray-streamnative-maven</id>
      <name>bintray</name>
      <url>https://dl.bintray.com/streamnative/maven</url>
    </repository>
  </repositories>

Deploy

Client library

As with any Spark applications, spark-submit is used to launch your application.
pulsar-spark-connector_{{SCALA_BINARY_VERSION}} and its dependencies can be directly added to spark-submit using --packages.

Example

$ ./bin/spark-submit 
  --packages io.streamnative.connectors:pulsar-spark-connector_{{SCALA_BINARY_VERSION}}:{{PULSAR_SPARK_VERSION}}
  --repositories https://dl.bintray.com/streamnative/maven
  ...

CLI

For experimenting on spark-shell (or pyspark for Python), you can also use --packages to add pulsar-spark-connector_{{SCALA_BINARY_VERSION}} and its dependencies directly.

Example

$ ./bin/spark-shell 
  --packages io.streamnative.connectors:pulsar-spark-connector_{{SCALA_BINARY_VERSION}}:{{PULSAR_SPARK_VERSION}}
  --repositories https://dl.bintray.com/streamnative/maven
  ...

When locating an artifact or library, --packages option checks the following repositories in order:

  1. Local maven repository

  2. Maven central repository

  3. Other repositories specified by --repositories

The format for the coordinates should be groupId:artifactId:version.

For more information about submitting applications with external dependencies, see Application Submission Guide.

Usage

Read data from Pulsar

Create a Pulsar source for streaming queries

The following examples are in Scala.

// Subscribe to 1 topic
val df = spark
  .readStream
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("admin.url", "http://localhost:8080")
  .option("topic", "topic1")
  .load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to multiple topics
val df = spark
  .readStream
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("admin.url", "http://localhost:8080")
  .option("topics", "topic1,topic2")
  .load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to a pattern
val df = spark
  .readStream
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("admin.url", "http://localhost:8080")
  .option("topicsPattern", "topic.*")
  .load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Tip

For more information on how to use other language bindings for Spark Structured Streaming, see Structured Streaming Programming Guide.

Create a Pulsar source for batch queries

If you have a use case that is better suited to batch processing, you can create a Dataset/DataFrame for a defined range of offsets.

The following examples are in Scala.

// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
  .read
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("admin.url", "http://localhost:8080")
  .option("topic", "topic1")
  .load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to multiple topics, specifying explicit Pulsar offsets
import org.apache.spark.sql.pulsar.JsonUtils._
val startingOffsets = topicOffsets(Map("topic1" -> messageId1, "topic2" -> messageId2))
val endingOffsets = topicOffsets(...)
val df = spark
  .read
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("admin.url", "http://localhost:8080")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("endingOffsets", endingOffsets)
  .load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
  .read
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("admin.url", "http://localhost:8080")
  .option("topicsPattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

The following options must be set for the Pulsar source for both batch and streaming queries.

Option Value Description
`topic` A topic name string The topic to be consumed. Only one of `topic`, `topics` or `topicsPattern` options can be specified for Pulsar source.
`topics` A comma-separated list of topics The topic list to be consumed. Only one of `topic`, `topics` or `topicsPattern` options can be specified for Pulsar source.
`topicsPattern` A Java regex string The pattern used to subscribe to topic(s). Only one of `topic`, `topics` or `topicsPattern` options can be specified for Pulsar source.
`service.url` A service URL of your Pulsar cluster The Pulsar `serviceUrl` configuration.
`admin.url` A service HTTP URL of your Pulsar cluster The Pulsar `serviceHttpUrl` configuration.

The following configurations are optional.

Option Value Default Query Type Description
`startingOffsets` The following are valid values:
  • "earliest"(streaming and batch queries)

  • "latest" (streaming query)

  • A JSON string

    Example

    """ {"topic-1":[8,11,16,101,24,1,32,1],"topic-5":[8,15,16,105,24,5,32,5]} """

  • "earliest"(batch query)

  • "latest"(streaming query)

Streaming and batch queries

startingOffsets option controls where a reader reads data from.

  • "earliest": lacks a valid offset, the reader reads all the data in the partition, starting from the very beginning.

  • "latest": lacks a valid offset, the reader reads from the newest records written after the reader starts running.

  • A JSON string: specifies a starting offset for each Topic.
    You can use org.apache.spark.sql.pulsar.JsonUtils.topicOffsets(Map[String, MessageId]) to convert a message offset to a JSON string.

Note:

  • For batch query, "latest" is not allowed, either implicitly specified or use MessageId.latest ([8,-1,-1,-1,-1,-1,-1,-1,-1,127,16,-1,-1,-1,-1,-1,-1,-1,-1,127]) in JSON.

  • For streaming query, "latest" only applies when a new query is started, and the resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at "earliest".

`endingOffsets` The following are valid values:
  • "latest" (batch query)

  • A JSON string

Example

{"topic-1":[8,12,16,102,24,2,32,2],"topic-5":[8,16,16,106,24,6,32,6]}

"latest" Batch query

endingOffsets option controls where a reader stops reading data.

  • "latest": the reader stops reading data at the latest record.

  • A JSON string: specifies an ending offset for each topic.

    Note:

    MessageId.earliest ([8,-1,-1,-1,-1,-1,-1,-1,-1,-1,1,16,-1,-1,-1,-1,-1,-1,-1,-1,-1,1]) is not allowed.

`failOnDataLoss` The following are valid values:
  • true

  • false

true Streaming query

failOnDataLoss option controls whether to fail a query when data is lost (for example, topics are deleted, or messages are deleted because of retention policy).

This may cause a false alarm. You can set it to false when it doesn't work as you expected.

A batch query always fails if it fails to read any data from the provided offsets due to data loss.

Schema of Pulsar source

  • For topics without schema or with primitive schema in Pulsar, messages' payload is loaded to a value column with the corresponding type with Pulsar schema.

  • For topics with Avro or JSON schema, their field names and field types are kept in the result rows.

Besides, each row in the source has the following metadata fields as well.

Column Type
`__key` Binary
`__topic` String
`__messageId` Binary
`__publishTime` Timestamp
`__eventTime` Timestamp

** Example**

The topic of AVRO schema s in Pulsar is as below:

  case class Foo(i: Int, f: Float, bar: Bar)
  case class Bar(b: Boolean, s: String)
  val s = Schema.AVRO(Foo.getClass)

has the following schema as a DataFrame/DataSet in Spark:

root
 |-- i: integer (nullable = false)
 |-- f: float (nullable = false)
 |-- bar: struct (nullable = true)
 |    |-- b: boolean (nullable = false)
 |    |-- s: string (nullable = true)
 |-- __key: binary (nullable = true)
 |-- __topic: string (nullable = true)
 |-- __messageId: binary (nullable = true)
 |-- __publishTime: timestamp (nullable = true)
 |-- __eventTime: timestamp (nullable = true)

For Pulsar topic with Schema.DOUBLE, it's schema as a DataFrame is:

root
|-- value: double (nullable = false)
|-- __key: binary (nullable = true)
|-- __topic: string (nullable = true)
|-- __messageId: binary (nullable = true)
|-- __publishTime: timestamp (nullable = true)
|-- __eventTime: timestamp (nullable = true)

Write data to Pulsar

The DataFrame written to Pulsar can have arbitrary schema, since each record in DataFrame is transformed as one message sent to Pulsar, fields of DataFrame are divided into two groups: __key and __eventTime fields are encoded as metadata of Pulsar message; other fields are grouped and encoded using AVRO and put in value():

producer.newMessage().key(__key).value(avro_encoded_fields).eventTime(__eventTime)

Create a Pulsar sink for streaming queries

The following examples are in Scala.

// Write key-value data from a DataFrame to a specific Pulsar topic specified in an option
val ds = df
  .selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("admin.url", "http://localhost:8080")
  .option("topic", "topic1")
  .start()

// Write key-value data from a DataFrame to Pulsar using a topic specified in the data
val ds = df
  .selectExpr("__topic", "CAST(__key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("admin.url", "http://localhost:8080")
  .start()

Write the output of batch queries to Pulsar

The following examples are in Scala.

// Write key-value data from a DataFrame to a specific Pulsar topic specified in an option
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("admin.url", "http://localhost:8080")
  .option("topic", "topic1")
  .save()

// Write key-value data from a DataFrame to Pulsar using a topic specified in the data
df.selectExpr("__topic", "CAST(__key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("admin.url", "http://localhost:8080")
  .save()

Limitations

Currently, we provide at-least-once semantic. Consequently, when writing either streaming queries or batch queries to Pulsar, some records may be duplicated. A possible solution to remove duplicates when reading the written data could be to introduce a primary (unique) key that can be used to perform de-duplication when reading.

Pulsar specific configurations

Client/producer/reader configurations of Pulsar can be set via DataStreamReader.option with pulsar.client./pulsar.producer./pulsar.reader. prefix, e.g, stream.option("pulsar.reader.receiverQueueSize", "1000000"). For possible Pulsar parameters, check docs at Pulsar client libraries.

Build Spark Pulsar Connector

If you want to build a Spark-Pulsar connector reading data from Pulsar and writing results to Pulsar, follow the steps below.

  1. Checkout the source code.
$ git clone https://github.com/streamnative/pulsar-spark.git
$ cd pulsar-spark
  1. Install Docker.

Pulsar-spark connector is using Testcontainers for integration tests. In order to run the integration tests, make sure you have installed Docker.

  1. Set a Scala version.

Change scala.version and scala.binary.version in pom.xml.

Note

Scala version should be consistent with the Scala version of Spark you use.

  1. Build the project.
$ mvn clean install -DskipTests
  1. Run the tests.
$ mvn clean install

Once the installation is finished, there is a fat jar generated under both local maven repo and target directory.

License

FOSSA Status

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].