All Projects → aspecto-io → sns-sqs-big-payload

aspecto-io / sns-sqs-big-payload

Licence: Apache-2.0 license
Amazon SNS/SQS client library that enables sending and receiving messages with payload larger than 256KiB via Amazon S3.

Programming Languages

typescript
32286 projects
javascript
184084 projects - #8 most used programming language

Projects that are alternatives of or similar to sns-sqs-big-payload

blaster
Web hooks for message queues
Stars: ✭ 14 (-65%)
Mutual labels:  sqs, sqs-consumer
aws-sqs-sns-client
AWS SNS SQS client UI
Stars: ✭ 26 (-35%)
Mutual labels:  sqs, sns
Loafer
Asynchronous message dispatcher - Currently using asyncio and amazon SQS
Stars: ✭ 104 (+160%)
Mutual labels:  sqs, sns
Dazn Lambda Powertools
Powertools (logger, HTTP client, AWS clients, middlewares, patterns) for Lambda functions.
Stars: ✭ 501 (+1152.5%)
Mutual labels:  sqs, sns
ontopic
Display SNS messages on your terminal
Stars: ✭ 20 (-50%)
Mutual labels:  sqs, sns
Serverless
This is intended to be a repo containing all of the official AWS Serverless architecture patterns built with CDK for developers to use. All patterns come in Typescript and Python with the exported CloudFormation also included.
Stars: ✭ 1,048 (+2520%)
Mutual labels:  sqs, sns
Aws Sdk Perl
A community AWS SDK for Perl Programmers
Stars: ✭ 153 (+282.5%)
Mutual labels:  sqs, sns
sensu-plugins-aws
This plugin provides native AWS instrumentation for monitoring and metrics collection, including: health and metrics for various AWS services, such as EC2, RDS, ELB, and more, as well as handlers for EC2, SES, and SNS.
Stars: ✭ 79 (+97.5%)
Mutual labels:  sqs, sns
aws-developer-associate-certificate
Note to pass the AWS Developer Associate Exam
Stars: ✭ 53 (+32.5%)
Mutual labels:  sqs, sns
amazon-sns-java-extended-client-lib
This AWS SNS client library allows to publish messages to SNS that exceed the 256 KB message size limit.
Stars: ✭ 23 (-42.5%)
Mutual labels:  sqs, sns
Gizmo
A Microservice Toolkit from The New York Times
Stars: ✭ 3,566 (+8815%)
Mutual labels:  sqs, sns
mq-go
SQS Consumer Server for Go
Stars: ✭ 28 (-30%)
Mutual labels:  sqs, sqs-consumer
Lambdaguard
AWS Serverless Security
Stars: ✭ 300 (+650%)
Mutual labels:  sqs, sns
Brighter
Command Dispatcher, Processor, and Distributed Task Queue
Stars: ✭ 1,393 (+3382.5%)
Mutual labels:  sqs, sns
event-driven-messenger
No description or website provided.
Stars: ✭ 43 (+7.5%)
Mutual labels:  sqs, sns
Components Contrib
Community driven, reusable components for distributed apps
Stars: ✭ 131 (+227.5%)
Mutual labels:  sqs, sns
celery-connectors
Want to handle 100,000 messages in 90 seconds? Celery and Kombu are that awesome - Multiple publisher-subscriber demos for processing json or pickled messages from Redis, RabbitMQ or AWS SQS. Includes Kombu message processors using native Producer and Consumer classes as well as ConsumerProducerMixin workers for relay publish-hook or caching
Stars: ✭ 37 (-7.5%)
Mutual labels:  sqs, sqs-consumer
go-aws-msg
AWS Pub/Sub Primitives for Go
Stars: ✭ 22 (-45%)
Mutual labels:  sqs, sns
Justsaying
A light-weight message bus on top of AWS services (SNS and SQS).
Stars: ✭ 157 (+292.5%)
Mutual labels:  sqs, sns
django-eb-sqs-worker
Django Background Tasks for Amazon Elastic Beanstalk
Stars: ✭ 27 (-32.5%)
Mutual labels:  sqs, sqs-consumer

Build PRs Welcome TypeScript NPM version

sns-sqs-big-payload

SQS/SNS producer/consumer library. Provides an ability to pass payloads though s3.

Motivation

Aspecto helps modern development teams solve production issues before they evolve. We collect real production data and perform deep API analysis over it to autogenerate tests and monitor services stability. As a result, we often need to handle large payloads which can't be used with SQS & SNS due to the hard size limit. This library was developed to overcome this challenge - it enables you to manage Amazon SNS & SQS message payloads with Amazon S3 when dealing with payloads larger than 256KB. Key functionality includes:

  • Controlling whether message payloads are always stored in Amazon S3 or only when a message's size exceeds 256KB.
  • Send a message that references a single message object stored in an Amazon S3 bucket.
  • Get the corresponding message object from an Amazon S3 bucket.
  • Handle the interface for large messages between SNS to SQS via S3 bucket in the middle

Installation

npm install sns-sqs-big-payload

Important:

Make sure you also have aws-sdk installed, because it's listed as a peer dependency, so won't be installed automatically.

Usage

The library exports 3 clients:

  • SNS producer
  • SQS producer
  • SQS consumer

All 3 clients are under the same repository since they share a similar contract when sending payloads via S3.

SNS Producer

import { SnsProducer } from 'sns-sqs-big-payload';

