matrus2 / Dynamodb Stream Elasticsearch
Programming Languages
Labels
Projects that are alternatives of or similar to Dynamodb Stream Elasticsearch
_ _ _
__| | _ _ _ _ __ _ _ __ ___ __| || |__
/ _` || || || ' \ / _` || ' \ / _ \/ _` || '_ \
\__,_| \_, ||_||_|\__,_||_|_|_|\___/\__,_||_.__/
|__/ _
___| |_ _ _ ___ __ _ _ __
(_-<| _|| '_|/ -_)/ _` || ' \
/__/ \__||_| \___|\__,_||_|_|_|
_ _ _ _
___ | | __ _ ___| |_ (_) __ ___ ___ __ _ _ _ __ | |_
/ -_)| |/ _` |(_-<| _|| |/ _|(_-</ -_)/ _` || '_|/ _|| ' \
\___||_|\__,_|/__/ \__||_|\__|/__/\___|\__,_||_| \__||_||_|
DynamoDB --> Stream --> Elasticsearch
The missing blueprint for AWS Lambda, which reads stream from AWS DynamoDB and writes it to Elasticsearch.
Whenever data is changed (modified, removed or inserted) in DynamoDB one can use AWS Lambda function to capture this change and update Elasticsearch machine immediately. Further reading:
Indexing Amazon DynamoDB Content with Amazon Elasticsearch Service Using AWS Lambda
Getting Started
Install:
npm i dynamodb-stream-elasticsearch
Use it in your lambda:
const { pushStream } = require('dynamodb-stream-elasticsearch');
const { ES_ENDPOINT, INDEX } = process.env;
function myHandler(event, context, callback) {
console.log('Received event:', JSON.stringify(event, null, 2));
pushStream({ event, endpoint: ES_ENDPOINT, index: INDEX })
.then(() => {
callback(null, `Successfully processed ${event.Records.length} records.`);
})
.catch((e) => {
callback(`Error ${e}`, null);
});
}
exports.handler = myHandler;
Upload Lambda to AWS and star this repository if it works as expected!!
Parameters
Param | Description | Required |
---|---|---|
event | Event object generated by the stream (pass it as it is and don't modify) | required |
endpoint | Exact url of Elasticsearch instance (it works with AWS ES and standard ES) (string) | required |
index | The name of Elasticsearch index (string). If not provided will set the same as DynamoDB table name | optional |
refresh | Force Elasticsearch refresh its index immediately more here (boolean). Default: true | optional |
useBulk | Enables bulk upserts and removals (boolean). Default: false | optional |
transformFunction | A function/promise to transform each record before sending them to ES. Applies to INSERT and UPDATE operations. If transformFunction returns an empty object or false the row will be skipped. This function will receive body (NewImage), oldBody (OldImage) and (record) as the whole record as arguments. |
optional |
elasticSearchOptions | Additional set of arguments passed to elasticsearch Client see here | optional |
Running the tests
Setup elastic node
Docker can be used to host a node of Elasticsearch
Docker by default tries to pull the elasticsearch:latest
tag from the repository if no version is specified.
The tag latest`` does not exists, therefore a specific version needs to be specified ie:
7.2.0```.
To run tests locally you need to have an Elasticsearch Docker container running. Simply type:
docker run -i -p 9200:9200 --name my_elastic_7_10 -p 9300:9300 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:7.10.1
Running tests
Tests are written using Mocha [https://mochajs.org/]. Tests can be launched using:
npm test
Contributing
If you want to commit changes, make sure if follow these rules:
- All code changes should go with a proper integration test;
- Code should follow Javascript Standard Guideline;
- Commit messages should be set according to this article.
TODO
- Introduce Continuous Integration;
- Add Elasticsearch bulk operation instead of index for multiple records
Authors & Contributors
License
This project is licensed under the MIT License - see the LICENSE.md file for details
Donate
If you find this project to be useful and you would like to support the author for maintaining it, you might consider to make any donation under this link:
Release notes:
Compatible with node 8.10. (If for some reason you want to use it with node 6.10, then use 1.0.0 of this module)
Third version doesn't support types as they were deprecated in ES 7.0.