All Projects → permutive → fs2-google-pubsub

permutive / fs2-google-pubsub

Licence: Apache-2.0 license
Google Cloud Pub/Sub stream-based client built on top of cats-effect, fs2 and http4s.

Programming Languages

scala
5932 projects

Projects that are alternatives of or similar to fs2-google-pubsub

pfps-examples
🏮 Standalone examples shown in the book "Practical FP in Scala: A hands-on approach"
Stars: ✭ 167 (+421.88%)
Mutual labels:  fs2, fp, cats-effect
fs2-es
Event sourcing utilities for FS2
Stars: ✭ 75 (+134.38%)
Mutual labels:  streaming, fs2, cats-effect
tutorials
🎥 Source code of the examples shown in the video tutorials
Stars: ✭ 18 (-43.75%)
Mutual labels:  fs2, fp, cats-effect
scala-functional-programming-tutorial
Functional Programming in Scala Tutorial
Stars: ✭ 23 (-28.12%)
Mutual labels:  fs2, fp, cats-effect
pfhais
Source code of the book Pure functional HTTP APIs in Scala including a chapter about upgrading to Scala 3.
Stars: ✭ 48 (+50%)
Mutual labels:  fs2, cats-effect
tradeio
A disciplined way to purely functional domain models in Scala
Stars: ✭ 19 (-40.62%)
Mutual labels:  fp, cats-effect
Monix
Asynchronous, Reactive Programming for Scala and Scala.js.
Stars: ✭ 1,819 (+5584.38%)
Mutual labels:  fp, cats-effect
fs2-ssh
A wrapper around Apache SSHD targeting cats-effect and fs2
Stars: ✭ 36 (+12.5%)
Mutual labels:  fp, cats-effect
fs2-ftp
Simple client for Ftp/Ftps/Sftp
Stars: ✭ 24 (-25%)
Mutual labels:  fs2, cats-effect
swam
WebAssembly engine in Scala
Stars: ✭ 38 (+18.75%)
Mutual labels:  fs2, cats-effect
typelevel-stack.g8
📚 Unofficial Giter8 template for the Typelevel Stack (Http4s / Doobie / Circe / Cats Effect / Fs2) based on Cats v1.x.x
Stars: ✭ 63 (+96.88%)
Mutual labels:  fs2, cats-effect
meteor
https://d2a4u.github.io/meteor/
Stars: ✭ 17 (-46.87%)
Mutual labels:  fs2, cats-effect
tamer
Standalone alternatives to Kafka Connect Connectors
Stars: ✭ 42 (+31.25%)
Mutual labels:  fs2, fp
apropos
Fast strong typed 'Either' data structure for typescript and flow
Stars: ✭ 20 (-37.5%)
Mutual labels:  fp
theWorldInSafety
Surveillance System Against Violence
Stars: ✭ 31 (-3.12%)
Mutual labels:  streaming
LazyMan-iOS
A simple app that lets you stream every live and archived NHL and MLB game from any of your iOS devices.
Stars: ✭ 73 (+128.13%)
Mutual labels:  streaming
live-cryptocurrency-streaming-flutter
A Flutter app with live cryptocurrency updates, powered by Ably
Stars: ✭ 26 (-18.75%)
Mutual labels:  streaming
rdfa-streaming-parser.js
A fast and lightweight streaming RDFa parser for JavaScript
Stars: ✭ 15 (-53.12%)
Mutual labels:  streaming
ruby-stream-api
Ruby Stream API. Inspired by Java 8's Stream API.
Stars: ✭ 21 (-34.37%)
Mutual labels:  streaming
telemetry-streaming
Spark Streaming ETL jobs for Mozilla Telemetry
Stars: ✭ 16 (-50%)
Mutual labels:  streaming

fs2-google-pubsub

Build Status Maven Central

Google Cloud Pub/Sub stream-based client built on top of cats-effect, fs2 and http4s.

fs2-google-pubsub provides a mix of APIs, depending on the exact module. Consumers are provided as fs2 streams, while the producers are effect-based, utilising cats-effect.

Table of Contents

Module overview

Public modules

  • fs2-google-pubsub-grpc - an implementation that utilises Google's own Java library
  • fs2-google-pubsub-http - an implementation that uses http4s and communicates via the REST API

Internal modules

  • fs2-google-pubsub - shared classes for all implementations

Dependencies

Add one (or more) of the following to your build.sbt, see Releases for latest version:

libraryDependencies += "com.permutive" %% "fs2-google-pubsub-grpc" % Version

OR

libraryDependencies += "com.permutive" %% "fs2-google-pubsub-http" % Version

Also note you need to add an explicit HTTP client implementation. http4s provides different implementations for the clients, including blaze, async-http-client, jetty, okhttp and others.

If async-http-client is desired, add the following to build.sbt:

libraryDependencies += "org.http4s" %% "http4s-async-http-client" % Version

