All Projects → ivi-ru → Flink Clickhouse Sink

ivi-ru / Flink Clickhouse Sink

Licence: mit
Flink sink for Clickhouse

Programming Languages

java
68154 projects - #9 most used programming language

Projects that are alternatives of or similar to Flink Clickhouse Sink

Szt Bigdata
深圳地铁大数据客流分析系统🚇🚄🌟
Stars: ✭ 826 (+400.61%)
Mutual labels:  flink, clickhouse
np-flink
flink详细学习实践
Stars: ✭ 26 (-84.24%)
Mutual labels:  clickhouse, flink
litemall-dw
基于开源Litemall电商项目的大数据项目,包含前端埋点(openresty+lua)、后端埋点;数据仓库(五层)、实时计算和用户画像。大数据平台采用CDH6.3.2(已使用vagrant+ansible脚本化),同时也包含了Azkaban的workflow。
Stars: ✭ 36 (-78.18%)
Mutual labels:  clickhouse, flink
Flink Learning
flink learning blog. http://www.54tianzhisheng.cn/ 含 Flink 入门、概念、原理、实战、性能调优、源码解析等内容。涉及 Flink Connector、Metrics、Library、DataStream API、Table API & SQL 等内容的学习案例,还有 Flink 落地应用的大型项目案例(PVUV、日志存储、百亿数据实时去重、监控告警)分享。欢迎大家支持我的专栏《大数据实时计算引擎 Flink 实战与性能优化》
Stars: ✭ 11,378 (+6795.76%)
Mutual labels:  flink, clickhouse
Quicksql
A Flexible, Fast, Federated(3F) SQL Analysis Middleware for Multiple Data Sources
Stars: ✭ 1,821 (+1003.64%)
Mutual labels:  flink
Datax
DataX is an open source universal ETL tool that support Cassandra, ClickHouse, DBF, Hive, InfluxDB, Kudu, MySQL, Oracle, Presto(Trino), PostgreSQL, SQL Server
Stars: ✭ 116 (-29.7%)
Mutual labels:  clickhouse
Waterdrop
Production Ready Data Integration Product, documentation:
Stars: ✭ 1,856 (+1024.85%)
Mutual labels:  flink
Flinkstreamsql
基于开源的flink,对其实时sql进行扩展;主要实现了流与维表的join,支持原生flink SQL所有的语法
Stars: ✭ 1,682 (+919.39%)
Mutual labels:  flink
Big Whale
Spark、Flink等离线任务的调度以及实时任务的监控
Stars: ✭ 163 (-1.21%)
Mutual labels:  flink
Pg2ch
Data streaming from postgresql to clickhouse via logical replication mechanism
Stars: ✭ 149 (-9.7%)
Mutual labels:  clickhouse
Pandahouse
Pandas interface for Clickhouse database
Stars: ✭ 126 (-23.64%)
Mutual labels:  clickhouse
Apijson
🚀 零代码、热更新、全自动 ORM 库,后端接口和文档零代码,前端(客户端) 定制返回 JSON 的数据和结构。 🚀 A JSON Transmission Protocol and an ORM Library for automatically providing APIs and Docs.
Stars: ✭ 12,559 (+7511.52%)
Mutual labels:  clickhouse
Carbon Clickhouse
Graphite metrics receiver with ClickHouse as storage
Stars: ✭ 139 (-15.76%)
Mutual labels:  clickhouse
Clickhouse Rs
Asynchronous ClickHouse client library for Rust programming language.
Stars: ✭ 113 (-31.52%)
Mutual labels:  clickhouse
Streamline
StreamLine - Streaming Analytics
Stars: ✭ 151 (-8.48%)
Mutual labels:  flink
Java learning practice
java 进阶之路:面试高频算法、akka、多线程、NIO、Netty、SpringBoot、Spark&&Flink 等
Stars: ✭ 110 (-33.33%)
Mutual labels:  flink
Hadoopcryptoledger
Hadoop Crypto Ledger - Analyzing CryptoLedgers, such as Bitcoin Blockchain, on Big Data platforms, such as Hadoop/Spark/Flink/Hive
Stars: ✭ 126 (-23.64%)
Mutual labels:  flink
Clickhouse Net
Yandex ClickHouse fully managed .NET client
Stars: ✭ 142 (-13.94%)
Mutual labels:  clickhouse
Pulsar Flink
Elastic data processing with Apache Pulsar and Apache Flink
Stars: ✭ 126 (-23.64%)
Mutual labels:  flink
Sqli
orm sql interface, Criteria, CriteriaBuilder, ResultMapBuilder
Stars: ✭ 1,644 (+896.36%)
Mutual labels:  clickhouse

