All Projects → O2-Czech-Republic → proxima-platform

O2-Czech-Republic / proxima-platform

Licence: Apache-2.0 and 2 other licenses found Licenses found Apache-2.0 LICENSE Unknown license-header-spotless.txt Unknown license-header.txt
The Proxima platform.

Programming Languages

java
68154 projects - #9 most used programming language

Projects that are alternatives of or similar to proxima-platform

Pulsar Spark
When Apache Pulsar meets Apache Spark
Stars: ✭ 55 (+223.53%)
Mutual labels:  apache-spark, batch-processing
generic-batch-processor
”Building a concurrent and distributed system for batch processing which is fault tolerant and can scale up or scale out using Akka.NET (based on actor model)”.
Stars: ✭ 18 (+5.88%)
Mutual labels:  batch-processing
fab-oidc
Flask-AppBuilder SecurityManager for OpenIDConnect
Stars: ✭ 28 (+64.71%)
Mutual labels:  analytical-platform
streamsx.kafka
Repository for integration with Apache Kafka
Stars: ✭ 13 (-23.53%)
Mutual labels:  apache-spark
aws-batch-example
Example use of AWS batch
Stars: ✭ 96 (+464.71%)
Mutual labels:  batch-processing
corb2
MarkLogic tool for processing and reporting on content, enhanced from the original CoRB
Stars: ✭ 18 (+5.88%)
Mutual labels:  batch-processing
data-product-analytics
Template to deploy a Data Product for analytics and data science use-cases into a Data Landing Zone of the Data Management & Analytics Scenario (former Enterprise-Scale Analytics). The Data Product template can be used by cross-functional teams to create insights and products for external users.
Stars: ✭ 62 (+264.71%)
Mutual labels:  data-mesh
ElasticBatch
Elasticsearch tool for easily collecting and batch inserting Python data and pandas DataFrames
Stars: ✭ 21 (+23.53%)
Mutual labels:  batch-processing
rack-cargo
🚚 Batch requests for Rack apps (works with Rails, Sinatra, etc)
Stars: ✭ 17 (+0%)
Mutual labels:  batch-processing
learn-by-examples
Real-world Spark pipelines examples
Stars: ✭ 84 (+394.12%)
Mutual labels:  apache-spark
Location-based-Restaurants-Recommendation-System
Big Data Management and Analysis Final Project
Stars: ✭ 44 (+158.82%)
Mutual labels:  apache-spark
data-landing-zone
Template to deploy a single Data Landing Zone of the Data Management & Analytics Scenario (former Enterprise-Scale Analytics). The Data Landing Zone is a logical construct and a unit of scale in the architecture that enables data retention and execution of data workloads for generating insights and value with data.
Stars: ✭ 136 (+700%)
Mutual labels:  data-mesh
spark-twitter-sentiment-analysis
Sentiment Analysis of a Twitter Topic with Spark Structured Streaming
Stars: ✭ 55 (+223.53%)
Mutual labels:  apache-spark
fink-broker
Astronomy Broker based on Apache Spark
Stars: ✭ 18 (+5.88%)
Mutual labels:  apache-spark
daily-home
dailyhome - open home automation platform powered by openfaas targeted easy adaptation
Stars: ✭ 28 (+64.71%)
Mutual labels:  iot-platform
micrOS
micrOS - mini automation OS for DIY projects requires reliable direct communication
Stars: ✭ 55 (+223.53%)
Mutual labels:  iot-platform
mmtf-spark
Methods for the parallel and distributed analysis and mining of the Protein Data Bank using MMTF and Apache Spark.
Stars: ✭ 20 (+17.65%)
Mutual labels:  apache-spark
python-batch-runner
A tiny framework for building batch applications as a collection of tasks in a workflow.
Stars: ✭ 22 (+29.41%)
Mutual labels:  batch-processing
awesome-tools
curated list of awesome tools and libraries for specific domains
Stars: ✭ 31 (+82.35%)
Mutual labels:  apache-spark
treenet
Recursive Neural Networks for PyTorch
Stars: ✭ 29 (+70.59%)
Mutual labels:  batch-processing

