All Projects → yahoo → bandar-log

yahoo / bandar-log

Licence: Apache-2.0 License
Monitoring tool to measure flow throughput of data sources and processing components that are part of Data Ingestion and ETL pipelines.

Programming Languages

scala
5932 projects
shell
77523 projects

Projects that are alternatives of or similar to bandar-log

Bandar Log
Monitoring tool to measure flow throughput of data sources and processing components that are part of Data Ingestion and ETL pipelines.
Stars: ✭ 19 (-5%)
Mutual labels:  big-data, presto, etl, spark-streaming
Presto
The official home of the Presto distributed SQL query engine for big data
Stars: ✭ 12,957 (+64685%)
Mutual labels:  big-data, presto
Hydrograph
A visual ETL development and debugging tool for big data
Stars: ✭ 144 (+620%)
Mutual labels:  big-data, etl
Gimel
Big Data Processing Framework - Unified Data API or SQL on Any Storage
Stars: ✭ 216 (+980%)
Mutual labels:  big-data, spark-streaming
Setl
A simple Spark-powered ETL framework that just works 🍺
Stars: ✭ 79 (+295%)
Mutual labels:  big-data, etl
Maha
A framework for rapid reporting API development; with out of the box support for high cardinality dimension lookups with druid.
Stars: ✭ 101 (+405%)
Mutual labels:  big-data, presto
Presto Go Client
A Presto client for the Go programming language.
Stars: ✭ 183 (+815%)
Mutual labels:  big-data, presto
Smooks
An extensible Java framework for building XML and non-XML streaming applications
Stars: ✭ 293 (+1365%)
Mutual labels:  big-data, etl
Aws Etl Orchestrator
A serverless architecture for orchestrating ETL jobs in arbitrarily-complex workflows using AWS Step Functions and AWS Lambda.
Stars: ✭ 245 (+1125%)
Mutual labels:  big-data, etl
Data Accelerator
Data Accelerator for Apache Spark simplifies onboarding to Streaming of Big Data. It offers a rich, easy to use experience to help with creation, editing and management of Spark jobs on Azure HDInsights or Databricks while enabling the full power of the Spark engine.
Stars: ✭ 247 (+1135%)
Mutual labels:  big-data, spark-streaming
datalake-etl-pipeline
Simplified ETL process in Hadoop using Apache Spark. Has complete ETL pipeline for datalake. SparkSession extensions, DataFrame validation, Column extensions, SQL functions, and DataFrame transformations
Stars: ✭ 39 (+95%)
Mutual labels:  big-data, etl
bigquery-kafka-connect
☁️ nodejs kafka connect connector for Google BigQuery
Stars: ✭ 17 (-15%)
Mutual labels:  big-data, etl
Metorikku
A simplified, lightweight ETL Framework based on Apache Spark
Stars: ✭ 361 (+1705%)
Mutual labels:  big-data, etl
Eel Sdk
Big Data Toolkit for the JVM
Stars: ✭ 140 (+600%)
Mutual labels:  big-data, etl
Sylph
Stream computing platform for bigdata
Stars: ✭ 362 (+1710%)
Mutual labels:  big-data, spark-streaming
Bigdata Playground
A complete example of a big data application using : Kubernetes (kops/aws), Apache Spark SQL/Streaming/MLib, Apache Flink, Scala, Python, Apache Kafka, Apache Hbase, Apache Parquet, Apache Avro, Apache Storm, Twitter Api, MongoDB, NodeJS, Angular, GraphQL
Stars: ✭ 177 (+785%)
Mutual labels:  big-data, spark-streaming
Cube.js
📊 Cube — Open-Source Analytics API for Building Data Apps
Stars: ✭ 11,983 (+59815%)
Mutual labels:  presto, athena
Trino
Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
Stars: ✭ 4,581 (+22805%)
Mutual labels:  big-data, presto
Eland
Python Client and Toolkit for DataFrames, Big Data, Machine Learning and ETL in Elasticsearch
Stars: ✭ 235 (+1075%)
Mutual labels:  big-data, etl
architect big data solutions with spark
code, labs and lectures for the course
Stars: ✭ 40 (+100%)
Mutual labels:  etl, spark-streaming

Bandar-Log: Monitoring Tool

Build Status

Intro

Bandar-Log makes possible to monitor flow throughput of data sources and processing components that are part of Data Ingestion and ETL pipelines.

Typical ETL assumes having some processing logic between data sources which adds some delay i. e. "resistance", which should be measured. For example:

  • how many events Spark app process per minute and comparing to how many events come to Kafka topics
  • what's the size of unprocessed events in Kafka topics at this moment
  • how much time passed since the last aggregation processed

