All Projects → cloudflare → Flow Pipeline

cloudflare / Flow Pipeline

A set of tools and examples to run a flow-pipeline (sFlow, NetFlow)

Programming Languages

go
31211 projects - #10 most used programming language

Projects that are alternatives of or similar to Flow Pipeline

Myth
Reliable messages resolve distributed transactions
Stars: ✭ 1,470 (+1609.3%)
Mutual labels:  protobuf, kafka
goflow2
High performance sFlow/IPFIX/NetFlow Collector
Stars: ✭ 125 (+45.35%)
Mutual labels:  netflow, protobuf
Pq
a command-line Protobuf parser with Kafka support and JSON output
Stars: ✭ 120 (+39.53%)
Mutual labels:  protobuf, kafka
Synch
Sync data from the other DB to ClickHouse(cluster)
Stars: ✭ 200 (+132.56%)
Mutual labels:  kafka, clickhouse
Plumber
A swiss army knife CLI tool for interacting with Kafka, RabbitMQ and other messaging systems.
Stars: ✭ 514 (+497.67%)
Mutual labels:  protobuf, kafka
Schema Registry
Confluent Schema Registry for Kafka
Stars: ✭ 1,647 (+1815.12%)
Mutual labels:  protobuf, kafka
Istio Micro
istio 微服务示例代码 grpc+protobuf+echo+websocket+mysql+redis+kafka+docker-compose
Stars: ✭ 194 (+125.58%)
Mutual labels:  protobuf, kafka
Storagetapper
StorageTapper is a scalable realtime MySQL change data streaming, logical backup and logical replication service
Stars: ✭ 232 (+169.77%)
Mutual labels:  kafka, clickhouse
Goflow
The high-scalability sFlow/NetFlow/IPFIX collector used internally at Cloudflare.
Stars: ✭ 460 (+434.88%)
Mutual labels:  kafka, netflow
Trubka
A CLI tool for Kafka
Stars: ✭ 296 (+244.19%)
Mutual labels:  protobuf, kafka
Flink Learning
flink learning blog. http://www.54tianzhisheng.cn/ 含 Flink 入门、概念、原理、实战、性能调优、源码解析等内容。涉及 Flink Connector、Metrics、Library、DataStream API、Table API & SQL 等内容的学习案例,还有 Flink 落地应用的大型项目案例(PVUV、日志存储、百亿数据实时去重、监控告警)分享。欢迎大家支持我的专栏《大数据实时计算引擎 Flink 实战与性能优化》
Stars: ✭ 11,378 (+13130.23%)
Mutual labels:  kafka, clickhouse
Vflow
Enterprise Network Flow Collector (IPFIX, sFlow, Netflow) from Verizon Media
Stars: ✭ 776 (+802.33%)
Mutual labels:  kafka, netflow
Netty Learning Example
🥚 Netty实践学习案例,见微知著!带着你的心,跟着教程。我相信你行欧。
Stars: ✭ 2,146 (+2395.35%)
Mutual labels:  protobuf, kafka
Clickhouse sinker
Easily load data from kafka to ClickHouse
Stars: ✭ 256 (+197.67%)
Mutual labels:  kafka, clickhouse
Pmacct
pmacct is a small set of multi-purpose passive network monitoring tools [NetFlow IPFIX sFlow libpcap BGP BMP RPKI IGP Streaming Telemetry].
Stars: ✭ 677 (+687.21%)
Mutual labels:  kafka, netflow
Szt Bigdata
深圳地铁大数据客流分析系统🚇🚄🌟
Stars: ✭ 826 (+860.47%)
Mutual labels:  kafka, clickhouse
Ue4protobuf
A protobuf source integration for UE4.
Stars: ✭ 80 (-6.98%)
Mutual labels:  protobuf
Karapace
Karapace - Your Kafka essentials in one tool
Stars: ✭ 83 (-3.49%)
Mutual labels:  kafka
Kafka Connect Protobuf Converter
Protobuf converter plugin for Kafka Connect
Stars: ✭ 79 (-8.14%)
Mutual labels:  kafka
Community
一个仿照牛客网实现的讨论社区,不仅实现了基本的注册,登录,发帖,评论,点赞,回复功能,同时使用前缀树实现敏感词过滤,使用wkhtmltopdf生成长图和pdf,实现网站UV和DAU统计,并将用户头像等信息存于七牛云服务器。
Stars: ✭ 80 (-6.98%)
Mutual labels:  kafka

flow-pipeline

This repository contains a set of tools and examples for GoFlow, a NetFlow/IPFIX/sFlow collector by Cloudflare.

Start a flow pipeline

