All Projects → sadikovi → spark-netflow

sadikovi / spark-netflow

Licence: Apache-2.0 license
NetFlow data source for Spark SQL and DataFrames

Programming Languages

java
68154 projects - #9 most used programming language
scala
5932 projects

Projects that are alternatives of or similar to spark-netflow

Fastnetmon
FastNetMon - very fast DDoS sensor with sFlow/Netflow/IPFIX/SPAN support
Stars: ✭ 2,860 (+17775%)
Mutual labels:  netflow, cisco
ddal
DDAL(Distributed Data Access Layer) is a simple solution to access database shard.
Stars: ✭ 33 (+106.25%)
Mutual labels:  datasource
opcua-datasource
An OPC UA datasource for reading from OPC UA servers (DA/HDA/AE) into Grafana directly
Stars: ✭ 43 (+168.75%)
Mutual labels:  datasource
flowanalyzer
Manito Networks Flow Analyzer
Stars: ✭ 42 (+162.5%)
Mutual labels:  netflow
ARF-Converter
Bulk ARF file converter
Stars: ✭ 15 (-6.25%)
Mutual labels:  cisco
Firepower O365 Feed Parser
This is a Sample Script that can parse the O365 Web Service API and upload it to Firepower Management Center as Group Objects.
Stars: ✭ 56 (+250%)
Mutual labels:  cisco
cheat-sheets
Cheat sheets to help you in daily hands-on tasks of trouble shooting, configuration, and diagnostics with Fortinet, HP/Aruba, Cisco, Checkpoint and others' gear.
Stars: ✭ 63 (+293.75%)
Mutual labels:  cisco
finesseGadgets
Collection of gadgets for Cisco Finesse
Stars: ✭ 25 (+56.25%)
Mutual labels:  cisco
dne-security-code
No description or website provided.
Stars: ✭ 30 (+87.5%)
Mutual labels:  cisco
getting-started
pyATS Getting Started guide sources, URL: https://developer.cisco.com/docs/pyats-getting-started/
Stars: ✭ 24 (+50%)
Mutual labels:  cisco
chartjs-plugin-datasource-prometheus
Chart.js plugin for Prometheus data loading
Stars: ✭ 77 (+381.25%)
Mutual labels:  datasource
graylog-plugin-netflow
[DEPRECATED] Graylog NetFlow plugin
Stars: ✭ 35 (+118.75%)
Mutual labels:  netflow
topolograph
Topolograph.com is an online project which can visualize OSPF/ISIS topology based on single OSPF LinkState DataBase scrapping from one network device ( thanks OSPF =). Then you can not only see (and check) the shortest path from source to destination, but also see the outcome from link or node failure along the path to the destination. The exist…
Stars: ✭ 84 (+425%)
Mutual labels:  cisco
cdp-spark-datasource
Spark data source for Cognite Data Fusion
Stars: ✭ 18 (+12.5%)
Mutual labels:  datasource
catalyst9k-network-automation
Sample python scripts for automation workflows for feature sets present in Catalyst Switching using openly available YANG data models
Stars: ✭ 40 (+150%)
Mutual labels:  cisco
grafana-pandas-datasource
Grafana Pandas Datasource - using Python for generating timeseries-, table-data and annotations
Stars: ✭ 38 (+137.5%)
Mutual labels:  datasource
PowerBI
This repository hosts well-written DAX formulas and tested Power BI PBIX samples with data sources.
Stars: ✭ 38 (+137.5%)
Mutual labels:  datasource
DirectFire Converter
DirectFire Firewall Converter - Network Security, Next-Generation Firewall Configuration Conversion, Firewall Syntax Translation and Firewall Migration Tool - supports Cisco ASA, Fortinet FortiGate (FortiOS), Juniper SRX (JunOS), SSG / Netscreen (ScreenOS) and WatchGuard (support for further devices in development). Similar to FortiConverter, Sm…
Stars: ✭ 34 (+112.5%)
Mutual labels:  cisco
pyaci
Python Bindings for Cisco ACI REST API
Stars: ✭ 42 (+162.5%)
Mutual labels:  cisco
gen-cisco
🧨 Generates Cisco scripts based on YAML files
Stars: ✭ 29 (+81.25%)
Mutual labels:  cisco

