All Projects → weiboad → Aidp

weiboad / Aidp

Licence: apache-2.0
weiboAd Infrastructure Data Processor : kafka consumer embedded Lua scripting language in data process framework

Programming Languages

lua
6591 projects

Projects that are alternatives of or similar to Aidp

Sarama Cluster
Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 [DEPRECATED]
Stars: ✭ 969 (+4745%)
Mutual labels:  consumer, kafka
Remora
Kafka consumer lag-checking application for monitoring, written in Scala and Akka HTTP; a wrap around the Kafka consumer group command. Integrations with Cloudwatch and Datadog. Authentication recently added
Stars: ✭ 183 (+815%)
Mutual labels:  consumer, kafka
Anotherkafkamonitor Akm
Another app which used to monitor the progress of Kafka Producer and Consumer
Stars: ✭ 36 (+80%)
Mutual labels:  consumer, kafka
Kafka Flow
KafkaFlow is a .NET framework to consume and produce Kafka messages with multi-threading support. It's very simple to use and very extendable. You just need to install, configure, start/stop the bus with your app and create a middleware/handler to process the messages.
Stars: ✭ 118 (+490%)
Mutual labels:  consumer, kafka
Kminion
KMinion is a feature-rich Prometheus exporter for Apache Kafka written in Go. It is lightweight and highly configurable so that it will meet your requirements.
Stars: ✭ 274 (+1270%)
Mutual labels:  consumer, kafka
Yivnet
Yivnet is a microservice game server base on go-kit
Stars: ✭ 237 (+1085%)
Mutual labels:  kafka, server
Kafka Zk Restapi
Kafka Zookeeper RESTful API to perform topic/consumer group administration/metric(offset\lag\message) collection and monitor
Stars: ✭ 121 (+505%)
Mutual labels:  consumer, kafka
Mmo Server
Distributed Java game server, including login, gateway, game demo
Stars: ✭ 114 (+470%)
Mutual labels:  kafka, server
Qbusbridge
The Apache Kafka Client SDK
Stars: ✭ 272 (+1260%)
Mutual labels:  consumer, kafka
Node Sinek
🎩 Most advanced high level Node.js Kafka client
Stars: ✭ 262 (+1210%)
Mutual labels:  consumer, kafka
Qmq
QMQ是去哪儿网内部广泛使用的消息中间件,自2012年诞生以来在去哪儿网所有业务场景中广泛的应用,包括跟交易息息相关的订单场景; 也包括报价搜索等高吞吐量场景。
Stars: ✭ 2,420 (+12000%)
Mutual labels:  consumer, kafka
Zerocode
A community-developed, free, open source, microservices API automation and load testing framework built using JUnit core runners for Http REST, SOAP, Security, Database, Kafka and much more. Zerocode Open Source enables you to create, change, orchestrate and maintain your automated test cases declaratively with absolute ease.
Stars: ✭ 482 (+2310%)
Mutual labels:  consumer, kafka
Kafka Go
Kafka library in Go
Stars: ✭ 4,200 (+20900%)
Mutual labels:  consumer, kafka
Librdkafka
The Apache Kafka C/C++ library
Stars: ✭ 5,617 (+27985%)
Mutual labels:  consumer, kafka
Deprecated
🚀 Framework for building universal web app and static website in Vue.js (beta)
Stars: ✭ 858 (+4190%)
Mutual labels:  server
Cdc Kafka Hadoop
MySQL to NoSQL real time dataflow
Stars: ✭ 13 (-35%)
Mutual labels:  kafka
Blink Java
Simplified pure Java http server
Stars: ✭ 10 (-50%)
Mutual labels:  server
Hazelcast Jet
Distributed Stream and Batch Processing
Stars: ✭ 855 (+4175%)
Mutual labels:  kafka
Tinytcpserver
A small tcp server working under Mono or .NET (4.0) and provides hooks for handling data exchange with clients (works under mono and .net). Behaviour/protocol/reaction could be specified via custom C# script.
Stars: ✭ 14 (-30%)
Mutual labels:  server
Kafkatools
CLI tools for monitoring and managing Apache Kafka
Stars: ✭ 13 (-35%)
Mutual labels:  kafka

AIDP (Data Process)

中文文档

AIDP is responsible for a series of messages such as message queue consumption. After getting the message, the specific processing is handled by Lua script. The developer can customize the processing of the consumption queue, and the framework provides http server service. You can use the Lua script to customize the http API Interface, and then consumption message logic and http server between the framework provides a simple Key-Val storage structure for data sharing. The figure is the structure of the AIDP data processing framework:

structure

AIDP invoke flow:

data process flow

Why use lua script customize ?

Most of the application scenarios for the message queue are back-end with a consumer module for data processing, the final data stored in a DB, and finally through the RPC interface for the user use. Some application scenarios data process logic may be particularly simple, but most of the language is very complicated for Kafka's consumption (Kafka consumption logic) When the specific data process logic to increase the development costs, AIDP through C++ this general development model abstract package, in order to increase the custom through the embedded Lua scripting language. To complete the custom part of the logic. AIDP is a compromise between development difficulty, cost and flexibility. Can meet a large number of data processing demand