The demo directory contains a startup file for an example pipeline including:

  • GoFlow: an sFlow collector
  • A mock collector
  • Kafka/Zookeeper
  • A database (Postgres/clickhouse)
  • An inserter: to insert the flows in a database (for Postgres)

It will listen on port 6343/UDP for sFlow and 2055/UDP for NetFlow.

The protobuf provided in this repository is a light version of the GoFlow original one. Only a handful of fields will be inserted.

A basic pipeline looks like this:




                   +------+         +-----+
     sFlow/NetFlow |goflow+--------->Kafka|
                   +------+         +-----+
                                       |
                                       +--------------+
                      Topic: flows     |              |
                                       |              |
                                 +-----v----+       +-v---------+
                                 | inserter |       |new service|
                                 +----------+       +-----------+
                                      |
                                      |
                                   +--v--+
                                   |  DB |
                                   +-----+

You can add a processor that would enrich the data by consuming from Kafka and re-injecting the data into Kafka or directly into the database.

For instance, IP addresses can be mapped to countries, ASN or customer information.

A suggestion is extending the GoFlow protobuf with new fields.

Run a mock insertion

A mock insertion replaces the GoFlow decoding part. A mocker generates protobuf messages and sends them to Kafka.

Clone the repository, then run the following (for Postgres):

$ cd compose
$ docker-compose -f docker-compose-postgres-mock.yaml

Wait a minute for all the components to start.

You can connect on the local Grafana http://localhost:3000 (admin/admin) to look at the flows being collected.

Run a GoFlow insertion

If you want to send sFlow/NetFlow/IPFIX to a GoFlow, run the following:

Using Postgres:

$ cd compose
$ docker-compose -f docker-compose-postgres-collect.yml

Using Clickhouse (see next section):

$ cd compose
$ docker-compose -f docker-compose-clickhouse-collect.yml

Keep in mind this is a development/prototype setup. Some components will likely not be able to process more than a few thousands rows per second. You will likely have to tweak configuration statements, number of workers.

Using a production setup, GoFlow was able to process more than +100k flows per seconds and insert them in a Clickhouse database.

About the Clickhouse setup

If you choose to visualize in Grafana, you will need a Clickhouse Data source plugin. You can connect to the compose Grafana which has the plugin installed.

The insertion is handled natively by Clickhouse:

Note: the protobuf messages to be written with their lengths.

Clickhouse will connect to Kafka periodically and fetch the content. Materialized views allow to store the data persistently and aggregate over fields.

To connect to the database, you have to run the following:

$ docker exec -ti compose_db_1 clickhouse-client

Once in the client CLI, a handful of tables are available:

  • flows is directly connected to Kafka, it fetches from the current offset
  • flows_raw contains the materialized view of flows
  • flows_5m contains 5-minutes aggregates of ASN

Commands example:

:) DESCRIBE flows_raw

DESCRIBE TABLE flows_raw

┌─name───────────┬─type────────────┬─default_type─┬─default_expression─┬─comment─┬─codec_expression─┬─ttl_expression─┐
│ Date           │ Date            │              │                    │         │                  │                │
│ TimeReceived   │ DateTime        │              │                    │         │                  │                │
│ TimeFlowStart  │ DateTime        │              │                    │         │                  │                │
│ SequenceNum    │ UInt32          │              │                    │         │                  │                │
│ SamplingRate   │ UInt64          │              │                    │         │                  │                │
│ SamplerAddress │ FixedString(16) │              │                    │         │                  │                │
│ SrcAddr        │ FixedString(16) │              │                    │         │                  │                │
│ DstAddr        │ FixedString(16) │              │                    │         │                  │                │
│ SrcAS          │ UInt32          │              │                    │         │                  │                │
│ DstAS          │ UInt32          │              │                    │         │                  │                │
│ EType          │ UInt32          │              │                    │         │                  │                │
│ Proto          │ UInt32          │              │                    │         │                  │                │
│ SrcPort        │ UInt32          │              │                    │         │                  │                │
│ DstPort        │ UInt32          │              │                    │         │                  │                │
│ Bytes          │ UInt64          │              │                    │         │                  │                │
│ Packets        │ UInt64          │              │                    │         │                  │                │
└────────────────┴─────────────────┴──────────────┴────────────────────┴─────────┴──────────────────┴────────────────┘

:) SELECT Date,TimeReceived,IPv6NumToString(SrcAddr), IPv6NumToString(DstAddr), Bytes, Packets FROM flows_raw;

SELECT
    Date,
    TimeReceived,
    IPv6NumToString(SrcAddr),
    IPv6NumToString(DstAddr),
    Bytes,
    Packets
FROM flows_raw

