All Projects → lifion → Lifion Kinesis

lifion / Lifion Kinesis

Licence: mit
A native Node.js producer and consumer library for Amazon Kinesis Data Streams

Programming Languages

javascript
184084 projects - #8 most used programming language

Projects that are alternatives of or similar to Lifion Kinesis

Aws Sdk Java
The official AWS SDK for Java.
Stars: ✭ 3,662 (+6681.48%)
Mutual labels:  aws, amazon
Ansible For Devops
Ansible for DevOps examples.
Stars: ✭ 5,265 (+9650%)
Mutual labels:  aws, amazon
Shep
A framework for building JavaScript Applications with AWS API Gateway and Lambda
Stars: ✭ 376 (+596.3%)
Mutual labels:  aws, amazon
Aws.s3
Amazon Simple Storage Service (S3) API Client
Stars: ✭ 302 (+459.26%)
Mutual labels:  aws, amazon
Apai Io
DISCONTINUED Amazon Product Adverstising Library based on PHP REST and SOAP (only V1) using the Product Advertising API.
Stars: ✭ 647 (+1098.15%)
Mutual labels:  aws, amazon
Ec2instances.info
Amazon EC2 instance comparison site
Stars: ✭ 3,619 (+6601.85%)
Mutual labels:  aws, amazon
Data Science Ipython Notebooks
Data science Python notebooks: Deep learning (TensorFlow, Theano, Caffe, Keras), scikit-learn, Kaggle, big data (Spark, Hadoop MapReduce, HDFS), matplotlib, pandas, NumPy, SciPy, Python essentials, AWS, and various command lines.
Stars: ✭ 22,048 (+40729.63%)
Mutual labels:  aws, big-data
Utern
Multi group and stream log tailing for AWS CloudWatch Logs.
Stars: ✭ 241 (+346.3%)
Mutual labels:  aws, amazon
Amazon Sagemaker Examples
Example 📓 Jupyter notebooks that demonstrate how to build, train, and deploy machine learning models using 🧠 Amazon SageMaker.
Stars: ✭ 6,346 (+11651.85%)
Mutual labels:  aws, amazon
Saws
A supercharged AWS command line interface (CLI).
Stars: ✭ 4,886 (+8948.15%)
Mutual labels:  aws, client
Ccat
Cloud Container Attack Tool (CCAT) is a tool for testing security of container environments.
Stars: ✭ 300 (+455.56%)
Mutual labels:  aws, amazon
Aws Toolkit Vscode
AWS Toolkit for Visual Studio Code, an extension for working with AWS services including AWS Lambda.
Stars: ✭ 823 (+1424.07%)
Mutual labels:  aws, amazon
Laravel Aws Eb
Ready-to-deploy configuration to run Laravel on AWS Elastic Beanstalk.
Stars: ✭ 247 (+357.41%)
Mutual labels:  aws, amazon
Dynamon
😈 Dynamon is GUI client for DynamoDB, can connect local dynamodb.
Stars: ✭ 324 (+500%)
Mutual labels:  aws, client
Aws Etl Orchestrator
A serverless architecture for orchestrating ETL jobs in arbitrarily-complex workflows using AWS Step Functions and AWS Lambda.
Stars: ✭ 245 (+353.7%)
Mutual labels:  aws, big-data
Aws Google Auth
Provides AWS STS credentials based on Google Apps SAML SSO auth (what a jumble!)
Stars: ✭ 428 (+692.59%)
Mutual labels:  aws, amazon
Cognito Express
Authenticates API requests on a Node application by verifying the JWT signature of AccessToken or IDToken generated by Amazon Cognito.
Stars: ✭ 165 (+205.56%)
Mutual labels:  aws, amazon
Aws Lambda Fastify
Insipired by aws-serverless-express to work with Fastify with inject functionality.
Stars: ✭ 190 (+251.85%)
Mutual labels:  aws, amazon
Aws
A collection of bash shell scripts for automating various tasks with Amazon Web Services using the AWS CLI and jq.
Stars: ✭ 493 (+812.96%)
Mutual labels:  aws, amazon
Cfn nag
Linting tool for CloudFormation templates
Stars: ✭ 808 (+1396.3%)
Mutual labels:  aws, amazon

lifion-kinesis

npm version

Lifion's Node.js client for Amazon Kinesis Data Streams.

Getting Started

To install the module:

npm install lifion-kinesis --save

