All Projects → komamitsu → fluency

komamitsu / fluency

Licence: Apache-2.0 License
High throughput data ingestion logger to Fluentd, AWS S3 and Treasure Data

Programming Languages

java
68154 projects - #9 most used programming language

Projects that are alternatives of or similar to fluency

s3-edit
Edit directly a file on Amazon S3 in CLI
Stars: ✭ 69 (-48.89%)
Mutual labels:  aws-s3, s3
Bucket-Flaws
Bucket Flaws ( S3 Bucket Mass Scanner ): A Simple Lightweight Script to Check for Common S3 Bucket Misconfigurations
Stars: ✭ 43 (-68.15%)
Mutual labels:  aws-s3, s3
reaction-file-collections-sa-s3
An S3 storage adapter for Reaction Commerce's reaction-file-collections
Stars: ✭ 14 (-89.63%)
Mutual labels:  aws-s3, s3
fluent-forward-go
A high-performance Go client for Fluentd and Fluent Bit
Stars: ✭ 26 (-80.74%)
Mutual labels:  logger, fluentd
ionic-image-upload
Ionic Plugin for Uploading Images to Amazon S3
Stars: ✭ 26 (-80.74%)
Mutual labels:  aws-s3, s3
BlobHelper
BlobHelper is a common, consistent storage interface for Microsoft Azure, Amazon S3, Komodo, Kvpbase, and local filesystem written in C#.
Stars: ✭ 23 (-82.96%)
Mutual labels:  aws-s3, s3
terraform-aws-s3-object
Terraform module which creates S3 object resources on AWS
Stars: ✭ 15 (-88.89%)
Mutual labels:  aws-s3, s3
s3-fuzzer
🔐 A concurrent, command-line AWS S3 Fuzzer. Written in Go.
Stars: ✭ 43 (-68.15%)
Mutual labels:  aws-s3, s3
fluent-bit-go-s3
[Deprecated] The predessor of fluent-bit output plugin for Amazon S3. https://aws.amazon.com/s3/
Stars: ✭ 34 (-74.81%)
Mutual labels:  s3, fluentd
Dive-Into-AWS
Links to the Repos and Sections in our Dive into AWS Course.
Stars: ✭ 27 (-80%)
Mutual labels:  aws-s3, s3
django-s3file
A lightweight file upload input for Django and Amazon S3
Stars: ✭ 66 (-51.11%)
Mutual labels:  aws-s3, s3
laravel-uppy-s3-multipart-upload
Multipart Uploads using Laravel, AWS S3, and Uppy
Stars: ✭ 30 (-77.78%)
Mutual labels:  aws-s3, s3
s3cli
Command line tool for S3
Stars: ✭ 21 (-84.44%)
Mutual labels:  aws-s3, s3
simple-flask-s3-uploader
Simple and easy to use Flask app to upload files to Amazon S3. Based on Python, Flask, and using Boto3. Securely storing your AWS credentials as environment variables. Quick AWS S3 Flask uploader example.
Stars: ✭ 24 (-82.22%)
Mutual labels:  aws-s3, s3
Laravel-FluentLogger
fluent logger for laravel (with Monolog handler for Fluentd)
Stars: ✭ 55 (-59.26%)
Mutual labels:  logger, fluentd
flask-drive
A simple Flask app to upload and download files off Amazon's S3
Stars: ✭ 23 (-82.96%)
Mutual labels:  aws-s3, s3
Node S3 Uploader
Flexible and efficient resize, rename, and upload images to Amazon S3 disk storage. Uses the official AWS Node SDK for transfer, and ImageMagick for image processing. Support for multiple image versions targets.
Stars: ✭ 237 (+75.56%)
Mutual labels:  aws-s3, s3
lamba-thumbnailer
AWS S3 Video Thumbnailer with Lambda
Stars: ✭ 21 (-84.44%)
Mutual labels:  aws-s3, s3
fss3
FSS3 is an S3 filesystem abstraction layer for Golang
Stars: ✭ 52 (-61.48%)
Mutual labels:  aws-s3, s3
punic
Punic is a remote cache CLI built for Carthage and Apple .xcframework
Stars: ✭ 25 (-81.48%)
Mutual labels:  aws-s3, s3

