All Projects → collabH → flink-connector-kudu

collabH / flink-connector-kudu

Licence: other
基于Apache-bahir-kudu-connector的flink-connector-kudu,支持Flink1.11.x DynamicTableSource/Sink,支持Range分区等

Programming Languages

java
68154 projects - #9 most used programming language

Projects that are alternatives of or similar to flink-connector-kudu

flink-demo
Flink Demo
Stars: ✭ 39 (-2.5%)
Mutual labels:  flink, flink-sql
Real-time-Data-Warehouse
Real-time Data Warehouse with Apache Flink & Apache Kafka & Apache Hudi
Stars: ✭ 52 (+30%)
Mutual labels:  flink, flink-sql
LarkMidTable
LarkMidTable 是一站式开源的数据中台,实现中台的 基础建设,数据治理,数据开发,监控告警,数据服务,数据的可视化,实现高效赋能数据前台并提供数据服务的产品。
Stars: ✭ 873 (+2082.5%)
Mutual labels:  flink, flink-sql
Nussknacker
Process authoring tool for Apache Flink
Stars: ✭ 182 (+355%)
Mutual labels:  flink
Flink Spector
Framework for Apache Flink unit tests
Stars: ✭ 190 (+375%)
Mutual labels:  flink
flink-spark-submiter
从本地IDEA提交Flink/Spark任务到Yarn/k8s集群
Stars: ✭ 157 (+292.5%)
Mutual labels:  flink
bigdata-doc
大数据学习笔记,学习路线,技术案例整理。
Stars: ✭ 37 (-7.5%)
Mutual labels:  flink
Flink Commodity Recommendation System
🐳基于 Flink 的商品实时推荐系统。使用了 redis 缓存热点数据。当用户产生评分行为时,数据由 kafka 发送到 flink,根据用户历史评分行为进行实时和离线推荐。实时推荐包括:基于行为和实时热门,离线推荐包括:历史热门、历史优质商品和 itemcf 。
Stars: ✭ 167 (+317.5%)
Mutual labels:  flink
parquet-flinktacular
How to use Parquet in Flink
Stars: ✭ 29 (-27.5%)
Mutual labels:  flink
Lidea
大型分布式系统实时监控平台
Stars: ✭ 28 (-30%)
Mutual labels:  flink
Addax
Addax is an open source universal ETL tool that supports most of those RDBMS and NoSQLs on the planet, helping you transfer data from any one place to another.
Stars: ✭ 615 (+1437.5%)
Mutual labels:  kudu
Flink Sql Cookbook
The Apache Flink SQL Cookbook is a curated collection of examples, patterns, and use cases of Apache Flink SQL. Many of the recipes are completely self-contained and can be run in Ververica Platform as is.
Stars: ✭ 189 (+372.5%)
Mutual labels:  flink
FlinkExperiments
Experiments with Apache Flink.
Stars: ✭ 3 (-92.5%)
Mutual labels:  flink
Registry
Schema Registry
Stars: ✭ 184 (+360%)
Mutual labels:  flink
dpkb
大数据相关内容汇总,包括分布式存储引擎、分布式计算引擎、数仓建设等。关键词:Hadoop、HBase、ES、Kudu、Hive、Presto、Spark、Flink、Kylin、ClickHouse
Stars: ✭ 123 (+207.5%)
Mutual labels:  flink
Sparkstreaming
💥 🚀 封装sparkstreaming动态调节batch time(有数据就执行计算);🚀 支持运行过程中增删topic;🚀 封装sparkstreaming 1.6 - kafka 010 用以支持 SSL。
Stars: ✭ 179 (+347.5%)
Mutual labels:  flink
flink-deployer
A tool that help automate deployment to an Apache Flink cluster
Stars: ✭ 143 (+257.5%)
Mutual labels:  flink
Flink Doc Zh
Apache Flink 中文文档
Stars: ✭ 242 (+505%)
Mutual labels:  flink
Flink Boot
懒松鼠Flink-Boot 脚手架让Flink全面拥抱Spring生态体系,使得开发者可以以Java WEB开发模式开发出分布式运行的流处理程序,懒松鼠让跨界变得更加简单。懒松鼠旨在让开发者以更底上手成本(不需要理解分布式计算的理论知识和Flink框架的细节)便可以快速编写业务代码实现。为了进一步提升开发者使用懒松鼠脚手架开发大型项目的敏捷的度,该脚手架默认集成Spring框架进行Bean管理,同时将微服务以及WEB开发领域中经常用到的框架集成进来,进一步提升开发速度。比如集成Mybatis ORM框架,Hibernate Validator校验框架,Spring Retry重试框架等,具体见下面的脚手架特性。
Stars: ✭ 209 (+422.5%)
Mutual labels:  flink
fdp-modelserver
An umbrella project for multiple implementations of model serving
Stars: ✭ 47 (+17.5%)
Mutual labels:  flink

flink-connector-kudu

Kudu Connector

  • 基于Apache-Bahir-Kudu-Connector改造而来的满足公司内部使用的Kudu Connector,支持特性Range分区、定义Hash分桶数、支持Flink1.11.x动态数据源等,改造后已贡献部分功能给社区。

Tag版本

  • v1.0.0使用kudu-client:1.10.0
    • 注意点:目前使用kudu1.13之前的版本,kudu不支持delete ignore,因此在数据delete的时候该条数据一定要存在否则会出现not primary key异常,目前的connector中解决方法为如果判断是Delete,则根据主键查询,查询不到数据则不进行删除(这样存在的问题是Delete操作需要一次查询IO,个人建议升级Kudu版本至1.14, 改造RowDataUpsertOperationMapper将newDelete改成newDeleteIgnore即可。)
  • v1.1.0使用kudu-client:1.14.0
    • 不存在v1.0.0问题