const snsProducer = SnsProducer.create({
    topicArn: '<topic-arn>',
    region: 'us-east-1',
    // to enable sending large payloads (>256KiB) though S3
    largePayloadThoughS3: true,
    // Opt-in to enable compatibility with
    // Amazon SQS Extended Client Java Library (and other compatible libraries).
    // see https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-s3-messages.html
    extendedLibraryCompatibility: boolean,
    s3EndpointUrl: '...',
});

await snsProducer.sendJSON({
    // ...
});

SQS Producer

import { SqsProducer } from 'sns-sqs-big-payload';

const sqsProducer = SqsProducer.create({
    queueUrl: '...',
    region: 'us-east-1',
    // to enable sending large payloads (>256KiB) though S3
    largePayloadThoughS3: true,
    // Opt-in to enable compatibility with
    // Amazon SQS Extended Client Java Library (and other compatible libraries).
    // see https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-s3-messages.html
    extendedLibraryCompatibility: boolean,
    s3Bucket: '...',
});

await sqsProducer.sendJSON({
    // ...
});

SQS Consumer

import { SqsConsumer, SqsConsumerEvents } from 'sns-sqs-big-payload';

const sqsConsumer = SqsConsumer.create({
    queueUrl: '...',
    region: 'us-east-1',
    // to enable loading payloads from S3 automatically
    getPayloadFromS3: true,
    s3Bucket: '...',
    // if the queue is subscribed to SNS
    // the message will arrive wrapped in sns envelope
    // so we need to unwrap it first
    transformMessageBody: (body) => {
        const snsMessage = JSON.parse(body);
        return snsMessage.Message;
    },
    // if you expect json payload - use `parsePayload` hook to parse it
    parsePayload: (raw) => JSON.parse(raw),
    // message handler, payload already parsed at this point
    handleMessage: async ({ payload }) => {
        // ...
    },
    // Opt-in to enable compatibility with
    // Amazon SQS Extended Client Java Library (and other compatible libraries).
    // see https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-s3-messages.html
    extendedLibraryCompatibility: boolean,
});

// to subscribe for events
sqsConsumer.on(SqsConsumerEvents.messageProcessed, () => {
    // ...
});

sqsConsumer.start();

// to stop processing
sqsConsumer.stop();
  • The queue is polled continuously for messages using long polling.
  • Messages are deleted from the queue once the handler function has completed successfully.
  • Throwing an error (or returning a rejected promise) from the handler function will cause the message to be left on the queue. An SQS redrive policy can be used to move messages that cannot be processed to a dead letter queue.
  • By default messages are processed by 10 at a time – a new batch won't be received until the previous one is processed. To adjust number of messages that is being processed in parallel, use the batchSize option detailed below.

Usage in lambda

If you have a lambda function subscribed to sqs queue, you can use SqsConsumer in this case too. This is a short guide.

Compatibility with libraries in other languages

If you turn on extendedLibraryCompatibility, then the library will be compatible with:

Please be careful: This mode is not compatible with the standard mode due to differences in s3 payload.

Credentials

By default the consumer will look for AWS credentials in the places specified by the AWS SDK. The simplest option is to export your credentials as environment variables:

export AWS_SECRET_ACCESS_KEY=...
export AWS_ACCESS_KEY_ID=...

If you need to specify your credentials manually, you can use a pre-configured instance of the AWS SQS client:

import { SqsConsumer } from 'sns-sqs-big-payload';
import * as aws from 'aws-sdk';

aws.config.update({
    region: 'us-east-1',
    accessKeyId: '...',
    secretAccessKey: '...',
});

const consumer = SqsConsumer.create({
    queueUrl: 'https://sqs.us-east-1.amazonaws.com/account-id/queue-name',
    handleMessage: async (message) => {
        // ...
    },
    sqs: new aws.SQS(),
});

consumer.start();

Events and logging

SqsConsumer has an internal EventEmitter, you can subscribe for events like this:

sqsConsumer.on(SqsConsumerEvents.messageProcessed, () => {
    // ...
});

It sends the following events:

Event Params Description
started None Fires when the polling is started
message-received message Fires when a message is received (one per each message, not per batch)
message-processed message Fires after the message is successfully processed and removed from the queue
batch-processed None Fires after the current batch of messages is processed.
poll-ended None Fires after the polling cycle is ended. Useful for graceful shutdown.
stopped None Fires when the polling stops
error {err, message} Fires in case of processing error
s3-payload-error {err, message} Fires when an error occurs during downloading payload from s3
s3-extended-payload-error {err, message} Fires when a payload from s3 using extended compatibility is not in expected format
processing-error {err, message} Fires when an error occurs during processing (only inside handleMessage function)
connection-error err Fires when a connection error occurs during polling (retriable)
payload-parse-error err Fires when a connection error occurs during parsing

You can also use this enum if you're using TypeScript

enum SqsConsumerEvents {
    started = 'started',
    messageReceived = 'message-received',
    messageProcessed = 'message-processed',
    batchProcessed = 'batch-processed',
    pollEnded = 'poll-ended',
    stopped = 'stopped',
    error = 'error',
    s3PayloadError = 's3-payload-error',
    s3ExtendedPayloadError = 's3-extended-payload-error',
    processingError = 'processing-error',
    connectionError = 'connection-error',
    payloadParseError = 'payload-parse-error',
}

You may subscribe to those events to add logging for example.

Testing

Since this library heavily relies on AWS APIs, it is less relevant to run an isolated test using mocks. As a result, we recommend testing it using a localstack or by using real SQS queues and SNS topics.

To run localstack on mac:

TMPDIR=/private$TMPDIR docker-compose up

To run unit tests:

npm test
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].