spark-netflow

A library for reading NetFlow files from Spark SQL.

Build Status codecov

Requirements

Spark version spark-netflow latest version
1.4.x 1.3.1
1.5.x 1.3.1
1.6.x 1.3.1
2.0.x 2.0.4
2.1.x 2.0.4
3.0.x 2.1.0

Documentation reflects changes in master branch, for documentation on a specific version, please select corresponding version tag or branch.

Linking

The spark-netflow library can be added to Spark by using the --packages command line option. For example, run this to include it when starting the spark shell:

 $SPARK_HOME/bin/spark-shell --packages com.github.sadikovi:spark-netflow_2.12:2.1.0

See other available versions at http://spark-packages.org/package/sadikovi/spark-netflow.

Features

  • Column pruning
  • Predicate pushdown to the NetFlow file
  • Auto statistics based on file header information
  • Fields conversion (IP addresses, protocol, etc.)
  • NetFlow version 5 support (list of columns)
  • NetFlow version 7 support (list of columns)
  • Reading files from local file system and HDFS

Options

Currently supported options:

Name Example Description
version 5, 7 Version to use when parsing NetFlow files. This setting is optional, by default the package will resolve the version from provided files
buffer 1024, 32Kb, 3Mb, etc Buffer size for NetFlow compressed stream (default 1Mb)
stringify true, false Enables conversion of certain supported fields (e.g. IP, protocol) into human-readable format. If performance is essential, consider disabling the feature (default true)
predicate-pushdown true, false Enables predicate pushdown at NetFlow library level (default true)

Dealing with corrupt files

Package supports Spark option spark.sql.files.ignoreCorruptFiles. When set to true, corrupt files are ignored (corrupt header, wrong format) or partially read (corrupt data block in a middle of a file). By default, option is set to false, meaning exception will be raised when such file is encountered, this behaviour is similar to Spark.

Other NetFlow formats

If you would like to have the package support NetFlow files for other formats, e.g. NetFlow 9, feel free to open an issue or a pull request.

Example

Scala API

// You can provide only format, package will infer version from provided files,
// or you can enforce version of the files with `version` option.
val df = spark.read.format("com.github.sadikovi.spark.netflow").load("...")

// You can read files from local file system or HDFS.
val df = spark.read.format("com.github.sadikovi.spark.netflow")
  .option("version", "5")
  .load("file:/...")
  .select("srcip", "dstip", "packets")

// You can also specify buffer size when reading compressed NetFlow files.
val df = spark.read.format("com.github.sadikovi.spark.netflow")
  .option("version", "5")
  .option("buffer", "2Mb")
  .load("hdfs://sandbox:8020/tmp/...")

Alternatively you can use shortcuts for NetFlow files

import com.github.sadikovi.spark.netflow._

// This will read version 5 with default buffer size.
val df = spark.read.netflow5("hdfs:/...")

// This will read version 7 without fields conversion.
val df = spark.read.option("stringify", "false").netflow7("file:/...")

Python API

df = spark.read.format("com.github.sadikovi.spark.netflow") \
  .option("version", "5") \
  .load("file:/...") \
  .select("srcip", "srcport")

res = df.where("srcip > 10")

SQL API

CREATE TEMPORARY TABLE ips
USING com.github.sadikovi.spark.netflow
OPTIONS (path "file:/...", version "5");

SELECT srcip, dstip, srcport, dstport FROM ips LIMIT 10;

Building From Source

This library is built using sbt, to build a JAR file simply run sbt package from project root.

Testing

Run sbt test from project root.

Running benchmark

Run sbt package to package project, next run spark-submit with following options:

$ spark-submit --class com.github.sadikovi.spark.benchmark.NetFlowReadBenchmark \
  target/scala-2.12/spark-netflow_2.12-2.1.0.jar \
  --iterations 5 \
  --files 'file:/Users/sadikovi/developer/spark-netflow/temp/ftn/0[1,2,3]/ft*' \
  --version 5

