All Projects → itkpi → trembita

itkpi / trembita

Licence: MIT License
Model complex data transformation pipelines easily

Programming Languages

scala
5932 projects
kotlin
9241 projects

Projects that are alternatives of or similar to trembita

Es Cqrs Shopping Cart
A resilient and scalable shopping cart system designed using Event Sourcing (ES) and Command Query Responsibility Segregation (CQRS)
Stars: ✭ 19 (-56.82%)
Mutual labels:  cats, cassandra, akka-streams
pyfuncol
Functional collections extension functions for Python
Stars: ✭ 32 (-27.27%)
Mutual labels:  functional, parallel, collections
Cloe
Cloe programming language
Stars: ✭ 398 (+804.55%)
Mutual labels:  functional, lazy, parallel
telepooz
Functional Telegram Bot API wrapper for Scala on top of akka, circe, cats, and shapeless
Stars: ✭ 26 (-40.91%)
Mutual labels:  cats, akka-streams
Foundationdb4s
Type-safe and idiomatic Scala client for FoundationDB
Stars: ✭ 23 (-47.73%)
Mutual labels:  cats, akka-streams
Sup
Composable, purely functional healthchecks in Scala.
Stars: ✭ 138 (+213.64%)
Mutual labels:  cats, functional
Eclipse Collections
Eclipse Collections is a collections framework for Java with optimized data structures and a rich, functional and fluent API.
Stars: ✭ 1,828 (+4054.55%)
Mutual labels:  functional, collections
machine-learning-data-pipeline
Pipeline module for parallel real-time data processing for machine learning models development and production purposes.
Stars: ✭ 22 (-50%)
Mutual labels:  parallel, data-pipeline
ugo
Simple and expressive toolbox written in Go
Stars: ✭ 27 (-38.64%)
Mutual labels:  functional, collections
swift-di-explorations
Functional DI explorations in Swift
Stars: ✭ 28 (-36.36%)
Mutual labels:  functional, typesafe
Funky
Funky is a functional utility library written in Objective-C.
Stars: ✭ 41 (-6.82%)
Mutual labels:  functional, collections
freecli
Command line parsing library using Free Applicative
Stars: ✭ 29 (-34.09%)
Mutual labels:  cats, typesafe
linqjs
Perform queries on collections in the manner of C#s System.Linq in JavaScript
Stars: ✭ 14 (-68.18%)
Mutual labels:  functional, lazy
Chaos
The Chaos Programming Language
Stars: ✭ 171 (+288.64%)
Mutual labels:  functional, typesafe
Pulsar4s
Idiomatic, typesafe, and reactive Scala client for Apache Pulsar
Stars: ✭ 172 (+290.91%)
Mutual labels:  cats, akka-streams
Rangeless
c++ LINQ -like library of higher-order functions for data manipulation
Stars: ✭ 148 (+236.36%)
Mutual labels:  functional, parallel
Nspl
Non-Standard PHP Library - functional primitives toolbox and more
Stars: ✭ 365 (+729.55%)
Mutual labels:  functional, lazy
advxml
A lightweight, simple and functional library DSL to work with XML in Scala with Cats
Stars: ✭ 54 (+22.73%)
Mutual labels:  cats, functional
liquibase-slf4j
Liquibase SLF4J Logger.
Stars: ✭ 42 (-4.55%)
Mutual labels:  slf4j, log4j2
fastdata-cluster
Fast Data Cluster (Apache Cassandra, Kafka, Spark, Flink, YARN and HDFS with Vagrant and VirtualBox)
Stars: ✭ 20 (-54.55%)
Mutual labels:  spark, cassandra
Project Current version Scala version
Trembita
0.8.5-SNAPSHOT
2.11.12, 2.12.8

codecov Build Status Cats Friendly Badge

trembita

Description

Project Trembita - Functional Data Pipelining library. Lets you query and transform your data in a pure functional, typesafe & declarative way. Trembita allows you to make complicated transformation pipelines where some of them are executed locally sequentially, locally in parallel on in other environments (for instance on Spark cluster, see below)

resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots"
libraryDependencies ++= {
  val trembitaV = "0.8.5-SNAPSHOT"
  Seq(
    "ua.pp.itkpi" %% "trembita-kernel" % trembitaV, // kernel,
    
    "ua.pp.itkpi" %% "trembita-cassandra" % trembitaV, // cassandra
    
    "ua.pp.itkpi" %% "trembita-phantom" % trembitaV, // phantom
    
    "ua.pp.itkpi" %% "trembita-slf4j" % trembitaV // slf4j, for logging    
  )
}

Core features

Available Integrations

Processing modules

  • kernel - lazy (parallel) data pipelines, QL for grouping/aggregations and stateful computations using Cats and Shapeless

Data sources

  • Any Iterable - just wrap your collection into DataPipeline
  • cassandra connector - fetch rows from your Cassandra database with CassandraSource
  • cassandra phantom - provides Phantom library support
  • akka stream - allows to make pipeline from akka stream (e.g. from any data source compatible with akka)
  • spark RDD / DataSet - allows to make pipeline from RDD / DataSet (e.g. from any non-streaming data source compatible with Spark)
  • spark DStreams - allows to make pipeline from Discrete streams (e.g. from any streaming data source compatible with Spark)

Miscelone

Spark support

Introducing spark pipelines

You can run some your transformations on spark cluster. To do that, add the following dependencies:

libraryDependencies ++= Seq(
    "ua.pp.itkpi" %% "trembita-spark" % trembitaV,
    "org.apache.spark" %% "spark-core" % "2.4.0" // first spark version with scala 2.12 support
)

Asynchronous computations in spark

