All Projects → InterestingLab → seatunnel-example

InterestingLab / seatunnel-example

Licence: other
seatunnel plugin developing examples.

Programming Languages

scala
5932 projects
java
68154 projects - #9 most used programming language

Projects that are alternatives of or similar to seatunnel-example

Waterdrop
Production Ready Data Integration Product, documentation:
Stars: ✭ 1,856 (+6774.07%)
Mutual labels:  spark-streaming, flink, sql-engine, etl-framework, etl-pipeline
cassandra.realtime
Different ways to process data into Cassandra in realtime with technologies such as Kafka, Spark, Akka, Flink
Stars: ✭ 25 (-7.41%)
Mutual labels:  spark-streaming, flink
redis-connect-dist
Real-Time Event Streaming & Change Data Capture
Stars: ✭ 21 (-22.22%)
Mutual labels:  etl-framework, etl-pipeline
open-stream-processing-benchmark
This repository contains the code base for the Open Stream Processing Benchmark.
Stars: ✭ 37 (+37.04%)
Mutual labels:  spark-streaming, flink
hamilton
A scalable general purpose micro-framework for defining dataflows. You can use it to create dataframes, numpy matrices, python objects, ML models, etc.
Stars: ✭ 612 (+2166.67%)
Mutual labels:  etl-framework, etl-pipeline
etlflow
EtlFlow is an ecosystem of functional libraries in Scala based on ZIO for writing various different tasks, jobs on GCP and AWS.
Stars: ✭ 38 (+40.74%)
Mutual labels:  etl-framework, etl-pipeline
Sylph
Stream computing platform for bigdata
Stars: ✭ 362 (+1240.74%)
Mutual labels:  spark-streaming, flink
litemall-dw
基于开源Litemall电商项目的大数据项目,包含前端埋点(openresty+lua)、后端埋点;数据仓库(五层)、实时计算和用户画像。大数据平台采用CDH6.3.2(已使用vagrant+ansible脚本化),同时也包含了Azkaban的workflow。
Stars: ✭ 36 (+33.33%)
Mutual labels:  spark-streaming, flink
Streamline
StreamLine - Streaming Analytics
Stars: ✭ 151 (+459.26%)
Mutual labels:  spark-streaming, flink
Registry
Schema Registry
Stars: ✭ 184 (+581.48%)
Mutual labels:  spark-streaming, flink
csvplus
csvplus extends the standard Go encoding/csv package with fluent interface, lazy stream operations, indices and joins.
Stars: ✭ 67 (+148.15%)
Mutual labels:  etl-framework, etl-pipeline
DIRECT
DIRECT, the Data Integration Run-time Execution Control Tool, is a data logistics framework that can be used to monitor, log, audit and control data integration / ETL processes.
Stars: ✭ 20 (-25.93%)
Mutual labels:  etl-framework, etl-pipeline
datalake-etl-pipeline
Simplified ETL process in Hadoop using Apache Spark. Has complete ETL pipeline for datalake. SparkSession extensions, DataFrame validation, Column extensions, SQL functions, and DataFrame transformations
Stars: ✭ 39 (+44.44%)
Mutual labels:  etl-framework, etl-pipeline
DaFlow
Apache-Spark based Data Flow(ETL) Framework which supports multiple read, write destinations of different types and also support multiple categories of transformation rules.
Stars: ✭ 24 (-11.11%)
Mutual labels:  etl-framework, etl-pipeline
Streaming Readings
Streaming System 相关的论文读物
Stars: ✭ 554 (+1951.85%)
Mutual labels:  spark-streaming, flink
vixtract
www.vixtract.ru
Stars: ✭ 40 (+48.15%)
Mutual labels:  etl-framework, etl-pipeline
fdp-modelserver
An umbrella project for multiple implementations of model serving
Stars: ✭ 47 (+74.07%)
Mutual labels:  spark-streaming, flink
flink-deployer
A tool that help automate deployment to an Apache Flink cluster
Stars: ✭ 143 (+429.63%)
Mutual labels:  flink
flink-connector-kudu
基于Apache-bahir-kudu-connector的flink-connector-kudu,支持Flink1.11.x DynamicTableSource/Sink,支持Range分区等
Stars: ✭ 40 (+48.15%)
Mutual labels:  flink
Real-time-log-analysis-system
🐧基于spark streaming+flume+kafka+hbase的实时日志处理分析系统(分为控制台版本和基于springboot、Echarts等的Web UI可视化版本)
Stars: ✭ 31 (+14.81%)
Mutual labels:  spark-streaming