Latest benchmarks:

- Iterations: 5
- Files: file:/tmp/spark-netflow/files/0[1,2,3]/ft*
- Version: 5

Java HotSpot(TM) 64-Bit Server VM 1.7.0_80-b15 on Mac OS X 10.12.4
Intel(R) Core(TM) i5-4258U CPU @ 2.40GHz
NetFlow full scan:                       Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Scan, stringify = F                            567 /  633          0.0       56726.7       1.0X
Scan, stringify = T                            968 / 1049          0.0       96824.6       0.6X

Java HotSpot(TM) 64-Bit Server VM 1.7.0_80-b15 on Mac OS X 10.12.4
Intel(R) Core(TM) i5-4258U CPU @ 2.40GHz
NetFlow predicate scan:                  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Predicate pushdown = F, high                  1148 / 1200          0.0      114845.4       1.0X
Predicate pushdown = T, high                  1208 / 1257          0.0      120818.0       1.0X
Predicate pushdown = F, low                    706 /  732          0.0       70559.3       1.6X
Predicate pushdown = T, low                    226 /  243          0.0       22575.0       5.1X

Java HotSpot(TM) 64-Bit Server VM 1.7.0_80-b15 on Mac OS X 10.12.4
Intel(R) Core(TM) i5-4258U CPU @ 2.40GHz
NetFlow aggregated report:               Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Aggregated report                             2171 / 2270          0.0      217089.9       1.0X

Using netflowlib library separately

You can use netflowlib without using spark-netflow package. Here some basic concepts and examples:

  • com.github.sadikovi.netflowlib.predicate.Columns.* all available column types in the library, check out com.github.sadikovi.netflowlib.version.* classes to see what columns are already defined for a specific NetFlow format.
  • com.github.sadikovi.netflowlib.predicate.FilterApi utility class to create predicates for NetFlow file
  • com.github.sadikovi.netflowlib.statistics.StatisticsTypes statistics that you can use to reduce boundaries of filter or allow filter to be evaluated before scanning the file. For example, library creates statistics on time, so time filter can be resolved upfront
  • com.github.sadikovi.netflowlib.NetFlowReader main entry to work with NetFlow file, gives access to file header and iterator of rows, allows to pass additional predicate and statistics
  • com.github.sadikovi.netflowlib.NetFlowHeader header information can be accessed using this class from NetFlowReader.getHeader(), see class for more information on flags available

Here is the general usage pattern:

import com.github.sadikovi.netflowlib.NetFlowReader
import com.github.sadikovi.netflowlib.version.NetFlowV5

// Create input stream by opening NetFlow file, e.g. `fs.open(hadoopFile)`
val stm: DataInputStream = ...
// Prepare reader based on input stream and buffer size, you can use
// overloaded alternative with default buffer size
val reader = NetFlowReader.prepareReader(stm, 10000)
// Check out header, optional
val header = reader.getHeader()
// Actual NetFlow version of the file
val actualVersion = header.getFlowVersion()
// Whether or not file is compressed
val isCompressed = header.isCompressed()

// This is list of fields that will be returned in iterator as values in
// array (same order)
val fields = Array(
  NetFlowV5.FIELD_UNIX_SECS,
  NetFlowV5.FIELD_SRCADDR,
  NetFlowV5.FIELD_DSTADDR,
  NetFlowV5.FIELD_SRCPORT,
  NetFlowV5.FIELD_DSTPORT
)

// Build record buffer and iterator that you can use to get values.
// Note that you can also use set of filters, if you want to get
// particular records
val recordBuffer = reader.prepareRecordBuffer(fields)
val iter = recordBuffer.iterator()

while (iter.hasNext) {
  // print every row with values
  println(iter.next)
}

Here is an example of using predicate to keep certain records:

import com.github.sadikovi.netflowlib.predicate.FilterApi
val predicate = FilterApi.and(
  FilterApi.eq(NetFlowV5.FIELD_SRCPORT, 123),
  FilterApi.eq(NetFlowV5.FIELD_DSTPORT, 456)
)

...
val recordBuffer = reader.prepareRecordBuffer(fields, predicate)
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].