Fluency

Maven Central Coverage Status

High throughput data ingestion logger to Fluentd, AWS S3 and Treasure Data

This document is for version 2. If you're looking for a document for version 1, see this.

Ingestion to Fluentd

Features

Install

Gradle

dependencies {
    compile "org.komamitsu:fluency-core:${fluency.version}"
    compile "org.komamitsu:fluency-fluentd:${fluency.version}"
}

Maven

<dependency>
    <groupId>org.komamitsu</groupId>
    <artifactId>fluency-core</artifactId>
    <version>${fluency.version}</version>
</dependency>

<dependency>
    <groupId>org.komamitsu</groupId>
    <artifactId>fluency-fluentd</artifactId>
    <version>${fluency.version}</version>
</dependency>

Usage

Create Fluency instance

For single Fluentd
// Single Fluentd(localhost:24224 by default)
//   - TCP heartbeat (by default)
//   - Asynchronous flush (by default)
//   - Without ack response (by default)
//   - Flush attempt interval is 600ms (by default)
//   - Initial chunk buffer size is 1MB (by default)
//   - Threshold chunk buffer size to flush is 4MB (by default)
//   - Threshold chunk buffer retention time to flush is 1000 ms (by default)
//   - Max total buffer size is 512MB (by default)
//   - Use off heap memory for buffer pool (by default)
//   - Max retries of sending events is 8 (by default)
//   - Max wait until all buffers are flushed is 10 seconds (by default)
//   - Max wait until the flusher is terminated is 10 seconds (by default)
//   - Socket connection timeout is 5000 ms (by default)
//   - Socket read timeout is 5000 ms (by default)
Fluency fluency = new FluencyBuilderForFluentd().build();
For multiple Fluentd with failover
// Multiple Fluentd(localhost:24224, localhost:24225)
Fluency fluency = new FluencyBuilderForFluentd().build(
        Arrays.asList(
                new InetSocketAddress(24224),
                new InetSocketAddress(24225)));
Enable ACK response mode
// Single Fluentd(localhost:24224)
//   - With ack response
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setAckResponseMode(true);
Fluency fluency = builder.build();
Enable file backup mode

In this mode, Fluency takes backup of unsent memory buffers as files when closing and then resends them when restarting

// Single Fluentd(localhost:24224)
//   - Backup directory is the temporary directory
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setFileBackupDir(System.getProperty("java.io.tmpdir"));
Fluency fluency = builder.build();
Buffer configuration

Fluency has some parameters to configure a flush timing of buffer. This diagram may help to understand it. buffer-flush

For high throughput data ingestion with high latency
// Single Fluentd(xxx.xxx.xxx.xxx:24224)
//   - Initial chunk buffer size is 16MB
//   - Threshold chunk buffer size to flush is 64MB
//     Keep this value (BufferRetentionSize) between `Initial chunk buffer size` and `Max total buffer size`
//   - Max total buffer size = 1024MB
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setBufferChunkInitialSize(16 * 1024 * 1024);
builder.setBufferChunkRetentionSize(64 * 1024 * 1024);
builder.setMaxBufferSize(1024 * 1024 * 1024L);
Fluency fluency = builder.build("xxx.xxx.xxx.xxx", 24224);
Socket configuration
// Single Fluentd(localhost:24224)
//   - Socket connection timeout is 15000 ms
//   - Socket read timeout is 10000 ms
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setConnectionTimeoutMilli(15000);
builder.setReadTimeoutMilli(10000);
Fluency fluency = builder.build();
Waits on close sequence
// Single Fluentd(localhost:24224)
//   - Max wait until all buffers are flushed is 30 seconds
//   - Max wait until the flusher is terminated is 40 seconds
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setWaitUntilBufferFlushed(30);
builder.setWaitUntilFlusherTerminated(40);
Fluency fluency = builder.build();
Register Jackson modules
// Single Fluentd(localhost:24224)
//   - SimpleModule that has FooSerializer is enabled
SimpleModule simpleModule = new SimpleModule();
simpleModule.addSerializer(Foo.class, new FooSerializer());
FluentdRecordFormatter.Config recordFormatterConfig =
	new FluentdRecordFormatter.Config();