The main module export is a Kinesis class that instantiates as a readable stream.

const Kinesis = require('lifion-kinesis');

const kinesis = new Kinesis({
  streamName: 'sample-stream'
  /* other options from AWS.Kinesis */
});
kinesis.on('data', (data) => {
  console.log('Incoming data:', data);
});
kinesis.startConsumer();

To take advantage of back-pressure, the client can be piped to a writable stream:

const Kinesis = require('lifion-kinesis');
const { pipeline } = require('stream');

pipeline([
  new Kinesis(/* options */),
  new Writable({
    objectMode: true,
    write(data, encoding, callback) {
      console.log(data);
      callback();
    }
  })
]);

Features

  • Standard Node.js stream abstraction of Kinesis streams.
  • Node.js implementation of the new enhanced fan-out feature.
  • Optional auto-creation, encryption, and tagging of Kinesis streams.
  • Support for a polling mode, using the GetRecords API, with automatic checkpointing.
  • Support for multiple concurrent consumers through automatic assignment of shards.
  • Support for sending messages to streams, with auto-retries.

API Reference

Kinesis ⇐ PassThrough

A pass-through stream class specialization implementing a consumer of Kinesis Data Streams using the AWS SDK for JavaScript. Incoming data can be retrieved through either the data event or by piping the instance to other streams.

Kind: Exported class
Extends: PassThrough

new Kinesis(options)

Initializes a new instance of the Kinesis client.

Param Type Default Description
options object The initialization options. In addition to the below options, it can also contain any of the AWS.Kinesis options.
[options.compression] string The kind of data compression to use with records. The currently available compression options are either "LZ-UTF8" or none.
[options.consumerGroup] string The name of the group of consumers in which shards will be distributed and checkpoints will be shared. If not provided, it defaults to the name of the application/project using this module.
[options.createStreamIfNeeded] boolean true Whether if the Kinesis stream should be automatically created if it doesn't exist upon connection
[options.dynamoDb] object {} The initialization options for the DynamoDB client used to store the state of the consumers. In addition to tableNames and tags, it can also contain any of the AWS.DynamoDB options.
[options.dynamoDb.tableName] string The name of the table in which to store the state of consumers. If not provided, it defaults to "lifion-kinesis-state".
[options.dynamoDb.tags] object If provided, the client will ensure that the DynamoDB table where the state is stored is tagged with these tags. If the table already has tags, they will be merged.
[options.encryption] object The encryption options to enforce in the stream.
[options.encryption.type] string The encryption type to use.
[options.encryption.keyId] string The GUID for the customer-managed AWS KMS key to use for encryption. This value can be a globally unique identifier, a fully specified ARN to either an alias or a key, or an alias name prefixed by "alias/".
[options.limit] number 10000 The limit of records per get records call (only applicable with useEnhancedFanOut is set to false)
[options.logger] object An object with the warn, debug, and error functions that will be used for logging purposes. If not provided, logging will be omitted.
[options.maxEnhancedConsumers] number 5 An option to set the number of enhanced fan-out consumer ARNs that the module should initialize. Defaults to 5. Providing a number above the AWS limit (20) or below 1 will result in using the default.
[options.noRecordsPollDelay] number 1000 The delay in milliseconds before attempting to get more records when there were none in the previous attempt (only applicable when useEnhancedFanOut is set to false)
[options.pollDelay] number 250 When the usePausedPolling option is false, this option defines the delay in milliseconds in between poll requests for more records (only applicable when useEnhancedFanOut is set to false)
[options.s3] object {} The initialization options for the S3 client used to store large items in buckets. In addition to bucketName and endpoint, it can also contain any of the AWS.S3 options.
[options.s3.bucketName] string The name of the bucket in which to store large messages. If not provided, it defaults to the name of the Kinesis stream.
[options.s3.largeItemThreshold] number 900 The size in KB above which an item should automatically be stored in s3.
[options.s3.nonS3Keys] Array.<string> [] If the useS3ForLargeItems option is set to true, the nonS3Keys option lists the keys that will be sent normally on the kinesis record.
[options.s3.tags] string If provided, the client will ensure that the S3 bucket is tagged with these tags. If the bucket already has tags, they will be merged.
[options.shardCount] number 1 The number of shards that the newly-created stream will use (if the createStreamIfNeeded option is set)
[options.shouldParseJson] string | boolean "auto" Whether if retrieved records' data should be parsed as JSON or not. Set to "auto" to only attempt parsing if data looks like JSON. Set to true to force data parse.
[options.statsInterval] number 30000 The interval in milliseconds for how often to emit the "stats" event. The event is only available while the consumer is running.
options.streamName string The name of the stream to consume data from (required)
[options.supressThroughputWarnings] boolean false Set to true to make the client log ProvisionedThroughputExceededException as debug rather than warning.
[options.tags] object If provided, the client will ensure that the stream is tagged with these tags upon connection. If the stream is already tagged, the existing tags will be merged with the provided ones before updating them.
[options.useAutoCheckpoints] boolean true Set to true to make the client automatically store shard checkpoints using the sequence number of the most-recently received record. If set to false consumers can use the setCheckpoint() function to store any sequence number as the checkpoint for the shard.
[options.useAutoShardAssignment] boolean true Set to true to automatically assign the stream shards to the active consumers in the same group (so only one client reads from one shard at the same time). Set to false to make the client read from all shards.
[options.useEnhancedFanOut] boolean false Set to true to make the client use enhanced fan-out consumers to read from shards.
[options.usePausedPolling] boolean false Set to true to make the client not to poll for more records until the consumer calls continuePolling(). This option is useful when consumers want to make sure the records are fully processed before receiving more (only applicable when useEnhancedFanOut is set to false)
[options.useS3ForLargeItems] boolean false Whether to automatically use an S3 bucket to store large items or not.
[options.leaseAcquisitionInterval] number 20000 The interval in milliseconds for how often to attempt lease acquisitions.