Using spark integration you can even easily run asynchronous computations on spark with Futures:

import trembita._
import trembita.spark._
import org.apache.spark._
import scala.concurrent.{ExecutionContext, Future}
import java.util.concurrent.Executors

implicit val sc: SparkContext = ??? // requires implicit SparkContext in scope
implicit val timeout: Timeout = Timeout(5.minutes) // requires implicit timeout for async operations
implicit val ec: ExecutionContext = ???

val cachedThreadPool =
    ExecutionContext.fromExecutor(Executors.newCachedThreadPool())
    
Input
  .sequentialF[SerializableFuture, Seq]
  .create(SerializableFuture.pure(Seq(1, 2, 3, 20, 40, 60)))
  .to[Spark]
  // will be executed on spark
  .map(_ + 1)
  .mapM { i: Int =>
    val n = SerializableFuture.start { i + 1 }(cahedThreadPool)
    val b = SerializableFuture
      .start {
        val x = 1 + 2
        x * 3
      }
      .flatTap(
        xx =>
          SerializableFuture.start {
            println(s"spark debug: $xx") // you won't see this in submit logs
        }
      )

    val result: SerializableFuture[Int] =
      n.bind { nx =>
        b.where(nx > _).fmap(_ + nx)
      }

    result.attempt
  }
  .mapK(serializableFutureToIO)
  .map(_.getOrElse(-100500))
  .mapM { i =>
    IO { scala.util.Random.nextInt(10) + i }
  }
  // will be executed locally in parallel
  .to[Parallel]
  .info(i => s"parallel debug: $i") // you will see it in console
  .map(_ + 1)

Trembita will do the best to transform async lambda into serializable format. By default a special macro detects all references to ExecutionContext within lambda you pass into mapM. All ExecutionContext's should be globally accessible (e.g. need to be def or val in some object). If not - your code won't compile with appropriate error. If everything is ok - macro creates helper object with references to all found ExecutionContexts making them @transient lazy val (well known technique) and rewrites your lambda so that all async transformations references to fields in that object. You can find full example here.

Happy to say that using cats.effect.IO on spark is also supported =)

FSM on Spark Datasets

You can now define stateful transformations on Spark Dataset using Finite State Machines. It's implemented using Dataset.mapWithState. Defining FSM for Spark is as simple as defining FSM for regular pipeline except of state is preserved only at level for specific key (due to mapWithState limitation). To do so, use fsmByKey:

val pipeline: DataPipelineT[F, A, Spark] = ???
pipeline.fsmByKey(getKey = ???)(... /* your FSM definition here */)

Full example can be found here.

Typesafe QL on RDD

See the full example here

Limitations

  • Be careful not to make closures against the SparkContext or SparkSession because it will fall in runtime
  • Other non-serializable resources also will fail in runtime. This will be adapted later

Examples

You can find a script to run the example on spark cluster within docker:

# in project root
sbt trembita-examples/assembly # prepare fat jar for spark-submit
sh examples/src/main/resources/spark/cluster/run.sh

To run Spark FSM example in docker use the following script:

# in project root
sbt trembita-examples/assembly # prepare fat jar for spark-submit
sh examples/src/main/resources/spark/cluster/run_fsm.sh

To run Spark QL example in docker use the following script:

# in project root
sbt trembita-examples/assembly # prepare fat jar for spark-submit
sh examples/src/main/resources/spark/cluster/run_ql.sh

Before running QL please remove spire jars from spark classpath to avoid dependency conflicts

Akka streams support

Trembita now supports running a part of your transformations on akka-streams. To use it, add the following dependency:

libraryDependencies += "ua.pp.itkpi" %% "trembita-akka-streams" % trembitaV

You can run existing pipeline through akka stream or create a pipeline from source directly:

import akka.stream.scaladsl._
import trembita.akka_streams._

val fileLines =
  Input.fromSourceF[IO, ByteString, Future[IOResult]](IO {
    FileIO
      .fromPath(Paths.get("examples/src/main/resources/words.txt"))
  })

Akka streaming pipelines also support FSM using custom graph state:

val pipeline: DataPipelineT[IO, Int, Akka] = ???
val stateful = pipeline.fsm(/* your FSM definition here */)

You can find full examples here

Seamless Akka to Spark integration

Add the following dependency if you wan't to run your pipeline through both akka streams and spark RDD:

libraryDependencies += "ua.pp.itkpi" %% "trembita-seamless-akka-spark" % trembitaV

It goal is to avoid additional overhead when switching between akka and spark. Akka -> Spark is implemented using custom Sink. Spark -> Akka is implemented using toLocalIterator

Spark streaming support

Trembita now allows to write QL and FSM upon spark DStreams.

libraryDependencies += "ua.pp.itkpi" %%  "trembita-spark-streaming" % trembitaV

For examples see here Run scripts:

java.util.stream integration

libraryDependencies += "ua.pp.itkpi" %%  "trembita-java-streams" % trembitaV

See sources and tests for examples

Seamless akka infinispan integration

libraryDependencies += "ua.pp.itkpi" %% "trembita-seamless-akka-infinispan" % trembitaV

Allows to cache akka stream. See example

To be done

  • caching
  • integration with distributed streaming frameworks
  • tensorflow
  • slick

Additional information

My speec about trembita at Scalaua conference: https://youtu.be/PDBVCVv4mVc

What means trembita?

trembita

Trembita is a alpine horn made of wood. It is common among Ukrainian highlanders Hutsuls who used to live in western Ukraine, eastern Poland, Slovakia and northern Romania. In southern Poland it's called trombita, bazuna in the North and ligawka in central Poland.

Contributors

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].