┌───────Date─┬────────TimeReceived─┬─IPv6NumToString(SrcAddr)─┬─IPv6NumToString(DstAddr)─┬─Bytes─┬─Packets─┐
│ 2020-03-22 │ 2020-03-22 21:26:38 │ 2001:db8:0:1::80         │ 2001:db8:0:1::20         │   105 │      63 │
│ 2020-03-22 │ 2020-03-22 21:26:38 │ 2001:db8:0:1::c2         │ 2001:db8:0:1::           │   386 │      43 │
│ 2020-03-22 │ 2020-03-22 21:26:38 │ 2001:db8:0:1::6b         │ 2001:db8:0:1::9c         │   697 │      29 │
│ 2020-03-22 │ 2020-03-22 21:26:38 │ 2001:db8:0:1::81         │ 2001:db8:0:1::           │  1371 │      54 │
│ 2020-03-22 │ 2020-03-22 21:26:39 │ 2001:db8:0:1::87         │ 2001:db8:0:1::32         │   123 │      23 │

To look at aggregates (optimizing will run the summing operation). The Nested structure allows to have sum per structures (in our case, per Ethernet-Type).

:) OPTIMIZE TABLE flows_5m;

OPTIMIZE TABLE flows_5m

Ok.

:) SELECT * FROM flows_5m WHERE SrcAS = 65001;

SELECT *
FROM flows_5m
WHERE SrcAS = 65001

┌───────Date─┬────────────Timeslot─┬─SrcAS─┬─DstAS─┬─ETypeMap.EType─┬─ETypeMap.Bytes─┬─ETypeMap.Packets─┬─ETypeMap.Count─┬─Bytes─┬─Packets─┬─Count─┐
│ 2020-03-22 │ 2020-03-22 21:25:00 │ 65001 │ 65000 │ [34525]        │ [2930]         │ [152]            │ [4]            │  2930 │     152 │     4 │
│ 2020-03-22 │ 2020-03-22 21:25:00 │ 65001 │ 65001 │ [34525]        │ [1935]         │ [190]            │ [3]            │  1935 │     190 │     3 │
│ 2020-03-22 │ 2020-03-22 21:25:00 │ 65001 │ 65002 │ [34525]        │ [4820]         │ [288]            │ [6]            │  4820 │     288 │     6 │

Regarding the storage of IP addresses: At the moment, the current Clickhouse table does not perform any transformation of the addresses before insertion. The bytes are inserted in a FixedString(16) regardless of the family (IPv4, IPv6). In the dashboards, the function IPv6NumToString(SrcAddr) is used.

For example, 192.168.1.1 will end up being 101:a8c0::

WITH toFixedString(reinterpretAsString(ipv4), 16) AS ipv4c
SELECT
    '192.168.1.1' AS ip,
    IPv4StringToNum(ip) AS ipv4,
    IPv6NumToString(ipv4c) AS ipv6

┌─ip──────────┬───────ipv4─┬─ipv6───────┐
 192.168.1.1  3232235777  101:a8c0:: 
└─────────────┴────────────┴────────────┘

In order to convert it:

WITH IPv6StringToNum(ip) AS ipv6
SELECT
    '101:a8c0::' AS ip,
    reinterpretAsUInt32(ipv6) AS ipv6c,
    IPv4NumToString(ipv6c) AS ipv4

┌─ip─────────┬──────ipv6c─┬─ipv4────────┐
 101:a8c0::  3232235777  192.168.1.1 
└────────────┴────────────┴─────────────┘

Which for instance to display either IPv4 or IPv6 in a single query:

SELECT
  if(EType = 0x800, IPv4NumToString(reinterpretAsUInt32(SrcAddr)), IPv6NumToString(SrcAddr) AS SrcIP

This will be fixed in future dashboard/db schema version.

Information and roadmap

This repository is an example and does not offer any warranties. I try to update it whenever I can. Contributions are welcome.

The main purpose is for users to get started quickly and provide a basic system. This should not be used in production.

I received requests to publish the Flink aggregator source code as you may have seen it being used in GoFlow presentations. Unfortunately, we moved entirely towards Clickhouse, the old code has not been updated in a while. It may get published at some point but this is currently low priority.

Issue troubleshooting

The compose files don't bind to specific versions of the containers. You will likely need to down in order to clean the setup (volumes, network), push to resynchronize repositories like GoFlow and build to rebuild components like inserter .

$ docker-compose -f some-yaml-listed-above.yml down
$ docker-compose -f some-yaml-listed-above.yml pull
$ docker-compose -f some-yaml-listed-above.yml build
$ docker-compose -f some-yaml-listed-above.yml up
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].