recordFormatterConfig.setJacksonModules(
	Collections.singletonList(simpleModule));
FluencyBuilderForFluentd builder = new FluencyBuilder();
builder.setRecordFormatter(new FluentdRecordFormatter(recordFormatterConfig));

Fluency fluency = builder.build();
Set a custom error handler
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setErrorHandler(ex -> {
  // Send a notification
});
Fluency fluency = builder.build();

    :

// If flushing events to Fluentd fails and retried out, the error handler is called back.
fluency.emit("foo.bar", event);
Send requests over SSL/TLS
// Single Fluentd(localhost:24224)
//   - Enable SSL/TLS
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setSslEnabled(true);
Fluency fluency = builder.build();

If you want to use a custom truststore, specify the JKS file path using -Djavax.net.ssl.trustStore (and -Djavax.net.ssl.trustStorePassword if needed). You can create a custom truststore like this:

$ keytool -import -file server.crt -alias mytruststore -keystore truststore.jks

For server side configuration, see https://docs.fluentd.org/v1.0/articles/in_forward#how-to-enable-tls/ssl-encryption .

Mutual TLS

See this project.

Other configurations
// Multiple Fluentd(localhost:24224, localhost:24225)
//   - Flush attempt interval is 200ms
//   - Max retry of sending events is 12
//   - Use JVM heap memory for buffer pool
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setFlushAttemptIntervalMillis(200);
builder.setSenderMaxRetryCount(12);
builder.setJvmHeapBufferMode(true);
Fluency fluency = builder.build(
        Arrays.asList(
                new InetSocketAddress(24224),
                new InetSocketAddress(24225));

Emit event

String tag = "foo_db.bar_tbl";
Map<String, Object> event = new HashMap<String, Object>();
event.put("name", "komamitsu");
event.put("age", 42);
event.put("rate", 3.14);
fluency.emit(tag, event);

If you want to use EventTime as a timestamp, call Fluency#emit with an EventTime object in the following way

int epochSeconds;
int nanoseconds;
    :
EventTime eventTime = EventTime.fromEpoch(epochSeconds, nanoseconds);

// You can also create an EventTime object like this
// EventTime eventTime = EventTime.fromEpochMilli(System.currentTimeMillis());

fluency.emit(tag, eventTime, event);

Error handling

Fluency#emit keeps buffered data in memory even if a retriable exception happens. But in case of buffer full, the method throws org.komamitsu.fluency.BufferFullException. There are 2 options to handle the exception.

a) Ignore the exception so that main application isn't blocked
try {
    fluency.emit(tag, event);
}
catch (BufferFullException e) {
    // Just log the error and move forward
    logger.warn("Fluency's buffer is full", e);
}
b) Retry until the data is successfully buffered
// Considering maximum retry count would be also good
while (true) {
	try {
	    fluency.emit(tag, event);
	    break;
	}
	catch (BufferFullException e) {
	    // Log the error, sleep and retry
	    logger.warn("Fluency's buffer is full. Retrying", e);
	    TimeUnit.SECONDS.sleep(5);
	}
}

Which to choose depends on how important the data is and how long the application can be blocked.

Wait until buffered data is flushed and release resource

fluency.close();

Know how much Fluency is allocating memory

LOG.debug("Memory size allocated by Fluency is {}", fluency.getAllocatedBufferSize());

Know how much Fluency is buffering unsent data in memory

LOG.debug("Unsent data size buffered by Fluency in memory is {}", fluency.getBufferedDataSize());

Ingestion to Treasure Data

Features

  • Asynchronous flush
  • Backup of buffered data on local disk
  • Automatic database/table creation

Install

Gradle

dependencies {
    compile "org.komamitsu:fluency-core:${fluency.version}"
    compile "org.komamitsu:fluency-treasuredata:${fluency.version}"
}

Maven

<dependency>
    <groupId>org.komamitsu</groupId>
    <artifactId>fluency-core</artifactId>
    <version>${fluency.version}</version>
</dependency>

<dependency>
    <groupId>org.komamitsu</groupId>
    <artifactId>fluency-treasuredata</artifactId>
    <version>${fluency.version}</version>