本文面向所有 seatunnel 插件开发人员,尽可能清晰得阐述开发一个 seatunnel 插件所经历的过程,希望能消除开发者的困惑。

seatunnel 为什么需要插件机制

  1. 原生提供插件可能仅能满足80%的需求,还有20%需要你自己来开发
  2. 有了插件机制,开发仅需关注特定插件的处理逻辑,数据处理的共性问题,由框架来统一处理。
  3. 通用插件大大提升了代码的复用率。

seatunnel 插件体系介绍

seatunnel 插件分为3部分,Inputfilteroutput,这里贴一个简版类图

Input

Input 插件主要负责从外部读取数据并且封装为 Spark DataSet[Row] 数据集。插件有三种类型,分别是:

  1. 读取静态离线数据的 BaseStaticInput
  2. 实时数据流处理 BaseStreamingInput,并且我们在此接口基础上开发了方便 Java 使用的 BaseJavaStreamingInput
  3. 状态的实时流处理 BaseStructuredStreamingInput

Filter

Filter 插件主要负责处理类型为 Spark DataSet[Row] 的输入,并且返回同样为 Spark DataSet[Row] 类型的数据集。

Output

Output 插件则负责将 Spark DataSet[Row] 数据集输出到外部数据源或者将结果打印到终端。

如何开发一个插件

第一步,创建项目

创建项目或者直接拉取本项目代码

git clone https://github.com/InterestingLab/seatunnel-example.git

当你看到这里的,相比这一步不用过多阐述

第二步,配置seatunnel-api依赖

sbt

libraryDependencies += "io.github.interestinglab.waterdrop" %% "waterdrop-apis" % "1.5.0"

maven

<dependency>
    <groupId>io.github.interestinglab.waterdrop</groupId>
    <artifactId>waterdrop-apis_2.11</artifactId>
    <version>1.5.0</version>
</dependency>

第三步,继承并实现接口

1. Input

新建一个类,并继承 waterdrop-apis 提供的父类并实现相关接口完成 Input 插件开发。

1.1 StreamingInput

BaseStreamingInput 用于实现一个流式处理 Input 插件,它支持泛型,用户可以根据实际数据情况指定类型。

需要注意,seatunnel 中的流式计算插件,类名必须以 Stream 结尾,如 hdfsStream

BaseStreamingInput 接口定义如下:

abstract class BaseStreamingInput[T] extends Plugin {

  /**
   * Things to do after filter and before output
   * */
  def beforeOutput: Unit = {}

  /**
   * Things to do after output, such as update offset
   * */
  def afterOutput: Unit = {}

  /**
   * This must be implemented to convert RDD[T] to Dataset[Row] for later processing
   * */
  def rdd2dataset(spark: SparkSession, rdd: RDD[T]): Dataset[Row]

  /**
   * start should be invoked in when data is ready.
   * */
  def start(spark: SparkSession, ssc: StreamingContext, handler: Dataset[Row] => Unit): Unit = {

    getDStream(ssc).foreachRDD(rdd => {
      val dataset = rdd2dataset(spark, rdd)
      handler(dataset)
    })
  }

  /**
   * Create spark dstream from data source, you can specify type parameter.
   * */
  def getDStream(ssc: StreamingContext): DStream[T]

BaseStreamingInput 接口功能如下:

  • beforeOutput: 定义调用 Output 之前的逻辑。
  • afterOutput: 定义调用 Output 之后的逻辑,常用于维护 Offset 实现 at-least-once 语义
  • rdd2dataset: 定义 RDD[T] 转换为 DataSet[Row] 的处理逻辑
  • start: 内部操作逻辑,可以无需关心
  • getDStream: 定义获取外部数据源数据的逻辑,需要生成 DStream[T]

总的来说,我们需要定义外部数据源转换为 DataSet[Row] 的逻辑。

1.2 BaseStaticInput

BaseStaticInput 用于实现一个离线/静态数据源读取插件,接口定义如下

abstract class BaseStaticInput extends Plugin {

  /**
   * Get Dataset from this Static Input.
   * */
  def getDataset(spark: SparkSession): Dataset[Row]
}

接口功能如下:

  • getDataset: 将外部数据源转换为 DataSet[Row]

1.3 BaseStructuredStreamingInput

BaseStructuredStreamingInput 接口定义与 BaseStaticFilter 定义类似,此处不再做过多陈述。

2. Filter

新建一个类,并继承 waterdrop-apis 提供的父类并实现相关接口完成 Filter 插件开发。

BaseFilter

Spark 较好实现了批流一体,因此我们的 Filter 插件仅有一套API,且大部分 Filter 插件都能同时支持批处理和流处理。

BaseFilter 接口定义如下:

abstract class BaseFilter extends Plugin {

  def process(spark: SparkSession, df: Dataset[Row]): Dataset[Row]

  /**
   * Allow to register user defined UDFs
   * @return empty list if there is no UDFs to be registered
   * */
  def getUdfList(): List[(String, UserDefinedFunction)] = List.empty

  /**
   * Allow to register user defined UDAFs
   * @return empty list if there is no UDAFs to be registered
   * */
  def getUdafList(): List[(String, UserDefinedAggregateFunction)] = List.empty
}

BaseFilter 接口功能如下:

process: 定义 Filter 插件的具体操作逻辑,方法输入和输出都是 DataSet[Row] getUdfList: 定义需要被注册的 UDF 列表 getUdafList: 定义需要被注册的 UDAF 列表

大部分场景我们仅需要实现 process 方法定义数据处理逻辑即可。

3. Output

新建一个类,并继承 waterdrop-apis 提供的父类并实现相关接口完成 Output 插件开发。

3.1 BaseOutput

BaseOutput 插件支持批处理和 Spark Streaming,不支持 Spark Structured Streaming

BaseOutput 接口定义如下:

abstract class BaseOutput extends Plugin {

  def process(df: Dataset[Row])
}

BaseOutput 接口功能如下:

process: 定义 Dataset[Row] 数据输出到外部数据源的方法,需要注意,这里需要触发一个 action 操作

3.2 BaseStructuredStreamingOutputIntra

BaseStructuredStreamingOutputIntra 则是为了提供 Spark Structured Streaming 对外输出的插件。

BaseStructuredStreamingOutputIntra 接口定义

trait BaseStructuredStreamingOutputIntra extends Plugin {

  def process(df: Dataset[Row]): DataStreamWriter[Row]
}

BaseStructuredStreamingOutputIntra 接口功能:

process: 与 BaseOutput 不同的是,这里返回的是 DataStreamWriter[Row]

3.3 BaseStructuredStreamingOutput

BaseStructuredStreamingOutputIntra 主要为了提供 Spark 现在的 Output 方法支持,而 BaseStructuredStreamingOutput 则是为了自定义输出数据源。

BaseStructuredStreamingOutput 接口定义如下:

trait BaseStructuredStreamingOutput extends ForeachWriter[Row] with BaseStructuredStreamingOutputIntra {

  /**
   * Things to do before process.
   * */
  def open(partitionId: Long, epochId: Long): Boolean

  /**
   * Things to do with each Row.
   * */
  def process(row: Row): Unit

  /**
   * Things to do after process.
   * */
  def close(errorOrNull: Throwable): Unit

  /**
   * seatunnel Structured Streaming process.
   * */
  def process(df: Dataset[Row]): DataStreamWriter[Row]
}

BaseStructuredStreamingOutput 接口功能如下:

open: 定义处理之前的逻辑 process: 定义每条数据的处理逻辑 close: 定义处理之后的逻辑 process: seatunnel 内部的处理逻辑,需要返回 DataStreamWriter[Row]

第四步,打包使用

  1. 编译打包
  2. 将打包后的 Jar 包放到 seatunnel 指定目录下,以便 seatunnel 在启动的时候可以加载到。
cd seatunnel
mkdir -p plugins/my_plugins/lib
cd plugins/my_plugins/lib

seatunnel需要将第三方Jar包放到,必须新建lib文件夹

plugins/your_plugin_name/lib/your_jar_name

其他文件放到

plugins/your_plugin_name/files/your_file_name

  1. 在配置文件中使用插件

第三方插件在使用时,必须使用插件的完整包路径,例如

org.interestinglab.seatunnel.output.JavaOutput

output {                                        
    org.interestinglab.seatunnel.output.JavaStdout {
        limit = 2
    }
}

第五步, 启动

至此,我们就完成了一个插件的开发,并且在 seatunnel 中使用这个插件。

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].