All Projects → vascokk → Rivus_cep

vascokk / Rivus_cep

Licence: apache-2.0
Complex event processing in Erlang

Programming Languages

erlang
1774 projects

Labels

Projects that are alternatives of or similar to Rivus cep

gocells
Event Based Applications [DEPRECATED]
Stars: ✭ 69 (-50.71%)
Mutual labels:  cep
Parcel Plugin Cep
Zero configuration CEP extension builder for Parcel
Stars: ✭ 29 (-79.29%)
Mutual labels:  cep
Siddhi
Stream Processing and Complex Event Processing Engine
Stars: ✭ 1,185 (+746.43%)
Mutual labels:  cep
Getting Started Guides
Getting Started guides and samples for CEP extensions
Stars: ✭ 296 (+111.43%)
Mutual labels:  cep
Esper
Esper Complex Event Processing, Streaming SQL and Event Series Analysis
Stars: ✭ 680 (+385.71%)
Mutual labels:  cep
Brazilian Utils
Utils library for specific Brazilian businesses
Stars: ✭ 1,023 (+630.71%)
Mutual labels:  cep
EsperIoT
Small and simple stream-based CEP tool for IoT devices connected to an MQTT broker
Stars: ✭ 18 (-87.14%)
Mutual labels:  cep
Ui Dna
programmable and semantically UI design tool for Photoshop
Stars: ✭ 126 (-10%)
Mutual labels:  cep
Kallewheel
A custom color wheel extension for Adobe Photoshop
Stars: ✭ 16 (-88.57%)
Mutual labels:  cep
Busca Cep
Módulo de node.js que busca por ceps do Brasil utilizando o serviço ViaCEP
Stars: ✭ 70 (-50%)
Mutual labels:  cep
Serendipity
Serendipity is an open source Customer Engagement Platform
Stars: ✭ 297 (+112.14%)
Mutual labels:  cep
Brazilian Values
🇧🇷 Funções de formatação, conversão e validação para valores, documentos e outras unidades usadas no Brasil. Como CEP, CNPJ, CPF, BRL (R$), datas, números etc.
Stars: ✭ 366 (+161.43%)
Mutual labels:  cep
Interference
opensource distributed database with base JPA implementation and event processing support
Stars: ✭ 57 (-59.29%)
Mutual labels:  cep
Kafkastreams Cep
Complex Event Processing on top of Kafka Streams
Stars: ✭ 257 (+83.57%)
Mutual labels:  cep
Eagle
Real time data processing system based on flink and CEP
Stars: ✭ 95 (-32.14%)
Mutual labels:  cep
burocracia.cr
No dependency Crystal shard to validate, generate and format Brazilian burocracias such as CPF, CNPJ and CEP
Stars: ✭ 21 (-85%)
Mutual labels:  cep
Grunt Cep
A Grunt plugin that helps debugging and packaging HTML5-based extensions for Adobe Creative Cloud applications.
Stars: ✭ 40 (-71.43%)
Mutual labels:  cep
Wayeb
Wayeb is a Complex Event Processing and Forecasting (CEP/F) engine written in Scala.
Stars: ✭ 138 (-1.43%)
Mutual labels:  cep
Correios Php
Uma maneira fácil de interagir com as principais funcionalidades dos Correios.
Stars: ✭ 116 (-17.14%)
Mutual labels:  cep
Cep Gratis
Com esse pacote você poderá realizar consultas de CEP gratuitamente.
Stars: ✭ 64 (-54.29%)
Mutual labels:  cep

Build Status Join the chat at https://gitter.im/vascokk/rivus_cep

Overview

Rivus CEP is an Erlang application for Complex Event Processing. It uses a declarative SQL-like DSL to define operations over event streams. In the CEP-based systems the events(data) are processed as they arrive, as opposite to the DB, where data is first persisted, then fetched and processed:

                               |------CEP Engine------| 
                               |                      | 
DataSource ------------------->|  Continuous Query    |-------------> Result Subscriber 
/Provider/     Event stream    | over a time interval |   Result         /Consumer/
                               |----------------------|
              

#Queries

There are two type of queries:

  • simple queries:
select 
    ev1.eventparam1, ev2.eventparam2, sum(ev2.eventparam3) 
from 
    event1 as ev1, event2 as ev2
where 
    ev1.eventparam2 = ev2.eventparam2
within 60 seconds

The above query will join all the events of type event1 and event2 arrived whithin the last 60 seconds ("sliding window"). In case of "join" queries, the events within the time window will be persisted in memory. For queries that do not require "join" - the result will be calculated immediately, wihout events persistence.

  • pattern matching queries:
select 
    ev1.eventparam1, ev2.eventparam2, ev2.eventparam3, ev2.eventparam4
from 
    event1 as ev1 -> event2 as ev2
where
    ev1.eventparam2 = ev2.eventparam2
within 60 seconds

Here the result will be generated only in case when event2 strictly follows event1 within a 60 seconds window. Pattern-based queries always persist the events. Pattern matching mechanism is based on a directed graph FSM, using the digraph module.

#Windows

A "window" in Rivus actually means two things:

  • a "time window" - the time interval, which the query operates on, and:
  • the temporary storage where the events within the "time window" are being persisted;

There are two types of windows currently implemented:

  • sliding window (default) - this is a moving length window, which contains the events from the last X second. A new Result will be generated after each received event.
  • batch window - contains the events from a given moment in the past (t0) up to the moment t0+X. The Result will be generated only after the amount of time X expires. Once the Result is calculated the window will be cleaned up.

The underlying persistence mechanism is pluggable (see the rivus_cep_window.erl module). Default implementation is based on ETS tables.