kinesis.startConsumer() ⇒ Promise

Starts the stream consumer, by ensuring that the stream exists, that it's ready, and configured as requested. The internal managers that deal with heartbeats, state, and consumers will also be started.

Kind: instance method of Kinesis
Fulfil: undefined - Once the consumer has successfully started.
Reject: Error - On any unexpected error while trying to start.

kinesis.stopConsumer()

Stops the stream consumer. The internal managers will also be stopped.

Kind: instance method of Kinesis

kinesis.putRecord(params) ⇒ Promise

Writes a single data record into a stream.

Kind: instance method of Kinesis
Fulfil: Object - The de-serialized data returned from the request.
Reject: Error - On any unexpected error while writing to the stream.

Param Type Description
params object The parameters.
params.data * The data to put into the record.
[params.explicitHashKey] string The hash value used to explicitly determine the shard the data record is assigned to by overriding the partition key hash.
[params.partitionKey] string Determines which shard in the stream the data record is assigned to. If omitted, it will be calculated based on a SHA-1 hash of the data.
[params.sequenceNumberForOrdering] string Set this to the sequence number obtained from the last put record operation to guarantee strictly increasing sequence numbers, for puts from the same client and to the same partition key. If omitted, records are coarsely ordered based on arrival time.
[params.streamName] string If provided, the record will be put into the specified stream instead of the stream name provided during the consumer instantiation.

kinesis.listShards(params) ⇒ Promise

List the shards of a stream.

Kind: instance method of Kinesis
Fulfil: Object - The de-serialized data returned from the request.
Reject: Error - On any unexpected error while writing to the stream.

Param Type Description
params object The parameters.
[params.streamName] string If provided, the method will list the shards of the specific stream instead of the stream name provided during the consumer instantiation.

kinesis.putRecords(params) ⇒ Promise

Writes multiple data records into a stream in a single call.

Kind: instance method of Kinesis
Fulfil: Object - The de-serialized data returned from the request.
Reject: Error - On any unexpected error while writing to the stream.

Param Type Description
params object The parameters.
params.records Array.<object> The records associated with the request.
params.records[].data * The record data.
[params.records[].explicitHashKey] string The hash value used to explicitly determine the shard the data record is assigned to by overriding the partition key hash.
[params.records[].partitionKey] string Determines which shard in the stream the data record is assigned to. If omitted, it will be calculated based on a SHA-1 hash of the data.
[params.streamName] string If provided, the record will be put into the specified stream instead of the stream name provided during the consumer instantiation.

kinesis.getStats() ⇒ object

Returns statistics for the instance of the client.

Kind: instance method of Kinesis
Returns: object - An object with the statistics.

Kinesis.getStats() ⇒ object

Returns the aggregated statistics of all the instances of the client.

Kind: static method of Kinesis
Returns: object - An object with the statistics.

License

MIT

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].