Collected metrics might be sent to specified monitoring service like Datadog or others. intro

Typical ETL pipelines accept incoming data and compraises a chain of processing components with certain flow geometry:

  1. There is incoming data with certain rate, we call it IN.

    Examples: Kafka's topics that accept incoming messages.

  2. There are ETL components that pull data from one data source and put it to another one. This consuming rate is called OUT.

    Examples: Spark Streaming, Hive/Presto aggregations that pull portion of data from one table and aggregate it to another one, replicators that mirror data from one data source to another one.

Bandarlog is a standalone service that tracks IN [incoming rate], OUT [consuming rate] between two and more data sources.

In addition, it allows to measure LAG which is defined as (LAG = IN - OUT).

Particular semantics of metrics IN, OUT, LAG depends on specific Data Sources and contracts that Bandarlog expects (see Metrics).

Why Bandar-Log?

  1. Easy to use. Create your own Bandar-Log in 10 minutes, just follow up with Start Bandar-Log in 3 steps section.
  2. Tested on the real-time big data pipelines for a long period of time. Bandar-Log proved itself like the most straightforward and stable component.
  3. Support. Bandar-Log is running right now. So, we care about its stability and new features.
  4. No need to modify any apps. Bandar-Log is a separated application which monitors metrics outside.
  5. Easy to extend and add custom data sources.

Getting Started

See How to Start Bandar-Log in 3 steps.

Bandar-Log concepts

  • Data source config
  • Bandarlog
  • Metric
  • Reporter

Data source config

Data source is an abstration over persistance component which can provide or store data.

In Bandar-Log data source is represented by configuration object called data source config.

Data source config specifies driver/connection properties like host, username, password etc... Data source config can be shared between multiple data sources.

Kafka data source

Kafka data source configuration:

kafka-config {                   # kafka configuration id (can be any id)
  brokers = 1.1.1.1:9092         # default list of brokers
}

example-bandarlog {
  connector = "kafka-config"     # reference to the particular kafka config
}

SQL data source

SQL data source configuration:

sql-data-source-config {             # data source configuration id (can be any id)
  host = "example.host.com"          # data source host
  port = "5433"                      # data source port
  dbname = "dbname"                  # data source database name
  username = "username"              # data source username
  password = "password"              # data source password 
  schema = "schema"                  # data source schema  
  use-ssl = true                     # data source SSL mode flag (by default = true)
  max.pool.size = 10                 # connection pool size (by default = 10) 
  connection.timeout.ms = 60000      # connection timeout in ms (by default = 60000)
}

aws-glue-source-config {
  region = "region"                  # aws region
  dbname = "database"                # database name
  access.key = "accesskey"           # access key provided by AWS account
  secret.key = "secretaccesskey"     # secret key provided by AWS account
  fetch.size = 10                    # the maximum number of partitions to return in a single response
  segment.total.number = 10          # the total number of segments - non-overlapping region of a table's partitions. Maximum possible value - 10
  maxwait.timeout.seconds = 60       # maximum wait time until all the parallel requests become completed
}

Supported data source versions:

  • Kafka (version >= 0.10.2)
  • SQL
    • Vertica (compatible with vertica driver 6.0.0)
    • Presto (compatible with presto driver 0.181)
    • AWS Glue Data Catalog (compatible and tested with aws-java-sdk-glue 1.11.388)
      * you can easily add new data source

Bandarlog

Bandarlog -- unit of data-flow monitoring for one data source or between several data sources.
Each bandarlog, depending on its type, has one or multiple connectors -- objects that reference to specific data source config. Bandarlog monitors flow between linked data sources.

For now there are two supported bandarlog types:

  • SQL -- to measure the performance of specific ETL component(s) which reads data from SQL-complient data source and writes data to SQL complient data source(s). In order to use Glue connector, one needs metadata table, for example, created by the AWS Glue crawlers. Crawlers connect to a source or target data store, determine the schema, automatically compute statistics and register partitions, and then create metadata table in the AWS Glue Data Catalog.
  • Kafka -- to measure performance of specific kafka consumer and incoming rate.

SQL connectors divided into IN and OUT connector types with one-to-many relation (we can have one IN connector and several OUT connectors):

IN - connector for input data source. IN metric will be fetched from it (input rate).
OUT - list of connectors for output data sources. OUT metrics will be fetched from it (output rate).

You can use IN or OUT connector separately according to your requirements (like in quick start examples).

SQL connector configuration:

