All Projects → Eneco → kafka-connect-ftp

Eneco / kafka-connect-ftp

Licence: Apache-2.0 license
A Kafka Connect Source for FTP servers - Monitors files on an FTP server and feeds changes into Kafka

Programming Languages

scala
5932 projects
shell
77523 projects

Projects that are alternatives of or similar to kafka-connect-ftp

kafka-connect-fs
Kafka Connect FileSystem Connector
Stars: ✭ 107 (+132.61%)
Mutual labels:  ftp, kafka-connect
Stream Reactor
Streaming reference architecture for ETL with Kafka and Kafka-Connect. You can find more on http://lenses.io on how we provide a unified solution to manage your connectors, most advanced SQL engine for Kafka and Kafka Streams, cluster monitoring and alerting, and more.
Stars: ✭ 753 (+1536.96%)
Mutual labels:  ftp, kafka-connect
DecoyMini
🐝 A highly scalable, safe, free enterprise honeypots 一款高可扩展、安全、免费的企业级蜜罐系统
Stars: ✭ 213 (+363.04%)
Mutual labels:  ftp
maxwell-sink
consume maxwell generated message from kafka,export it to another mysql.
Stars: ✭ 16 (-65.22%)
Mutual labels:  kafka-connect
cassandra.realtime
Different ways to process data into Cassandra in realtime with technologies such as Kafka, Spark, Akka, Flink
Stars: ✭ 25 (-45.65%)
Mutual labels:  kafka-connect
one-ftpserver
Simple, portable FTP server for one user by one executable jar file.
Stars: ✭ 26 (-43.48%)
Mutual labels:  ftp
C-Browser-Password-Cracker
C++ Firefox & Google Chrome Cracker Source Code
Stars: ✭ 24 (-47.83%)
Mutual labels:  ftp
kafkacli
CLI and Go Clients to manage Kafka components (Kafka Connect & SchemaRegistry)
Stars: ✭ 28 (-39.13%)
Mutual labels:  kafka-connect
docker-kafka-connect
Docker Image for kafka-connect
Stars: ✭ 16 (-65.22%)
Mutual labels:  kafka-connect
wget-lua
Wget-AT is a modern Wget with Lua hooks, Zstandard (+dictionary) WARC compression and URL-agnostic deduplication.
Stars: ✭ 52 (+13.04%)
Mutual labels:  ftp
scylla-cdc-source-connector
A Kafka source connector capturing Scylla CDC changes
Stars: ✭ 19 (-58.7%)
Mutual labels:  kafka-connect
acd
ACD helps you download Adobe Connect Sessions Videos and Audios, download files from FTP server, transfer files using Shift I/O
Stars: ✭ 117 (+154.35%)
Mutual labels:  ftp
bigquery-kafka-connect
☁️ nodejs kafka connect connector for Google BigQuery
Stars: ✭ 17 (-63.04%)
Mutual labels:  kafka-connect
connor
A commandline tool for resetting Kafka Connect source connector offsets.
Stars: ✭ 17 (-63.04%)
Mutual labels:  kafka-connect
Bash-Backup-Script
A small script to upload backup tar to an external storage service
Stars: ✭ 21 (-54.35%)
Mutual labels:  ftp
kafka-with-springboot
Demonstrations for Kafka with Spring Boot
Stars: ✭ 17 (-63.04%)
Mutual labels:  kafka-connect
ftp-server
A FTP Server base on Spring Boot and Apache Ftp Server.😝
Stars: ✭ 17 (-63.04%)
Mutual labels:  ftp
kafka-connect-arangodb
🥑 Kafka connect sink connector for ArangoDB
Stars: ✭ 22 (-52.17%)
Mutual labels:  kafka-connect
simple http server
simple http server for upload and download
Stars: ✭ 101 (+119.57%)
Mutual labels:  ftp
docker-vsftpd
vsftpd Docker Image
Stars: ✭ 39 (-15.22%)
Mutual labels:  ftp

Kafka Connect FTP

Build Status

Monitors files on an FTP server and feeds changes into Kafka.

Remote directories of interest are to be provided. On a specified interval, the list of files in the directories is refreshed. Files are downloaded when they were not known before, or when their timestamp or size are changed. Only files with a timestamp younger than the specified maximum age are considered. Hashes of the files are maintained and used to check for content changes. Changed files are then fed into Kafka, either as a whole (update) or only the appended part (tail), depending on the configuration. Optionally, file bodies can be transformed through a pluggable system prior to putting it into Kafka.

Data Types

Each Kafka record represents a file, and has the following types.

  • The format of the keys is configurable through ftp.keystyle=string|struct. It can be a string with the file name, or a FileInfo structure with name: string and offset: long. The offset is always 0 for files that are updated as a whole, and hence only relevant for tailed files.
  • The values of the records contain the body of the file as bytes.

Setup

Properties

In addition to the general configuration for Kafka connectors (e.g. name, connector.class, etc.) the following options are available.

name data type required default description
ftp.address string yes - host[:port] of the ftp server
ftp.user string yes - username
ftp.password string yes - password
ftp.refresh string yes - iso8601 duration the server is polled
ftp.file.maxage string yes - iso8601 duration how old files can be
ftp.keystyle string yes - string or struct, see above
ftp.monitor.tail list no - comma separated list of path:destinationtopic
ftp.monitor.update list no - comma separated list of path:destinationtopic
ftp.sourcerecordconverter string no No operation Source Record converter class name, see below

An example file is here.

Tailing Versus Update as a Whole

The following rules are used.

  • Tailed files are only allowed to grow. Bytes that have been appended to it since a last inspection are yielded. Preceding bytes are not allowed to change;
  • Updated files can grow, shrink and change anywhere. The entire contents are yielded.

Usage

Build.

mvn clean package

Put jar into CLASSPATH.

export CLASSPATH=`realpath ./target/kafka-connect-ftp-0.1-jar-with-dependencies.jar` 

With $CONFLUENT_HOME pointing to the root of your Confluent Platform installation, start.

$CONFLUENT_HOME/bin/connect-standalone $CONFLUENT_HOME/etc/schema-registry/connect-avro-standalone.properties your.specific.properties

Data Converters

Instead of dumping whole file bodies (and the danger of exceeding Kafka's message.max.bytes), one might want to give an interpretation to the data contained in the files before putting it into Kafka. For example, if the files that are fetched from the FTP are comma-separated values (CSVs), one might prefer to have a stream of CSV records instead. To allow to do so, the connector provides a pluggable conversion of SourceRecords. Right before sending a SourceRecord to the Connect framework, it is run through an object that implements:

package com.eneco.trading.kafka.connect.ftp.source

trait SourceRecordConverter extends Configurable {
  def convert(in:SourceRecord) : java.util.List[SourceRecord]
}

(for the Java people, read: interface instead of trait).

The default object that is used is a pass-through converter, an instance of:

class NopSourceRecordConverter extends SourceRecordConverter{
  override def configure(props: util.Map[String, _]): Unit = {}
  override def convert(in: SourceRecord): util.List[SourceRecord] = Seq(in).asJava
}

To override it, create your own implementation of SourceRecordConverter, put the jar into your $CLASSPATH and instruct the connector to use it via the .properties:

ftp.sourcerecordconverter=your.name.space.YourConverter
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].