All Projects → BenFradet → Spark Kafka Writer

BenFradet / Spark Kafka Writer

Licence: apache-2.0
Write your Spark data to Kafka seamlessly

Programming Languages

scala
5932 projects

Projects that are alternatives of or similar to Spark Kafka Writer

Logisland
Scalable stream processing platform for advanced realtime analytics on top of Kafka and Spark. LogIsland also supports MQTT and Kafka Streams (Flink being in the roadmap). The platform does complex event processing and is suitable for time series analysis. A large set of valuable ready to use processors, data sources and sinks are available.
Stars: ✭ 97 (-44.57%)
Mutual labels:  kafka, spark
Flink Learning
flink learning blog. http://www.54tianzhisheng.cn/ 含 Flink 入门、概念、原理、实战、性能调优、源码解析等内容。涉及 Flink Connector、Metrics、Library、DataStream API、Table API & SQL 等内容的学习案例,还有 Flink 落地应用的大型项目案例(PVUV、日志存储、百亿数据实时去重、监控告警)分享。欢迎大家支持我的专栏《大数据实时计算引擎 Flink 实战与性能优化》
Stars: ✭ 11,378 (+6401.71%)
Mutual labels:  kafka, spark
Bigdata Notes
大数据入门指南 ⭐
Stars: ✭ 10,991 (+6180.57%)
Mutual labels:  kafka, spark
Model Serving Tutorial
Code and presentation for Strata Model Serving tutorial
Stars: ✭ 57 (-67.43%)
Mutual labels:  kafka, spark
Spark Structured Streaming Examples
Spark Structured Streaming / Kafka / Cassandra / Elastic
Stars: ✭ 168 (-4%)
Mutual labels:  kafka, spark
Thingsboard
Open-source IoT Platform - Device management, data collection, processing and visualization.
Stars: ✭ 10,526 (+5914.86%)
Mutual labels:  kafka, spark
Seldon Server
Machine Learning Platform and Recommendation Engine built on Kubernetes
Stars: ✭ 1,435 (+720%)
Mutual labels:  kafka, spark
Bigdata Interview
🎯 🌟[大数据面试题]分享自己在网络上收集的大数据相关的面试题以及自己的答案总结.目前包含Hadoop/Hive/Spark/Flink/Hbase/Kafka/Zookeeper框架的面试题知识总结
Stars: ✭ 857 (+389.71%)
Mutual labels:  kafka, spark
Iot Traffic Monitor
Stars: ✭ 131 (-25.14%)
Mutual labels:  kafka, spark
Abris
Avro SerDe for Apache Spark structured APIs.
Stars: ✭ 130 (-25.71%)
Mutual labels:  kafka, spark
Awesome Recommendation Engine
The purpose of this tiny project is to put things together with the know how that i learned from the course big data expert from formacionhadoop.com The idea is to show how to play with apache spark streaming, kafka,mongo, spark machine learning algorithms.
Stars: ✭ 47 (-73.14%)
Mutual labels:  kafka, spark
Azure Event Hubs Spark
Enabling Continuous Data Processing with Apache Spark and Azure Event Hubs
Stars: ✭ 140 (-20%)
Mutual labels:  kafka, spark
Delta Architecture
Streaming data changes to a Data Lake with Debezium and Delta Lake pipeline
Stars: ✭ 43 (-75.43%)
Mutual labels:  kafka, spark
Repository
个人学习知识库涉及到数据仓库建模、实时计算、大数据、Java、算法等。
Stars: ✭ 92 (-47.43%)
Mutual labels:  kafka, spark
Real Time Stream Processing Engine
This is an example of real time stream processing using Spark Streaming, Kafka & Elasticsearch.
Stars: ✭ 37 (-78.86%)
Mutual labels:  kafka, spark
Bigdata Notebook
Stars: ✭ 100 (-42.86%)
Mutual labels:  kafka, spark
Szt Bigdata
深圳地铁大数据客流分析系统🚇🚄🌟
Stars: ✭ 826 (+372%)
Mutual labels:  kafka, spark
Dockerfiles
50+ DockerHub public images for Docker & Kubernetes - Hadoop, Kafka, ZooKeeper, HBase, Cassandra, Solr, SolrCloud, Presto, Apache Drill, Nifi, Spark, Consul, Riak, TeamCity and DevOps tools built on the major Linux distros: Alpine, CentOS, Debian, Fedora, Ubuntu
Stars: ✭ 847 (+384%)
Mutual labels:  kafka, spark
Example Spark Kafka
Apache Spark and Apache Kafka integration example
Stars: ✭ 120 (-31.43%)
Mutual labels:  kafka, spark
Aliyun Emapreduce Datasources
Extended datasource support for Spark/Hadoop on Aliyun E-MapReduce.
Stars: ✭ 132 (-24.57%)
Mutual labels:  kafka, spark

spark-kafka-writer

Build Status codecov Join the chat at https://gitter.im/BenFradet/spark-kafka-writer Maven Central Stories in Ready