in-connector {                               
  type = "presto"                   # data source type (vertica, presto)
  config-id = "presto-config"       # reference to the data source config id
  tag = "presto-tag-name"           # reporting tag value
}
out-connectors = [{                         
  type = "vertica"                  # data source type (vertica, presto)
  config-id = "vertica-config"      # reference to the data source config id
  tag = "vertica-tag-name"          # reporting tag value
}]

Bandar-Log App accepts list of bandarlog units that works in parallel. One Bandar-Log App instance can run all required stuff.

bandarlogs {               # bandarlogs list, every bandarlog should be inside it (can't be renamed)
    <bandarlog-1> {        # bandarlog unit (can be renamed to any name)
        ...
    }
    ...
    
    <bandarlog-n> {
        ...
    }
}

Bandarlogs are isolated therefore any connection/semantics issues affected one bandarlog won't affect others.

Each bandarlog has several mandatory properties:

  1. Enabled - flag to enable/disable bandarlog unit
enabled = true
  1. Bandarlog type - can be kafka or sql according to data source
bandarlog-type = "kafka"
  1. Column type - for sql bandarlog (see SQL section).
column-type = "timestamp"
  1. Data source & Connector - see Data sources section.

  2. Metrics - list of metrics which are should be calculated and reported (see Metric).

metrics = ["IN", "OUT", "LAG"]    
  1. Reporters - see Reporter section.

  2. Scheduler - specifies bandarlog execution time

scheduler {                                         
  delay.seconds = 0              # delay in seconds before bandarlog is to be executed
  scheduling.seconds = 60        # time in seconds between bandarlog executions
}
  1. Tables/Topics (according to the bandarlog type) - each metric will be calculated and reported for each table/topic from the list.

SQL tables:

tables = [                                                 # example of config for table when column-type = datetime 
  {
    in-table = "in_table_1"                                # table name for for the IN metric
    in-columns = ["year=yyyy", "month=MM", "day=dd"]       # <column>=<format> pairs for the IN metric
    out-table = "out_table_1"                              # table name for for the OUT metric
    out-columns = ["year=yyyy", "month=MM", "day=dd"]      # <column>=<format> pairs for the OUT metric
  },
  { 
    in-table = "in_table_n"
    in-columns = ["date=yyyy-MM-dd HH:mm:ss"] 
    out-table = "out_table_n"
    out-columns = ["date=yyyy-MM-dd HH:mm:ss"]
  }, 
  ...
]
tables = [                                                 # example of config for table when column-type = timestamp
  {
    in-table = "in_table_1"                                # table name for for the IN metric
    in-columns = ["in_column_1"]                           # column name for the IN metric
    out-table = "out_table_1"                              # table name for for the OUT metric
    out-columns = ["out_column_1"]                         # column name for the OUT metric
  }, 
  ...
]

Kafka topics:

topics = [                                        
  {
    topic-id = "topic_id"                     # user-friendly topic id, every metric will be tagged with this value
    topic = ["topic_1", "topic_2"]            # kafka topics
    group-id = "group_id"                     # kafka group id
  },
  ...
]

Metric

Bandar-Log measures three fundamental metrics IN, OUT, LAG whose semantics depends heavily on bandarlog-type (kafka, sql). Bandarlog object contains section metrics to specify either all of them or just required subset.

metrics = ["IN", "OUT", "LAG"]

Kafka

Note

Bandar-Log assumes that Kafka consumer component that require to me monitored, commit their offsets back to Kafka using Kafka API.

The following metrics are available for bandarlog with type kafka :

Metric Reporting metric name Value type Required params Description
IN *.in_messages incoming messages (long) topic Number of incoming messages across all topic partitions calculates as SUM of leader offsets ** for all topic partitions fetched from Kafka API (getLatestLeaderOffsets)
using topic from topics list
OUT *.out_messages consumed messages (long) topic, group-id Number of consumed messages across all topic partitions calculates as SUM of consumer offsets ** for all topic partitions fetched from Kafka API (getConsumerOffsets)
using topic and group-id from topics list
LAG *.lag unconsumed messages (long) topic, group-id Number of unconsumed messages, calculates as Sum(leader offsets - consumer offsets) ** per topic

* reporting prefix
** according to kafka architecture, offset is an order of messages

SQL

Note

Bandar-Log assumes:

  1. ETL components use dedicated column(s) to mark and isolate specific piece of processed data.
  2. Column can be of several types according to data type of partition column: timestamp or datetime.
  3. The semantics of timestamp column (here and futher called batch_id [name is configurable]) is Unixtime timestamp measured in milliseconds (UTC by definition) which determines a moment of time when piece of data has been processed.
  4. This column must be fetched using query SELECT MAX(batch_id) FROM :table.
  5. The semantics of datetime is Date/Timestamp (e.g., '2013-01-01' or '2015-04-09 14:07'). There can be several columns of type datetime. Along with the column name the appropriate format must be provided via config for parsing the partition values to date represented by milliseconds. The format is according to Date and Time Patterns in Java SimpleDateFormat.
  6. These columns must be fetched using query SELECT DISTINCT year, month, day FROM :table
  7. In case of the AWS Glue connector, a dedicated column (for example, batch_id) should be a partition column. Thus, having a metadata table generated by AWS Glue crawler, Glue client can extract maximum value for partition column without the need for scanning the whole table in AWS Athena.