Build Status sonar Maven Version

The Proxima platform

The platform is a generic data ingestion, manipulation and retrieval framework. High level can be described by following scheme:

high-level scheme

Design document

High level design document can be found here.

Scheme definition

First, let's introduce some glossary:

  • entity: a named dictionary consisting of string key and one or more attributes
  • attribute: an atomic field of entity with string name and scheme definining its data-type
  • attribute family: a logical grouping of attributes of the same entity into a named group
  • storage: a physical store for data

Example scheme definition

The scheme definition uses HOCON. As a short example we will show definition of data processing of a hypothetic e-commerce site. The site has some goods, some users and generates some events which describe how users interact with the goods. We will use protocol buffers for serialization.

First, let's define our data model. We will model the system which processes events coming from some source in given format and based on these events creates a model of user preferences.

entities {
  # user entity, let's make this really simple
  user {
    attributes {

      # some details of user - e.g. name, email, ...
      details { scheme: "proto:cz.o2.proxima.example.Example.UserDetails" }

      # model of preferences based on events
      preferences { scheme: "proto:cz.o2.proxima.example.Example.UserPreferences" }

      # selected events are stored to user's history
      "event.*" { scheme: "proto:cz.o2.proxima.example.Example.BaseEvent" }

    }
  }
  # entity describing a single good we want to sell
  product {
    # note: we have to split to separate attributes each attribute that we want to be able
    # to update *independently*
    attributes {

      # price, with some possible additional information, like VAT and other stuff
      price { scheme: "proto:cz.o2.proxima.example.Example.Price" }

      # some general details of the product
      details { scheme: "proto:cz.o2.proxima.example.Example.ProductDetails" }

      # list of associated categories
      "category.*" { scheme: "proto:cz.o2.proxima.example.Example.ProductCategory" }

    }
  }

  # the events which link users to goods
  event {
    attributes {

      # the event is atomic entity with just a single attribute
      data { scheme: "proto:cz.o2.proxima.example.Example.BaseEvent" }

    }
  }

}

Next, after defining our data model, we need to specify attribute families for our entities. This definition is highly dependent on the access pattern to the data. Mostly, we have to worry about how are we going to read our data. Relevant questions are:

  • are we going to need a random access (get or list request) for data by entity key and attribute name?
  • are we going to be reading the data as continuously updated stream?
  • do we want to be able to read all historical updates, or are we interested only in the last updated value for each attribute?
  • are we going to process the data in batch fashion to build some sort of model?

Let's describe our intentions as follows:

  • we need to be able to batch reprocess all events (maybe limited by some global time window, say two years back), in order to build a model that will be used to update user's preferences with incoming events
  • we need random acccess to data stored per user and per product
  • we need access to stream of events to be able to do real-time updates to user preferences
  • we want to be able to select some events to be stored in user's history and then list this history by time from newest to oldest

To be able to fulfill these requirements, we have chosen the following storages:

This will yield us the following setup for attribute families (some details are ommitted for simplicity):

 attributeFamilies {

    # we need this to be able to read user attributes 'details' and 'preferences' by user's key
    user-random-access {
      entity: user
      attributes: [ "details", "preferences" ]
      storage: "cassandra://"${cassandra.seed}/${cassandra.user-table}"?primary=user"
      type: primary
      access: random-access
    }

    # store incoming events to user's history
    user-event-history-store {
      entity: event
      attributes: [ "data" ]
      storage: "cassandra://"${cassandra.seed}/${cassandra.user-event-table}/
      # this class defines how we transform incoming event to CQL
      cqlFactory: cz.o2.proxima.example.EventHistoryCqlFactory
      # this is filtering condition, we want to select only some events
      filter: cz.o2.proxima.example.EventHistoryFilter
      type: replica
      access: write-only
    }

    # this family defines read access to the stored event history
    user-event-history-read {
      entity: user
      attributes: [ "event.*" ]
      storage: "cassandra://"${cassandra.seed}/${cassandra.user-event-table}"?primary=user&secondary=stamp&data=event&reversed=true"
      # ignore this for now
      converter: cz.o2.proxima.storage.cassandra.DateToLongConverter
      type: replica
      # we will not explicitly modify this, it will be updated automatically by incoming events
      access: read-only
    }

    # random access to products
    product-random-acesss {
      entity: product
      attributes: [ "*" ]
      storage: "cassandra://"${cassandra.seed}/${cassandra.product-table}
      type: primary
      access: random-access
    }

    # event stream storage
    event-commit-log {
      entity: event
      attributes: [ "*" ]
      storage: "kafka://"${kafka.brokers}/${kafka.events-topic}
      # this is our commit log
      type: primary
      access: commit-log
    }
    # store events for batch analytics
    event-batch-storage {
      entity: event
      attributes: [ "*" ]
      storage: "hdfs://"${hdfs.authority}/${hdfs.event-path}
      type: replica
      access: batch-updates
    }

  }

  cassandra {
    seed = "cassandra:9042"
    user-table = "user"
    product-table = "product"
    user-event-table = "user_event"
  }

  kafka {
    brokers = "kafka1:9092,kafka2:9092,kafka3:9092"
    events-topic = "events"
  }

  hdfs {
    authority = "hdfs-master"
    event-path = "/events"
  }

By this definition, we have (somewhat simplified) working description of Proxima platform scheme for data manipulation, that can be fed into the ingestion/retrieval service and will start working as described above.

Platform's data model

Generally, data are modelled as unbounded stream of updates to attributes of entities. Each update consists of the following:

  • name of entity
  • name of attribute
  • value of attribute (or flag representing delete)
  • timestamp of the update
  • UUID of the update

Each stream can then be represented as a table (a.k.a table-stream duality), which is essentially a snapshot of a stream at a certain time (in terms of Proxima platform called batch snapshot).

Compiling scheme definition to access classes

The platform contains maven compiler of scheme specification to java access classes as follows:

     <plugin>
       <groupId>cz.o2.proxima</groupId>
       <artifactId>compiler-maven-plugin</artifactId>
       <version>0.9.0</version>
       <configuration>
         <outputDir>${project.build.directory}/generated-sources/model</outputDir>
         <javaPackage>cz.o2.proxima.testing.model</javaPackage>
         <className>Model</className>
         <config>${basedir}/src/main/resources/test-readme.conf</config>
       </configuration>
       <executions>
         <execution>
           <phase>generate-sources</phase>
           <goals>
             <goal>compile</goal>
           </goals>
         </execution>
       </executions>
       <dependencies>
         <!--
           Use direct data operator access, see later
         -->
         <dependency>
           <groupId>${project.groupId}</groupId>
           <artifactId>proxima-direct-compiler-plugin</artifactId>
           <version>0.9.0</version>
         </dependency>
         <!--
           The following dependencies define additional
           dependencies for this example
         -->
         <dependency>
           <groupId>${project.groupId}</groupId>
           <artifactId>proxima-core</artifactId>
           <version>${project.version}</version>
           <classifier>tests</classifier>
         </dependency>
         <dependency>
           <groupId>${project.groupId}</groupId>
           <artifactId>proxima-scheme-proto</artifactId>
           <version>0.9.0</version>
         </dependency>
         <dependency>
           <groupId>${project.groupId}</groupId>
           <artifactId>proxima-scheme-proto</artifactId>
           <version>0.9.0</version>
           <classifier>tests</classifier>
         </dependency>
       </dependencies>
     </plugin>