Quick Start

RPM install

RPM currently support centos7.x version, other versions will be uploaded, there are other system version of the demand can add the issue

Complie install

First, install AIDP depend package adbase, Installation method refer : https://nmred.gitbooks.io/adbase/content/en/basic/install.html

Second, complie and install aidp

git clone [email protected]:weiboad/aidp.git
cd aidp
./cmake.sh
cd build
make
make install

Example

For example, using aidp get kafka cluster topic test data write to the local disk, and statistics the number of messages, the final number of messages through the http api interface can be obtained. In the installed aidp directory modify the configuration conf/system.ini, configure kafka consumption calls lua script path, by modifying consumer section scriptName field. Set up on the lua script can be configured to handle the script as follows:

--
--
-- This sctipt process message write to the local disk and statistics the number of messages store in local storage
--
--
local obj = aidp.message.get()
local filename = os.date('%Y-%m-%d-%H') .. '.log'
local files = {};
-- get storage object
local storage = aidp.storage()
for k,v in pairs(obj) do
    if #v == 3 then
        if files[v[2]] == nil then
            files[v[2]] = io.open(v[2]..filename, 'a+');
            files[v[2]]:setvbuf('full')
        end
        files[v[2]]:write(v[3]..'\n')
	-- store number of message  in local storage
        storage:incr(v[2] .. ':message_size', 1)
    end
end
for k,v in pairs(files) do
    v:flush()
    v:close()
end

Modify the configuration file http section scriptName field set http interface call lua script path, lua script processing logic as follows:

-- get a topic number of has process message
local request = aidp.http.request();
local topic_name = request:get_query('t')
local response = aidp.http.response();
local storage = aidp.storage()
response:set_content(topic_name .. ':message_size ' .. storage:get(topic_name .. ':message_size'));

Start aidp service:

cd /usr/local/adinf/aidp/bin
./aidp -c ../conf/system.ini

Message Consumer

The message consumption module is responsible for getting the message from Kafka and invoking the Lua consumption script after getting the message. The Lua script can invoke the aidp.message.get() function to get the message

aidp.message.get

Get kafka message

Params:None

Scope:Only the message consumption script valid

Return:Data structure is follow:

-- {
-- 	{
-- 		id, 
-- 		topicName,
-- 		messageBody
-- 	},
-- 	{
-- 		id, 
-- 		topicName,
-- 		messageBody
-- 	},
-- 	{
-- 		id, 
-- 		topicName,
-- 		messageBody
-- 	}
-- }
--This sctipt process message write to the local disk
--
-- get message
local obj = aidp.message.get()

local filename = os.date('%Y-%m-%d-%H') .. '.log'
local files = {};
local storage = aidp.storage()
for k,v in pairs(obj) do
    if #v == 3 then
        if files[v[2]] == nil then
            files[v[2]] = io.open(v[2]..filename, 'a+');
            files[v[2]]:setvbuf('full')
        end
        files[v[2]]:write(v[3]..'\n')
        storage:incr(v[2] .. ':message_size', 1)
    end
end
for k,v in pairs(files) do
    v:flush()
    v:close()
end

Http Server

Provide HTTP interface Lua custom development, for the Lua side to provide Request, Response object, through the Request to obtain the request data, and through the Lua script. The final response data will be customized through the Response interface, Request and Response object methods only in the Http Server processing script is valid for the Message consumer processing script is invalid

Request

Method Method Type Description
aidp.http.request() Static Construct request object
request:get_uri() Object Method Get request URI
request:get_remote_address() Object Method Get remote address
request:get_post_data() Object Method Get origin post data
request:get_post([key]) Object Method Get POST Form-Data type data
request:get_query(key) Object Method Get GET method data
request:get_header(key) Object Method Get Header val
request:get_location() Object Method Get location
request:get_method() Object Method Get request method, will return enum type METHOD_GET、METHOD_POST、METHOD_OTHER

Response

Method Method Type Description
aidp.http.response() Static Construct response object
response:set_header(key, val) Object Method Set response header, if already exists will cover header
response:add_header(key, val) Object Method Add response header
response:set_content(body) Object Method Set response body
response:append_content(body) Object Method Append response body
response:get_code() Object Method Get response http code
response:set_body_size(size) Object Method Set response body size

Sample

local request = aidp.http.request();
local topic_name = request:get_query('t')
local response = aidp.http.response();
local storage = aidp.storage()
response:set_content(topic_name .. ':message_size ' .. storage:get(topic_name .. ':message_size'));

Simple Storage

Storage provides a simple stand-alone storage similar to Redis's key-val structure, helping to implement some data cache storage logic.

The following Lua Api is common in the Http lua script and the Message Consumer valid.

Method Method Type Description
aidp.storage() Static Construct storage object
storage:set(key, val, [ttl]) Object Method Set the data in the storage, if set ttl is expired after the key will be recovered, ttl units is second
storage:incr(key, step) Object Method Increment val step
storage:decr(key, step) Object Method Decrement val step
storage:get(key) Object Method Get value
storage:exists(key) Object Method Get whether a key exists
storage:del(key) Object Method Delete a key
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].