</dependency>

Create Fluency instance

Default configuration
// Asynchronous flush (by default)
// Flush attempt interval is 600ms (by default)
// Initial chunk buffer size is 4MB (by default)
// Threshold chunk buffer size to flush is 64MB (by default)
// Threshold chunk buffer retention time to flush is 30000 ms (by default)
// Max total buffer size is 512MB (by default)
// Use off heap memory for buffer pool (by default)
// Max retries of sending events is 10 (by default)
// Max wait until all buffers are flushed is 10 seconds (by default)
// Max wait until the flusher is terminated is 10 seconds (by default)
Fluency fluency = new FluencyBuilderForTreasureData().build(yourApiKey);
Buffer configuration for high throughput data ingestion with high latency
// Initial chunk buffer size is 32MB
// Threshold chunk buffer size to flush is 256MB
// Threshold chunk buffer retention time to flush is 120 seconds
// Max total buffer size is 1024MB
// Sender's working buffer size 32KB
FluencyBuilderForTreasureData builder = new FluencyBuilderForTreasureData();
builder.setBufferChunkInitialSize(32 * 1024 * 1024);
builder.setMaxBufferSize(1024 * 1024 * 1024L);
builder.setBufferChunkRetentionSize(256 * 1024 * 1024);
builder.setBufferChunkRetentionTimeMillis(120 * 1000);
builder.setSenderWorkBufSize(32 * 1024);
Fluency fluency = builder.build(yourApiKey);
Customize Treasure Data endpoint
Fluency fluency = new FluencyBuilderForTreasureData()
						.build(yourApiKey, tdEndpoint);
Other configurations

Some of other usages are same as ingestion to Fluentd. See Ingestion to Fluentd > Usage above.

Ingestion to AWS S3

Features

  • Asynchronous flush
  • Backup of buffered data on local disk
  • Several format supports
    • CSV
    • JSONL
    • MessagePack
  • GZIP compression
  • Customizable S3 bucket/key decision rule

Install

Gradle

dependencies {
    compile "org.komamitsu:fluency-core:${fluency.version}"
    compile "org.komamitsu:fluency-aws-s3:${fluency.version}"
}

Maven

<dependency>
    <groupId>org.komamitsu</groupId>
    <artifactId>fluency-core</artifactId>
    <version>${fluency.version}</version>
</dependency>

<dependency>
    <groupId>org.komamitsu</groupId>
    <artifactId>fluency-aws-s3</artifactId>
    <version>${fluency.version}</version>
</dependency>

Create Fluency instance

Default configuration for JSONL format
// Asynchronous flush (by default)
// Flush attempt interval is 600ms (by default)
// Initial chunk buffer size is 4MB (by default)
// Threshold chunk buffer size to flush is 64MB (by default)
// Threshold chunk buffer retention time to flush is 30000 ms (by default)
// Max total buffer size is 512MB (by default)
// Use off heap memory for buffer pool (by default)
// Sender's working buffer size 8KB (by default)
// Max retries of sending events is 10 (by default)
// Initial retry interval of sending events is 1000 ms (by default)
// Retry backoff factor of sending events is 2.0 (by default)
// Max retry interval of sending events is 30000 ms (by default)
// Max wait until all buffers are flushed is 10 seconds (by default)
// Max wait until the flusher is terminated is 10 seconds (by default)
// Destination S3 bucket is specified by Fluency#emit()'s "tag" parameter (by default)
// Destination S3 key format is "yyyy/MM/dd/HH/mm-ss-SSSSSS" (by default)
// Destination S3 key is decided as UTC (by default)
// GZIP compression is enabled (by default)
// File format is JSONL
FluencyBuilderForAwsS3 builder = new FluencyBuilderForAwsS3();
builder.setFormatType(FluencyBuilderForAwsS3.FormatType.JSONL);
Fluency fluency = builder.build();
Default configuration for MessagePack format
// File format is MessagePack
FluencyBuilderForAwsS3 builder = new FluencyBuilderForAwsS3();
builder.setFormatType(FluencyBuilderForAwsS3.FormatType.MESSAGE_PACK);
Fluency fluency = builder.build();
Default configuration for CSV format
// File format is CSV
// Expected columns are "time", "age", "name", "comment"
FluencyBuilderForAwsS3 builder = new FluencyBuilderForAwsS3();
builder.setFormatType(FluencyBuilderForAwsS3.FormatType.CSV);
builder.setFormatCsvColumnNames(Arrays.asList("time", "age", "name", "comment"));
Fluency fluency = builder.build();
AWS S3 configuration