##Filters

Filters are used to remove events from the stream, based on certain criteria, so that to speed up the result generation and reduce the memory requirements:

select 
	ev1.eventparam1, ev2.eventparam2, ev2.eventparam3, ev1.eventparam2
from 
	event1(eventparam1>5, eventparam1<30) as ev1, 
	event2(eventparam1<50) as ev2
where 
	ev1.eventparam2 = ev2.eventparam2
within 60 seconds

Multiple criteria separated by ',' could be used. The above query will filter out all the event1 events, which do not satisfy the condition eventparam1>5 AND eventparam1<30 and also the event2 for which eventparam1<50 is not true. The difference between using filters and using where clause is - the filters are executed before the event to be persisted in memory. In this way the user can reduce the event volume in Result calculations.

#Aggregations

The following aggregation functions are currently supported:

  • sum
  • count
  • min
  • max

#Events representation

Events are tuples in the format: {<name>, <attribute 1>, <attribute 2>,.....,<attribute N>}. The <name> must be unique. For each event type there must be a module implementing the event_behavior with the same name as the name of the event. The important function that needs to be implemented is - get_param_by_name(Event, ParamName). You can define events in runtime, using the following statement with rivus_cep:execute/1:

define <name> as (<attribute 1>, <attribute 2>,.....,<attribute N>);

See the following example in "Usage".

#Usage

Here is how to use Rivus:

Clone and build:

$ git clone https://github.com/vascokk/rivus_cep.git
$ ./rebar get-deps
$ ./rebar compile

Update rel/vars.config according to your preferences or use the default values.

Create a release using relx:

$ ./relx

Start the application:

$ ./_rel/rivus_cep/bin/rivus_cep console

Try the following in the Erlang console:

%% define the events to be recognised by the engine:
EventDefStr1 = "define event1 as (eventparam1, eventparam2, eventparam3);".
EventDefStr2 = "define event2 as (eventparam1, eventparam2, eventparam3);".

rivus_cep:execute(EventDefStr1).
rivus_cep:execute(EventDefStr2).

% deploy the query
QueryStr = "define correlation1 as
                     select ev1.eventparam1, ev2.eventparam2, ev2.eventparam3, ev1.eventparam2
                     from event1 as ev1, event2 as ev2
                     where ev1.eventparam2 = ev2.eventparam2
                     within 60 seconds; ".

Producer = event_producer_1.
{ok, SubscriberPid} = result_subscriber:start_link().

{ok, QueryPid, QueryDetails} = rivus_cep:execute(QueryStr, [Producer], [SubscriberPid], [{shared_streams, true}]).
    
%% create some evetnts
Event1 = {event1, 10, b, c}.
Event2 = {event1, 15, bbb, c}.
Event3 = {event1, 20, b, c}.
Event4 = {event2, 30, b, cc}.
Event5 = {event2, 40, bb, cc}.

%% send the events (if you do not care about the producers, you can use notify/1)
rivus_cep:notify(Producer, Event1).
rivus_cep:notify(Producer, Event2).
rivus_cep:notify(Producer, Event3).
rivus_cep:notify(Producer, Event4).
rivus_cep:notify(Producer, Event5).

%% result
%% you should get:
%% {ok,[{10,b,cc,b},{20,b,cc,b}]} 

gen_server:call(SubscriberPid, get_result).
	

The query is started with rivus_cep:execute/4 (previously load_query/4, which is now deprecated). It takes as arguments: query string, list of event producers, list of query result subscribers and options as a proplist.

Each query worker will register itself to the gproc process registry, for the events listed in the "from" clause.

If the events are sent via rivus_cep:notify/1, the event will be received by any query subscribed for this event type. With notify/2 - only the queries subscribed to the particular Producer will receive the event.

For each query there must be at least one Subscriber to receive the query results.

See tests/rivus_cep_tests.erl for more examples.

#Streaming events via TCP

You can stream events via TCP connection:

  {ok, {Host, Port}} = application:get_env(rivus_cep, rivus_tcp_serv),
  {ok, Socket} = gen_tcp:connect(Host, Port, [{active, false}, {nodelay, true}, {packet, 4}, binary]),

  Event1 = {event1, 10, b, c},
  Event2 = {event1, 15, bbb, c},
  Event3 = {event1, 20, b, c},
  Event4 = {event2, 30, b, cc, d},
  Event5 = {event2, 40, bb, cc, dd},

  gen_tcp:send(Socket, term_to_binary({event, test_query_1, Event1})),
  gen_tcp:send(Socket, term_to_binary({event, test_query_1, Event2})),
  gen_tcp:send(Socket, term_to_binary({event, test_query_1, Event3})),
  gen_tcp:send(Socket, term_to_binary({event, test_query_1, Event4})),
  gen_tcp:send(Socket, term_to_binary({event, test_query_1, Event5})),

Currently only Erlang clients are supported, but events serialization using BERT is work in progress.

#Shared streams

For memory efficiency, {shared_streams, true} can be provided in the options list. In this case, the query will work with a shared event sliding window. The window's size will be equal of the maximum "within" clause of all sharing queries. Queries based on event pattern use only non-shared windows.

#Benchmarking

You can load-test your queries using Basho Bench. Use the Basho Bench driver basho_bench_driver_rivus.erl and configuration file rivus.config provided in the /priv directory. Edit rivus.config according to your needs. Here is the result of the single query (single process) in rivus.config running on GCE cloud instance (16 CPU, 15GB RAM) for 5 minutes:

Benchmark result

#Dependencies

#Disclaimer

This is not a production ready project. You can use it on your own risk.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].