Version

  • branch:feature_support_with_flink113x
    • 兼容Flink1.13.x主要改造点为KuduCatalogFactory,1.13.x过期的TableSchema类并未修改(不影响使用)

使用姿势

  • clone代码后,改造pom项目坐标后上传公司私服使用

Kudu Catalog使用

创建Catalog

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
catalog = new KuduCatalog("cdh01:7051,cdh02:7051,cdh03:7051");
tableEnv = KuduTableTestUtils.createTableEnvWithBlinkPlannerStreamingMode(env);
tableEnv.registerCatalog("kudu", catalog);
tableEnv.useCatalog("kudu");

Catalog API

// dropTable
 catalog.dropTable(new ObjectPath("default_database", "test_Replice_kudu"), true);
 // 通过catalog操作表
tableEnv.sqlQuery("select * from test");
tableEnv.executeSql("drop table test");
tableEnv.executeSql("insert into testRange values(1,'hsm')");

FlinkSQL

KuduTable Properties

  • 通过connector.typeconnector区分使用TableSourceFactory还是KuduDynamicTableSource
kudu.table=指定映射的kudu表
kudu.masters=指定的kudu master地址
kudu.hash-columns=指定的表的hash分区键,多个使用","分割
kudu.replicas=kudu tablet副本数,默认为3
kudu.hash-partition-nums=hash分区的桶个数,默认为2 * replicas
kudu.range-partition-rule=range分区规则,rangeKey#leftValue,RightValue:rangeKey#leftValue1,RightValue1,rangeKey必须为主键
kudu.primary-key-columns=kudu表主键,多个实用","分割,主键定义必须有序
kudu.lookup.cache.max-rows=kudu时态表缓存最大缓存行,默认为不开启
kudu.lookup.cache.ttl=kudu时态表cache过期时间
kudu.lookup.max-retries=时态表join时报错重试次数,默认为3

Flink1.10.x版本

CREATE TABLE TestTableTableSourceFactory (
  first STRING,
  second STRING,
  third INT NOT NULL
) WITH (
  'connector.type' = 'kudu',
  'kudu.masters' = '...',
  'kudu.table' = 'TestTable',
  'kudu.hash-columns' = 'first',
  'kudu.primary-key-columns' = 'first,second'
)

Flink1.11.x版本

CREATE TABLE TestTableKuduDynamicTableSource (
  first STRING,
  second STRING,
  third INT NOT NULL
) WITH (
  'connector' = 'kudu',
  'kudu.masters' = '...',
  'kudu.table' = 'TestTable',
  'kudu.hash-columns' = 'first',
  'kudu.primary-key-columns' = 'first,second'
)

DataStream使用

  • DataStream使用方式具体查看bahir-flink官方,目前对于数仓工程师使用场景偏少。

版本迭代

1.1版本Feature

  • 增加Hash分区bucket属性配置,通过kudu.hash-partition-nums配置
  • 增加Range分区规则,支持Hash和Range分区同时使用,通过参数kudu.range-partition-rule 配置,规则格式如:range分区规则,rangeKey#leftValue,RightValue:rangeKey#leftValue1,RightValue1
  • 增加Kudu时态表支持,通过kudu.lookup.*相关函数控制内存数据的大小和TTL
 /**
     * lookup缓存最大行数
     */
  public static final String KUDU_LOOKUP_CACHE_MAX_ROWS = "kudu.lookup.cache.max-rows";
    /**
     * lookup缓存过期时间
     */
    public static final String KUDU_LOOKUP_CACHE_TTL = "kudu.lookup.cache.ttl";
    /**
     * kudu连接重试次数
     */
    public static final String KUDU_LOOKUP_MAX_RETRIES = "kudu.lookup.max-retries";

实现机制

  • 自定义KuduLookupFunction,使得KuduTableSource实现LookupableTableSource接口将自定义LookupFunction 返回已提供时态表的功能,底层缓存没有使用Flink JDBCGuava Cache而是使用效率更高的Caffeine Cache使得其缓存效率更高,同时也减轻了因大量请求为Kudu带来的压力

未来展望

当前问题

  1. SQL语句主键无法自动推断

目前基于Apache Bahir Kudu Connector增强的功能主要是为了服务公司业务,在使用该版本的connector也遇到了问题,SQL的主键无法自动推断导致数据无法直接传递到下游,内部通过天宫引擎通过Flink Table APIsqlQuery方法将结果集查询为一个Table对象,然后将Table转换为DataStream<Tuple2<Boolean,Row>>撤回流,最终通过Kudu Connector提供的KuduSinkUpsertOperationMapper对象将撤回流输出到Kudu中。

后续计划

  • 计划提供动态数据源来解决这一问题,将Flink 1.11.x之前的KuduTableSource/KuduTableSink改造为DynamicSource/Sink接口实现Source/Sink,以此解决主键推断问题。

1.2版本Feature

  • 改造支持Flink 1.11.x之后的DynamicSource/Sink,以此解决SQL语句主键无法推断问题,支持流批JOIN功能的SQL语句方式,无需在通过转换成DataStream的方式进行多表Join操作。
  • 内嵌Metrics上报机制,通过对Flink动态工厂入口处对操作的kudu表进行指标埋点,从而更加可视化的监控kudu表数据上报问题。
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].