Write your Spark data to Kafka seamlessly

Installation

spark-kafka-writer is available on maven central with the following coordinates depending on whether you're using Kafka 0.8 or 0.10 and your version of Spark:

Kafka 0.8 Kafka 0.10
Spark 2.4.X "com.github.benfradet" %% "spark-kafka-writer" % "0.5.0"
Spark 2.2.X "com.github.benfradet" %% "spark-kafka-writer" % "0.4.0"
Spark 2.1.X "com.github.benfradet" %% "spark-kafka-0-8-writer" % "0.3.0" "com.github.benfradet" %% "spark-kafka-0-10-writer" % "0.3.0"
Spark 2.0.X "com.github.benfradet" %% "spark-kafka-0-8-writer" % "0.2.0" "com.github.benfradet" %% "spark-kafka-0-10-writer" % "0.2.0"
Spark 1.6.X "com.github.benfradet" %% "spark-kafka-writer" % "0.1.0"

Usage

Without callbacks

  • if you want to save an RDD to Kafka:
import com.github.benfradet.spark.kafka.writer._
import org.apache.kafka.common.serialization.StringSerializer

val topic = "my-topic"
val producerConfig = Map(
  "bootstrap.servers" -> "127.0.0.1:9092",
  "key.serializer" -> classOf[StringSerializer].getName,
  "value.serializer" -> classOf[StringSerializer].getName
)

val rdd: RDD[String] = ...
rdd.writeToKafka(
  producerConfig,
  s => new ProducerRecord[String, String](topic, s)
)
  • if you want to save a DStream to Kafka:
import com.github.benfradet.spark.kafka.writer._
import org.apache.kafka.common.serialization.StringSerializer

val dStream: DStream[String] = ...
dStream.writeToKafka(
  producerConfig,
  s => new ProducerRecord[String, String](topic, s)
)
  • if you want to save a Dataset to Kafka:
import com.github.benfradet.spark.kafka.writer._
import org.apache.kafka.common.serialization.StringSerializer

case class Foo(a: Int, b: String)
val dataset: Dataset[Foo] = ...
dataset.writeToKafka(
  producerConfig,
  foo => new ProducerRecord[String, String](topic, foo.toString)
)
  • if you want to write a DataFrame to Kafka:
import com.github.benfradet.spark.kafka.writer._
import org.apache.kafka.common.serialization.StringSerializer

val dataFrame: DataFrame = ...
dataFrame.writeToKafka(
  producerConfig,
  row => new ProducerRecord[String, String](topic, row.toString)
)

With callbacks

It is also possible to assign a Callback from the Kafka Producer API that will be triggered after each write, this has a default value of None.

The Callback must implement the onCompletion method and the Exception parameter will be null if the write was successful.

Any Callback implementations will need to be serializable to be used in Spark.

For example, if you want to use a Callback when saving an RDD to Kafka:

// replace by kafka08 if you're using Kafka 0.8
import com.github.benfradet.spark.kafka010.writer._
import org.apache.kafka.clients.producer.{Callback, ProducerRecord, RecordMetadata}

@transient lazy val log = org.apache.log4j.Logger.getLogger("spark-kafka-writer")

val rdd: RDD[String] = ...
rdd.writeToKafka(
  producerConfig,
  s => new ProducerRecord[String, String](topic, s),
  Some(new Callback with Serializable {
    override def onCompletion(metadata: RecordMetadata, e: Exception): Unit = {
      if (Option(e).isDefined) {
        log.warn("error sending message", e)
      } else {
        log.info(s"write succeeded! offset: ${metadata.offset()}")
      }
    }
  })
)

Check out the Kafka documentation to know more about callbacks.

Java usage

It's also possible to use the library from Java, for example if we were to write a DStream to Kafka:

// Define a serializable Function1 separately
abstract class SerializableFunc1<T, R> extends AbstractFunction1<T, R> implements Serializable {}

Map<String, Object> producerConfig = new HashMap<String, Object>();
producerConfig.put("bootstrap.servers", "localhost:9092");
producerConfig.put("key.serializer", StringSerializer.class);
producerConfig.put("value.serializer", StringSerializer.class);

KafkaWriter<String> kafkaWriter = new DStreamKafkaWriter<>(javaDStream.dstream(),
    scala.reflect.ClassTag$.MODULE$.apply(String.class));
kafkaWriter.writeToKafka(producerConfig.asScala,
    new SerializableFunc1<String, ProducerRecord<String, String>>() {
        @Override
        public ProducerRecord<String, String> apply(final String s) {
            return new ProducerRecord<>(topic, s);
        }
    },
    //new Some<>((metadata, exception) -> {}), // with callback, define your lambda here.
    Option.empty()                             // or without callback.
);

However, #59 will provide a better Java API.

Scaladoc

You can find the full scaladoc at https://benfradet.github.io/spark-kafka-writer.

Credit

The original code was written by Hari Shreedharan.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].