fluency-aws-s3 follows default credential provider chain. If you want to explicitly specify credentials, use the following APIs.

// AWS S3 region is "us-east-1"
// AWS S3 endpoint is "https://another.s3.endpoi.nt"
// AWS access key id is "ABCDEFGHIJKLMNOPQRST"
// AWS secret access key is "ZaQ1XsW2CdE3VfR4BgT5NhY6"
FluencyBuilderForAwsS3 builder = new FluencyBuilderForAwsS3();
builder.setFormatType(FluencyBuilderForAwsS3.FormatType.JSONL);
builder.setAwsRegion("us-east-1");
builder.setAwsEndpoint("https://another.s3.endpoi.nt");
builder.setAwsAccessKeyId("ABCDEFGHIJKLMNOPQRST");
builder.setAwsSecretAccessKey("ZaQ1XsW2CdE3VfR4BgT5NhY6");
Fluency fluency = builder.build();
Disable compression
// GZIP compression is disabled
FluencyBuilderForAwsS3 builder = new FluencyBuilderForAwsS3();
builder.setFormatType(FluencyBuilderForAwsS3.FormatType.JSONL);
builder.setCompressionEnabled(false);
Fluency fluency = builder.build();
Change timezone used in S3 key decision rule
// Destination S3 key is decided as JST
FluencyBuilderForAwsS3 builder = new FluencyBuilderForAwsS3();
builder.setFormatType(FluencyBuilderForAwsS3.FormatType.JSONL);
builder.setS3KeyTimeZoneId(ZoneId.of("JST", SHORT_IDS));
Fluency fluency = builder.build();
Customize S3 destination decision rule
// Destination S3 bucket is "fixed-bucket-name"
// Destination S3 key format is UNIX epoch seconds rounded to 1 hour range
FluencyBuilderForAwsS3 builder = new FluencyBuilderForAwsS3();
builder.setFormatType(FluencyBuilderForAwsS3.FormatType.JSONL);
builder.setCustomS3DestinationDecider((tag, time) ->
    new S3DestinationDecider.S3Destination(
        "fixed-bucket-name",
        String.format("%s-%d", tag, time.getEpochSecond() / 3600)
));
Fluency fluency = builder.build();
Buffer configuration for high throughput data ingestion with high latency
// Initial chunk buffer size is 32MB
// Threshold chunk buffer size to flush is 256MB
// Threshold chunk buffer retention time to flush is 120 seconds
// Max total buffer size is 1024MB
// Sender's working buffer size 32KB
FluencyBuilderForAwsS3 builder = new FluencyBuilderForAwsS3();
builder.setFormatType(FluencyBuilderForAwsS3.FormatType.JSONL);
builder.setBufferChunkInitialSize(32 * 1024 * 1024);
builder.setMaxBufferSize(1024 * 1024 * 1024L);
builder.setBufferChunkRetentionSize(256 * 1024 * 1024);
builder.setBufferChunkRetentionTimeMillis(120 * 1000);
builder.setSenderWorkBufSize(32 * 1024);
Fluency fluency = builder.build();
Retry configuration
// Max retries of sending events is 16
// Initial retry interval of sending events is 500 ms
// Retry backoff factor of sending events is 1.5
// Max retry interval of sending events is 20000 ms
FluencyBuilderForAwsS3 builder = new FluencyBuilderForAwsS3();
builder.setFormatType(FluencyBuilderForAwsS3.FormatType.JSONL);
builder.setSenderRetryMax(16);
builder.setSenderRetryIntervalMillis(500);
builder.setSenderRetryFactor(1.5f);
builder.setSenderMaxRetryIntervalMillis(20000);
Fluency fluency = builder.build();
Other configurations

Some of other usages are same as ingestion to Fluentd. See Ingestion to Fluentd > Usage above.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].