(for presto with optimize_metadata_queries=true connection setting)

The following metrics are available for bandarlog with type sql :

Metric Reporting metric name Value type Required params Description
IN *.in_timestamp timestamp (long) in-table Timestamp fetched from in-connector data source using <table>:<column>
pair from in-table property
OUT *.out_timestamp timestamp (long) out-table Timestamp fetched from out-connectors data sources using <table>:<column>
pair from out-table property
LAG *.lag diff in milliseconds (long) in-table, out-table Difference between IN and OUT timestamps (LAG = IN - OUT)
REALTIME_LAG *.realtime_lag diff in milliseconds (long) out-table Difference between current timestamp in UTC and OUT timestamp (REALTIME_LAG =
System.currentTimeMillis() - OUT)

* reporting prefix

Reporter

Reporter API for the specific monitoring service like Datadog.

Reporters configuration:

report {                                           # each metric will be reported using these properties
  prefix = "vertica_metrics"                       # report prefix which should be used for reported metrics (kafka_metrics.in_messages..)
  interval.sec = 180                               # reporter running interval
}    

reporters = [                                      # list of reporters, where each metric should be reported
  {
    type = "datadog"                               # reporter type
    config-id = "datadog-config"                   # reference to reporter config
  }
]

Datadog

Currently, we are using Datadog reporter as a single reporter for bandarlog metrics.
Inside datadog reporter configuration you can specify host, metrics prefix and running interval for datadog reporter.
Also, we are using Datadog tags to keep metrics data aggregated, look at Reporting Tags section for more details.

configuration:

datadog-config {                      # datadog reporter config id
  host = null                         # use 'null' to use local datadog agent or specify host value  
}

Reporting Tags

Tags are a way of adding dimensions to metrics, so they can be sliced, aggregated, filtered.

Default Tags
Kafka
Metric Tags
IN topic:<topic-id>
OUT topic:<topic-id>, group-id:<group-id>
LAG topic:<topic-id>, group-id:<group-id>

* <topic-id> and <group-id> are placeholders for topic values from topics=[...] list inside kafka bandarlog config

SQL
Metric Tags
IN in_table:<in-table>, in_connector:<in-connector>
OUT out_table:<out-table>, out_connector:<out-connector>
LAG in_table:<in-table>, in_connector:<in-connector>, out_table:<out-table>, out_connector:<out-connector>
REALTIME_LAG out_table:<out-table>, out_connector:<out-connector>

* <in-table> and <out-table> are table placeholders from tables=[...] list inside sql bandarlog config.
<in-connector> and <out-connector> are placeholders from tag value inside in/out connector config.

Environment Tag

env:<environment>
<environment> value will be fetched from APP_ENVIRONMENT variable if it does not exist - the tag will not be reported.

Custom Tags

You can specify your own reporting tags, just add the following config to your bandarlog with a required key-value pair.

bandarlogs {
  example-bandarlog {
    ...
    tags = [
      {
        key = "<tag_name>"
        value = "<tag_value>"
      }
    ]
    ...
  }
}

Architecture overview

overview

  1. Data source
    Kafka, Vertica, Presto, AWS Glue Data Catalog.

  2. Connector
    API layer over data source.

  3. Metric Provider
    Metric Provider calls connector API to fetch data from data source and calculates specific business metrics based on it. Each metric has appropriate metric provider accordingly to bandarlog-type.
    For example IN metric for bandarlog-type = "kafka" will use KafkaInMessagesProvider but for bandarlog-type = "sql" it will use SqlTimestampProvider.

  4. Bandarlog manager
    Bandarlog is a monitoring unit for one or several data sources which is responsible for the managing data providers and reporters.

  5. Metric Reporter
    Reporter API for the specific monitoring service like Datadog.

  6. Monitor
    Metrics monitor in monitoring service.

Docker image

Published as bandarlog

Build custom image

publish bandarlog image to the local machine:

sbt bandarlog/docker:publishLocal

push image to docker registry:

docker login -u <docker_user> -p <docker_password> <docker_registry>
docker push <image>

License

Bandar-Log is released under the Apache License, Version 2.0

Credits

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].