This plugin then generates class cz.o2.proxima.testing.model.Model into target/generated-sources/model. The class can be instantiated via

  Model model = Model.of(ConfigFactory.defaultApplication());

or (in case of tests, where some validations and initializations are skipped)

 Model model = Model.ofTest(ConfigFactory.defaultApplication());

Platform's DataOperators

The platform offers various modes of access to data. As of version 0.9.0, these types are:

  • direct
  • Apache Beam
  • Apache Flink (experimental)

Direct access to data

This operator is used when accessing data from inside single JVM (or potentially multiple JVMs, e.g. coordinated via distributed consumption of commit log). The operator is constructed as follows:

   private DirectDataOperator createDataOperator(Model model) {
     Repository repo = model.getRepo();
     return repo.getOrCreateOperator(DirectDataOperator.class);
   }

Next, we can use the operator to create instances of data accessors, namely:

  • CommitLogReader
  • BatchLogReader
  • RandomAccessReader

For instance, observing commit log can be done by

   DirectDataOperator operator = model.getRepo().getOrCreateOperator(DirectDataOperator.class);
   CommitLogReader commitLog = operator.getCommitLogReader(
       model.getEvent().getDataDescriptor())
       .orElseThrow(() -> new IllegalArgumentException("Missing commit log for "
           + model.getEvent().getDataDescriptor()));
   commitLog.observe("MyObservationProcess", new LogObserver() {

     @Override
     public boolean onError(Throwable error) {
       throw new RuntimeException(error);
     }

     @Override
     public boolean onNext(StreamElement elem, OnNextContext context) {
       log.info("Consumed element {}", elem);
       // commit processing, so that it is not redelivered
       context.confirm();
       // continue processing
       return true;
     }

   });

Creating BatchLogReader or RandomAccessReader is analogous.

Apache Beam access to data

First, create BeamDataOperator as follows:

  BeamDataOperator operator = model.getRepo().getOrCreateOperator(BeamDataOperator.class);

Next, use this operator to create PCollection from Model.

  // some imports omitted, including these for clarity
  import org.apache.beam.sdk.Pipeline;
  import org.apache.beam.sdk.transforms.Count;
  import org.apache.beam.sdk.transforms.WithKeys;
  import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
  import org.apache.beam.sdk.transforms.windowing.FixedWindows;
  import org.apache.beam.sdk.transforms.windowing.Window;
  import org.apache.beam.sdk.values.KV;
  import org.apache.beam.sdk.values.PCollection;
  import org.joda.time.Duration;

  Pipeline pipeline = Pipeline.create();
  PCollection<StreamElement> input = operator.getStream(
      pipeline, Position.OLDEST, false, true,
      model.getEvent().getDataDescriptor());
  PCollection<KV<String, Long>> counted =
      input
          .apply(
              Window.<StreamElement>into(FixedWindows.of(Duration.standardMinutes(1)))
                  .triggering(AfterWatermark.pastEndOfWindow())
                  .discardingFiredPanes())
          .apply(
              WithKeys.of(
                  el ->
                      model
                          .getEvent()
                          .getDataDescriptor()
                          .valueOf(el)
                          .map(BaseEvent::getProductId)
                          .orElse("")))
          .apply(Count.perKey());

  PCollection<KV<String, Long>> counted = CountByKey.of(input)
      .keyBy(el -> model.getEvent()
          .getDataDescriptor()
          .valueOf(el)
          .map(BaseEvent::getProductId)
          .orElse(""))
      .windowBy(FixedWindows.of(Duration.standardMinutes(1)))
      .triggeredBy(AfterWatermark.pastEndOfWindow())
      .accumulationMode(AccumulationMode.DISCARDING_FIRED_PANES)
      .output();
  // do something with the output

Online Java docs

Build notes

CI is run only against changed modules (and its dependents) in pull requests. To completely rebuild the whole project in a PR push a commit with commit message 'rebuild'. After the build, you can squash and remove the commit.

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].