Examples

Consumer (Google)

See PubsubGoogleConsumerConfig for more configuration options.

package com.permutive.pubsub.consumer.google

import cats.effect.{ExitCode, IO, IOApp}
import cats.syntax.all._
import com.permutive.pubsub.consumer.Model
import com.permutive.pubsub.consumer.decoder.MessageDecoder

object SimpleDriver extends IOApp {
  case class ValueHolder(value: String) extends AnyVal

  implicit val decoder: MessageDecoder[ValueHolder] = (bytes: Array[Byte]) => {
    Right(ValueHolder(new String(bytes)))
  }

  override def run(args: List[String]): IO[ExitCode] = {
    val stream = PubsubGoogleConsumer.subscribe[IO, ValueHolder](
      Model.ProjectId("test-project"),
      Model.Subscription("example-sub"),
      (msg, err, ack, _) => IO(println(s"Msg $msg got error $err")) >> ack,
      config = PubsubGoogleConsumerConfig(
        onFailedTerminate = _ => IO.unit
      )
    )

    stream
      .evalTap(t => t.ack >> IO(println(s"Got: ${t.value}")))
      .compile
      .drain
      .as(ExitCode.Success)
  }
}

Consumer (HTTP)

See PubsubHttpConsumerConfig for more configuration options.

package com.permutive.pubsub.consumer.http

import cats.effect._
import cats.syntax.all._
import com.permutive.pubsub.consumer.Model
import com.permutive.pubsub.consumer.decoder.MessageDecoder
import org.http4s.client.asynchttpclient.AsyncHttpClient
import fs2.Stream

import scala.util.Try

object Example extends IOApp {
  case class ValueHolder(value: String) extends AnyVal

  implicit val decoder: MessageDecoder[ValueHolder] = (bytes: Array[Byte]) => {
    Try(ValueHolder(new String(bytes))).toEither
  }

  override def run(args: List[String]): IO[ExitCode] = {
    val client = AsyncHttpClient.resource[IO]()

    val mkConsumer = PubsubHttpConsumer.subscribe[IO, ValueHolder](
      Model.ProjectId("test-project"),
      Model.Subscription("example-sub"),
      Some("/path/to/service/account"),
      PubsubHttpConsumerConfig(
        host = "localhost",
        port = 8085,
        isEmulator = true,
      ),
      _,
      (msg, err, ack, _) => IO(println(s"Msg $msg got error $err")) >> ack,
    )

    Stream.resource(client)
      .flatMap(mkConsumer)
      .evalTap(t => t.ack >> IO(println(s"Got: ${t.value}")))
      .as(ExitCode.Success)
      .compile
      .lastOrError
  }
}

Producer (Google)

See PubsubProducerConfig for more configuration.

package com.permutive.pubsub.producer.google

import cats.effect.{ExitCode, IO, IOApp}
import cats.syntax.all._
import com.permutive.pubsub.producer.Model
import com.permutive.pubsub.producer.encoder.MessageEncoder

import scala.concurrent.duration._

object PubsubProducerExample extends IOApp {

  case class Value(v: Int) extends AnyVal

  implicit val encoder: MessageEncoder[Value] = new MessageEncoder[Value] {
    override def encode(a: Value): Either[Throwable, Array[Byte]] =
      Right(BigInt(a.v).toByteArray)
  }

  override def run(args: List[String]): IO[ExitCode] = {
    GooglePubsubProducer.of[IO, Value](
      Model.ProjectId("test-project"),
      Model.Topic("values"),
      config = PubsubProducerConfig[IO](
        batchSize = 100,
        delayThreshold = 100.millis,
        onFailedTerminate = e => IO(println(s"Got error $e")) >> IO.unit
      )
    ).use { producer =>
      producer.produce(
        Value(10),
      )
    }.map(_ => ExitCode.Success)
  }
}

Producer (HTTP)

See PubsubHttpProducerConfig for more configuration options.

package com.permutive.pubsub.producer.http

import cats.effect.{ExitCode, IO, IOApp}
import cats.syntax.all._
import com.github.plokhotnyuk.jsoniter_scala.core._
import com.github.plokhotnyuk.jsoniter_scala.macros._
import com.permutive.pubsub.producer.Model
import com.permutive.pubsub.producer.encoder.MessageEncoder
import org.http4s.client.asynchttpclient.AsyncHttpClient

import scala.concurrent.duration._
import scala.util.Try

object ExampleGoogle extends IOApp {

  final implicit val Codec: JsonValueCodec[ExampleObject] =
    JsonCodecMaker.make[ExampleObject](CodecMakerConfig)

  implicit val encoder: MessageEncoder[ExampleObject] = (a: ExampleObject) => {
    Try(writeToArray(a)).toEither
  }

  case class ExampleObject(
    projectId: String,
    url: String,
  )