Flink-ClickHouse-Sink

Build Status Maven Central

Description

Flink sink for ClickHouse database. Powered by Async Http Client.

High-performance library for loading data to ClickHouse.

It has two triggers for loading data: by timeout and by buffer size.

Version map
flink flink-clickhouse-sink
1.3.* 1.0.0
1.9.0 1.3.0

Install

Maven Central
<dependency>
  <groupId>ru.ivi.opensource</groupId>
  <artifactId>flink-clickhouse-sink</artifactId>
  <version>1.3.0</version>
</dependency>

Usage

Properties

The flink-clickhouse-sink uses two parts of configuration properties: common and for each sink in you operators chain.

The common part (use like global):

clickhouse.sink.num-writers - number of writers, which build and send requests,

clickhouse.sink.queue-max-capacity - max capacity (batches) of blank's queue,

clickhouse.sink.timeout-sec - timeout for loading data,

clickhouse.sink.retries - max number of retries,

clickhouse.sink.failed-records-path- path for failed records,

clickhouse.sink.ignoring-clickhouse-sending-exception-enabled - required boolean parameter responsible for raising (false) or not (true) ClickHouse sending exception in main thread. if ignoring-clickhouse-sending-exception-enabled is true, exception while clickhouse sending is ignored and failed data automatically goes to the disk. if ignoring-clickhouse-sending-exception-enabled is false, clickhouse sending exception thrown in "main" thread (thread which called ClickhHouseSink::invoke) and data also goes to the disk.

The sink part (use in chain):

clickhouse.sink.target-table - target table in ClickHouse,

clickhouse.sink.max-buffer-size- buffer size.

In code

The main thing: the clickhouse-sink works with events in string (ClickHouse insert format, like CSV) format. You have to convert your event to csv format (like usual insert in database).

For example, you have event-pojo:

class A {
   public final String str;
   public final int integer;
   
   public A(String str, int i){
       this.str = str;
       this.integer = i;
   }
}

You have to convert this pojo like this:

public static String convertToCsv(A a) {
    StringBuilder builder = new StringBuilder();
    builder.append("(");
    
    // add a.str
    builder.append("'");
    builder.append(a.str);
    builder.append("', ");
    
    // add a.intger
    builder.append(String.valueOf(a.integer));
    builder.append(" )");
    return builder.toString();
}

And then add record to sink.

You have to add global parameters for Flink environment:

StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironment();
Map<String, String> globalParameters = new HashMap<>();

// ClickHouse cluster properties
globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, ...);
globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_USER, ...);
globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_PASSWORD, ...);

// sink common
globalParameters.put(ClickHouseSinkConsts.TIMEOUT_SEC, ...);
globalParameters.put(ClickHouseSinkConsts.FAILED_RECORDS_PATH, ...);
globalParameters.put(ClickHouseSinkConsts.NUM_WRITERS, ...);
globalParameters.put(ClickHouseSinkConsts.NUM_RETRIES, ...);
globalParameters.put(ClickHouseSinkConsts.QUEUE_MAX_CAPACITY, ...);
globalParameters.put(ClickHouseSinkConsts.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, ...);

// set global paramaters
ParameterTool parameters = ParameterTool.fromMap(buildGlobalParameters(config));
environment.getConfig().setGlobalJobParameters(parameters);

And add your sink like this:

// create converter
public YourEventConverter {
    String toClickHouseInsertFormat (YourEvent yourEvent){
        String chFormat = ...;
        ....
        return chFormat;
    }
}

// create props for sink
Properties props = new Properties();
props.put(ClickHouseSinkConsts.TARGET_TABLE_NAME, "your_table");
props.put(ClickHouseSinkConsts.MAX_BUFFER_SIZE, "10000");

// build chain
DataStream<YourEvent> dataStream = ...;
dataStream.map(YourEventConverter::toClickHouseInsertFormat)
          .name("convert YourEvent to ClickHouse table format")
          .addSink(new ClickHouseSink(props))
          .name("your_table ClickHouse sink);

Roadmap

  • [ ] reading files from "failed-records-path"
  • [ ] migrate to gradle
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].