  override def run(args: List[String]): IO[ExitCode] = {
    val mkProducer = HttpPubsubProducer.resource[IO, ExampleObject](
      projectId = Model.ProjectId("test-project"),
      topic = Model.Topic("example-topic"),
      googleServiceAccountPath = Some("/path/to/service/account"),
      config = PubsubHttpProducerConfig(
        host = "pubsub.googleapis.com",
        port = 443,
        oauthTokenRefreshInterval = 30.minutes,
      ),
      _
    )

    val http = AsyncHttpClient.resource[IO]()
    http.flatMap(mkProducer).use { producer =>
      producer.produce(
        data = ExampleObject("70251cf8-5ffb-4c3f-8f2f-40b9bfe4147c", "example.com")
      )
    }.flatTap(output => IO(println(output))) >> IO.pure(ExitCode.Success)
  }
}

Producer (HTTP) automatic-batching

See PubsubHttpProducerConfig and BatchingHttpPublisherConfig for more configuration options.

package com.permutive.pubsub.producer.http

import cats.effect.{ExitCode, IO, IOApp}
import cats.syntax.all._
import com.github.plokhotnyuk.jsoniter_scala.core._
import com.github.plokhotnyuk.jsoniter_scala.macros._
import com.permutive.pubsub.producer.Model
import com.permutive.pubsub.producer.encoder.MessageEncoder
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import org.http4s.client.asynchttpclient.AsyncHttpClient

import scala.concurrent.duration._
import scala.util.Try

object ExampleBatching extends IOApp {

  private[this] final implicit val unsafeLogger: Logger[IO] = Slf4jLogger.unsafeCreate[IO]

  final implicit val Codec: JsonValueCodec[ExampleObject] =
    JsonCodecMaker.make[ExampleObject](CodecMakerConfig)

  implicit val encoder: MessageEncoder[ExampleObject] = (a: ExampleObject) => {
    Try(writeToArray(a)).toEither
  }

  case class ExampleObject(
    projectId: String,
    url: String,
  )

  override def run(args: List[String]): IO[ExitCode] = {
    val mkProducer = BatchingHttpPubsubProducer.resource[IO, ExampleObject](
      projectId = Model.ProjectId("test-project"),
      topic = Model.Topic("example-topic"),
      googleServiceAccountPath = Some("/path/to/service/account"),
      config = PubsubHttpProducerConfig(
        host = "localhost",
        port = 8085,
        oauthTokenRefreshInterval = 30.minutes,
        isEmulator = true,
      ),

      batchingConfig = BatchingHttpProducerConfig(
        batchSize = 10,
        maxLatency = 100.millis,

        retryTimes = 0,
        retryInitialDelay = 0.millis,
        retryNextDelay = _ + 250.millis,
      ),
      _
    )

    val messageCallback: Either[Throwable, Unit] => IO[Unit] = {
      case Right(_) => Logger[IO].info("Async message was sent successfully!")
      case Left(e) => Logger[IO].warn(e)("Async message was sent unsuccessfully!")
    }

    client
      .flatMap(mkProducer)
      .use { producer =>
        val produceOne = producer.produce(
          data = ExampleObject("1f9774be-9d7c-4dd9-8d97-855b681938a9", "example.com"),
        )

        val produceOneAsync = producer.produceAsync(
          data = ExampleObject("a84a3318-adbd-4eac-af78-eacf33be91ef", "example.com"),
          callback = messageCallback
        )

        for {
          result1 <- produceOne
          result2 <- produceOne
          result3 <- produceOne
          _       <- result1
          _       <- Logger[IO].info("First message was sent!")
          _       <- result2
          _       <- Logger[IO].info("Second message was sent!")
          _       <- result3
          _       <- Logger[IO].info("Third message was sent!")
          _       <- produceOneAsync
          _       <- IO.never
        } yield ()
      }
      .as(ExitCode.Success)
  }
}

HTTP vs Google

Google pros and cons

Pros of using the Google library

  • Underlying library well supported (theoretically)
  • Uses gRPC and HTTP/2 (should be faster)
  • Automatically handles authentication

Cons of using Google Library

  • Uses gRPC (if you uses multiple Google libraries with different gRPC versions, something will break)
  • Bloated
  • More dependencies
  • Less functional
  • Doesn't work with the official PubSub emulator (is in feature backlog)
  • Google API can change at any point (shouldn't be exposed to users of fs2-google-pubsub, but slows development/updating)

HTTP pros and cons

Pros of using HTTP variant

  • Less dependencies
  • Works with the PubSub emulator
  • Fully functional
  • Stable API
  • Theoretically less memory usage, especially for producer

Cons of using HTTP variant

  • Authentication is handled manually, hence potentially less secure/reliable
  • By default uses old HTTP 1.1 (potentially slower), but can be configured to use HTTP/2 if supported HTTP client backend is chosen

Licence

   Copyright 2018-2019 Permutive, Inc.

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].